Storage and feature analysis of massive power equipment monitoring data

Storage and feature analysis of massive power equipment monitoring data

A detailed tutorial on the storage and feature analysis of massive power equipment monitoring data:

Storage and feature analysis of massive power equipment monitoring data

1 background knowledge

On-line monitoring of power equipment refers to continuous or periodic automatic monitoring and detection of the status of power equipment without power failure. The technologies used include: sensor technology, wide-area communication technology and information processing technology. On-line monitoring of power equipment is an important means to realize the state operation and maintenance management of power equipment and improve the lean level of production operation management. It has a positive and far-reaching significance for improving the intelligent level of the power grid and realizing the state operation and management of power equipment.

With the advancement of smart grid construction, online monitoring of power equipment has been greatly developed and has become a trend. Monitoring data has become increasingly large and gradually constitutes big data for power equipment monitoring. This brings power equipment online monitoring systems to data storage and processing. This is a very big technical challenge.

Power equipment monitoring big data has the characteristics of large volume, multiple types, low value density and fast processing speed. The power grid company s monitoring system currently relies too much on centralized SAN storage and data integration based on SOA. It mainly uses "enterprise-level relational databases". Limited by capacity, scalability and access speed, it currently only stores secondary processed "familiar data "And the associated query and transaction processing that they are good at are useless in data analysis, and new big data storage and processing technologies are urgently needed to deal with them.

Partial discharge data of a transformer is a typical monitoring data of power equipment. Partial discharge phase analysis (phase resolved partial discharge, PRPD) includes the process from feature extraction to pattern recognition. This article will comprehensively introduce the process of using MaxCompute to realize the feature extraction of partial discharge monitoring data.

PD signal analysis mainly includes three sub-processes: (1) The extraction of basic parameters nq- . Scan the PD signal, and count the discharge peak value and the corresponding discharge phase in the signal. (2) Spectrogram structure and statistical feature calculation. Divide the phase window, count the average discharge volume and the distribution of the number of discharges, calculate the average discharge volume phase distribution spectrum qave- and the discharge number phase distribution spectrum n- . Based on qave- and n- , with i as a random variable, the skewness Sk, steepness Ku and other statistical characteristics of the spectrum are calculated to form the discharge characteristic vector. (3) Discharge type recognition. This article will introduce how to use MapReduce to implement the first sub-process.

MaxCompute (formerly ODPS) is a massive data processing platform provided by Alibaba Cloud. It mainly serves the storage and calculation of batch structured data, and the data scale reaches EB level. MaxCompute has been widely used in the fields of data warehouse and BI analysis of large Internet companies, log analysis of websites, and transaction analysis of e-commerce websites.

In addition, this article will also use odpscmd as the client to complete various operations on MaxCompute. odpscmd is a Java program that can access MaxCompute in command mode. Using this client, you can complete various tasks including data query, data upload, and download. JRE environment is required to run, please download and install JRE 1.6+ version.

This article will use MapReduce programming to complete the calculation task of feature analysis. MapReduce was first a distributed data processing model proposed by Google, and then received extensive attention in the industry, and was widely used in various business scenarios. Such as search, Web access log analysis, text statistical analysis, massive data mining, machine learning, natural language processing, advertisement recommendation, etc.

2 Analysis process

2.1 Project creation, table creation and data upload

(1) Create a MaxCompute project

Open the official website of Aliyun:

Log in with an existing Alibaba Cloud account.

Enter the Alibaba Cloud management console, and select "Big Data (Data Plus) Big Data Computing Service" from the left navigation bar to enter the MaxCompute management console.

Click "Create Project" below to create a new MaxCompute project.

Select "I/O post-payment", fill in the project name and project description, and "OK".

After the creation is complete, you can see the project you just created in the project list.

(2) Install and configure odpscmd

Prepare the JRE environment locally, please download and install JRE 1.6+ version.

Download the odpscmd tool from the official website of Alibaba Cloud:

Unzip and configure <ODPS_CLIENT>/conf/odps_config.ini

project_name=[project_name]access_id=******************access_key=*********************end_point= and access_key can be obtained from the Alibaba Cloud management console. Just configure project_name as the created MaxCompute project.

After the configuration is complete, run <ODPS_CLIENT>/bin/odpscmd to enter the interactive mode. The project name will appear as a prompt.

(3) Build a table and add partitions

1) Create ODS_PD table to store original transformer partial discharge monitoring data.

In odpscmd, execute the following SQL statement to create a table.

create table if not exists ODS_PD( Time string, Phase bigint, Value bigint)partitioned by (DeviceID string, Date string);

When "ok" appears, it means that the table is created successfully. You can use the "ls tables;" command to view the created table.

Add partitions to the ODS_PD table.

alter table ODS_PD add if not exists partition (DeviceID=001, Date=20171116);

You can use "show partitions ODS_PD;" to verify whether the added partition is successful.

2) Create the target feature table DW_NQF

In odpscmd, execute the following SQL statement to create a table.

createtableifnotexists DW_NQF( Time string, Phase bigint, MaxV bigint) partitioned by(DeviceID string, Date string);

Add partitions to the DW_NQF table.

altertable DW_NQF addifnotexistspartition(DeviceID=001, Date=20171116);

(4) Use Tunnel to upload data

Run the tunnel command in odpscmd to upload the local data file monitor_data.csv to the ODS_PD table. The path in the following command, please modify it according to the actual path when you execute it. Please download monitor_data.csv from the attachment.

tunnel upload d:/Clouder/jfdata/monitor_data.csv ODS_PD/deviceid=001,date=20171116;

2.2 MapReduce program development, local debugging and running

(1) Preparation of local development environment

This article uses Eclipse as the development environment, please download and install it in advance.

Find and download the ODPS for eclipse plug-in in the official website navigation, unzip and copy the plug-in to the plugins subdirectory of the Eclipse installation directory. Start Eclipse and check if there is an ODPS directory in the Wizard option.

ODPS for eclipse plug-in download link: doc27981.2.3.cCapmQ&

When an ODPS type project can be created, it means that the local development environment is ready.

(2) MapReduce program development

Create an ODPS project in Eclipse and name it NQF. In order for Eclipse to access MaxCompute correctly, the local path of odpscmd needs to be configured correctly when creating the project.

Add Mapper class, Reducer class, MapReduce Driver class and R class in turn.

The code is as follows:

import;import;import com.aliyun.odps.mapred.MapperBase;publicclassFSMapperextendsMapperBase{private Record word;private Record one;privateint max=8;//threshold@Override publicvoidsetup (TaskContext context)throws IOException {word=context.createMapOutputKeyRecord(); one=context.createMapOutputValueRecord();} @Override publicvoidmap(long recordNum, Record record, TaskContext context)throws IOException {long phase=record.getBigint("phase" );//Column 1, phase long val=record.getBigint("value");//Column 2, amplitude if(Math.abs(val)>max){ word.set(newObject[]{record .get("time"),record.get("phase")});//record id as key; one.set(newObject[]{phase,val}); context.write(word,one);} } @Override publicvoidcleanup(TaskContext context)throws IOException {}}}

The code of is as follows:

import;import java.util.Iterator;import;import com.aliyun.odps.mapred.ReducerBase;import com.aliyun.odps.mapred.Reducer.TaskContext;publicclassFSReducerextendsReducerBase {private Record result=null;private R left;private R middle;private R right; @Override publicvoidsetup(TaskContext context)throws IOException {result=context.createOutputRecord();;} @Override publicvoidreduce(Record key, Iterator<Record> values, TaskContext context)throws IOException {left=newR(); middle=newR(); right=newR(); Record temp;if(values.hasNext()){; left.phase= temp.getBigint("phase"); left.val=temp.getBigint("value");}else left=null;if(values.hasNext()){; middle.phase=temp .getBigint("phase"); middle.val=temp.getBigint("value");}else middle=null;if(values.hasNext()){; right.phase=temp.getBigint("phase"); right.val=temp.getBigint("value ");}else right=null;if(left!=null&&middle!=null&&right!=null){if(Math.abs(middle.val)>Math.abs(right.val)&&Math.abs(middle.val) >Math.abs(left.val)){ result.set("time",key.get("time").toString()); result.set("phase",middle.phase); result.set( "maxv",middle.val); context.write(result);}}while(values.hasNext()){ left.val=middle.val; left.phase=middle.phase; middle.val=right.val ; middle.phase=right.phase;; right.phase=temp.getBigint("phase"); right.val=temp.getBigint("value");if(left!=null&&middle! =null&&right!=null){if(Math.abs(middle.val)>Math.abs(right.val)&&Math.abs(middle.val)>Math.abs(left.val)){ result.set("time",key.get("time").toString()); result.set ("phase",middle.phase); result.set("maxv",middle.val); context.write(result);}}}} @Override publicvoidcleanup(TaskContext context)throws IOException {}}

The code is as follows:

import com.aliyun.odps.OdpsException;import;import com.aliyun.odps.mapred.JobClient;import com.aliyun.odps.mapred.RunningJob;import com.aliyun.odps.mapred .conf.JobConf;import com.aliyun.odps.mapred.utils.InputUtils;import com.aliyun.odps.mapred.utils.OutputUtils;import com.aliyun.odps.mapred.utils.SchemaUtils;import java.util.* ;publicclassFSDriver{publicstaticvoidmain(String[] args)throws OdpsException {if(args.length !=2){ System.err.println("Usage: WordCount <in_table> <out_table>"); System.exit(2);} JobConf job =newJobConf(); job.setMapperClass(FSMapper.class); job.setReducerClass(FSReducer.class);//secondary sort job.setMapOutputKeySchema(SchemaUtils.fromString("time:string,phase:bigint")); job.setMapOutputValueSchema(SchemaUtils.fromString("phase:bigint,value:bigint")); job.setPartitionColumns(newString[]{"time"}); job.setOutputGroupingColumns(newString[]{"time"}); job.setOutputKeySortColumns(newString[]{"time","phase"}) ;//Partition input InputUtils.addTable(TableInfo.builder().tableName(args[0]).partSpec("deviceid=001/date=20171116").build(), job);//Partition output OutputUtils.addTable (TableInfo.builder().tableName(args[1]).partSpec("deviceid=001/date=20171116").build(), job);try{ JobClient.runJob(job);}catch(OdpsException e) {//TODO Auto-generated catch block e.printStackTrace();}}}tableName(args[0]).partSpec("deviceid=001/date=20171116").build(), job);//Partition output OutputUtils.addTable(TableInfo.builder().tableName(args[1]). partSpec("deviceid=001/date=20171116").build(), job);try{ JobClient.runJob(job);}catch(OdpsException e){//TODO Auto-generated catch block e.printStackTrace(); }}}tableName(args[0]).partSpec("deviceid=001/date=20171116").build(), job);//Partition output OutputUtils.addTable(TableInfo.builder().tableName(args[1]). partSpec("deviceid=001/date=20171116").build(), job);try{ JobClient.runJob(job);}catch(OdpsException e){//TODO Auto-generated catch block e.printStackTrace(); }}}

The code of is as follows:

publicclassR{publiclong phase;publiclong val;publicR(){ phase=0; val=0;}}

(3) Local test

Open, right click "Run as- Run Configurations"

In the ODPS Config tab, select the correct ODPS project.

In the Arguments tab, enter the operating parameters: ods_pd dw_nqf, and click "Run" to perform a local test run.

During the first run, Eclipse will download a small amount of test data from MaxCompute for testing. After the operation is completed, you can see the input data used for the test and the result data generated in the Warehouse.

(4) Package and upload resources

After the local test results are correct, the jar package can be exported. Execute "File Export" under Eclipse, select "JAR File" and export to local.

Under odpscmd, execute the command to add resources and upload the jar to MaxCompute.

add jar d:/jar/NQF.jar;

(5) Program execution on MaxCompute

Under odpscmd, execute the jar command to run the program. (Please adjust the file path yourself)

jar -resources NQF.jar -classpath d:\jar\NQF.jar FSDriver ods_pd dw_nqf; 

More quality courses:

7 days to play cloud server

ApsaraDB for Redis edition tutorial

Getting started with cloud storage object storage OSS

Alibaba Cloud CDN usage tutorial

Introduction to Load Balancing and Product Usage Guide

Alibaba Cloud University official website (Alibaba Cloud University-official website, innovative talent workshop under the cloud ecology )