This topic describes how to process data collected to MaxCompute and generate cleansed data in DataWorks.

Prerequisites

Data is prepared. For more information, see Prepare data.

Create tables

  1. On the Data Analytics tab, click Show to the left of the target workflow.
  2. Right-click MaxCompute and choose Create > Table.
    Create tables
  3. In the Create Table dialog box that appears, set Table Name and click Commit.
    Notice The table name must start with a letter and cannot contain Chinese or special characters. The table name can be up to 64 characters in length.
    In this example, you must create the following tables:
    • Create three tables to store the power consumption trend data, metrics data, and electricity-stealing flag data that are synchronized to MaxCompute and cleansed in DataWorks. Name the tables clean_trend_data, clean_indicators_data, and clean_steal_flag_data, respectively.
    • Create a table named data4ml to store the aggregated data.
  4. On the editing tab of each table, click DDL Statement. In the DDL Statement dialog box that appears, enter the following CREATE TABLE statements:
    -- Create a table for storing the cleansed power consumption trend data.
    CREATE TABLE `clean_trend_data` (
        `uid` bigint,
        `trend` bigint
    )
    PARTITIONED BY (dt string)
    LIFECYCLE 7;
    -- Create a table for storing the cleansed metrics data.
    CREATE TABLE `clean_indicators_data` (
        `uid` bigint,
        `xiansun` bigint,
        `warnindicator` bigint
    )
    COMMENT '*'
    PARTITIONED BY (ds string)
    LIFECYCLE 36000;
    -- Create a table for storing the cleansed electricity-stealing flag data.
    CREATE TABLE `clean_steal_flag_data` (
        `uid` bigint,
        `flag` bigint
    )
    COMMENT '*'
    PARTITIONED BY (ds string)
    LIFECYCLE 36000;
    -- Create a table for storing the aggregated data.
    CREATE TABLE `data4ml` (
        `uid` bigint,
        `trend` bigint,
        `xiansun` bigint,
        `warnindicator` bigint,
        `flag` bigint
    )
    COMMENT '*'
    PARTITIONED BY (ds string)
    LIFECYCLE 36000;
  5. After the input of table creation statement is completed, click Generate table structure and Confirm operation.
  6. After returning to the table creation page, enter the Chinese name of the table in the Basic properties.
  7. When you are finished, click Submit to development environment and Submit to production environment.

Design the workflow

For more information about how to create a workflow and configure the dependencies among workflow nodes, see Create a workflow.

On the Data Analytics tab, double-click the target workflow. On the workflow dashboard that appears, create two ODPS SQL nodes for data cleansing and data aggregation and configure the dependencies between workflow nodes.

Configure the ODPS SQL nodes

  • Configure the data cleansing node.
    1. In the directed acyclic graph (DAG) of the workflow, double-click the data cleansing node to go to the node configuration tab.
    2. Write the processing logic.
      Write the following SQL statements:
      INSERT OVERWRITE TABLE clean_trend_data PARTITION(dt=${bdp.system.bizdate})
      SELECT  uid
              ,trend
      FROM    trend_data
      WHERE   trend IS NOT NULL
      AND     uid ! = 0
      AND     dt = ${bdp.system.bizdate}
      ;  
      
      INSERT OVERWRITE TABLE clean_steal_flag_data PARTITION(ds=${bdp.system.bizdate})
      SELECT  uid
              ,flag
      FROM    steal_flag_data
      WHERE   uid ! = 0
      AND     ds = ${bdp.system.bizdate}
      ;
      
      INSERT OVERWRITE TABLE clean_indicators_data PARTITION(ds=${bdp.system.bizdate})
      SELECT  uid
              ,xiansun,warnindicator
      FROM    indicators_data
      WHERE   uid ! = 0
      AND     ds = ${bdp.system.bizdate}
      ;
    3. Click Save icon in the toolbar.
  • Configure the data aggregation node.
    1. In the DAG of the workflow, double-click the data aggregation node to go to the node configuration tab.
    2. Write the processing logic.
      Write the following SQL statements:
      INSERT OVERWRITE TABLE data4ml PARTITION (ds=${bdp.system.bizdate})
      SELECT  a.uid
              ,trend
              ,xiansun
              ,warnindicator
              ,flag
      FROM    
      (
          SELECT uid,trend FROM clean_trend_data where dt=${bdp.system.bizdate}
      )a  
      FULL OUTER JOIN 
      (
          SELECT uid,xiansun,warnindicator FROM  clean_indicators_data where ds=${bdp.system.bizdate}
      )b
      ON      a.uid = b.uid
      FULL OUTER JOIN 
      (
          SELECT uid,flag FROM  clean_steal_flag_data where ds=${bdp.system.bizdate}
      )c 
      ON      b.uid = c.uid
      ;
    3. Click Save icon in the toolbar.

Commit the workflow

  1. On the workflow dashboard, click Submit icon in the toolbar.
  2. In the Commit dialog box that appears, select the nodes to be committed, set Description, and then select Ignore I/O Inconsistency Alerts.
  3. Click Commit. The message Committed successfully appears.

Run the workflow

  1. On the workflow dashboard, click Run icon in the toolbar.
  2. On the left-side navigation submenu, click the Ad-Hoc Query icon.
  3. On the Ad-Hoc Query tab that appears, right-click Ad-Hoc Query and choose Create Node > ODPS SQL.
    Ad-hoc query
  4. In the Create Node dialog box that appears, enter the node name and click Commit. On the node configuration tab that appears, write and execute SQL statements to query the number of data records written to the clean_trend_data, clean_indicators_data, clean_steal_flag_data, and data4ml tables.
    Note Use the following SQL statements. In each statement, change the partition key value to the data timestamp of the ad-hoc query node. For example, if the node is run on August 9, 2019, the data timestamp is 20190808, which is one day before the node is run.
    -- Check whether data is written to MaxCompute.
    SELECT count(*) from clean_trend_data where dt=Data timestamp of the ad-hoc query node;
    SELECT count(*) from clean_indicators_data where ds=Data timestamp of the ad-hoc query node;
    SELECT count(*) from clean_steal_flag_data where ds=Data timestamp of the ad-hoc query node;
    SELECT count(*) from data4ml where ds=Data timestamp of the ad-hoc query node;

Deploy the workflow

After you commit the workflow, the nodes in the workflow are in the development environment. To run the configured nodes, you must deploy them to the production environment. Nodes in the development environment cannot be automatically scheduled.

Note Before you deploy the nodes to the production environment, test the node code to make sure that the code is correct.
  1. On the workflow dashboard, click Deploy icon in the toolbar.
  2. On the Create Deploy Task page that appears, select the nodes to be deployed and click Add to List.
  3. Click To-Be-Deployed Node List in the upper-right corner. In the Nodes to Deploy pane that appears, click Deploy All.
  4. Go to the Deploy Tasks page and view the deployed nodes.

Run the nodes in the production environment

  1. After you deploy the workflow, click Operation Center in the upper-right corner.
  2. On the Operation Center page that appears, choose Cycle Task Maintenance > Cycle Task in the left-side navigation pane. On the Cycle Task page that appears, click the start node.
  3. In the directed acyclic graph (DAG) that appears on the right, right-click the start node and choose Run > Current and Descendent Nodes Retroactively.
  4. In the Patch Data dialog box that appears, select nodes to generate retroactive data and set Data Timestamp.
  5. Click OK.
  6. On the Patch Data page that appears, click Refresh until all the selected nodes are run successfully.

What to do next

Now, you have learned how to create SQL nodes and process raw data. You can proceed with the next step to learn how to load processed data and build a model for identifying users who steal electricity or are involved in electricity leakage by using Machine Learning Platform for AI (PAI). For more information, see Build a data model.