This topic describes how to use DataWorks to collect logs to an E-MapReduce (EMR) compute engine instance.

Prerequisites

The environment is prepared for performing the operations that are described in this topic. For more information, see Prepare the environment.

Background information

In this workshop, you must add an Object Storage Service (OSS) bucket and an ApsaraDB RDS instance from which you want to read data. You must also add an OSS bucket to which you want to write data.

Add an OSS bucket from which you want to read data

  1. Go to the Data Source page.
    1. Log on to the DataWorks console.
    2. In the left-side navigation pane, click Workspaces.
    3. On the Workspaces page, find the workspace in which you want to add a data source and click Data Integration in the Actions column.
      If you are using another service of DataWorks, click the Icon icon in the upper-left corner and choose All Products > Data Aggregation > Data Integration to go to the Data Integration page.
    4. In the left-side navigation pane, choose Data Source > Data Sources.
  2. On the Data Source page, click Add data source in the upper-right corner.
  3. In the Add data source dialog box, click OSS.
  4. In the Add OSS data source dialog box, set the parameters based on your business requirements. The following table describes how to set the parameters in this workshop.
    Parameter Description
    Data Source Name The name of the data source. Enter oss_workshop_log.
    Data source description The description of the data source.
    Endpoint The OSS endpoint. Enter http://oss-cn-shanghai-internal.aliyuncs.com.
    Bucket The name of the OSS bucket. Enter new-dataworks-workshop.
    AccessKey ID The AccessKey ID that is used to connect to OSS. Enter LTAI4FvGT3iU4xjKotpUMAjS.
    AccessKey Secret The AccessKey secret that is used to connect to OSS. Enter 9RSUoRmNxpRC9EhC4m9PjuG7Jzy7px.
  5. On the Data Integration tab, click Test connectivity in the Operation column of each resource group.
    A sync node uses only one resource group. To ensure that your sync nodes can be properly run, you must test the connectivity of all the resource groups for Data Integration on which your sync nodes will be run. If you need to test the connectivity of multiple resource groups for Data Integration at a time, select the resource groups and click Batch test connectivity. For more information, see Select a network connectivity solution.
  6. After the connection passes the connectivity test, click Complete.

Add an ApsaraDB RDS instance from which you want to read data

  1. On the Data Source page, click Add data source in the upper-right corner.
  2. In the Add data source dialog box, click MySQL.
  3. In the Add MySQL data source dialog box, set the parameters based on your business requirements. The following table describes how to set the parameters in this workshop.
    Parameter Description
    Data source type The type of the data source. Select Alibaba Cloud instance mode.
    Data Source Name The name of the data source. Enter rds_workshop_log.
    Data source description The description of the data source.
    Region The region where the ApsaraDB RDS instance resides. Select China East 2 (Shanghai).
    RDS instance ID

    The ID of the ApsaraDB RDS instance. Enter rm-2ev0681lc7042g16u.

    RDS instance account ID

    The ID of the Alibaba Cloud account that is used to purchase the ApsaraDB RDS instance. Enter 5600815724958382.

    Database name The name of the ApsaraDB RDS database. Enter workshop.
    User name The username that is used to connect to the ApsaraDB RDS database. Enter workshop.
    Password The password that is used to connect to the ApsaraDB RDS database. Enter workshop#2017.
  4. On the Data Integration tab, click Test connectivity in the Operation column of each resource group.
    A sync node uses only one resource group. To ensure that your sync nodes can be properly run, you must test the connectivity of all the resource groups for Data Integration on which your sync nodes will be run. If you need to test the connectivity of multiple resource groups for Data Integration at a time, select the resource groups and click Batch test connectivity. For more information, see Select a network connectivity solution.
  5. After the connection passes the connectivity test, click Complete.

Add an OSS bucket to which you want to write data

In this workshop, data of the EMR compute engine instance is stored in the OSS bucket that you created when you prepared the environment.

  1. On the Data Source page, click Add data source in the upper-right corner.
  2. In the Add data source dialog box, click OSS.
  3. In the Add OSS data source dialog box, set the parameters based on your business requirements.
    Parameter Description
    Data Source Name The name of the data source.
    Data source description The description of the data source.
    Endpoint The OSS endpoint. Enter http://oss-cn-shanghai-internal.aliyuncs.com.
    Bucket The name of the OSS bucket that you created when you prepared the environment. Enter dw-emr-demo.
    AccessKey ID The AccessKey ID of the account that is used to log on to DataWorks. You can go to the Security Management page to copy the AccessKey ID.
    AccessKey Secret The AccessKey secret of the account that is used to log on to DataWorks.
  4. On the Data Integration tab, click Test connectivity in the Operation column of each resource group.
    A sync node uses only one resource group. To ensure that your sync nodes can be properly run, you must test the connectivity of all the resource groups for Data Integration on which your sync nodes will be run. If you need to test the connectivity of multiple resource groups for Data Integration at a time, select the resource groups and click Batch test connectivity. For more information, see Select a network connectivity solution.
  5. After the connection passes the connectivity test, click Complete.

Create a workflow

  1. On the Data Source page, click the Icon icon in the upper-left corner and choose All Products > Data Development > DataStudio. The DataStudio page appears.
  2. On the Data Analytics tab, right-click Business Flow and select Create Workflow.
  3. In the Create Workflow dialog box, set the Workflow Name and Description parameters.
    Notice The workflow name can be a maximum of 128 characters in length and can contain letters, digits, underscores (_), and periods (.).
  4. Click Create.
  5. Double-click the new workflow to go to the workflow configuration tab. Drag Zero-Load Node in the General section to the canvas on the right. In the Create Node dialog box, set the Node Name parameter to workstart and click Commit.

    Drag Batch Synchronization in the Data Integration section to the canvas on the right to create two batch synchronization nodes named Log2oss and User2oss.

  6. Drag directed lines to configure the workstart node as the ancestor node of the two batch synchronization nodes.

Configure the workstart node

  1. On the Data Analytics tab, double-click the workstart node in the new workflow. On the node configuration tab that appears, click Properties in the right-side navigation pane.
  2. In the Dependencies section, click Use Root Node to configure the root node of the workspace as the ancestor node of the workstart node.
    In the latest version of DataWorks, each node must have its ancestor and descendant nodes. Therefore, you must configure an ancestor node for the workstart node. In this workshop, the root node of the workspace is configured as the ancestor node of the workstart node. The root node of the workspace is named in the Workspace name_root format. Use Root Node
  3. Click the Save icon in the upper-left corner.

Configure batch synchronization nodes

  1. Configure the User2oss node.
    1. On the Data Analytics tab, double-click the User2oss node in the new workflow. The node configuration tab appears.
    2. Configure a source.
      Source section
      Parameter Description
      Connection The type and name of the source. Select MySQL and rds_workshop_log in sequence.
      Table The table from which data is synchronized. Select the ods_user_info_d table.
      Filter The condition used to filter the data that you want to synchronize. Filtering based on the LIMIT keyword is not supported. You can leave this parameter unspecified.
      Shard Key The shard key for the data to be synchronized. We recommend that you use the primary key or an indexed column as the shard key. Only fields of the INTEGER type are supported. In this workshop, enter uid.
    3. Configure a destination.
      Target section
      Parameter Description
      Connection The type and name of the destination. Select OSS and dw_emr_demo in sequence.
      Object Name Prefix The prefix of the OSS object for storing the synchronized data. Set this parameter based on the folder that you created. In this workshop, enter ods_user_info_d/user_${bizdate}/user_${bizdate}.txt.
      File Type The object type. Select text.
      Field Delimiter The column delimiter. Enter |.
      Encoding The encoding format. Default value: UTF-8.
      Null String The string that represents null. You can leave this parameter unspecified.
      Time Format The time format. You can leave this parameter unspecified.
      Solution to Duplicate Prefixes The method that is used to process duplicate prefixes. Select Replace the Original File.
    4. Configure the mappings between fields in the source and destination.
      Mappings section
    5. Configure channel control policies and click the Save icon in the top toolbar.
      Channel section
    6. Click the Switch to Code Editor icon in the top toolbar. Add the following settings to the existing code: "writeSingleObject": "true" and "suffix": ".txt".
      Code editor
      Note
      • You can add the writeSingleObject and suffix parameters only in the code editor.
      • The value of the object parameter must be the same as the folder that is created in the OSS bucket.
    7. Click the Save icon in the top toolbar.
  2. Configure the Log2oss node.
    1. On the Data Analytics tab, double-click the Log2oss node in the new workflow. The node configuration tab appears.
    2. Configure a source.
      Parameter Description
      Connection The type and name of the source. Select OSS and oss_workshop_log in sequence.
      Object Name Prefix The prefix of the OSS object for storing the data to be synchronized. Enter user_log.txt.
      File Type The object type. Select text.
      Field Delimiter The column delimiter. Enter |.
      Encoding The encoding format. Default value: UTF-8.
      Null String The string that represents null. You can leave this parameter unspecified.
      Compression Format The compression format of the OSS object. Valid values: None, Gzip, Bzip2, and Zip. Select None.
      Include Header Specifies whether to include the table header. Default value: No.
    3. Configure a destination.
      Parameter Description
      Connection The type and name of the destination. Select OSS and dw_emr_demo in sequence.
      Object Name Prefix The prefix of the OSS object for storing the synchronized data. Set this parameter based on the folder that you created. In this workshop, enter ods_raw_log_d/user_log_${bizdate}/user_log_${bizdate}.txt.
      File Type The object type. Select text.
      Field Delimiter The column delimiter. Enter |.
      Encoding The encoding format. Default value: UTF-8.
      Null String The string that represents null. You can leave this parameter unspecified.
      Time Format The time format. You can leave this parameter unspecified.
      Solution to Duplicate Prefixes The method that is used to process duplicate prefixes. Select Replace the Original File.
    4. Configure the mappings between fields in the source and destination.
      Notice Only one column in the source table contains data. Remove the empty columns from the Source section.
    5. Configure channel control policies and click the Save icon in the top toolbar.
    6. Click the Switch to Code Editor icon in the top toolbar. Add the following settings to the existing code: "writeSingleObject": "true" and "suffix": ".txt".
      Note
      • You can add the writeSingleObject and suffix parameters only in the code editor.
      • The value of the object parameter must be the same as the folder that is created in the OSS bucket.
    7. Click the Save icon in the top toolbar.

Create tables to which you want to write data

  1. On the Data Analytics tab, click the new workflow, right-click EMR, and then choose Create > EMR Hive.
  2. In the Create Node dialog box, set the Node Name parameter and click Commit.

    In this workshop, you must create two EMR Hive nodes named ods_user_info_d and ods_raw_log_d. The ods_user_info_d node is used to create a table to store the user information that is synchronized from ApsaraDB RDS and the ods_raw_log_d node is used to create a table to store the logs that are synchronized from OSS.

  3. On the configuration tab of each EMR Hive node, select an EMR compute engine instance, enter CREATE TABLE statements, and then click the Save and Run icons in sequence to execute the statements.
    • Create the ods_user_info_d table.
      Double-click the ods_user_info_d node. On the node configuration tab that appears, enter the following CREATE TABLE statement.
      CREATE EXTERNAL TABLE IF NOT EXISTS ods_user_info_d
      (
          `uid` STRING COMMENT 'User ID',
          `gender` STRING COMMENT 'Gender',
          `age_range` STRING COMMENT 'Age range',
          `zodiac` STRING COMMENT 'Zodiac sign'
      ) PARTITIONED BY (
        dt STRING
      )
      ROW FORMAT  delimited fields terminated by '|'
      LOCATION 'oss://dw-emr-demo/ods_user_info_d/';
      
      ALTER TABLE ods_user_info_d ADD IF NOT EXISTS PARTITION (dt=${bizdate})
      LOCATION 'oss://dw-emr-demo/ods_user_info_d/user_${bizdate}/';
      Note In the preceding code, a sample path is specified in the location parameter. You must set the parameter to the path of the created folder.
    • Create a table named ods_raw_log_d.
      Double-click the ods_raw_log_d node. On the node configuration tab that appears, enter the table creation statements.
      -- Create a table to store the logs that are synchronized from OSS.
      CREATE EXTERNAL TABLE IF NOT EXISTS ods_raw_log_d
      (
        `col` STRING
      ) PARTITIONED BY (
        dt STRING
      );
      ALTER TABLE ods_raw_log_d ADD IF NOT EXISTS PARTITION (dt=${bizdate})
      LOCATION 'oss://dw-emr-demo/ods_raw_log_d/user_log_${bizdate}/';
      Note In the preceding code, a sample path is specified in the location parameter. You must set the parameter to the path of the created folder.
  4. View the data synchronization results.
    After the table creation statements are executed, enter a query statement on the configuration tab of each EMR Hive node.
    Note In the query statement, change the partition key value to the data timestamp of the node. For example, if the node is run on November 7, 2019, the data timestamp is 20191106, which is one day before the node is run.
    • Query data in the ods_user_info_d table.
      SELECT * from ods_user_info_d where dt=Data timestamp of the node; -- The data timestamp is one day before the node is run. 
    • Query data in the ods_raw_log_d table.
      SELECT * from ods_raw_log_d where dt=Data timestamp of the node; -- The data timestamp is one day before the node is run. 

What to do next

You understand how to collect and synchronize data. You can now proceed with the next tutorial. In the next tutorial, you will learn how to compute and analyze the collected data. For more information, see Process data.