This topic describes how to use DataWorks to collect logs to an E-MapReduce compute engine instance.

Background information

You must create an Object Storage Service (OSS) connection and a Relational Database Service (RDS) connection respectively to synchronize source data used in this workshop. You also need to create a private OSS connection to store source data synchronized from OSS and RDS to your own OSS bucket.

Create an OSS connection

  1. Log on to the DataWorks console. In the left-side navigation pane, click Workspaces. On the Workspaces page, find the target workspace and click Data Integration in the Actions column.
    If you have logged on to a module of DataWorks, click the DataWorks icon in the upper-left corner and choose All Products > Data Integration to go to Data Integration page.
  2. In the left-side navigation pane, click Connection. The Data Source page appears.
  3. On the Data Source page, click Add Connection in the upper-right corner.
  4. In the Add Connection dialog box that appears, select OSS.
  5. In the Add OSS Connection dialog box, set the parameters. You can directly use the parameter settings shown in the following figure.
    Parameter Description
    Connection Name The name of the connection, for example, oss_workshop_log.
    Description The description of the connection.
    Endpoint The OSS endpoint, for example, http://oss-cn-shanghai-internal.aliyuncs.com.
    Bucket The name of the OSS bucket, for example, dataworks-workshop.
    AccessKey ID The AccessKey ID of the account used to access the OSS bucket, for example, LTAINEhd4MZ8pX64.
    AccessKey Secret The AccessKey secret of the account used to access the OSS bucket, for example, lXnzUngTSebt3SfLYxZxoSjGAK6IaF.
  6. Click Test Connection.
  7. After the connectivity test is passed, click Complete.

Create an RDS connection

  1. On the Data Source page, click Add Connection in the upper-right corner.
  2. In the Add Connection dialog box that appears, select MySQL.
  3. In the Add MySQL Connection dialog box, set the parameters. You can directly use the parameter settings shown in the following figure.
    Parameter Description
    Connect To The type of the connection. Set the value to ApsaraDB for RDS.
    Connection Name The name of the connection, for example, rds_workshop_log.
    Description The description of the connection.
    Region The region where the RDS instance resides, for example, China East 2 (Shanghai).
    RDS Instance ID

    The ID of the RDS instance, for example, rm-2ev0681lc7042g16u.

    RDS Instance Account ID

    The ID of the Alibaba Cloud account used to purchase the RDS instance, for example,5600815724958382.

    Database Name The name of the RDS database, for example,workshop.
    Username The username used to connect to the database, for example, workshop.
    Password The password used to connect to the database, for example,workshop#2017.
  4. Click Test Connection.
  5. After the connectivity test is passed, click Complete.

Create a private OSS connection

In this topic, data of the E-MapReduce compute engine instance is stored in your own OSS bucket.

  1. On the Data Source page, click Add Connection in the upper-right corner.
  2. In the Add Connection dialog box that appears, select OSS.
  3. In the Add OSS Connection dialog box, set parameters for the private OSS connection.
    Parameter Description
    Connection Name The name of the connection.
    Description The description of the connection.
    Endpoint The OSS endpoint, for example, http://oss-cn-shanghai-internal.aliyuncs.com.
    Bucket The name of the OSS bucket you created in the Prepare the environment topic, for example, dw-emr-demo.
    AccessKey ID The AccessKey ID of the account used to log on to DataWorks. You can log on to the Alibaba Cloud console, move the pointer over your profile picture in the upper-right corner, select AccessKey, and copy the AccessKey ID on the Security Management page that appears.
    AccessKey Secret The AccessKey secret of the account used to log on to DataWorks.

Create a workflow

Note Currently, the E-MapReduce features described in this tutorial are in public preview. Before creating a workflow, prepare the following information and submit a ticket to Alibaba Cloud staff. Alibaba Cloud staff can then allow you to access the E-MapReduce cluster from the DataWorks workspace based on the information you provided. Only DataWorks of the Professional Edition or a more advanced edition supports E-MapReduce features.
  • User ID: the ID of the account used to create the E-MapReduce cluster.
  • Workspace ID: the ID of the DataWorks workspace. In the DataWorks console, click the Workspace Manage icon icon in the upper-right corner to go to the Workspace Management page. You can view the workspace ID on this page.
  • Region: the region of the E-MapReduce cluster, for example, China (Shanghai).
  1. In the DataWorks console, click the DataWorks icon in the upper-left corner and choose All Products > DataStudio.
  2. In the Data Analytics section, right-click Business Flow and select Create Workflow.
  3. In the Create Workflow dialog box that appears, set Workflow Name and Description.
  4. Click Create.
  5. On the workflow editing tab that appears, click and hold Zero-Load Node on the left and drag it to the editing section on the right. In the Create Node dialog box that appears, set Node Name to workstart and click Commit.
    Create two batch synchronization nodes in the same way and name them Log2oss and User2oss respectively.
  6. By drawing lines between nodes, configure the workstart node as the parent node of the two batch synchronization nodes.

Configure the workstart node

  1. In the Data Analytics section, double-click the workstart node in the corresponding workflow. On the node editing tab that appears, click the Properties tab on the right.
  2. In the Properties dialog box, click Use Root Node and set the root node of the workspace as the ancestor node of the workstart node.
    In the latest version of DataWorks, each node must have its ancestor and descendant nodes. Therefore, you must set an ancestor node for the workstart node. In this example, the root node of the workspace is set as the ancestor node of the workstart node. The root node of the workspace is named in the Workspace name_root format.Properties dialog box
  3. After the configuration is completed, click Save icon in the upper-left corner.

Configure the batch synchronization nodes

  1. Configure the User2oss node. This node uses the RDS connection and the private OSS connection that you created to synchronize user information to your own OSS bucket.
    1. In the Data Analytics section, double-click the User2oss node. The node configuration tab appears.
    2. Select the source connection.
      Parameter Description
      Connection The connection for accessing the source data. Select MySQL and rds_workshop_log in sequence.
      Table The table from which data is synchronized. Set the value to ods_user_info_d.
      Filter The filter condition for the data to be synchronized. Currently, filtering based on the limit keyword is not supported. You can leave the parameter unspecified.
      Shard Key The shard key in the source data. We recommend that you use the primary key or an indexed column as the shard key. Only integer fields are supported. In this example, set the value to uid.
    3. Select the destination connection.
      Parameter Description
      Connection The private OSS connection that you created. In this example, select OSS and dw-emr-demo in sequence.
      Object Name Prefix The prefix of the OSS object for storing synchronized data. Set the value based on the folder that you created. For example, the value can be ods_user_info_d/user_${bizdate}/user_${bizdate}.txt.
      File Type The object type. Set the value to csv.
      Field Delimiter The column delimiter. Set the value to ,.
      Encoding The encoding format. Default value: UTF-8.
      Null String The string that represents null. You can leave the parameter unspecified.
      Time Format The time format. You can leave the parameter unspecified.
      Solution to Duplicate Prefixes The method of processing duplicate prefixes. Set the value to Replace the Original File.
    4. Configure the mapping between fields in the source and destination tables.
    5. Set parameters in the Channel step and click the Save icon.
    6. Click the Switch to Code Editor icon in the toolbar and add the following content to the existing code: "writeSingleObject": "true" and "suffix": ".txt".
      Note
      • You can add the writeSingleObject and suffix parameters only in the code editor.
      • The value of the object parameter must be the same as the folder created in the OSS bucket.
    7. After the configuration is completed, click the Save icon in the toolbar.
  2. Configure the Log2oss node. This node uses the OSS connection and the private OSS connection to synchronize log information to your own OSS bucket.
    1. In the Data Analytics section, double-click the Log2oss node. The node configuration tab appears.
    2. Select the source connection.
      Parameter Description
      Connection The connection for accessing the source data. Select OSS and oss_workshop_log in sequence.
      Object Name Prefix The prefix of the OSS object for storing the data to be synchronized. Set the value to user_log.txt.
      File Type The object type. Set the value to csv.
      Field Delimiter The column delimiter. Set the value to |.
      Encoding The encoding format. Default value: UTF-8.
      Null String The string that represents null. You can leave the parameter unspecified.
      Compression Format The compression format of the OSS object. Valid values: None, Gzip, Bzip2, and Zip. In this example, set the value to None.
      Include Header Specifies whether to include headers. Default value: No.
    3. Select the destination connection.
      Parameter Description
      Connection The private OSS connection that you created. In this example, select OSS and dw-emr-demo in sequence.
      Object Name Prefix The prefix of the OSS object for storing synchronized data. Set the value based on the folder that you created. For example, the value can be ods_raw_log_d/user_log_${bizdate}/user_log_${bizdate}.txt.
      File Type The object type. Set the value to csv.
      Field Delimiter The column delimiter. Set the value to ,.
      Encoding The encoding format. Default value: UTF-8.
      Null String The string that represents null. You can leave the parameter unspecified.
      Time Format The time format. You can leave the parameter unspecified.
      Solution to Duplicate Prefixes The method of processing duplicate prefixes. Set the value to Replace the Original File.
    4. Configure the mapping between fields in the source and destination tables.
      Notice Only one column in the source table contains data. Delete the empty columns from the Source section.
    5. Set parameters in the Channel step and click the Save icon.
    6. Click the Switch to Code Editor icon in the toolbar and add the following content to the existing code: "writeSingleObject": "true" and "suffix": ".txt".
      Note
      • You can add the writeSingleObject and suffix parameters only in the code editor.
      • The value of the object parameter must be the same as the folder created in the OSS bucket.
    7. After the configuration is completed, click the Save icon in the toolbar.

Create tables

  1. In the Data Analytics section, click the created workflow, right-click EMR, and choose Create > EMR Hive.
  2. In the Create Node dialog box that appears, set Node Name and click Commit.

    Create two E-MapReduce Hive nodes in total and name them ods_user_info_d and ods_raw_log_d respectively. Create one table in each node to store user information synchronized from RDS and log information synchronized from OSS respectively.

  3. On the editing tab of each E-MapReduce Hive node, select an E-MapReduce compute engine instance, enter the table creation statements, and then click the Save and Submit icons in sequence to run the statements.
    • Create the ods_user_info_d table.
      Double-click the ods_user_info_d node. On the node editing tab that appears, enter the table creation statements.
      CREATE EXTERNAL TABLE IF NOT  EXISTS ods_user_info_d
      (
          `uid` STRING COMMENT 'The ID of the client user',
          `gender` STRING COMMENT 'The gender of the user',
          `age_range` STRING COMMENT 'The age range of the user',
          `zodiac` STRING COMMENT 'The zodiac sign of the user'
      ) PARTITIONED BY (
        dt STRING
      )
      ROW FORMAT  delimited fields terminated by ','
      location 'oss://dw-emr-demo/ods_user_info_d/';
      
      alter table ods_user_info_d add partition (dt=${bizdate})
      location 'oss://dw-emr-demo/ods_user_info_d/user_${bizdate}/';
      Note In the preceding code, the path specified in the location parameter is used as an example. Set the parameter to the path of the created folder.
    • Create the ods_raw_log_d table.
      Double-click the ods_raw_log_d node. On the node editing tab that appears, enter the table creation statements.
      -- Create a table to store logs synchronized from OSS.
      CREATE EXTERNAL TABLE IF NOT  EXISTS  ods_raw_log_d
      (
        `col` STRING
      ) PARTITIONED BY (
        dt STRING
      );
      
      alter table ods_raw_log_d add partition (dt=${bizdate})
      location 'oss://dw-emr-demo/ods_raw_log_d/user_log_${bizdate}/';
      Note In the preceding code, the path specified in the location parameter is used as an example. Set the parameter to the path of the created folder.
  4. View the data synchronization results.
    After the table creation statements are run, enter a query statement on the editing tab of each E-MapReduce Hive node.
    Note In the query statement, change the partition key value to the data timestamp of the node. For example, if the node is run on November 7, 2019, the data timestamp is 20191106, which is one day before the node is run.
    • Query data in the ods_user_info_d table.
      SELECT * from ods_user_info_d where dt=Data timestamp of the node -- The data timestamp is one day before the node is run.
    • Query data in the ods_raw_log_d table.
      SELECT * from ods_raw_log_d where dt=Data timestamp of the node -- The data timestamp is one day before the node is run.