All Products
Search
Document Center

DataWorks:Process data

Last Updated:Aug 17, 2023

This topic describes how to process data that is collected to MaxCompute and obtain cleansed data in DataWorks.

Prerequisites

Data is prepared. For more information, see Prepare data.

Create tables

  1. Go to the DataStudio page.

    Log on to the DataWorks console. In the left-side navigation pane, choose Data Modeling and Development > DataStudio. On the page that appears, select the desired workspace from the drop-down list and click Go to DataStudio.

  2. On the Data Analytics tab, click the Show icon to the left of the required workflow to show the content of the workflow.
  3. Right-click MaxCompute and choose Create > Table.
  4. In the Create Table dialog box, set the Table Name parameter and click Create.
    Important The table name must be 1 to 64 characters in length. It must start with a letter and cannot contain special characters.
    In this example, you must create the following tables:
    • Create three tables to store the power consumption trend data, metrics data, and electricity-stealing flag data that are synchronized to MaxCompute and cleansed in DataWorks. Name the tables clean_trend_data, clean_indicators_data, and clean_steal_flag_data.
    • Create a table named data4ml to store the aggregated data.
  5. On the configuration tab of each table, click DDL Statement. Enter the following CREATE TABLE statements:
    -- Create a table for storing the cleansed power consumption trend data.
    CREATE TABLE clean_trend_data (
        uid bigint,
        trend bigint
    )
    PARTITIONED BY (dt string)
    LIFECYCLE 7;
    -- Create a table for storing the cleansed metrics data.
    CREATE TABLE clean_indicators_data (
        uid bigint,
        xiansun bigint,
        warnindicator bigint
    )
    COMMENT '*'
    PARTITIONED BY (ds string)
    LIFECYCLE 36000;
    -- Create a table for storing the cleansed electricity-stealing flag data.
    CREATE TABLE clean_steal_flag_data (
        uid bigint,
        flag bigint
    )
    COMMENT '*'
    PARTITIONED BY (ds string)
    LIFECYCLE 36000;
    -- Create a table for storing the aggregated data.
    CREATE TABLE data4ml (
        uid bigint,
        trend bigint,
        xiansun bigint,
        warnindicator bigint,
        flag bigint
    )
    COMMENT '*'
    PARTITIONED BY (ds string)
    LIFECYCLE 36000;
  6. After you enter the CREATE TABLE statements, click Generate Table Schema. Then, click OK.
  7. On the configuration tab of each table, enter the display name in the General section.
  8. After the configuration is completed, click Commit in Development Environment and Commit to Production Environment in sequence.

Design the workflow

For information about how to create a workflow and configure the dependencies among nodes in the workflow, see Create a workflow.

On the workflow configuration tab, create two ODPS SQL nodes for data cleansing and data aggregation and configure the dependencies between nodes.

Configure ODPS SQL nodes

  • Configure the data cleansing node.
    1. Double-click the data cleansing node to go to the node configuration tab.
    2. Write the processing logic.
      Write the following SQL statements:
      INSERT OVERWRITE TABLE clean_trend_data PARTITION(dt=${bdp.system.bizdate})
      SELECTuid
      ,trend
      FROMtrend_data
      WHEREtrend IS NOT NULL
      ANDuid != 0
      ANDdt = ${bdp.system.bizdate}
      ;
      
      INSERT OVERWRITE TABLE clean_steal_flag_data PARTITION(ds=${bdp.system.bizdate})
      SELECTuid
      ,flag
      FROMsteal_flag_data
      WHEREuid != 0
      ANDds = ${bdp.system.bizdate}
      ;
      
      INSERT OVERWRITE TABLE clean_indicators_data PARTITION(ds=${bdp.system.bizdate})
      SELECTuid
      ,xiansun,warnindicator
      FROMindicators_data
      WHEREuid != 0
      ANDds = ${bdp.system.bizdate}
      ;
    3. Click the Save icon in the top toolbar.
  • Configure the data aggregation node.
    1. Double-click the data aggregation node to go to the node configuration tab.
    2. Write the processing logic.
      Write the following SQL statements:
      INSERT OVERWRITE TABLE data4ml PARTITION (ds=${bdp.system.bizdate})
      SELECTa.uid
      ,trend
      ,xiansun
      ,warnindicator
      ,flag
      FROM
      (
      SELECT uid,trend FROM clean_trend_data where dt=${bdp.system.bizdate}
      )a
      FULL OUTER JOIN
      (
      SELECT uid,xiansun,warnindicator FROMclean_indicators_data where ds=${bdp.system.bizdate}
      )b
      ONa.uid = b.uid
      FULL OUTER JOIN
      (
      SELECT uid,flag FROMclean_steal_flag_data where ds=${bdp.system.bizdate}
      )c
      ONb.uid = c.uid
      ;
    3. Click the Save icon in the top toolbar.

Commit the workflow

  1. On the workflow configuration tab, click the Submit icon in the top toolbar.
  2. In the Commit dialog box, select the nodes to be committed, set the Change description parameter, and then select Ignore I/O Inconsistency Alerts.
  3. Click Commit. The Committed successfully message appears.

Run the workflow

  1. On the workflow configuration tab, click the Run icon in the top toolbar.
  2. On the left-side navigation submenu, click the Ad-Hoc Query icon.
  3. On the Ad-Hoc Query tab, right-click Ad-Hoc Query and choose Create Node > ODPS SQL.
    Ad hoc query
  4. Write and execute SQL statements to query the number of data records that are written to the clean_trend_data, clean_indicators_data, clean_steal_flag_data, and data4ml tables.
    Use the following SQL statements. In each statement, change the partition key value to the data timestamp. For example, if the node is run on August 9, 2019, the data timestamp is 20190808.
    -- Check whether the data is written to MaxCompute.
    SELECT count(*) from clean_trend_data where dt=Data timestamp;
    SELECT count(*) from clean_indicators_data where ds=Data timestamp;
    SELECT count(*) from clean_steal_flag_data where ds=Data timestamp;
    SELECT count(*) from data4ml where ds=Data timestamp;

Deploy the workflow

After you commit the workflow, the nodes in the workflow are in the development environment. You must deploy the configured nodes to the production environment because nodes in the development environment cannot be automatically scheduled.

Note Before you deploy the nodes to the production environment, test the node code to ensure that the code is correct.
  1. On the workflow configuration tab, click the Deploy icon in the top toolbar.
  2. On the Create Deploy Task page, select the nodes to be deployed and click Add to List.
  3. Click To-Be-Deployed Node List in the upper-right corner and click Deploy All.
  4. Go to the Deploy Tasks page and view the deployed nodes.

Run the nodes in the production environment

  1. After the nodes are deployed, click Operation Center in the upper-right corner.
  2. Choose Cycle Task Maintenance > Cycle Task. Select the required nodes.
  3. In the directed acyclic graph (DAG), right-click the start node and choose Run > Current and Descendent Nodes Retroactively.
  4. Select nodes to generate retroactive data and set the Data Timestamp parameter.
  5. Click OK.
  6. On the Patch Data page, click Refresh until all the SQL nodes are run.

What to do next

You have learned how to create SQL nodes and process raw data. You can now proceed with the next step to learn how to load processed data and build a model for identifying users who steal electricity or are involved in electricity leakage by using Machine Learning Platform for AI (PAI).