All Products
Search
Document Center

E-MapReduce:Synchronize data

Last Updated:Jun 25, 2025

This tutorial demonstrates how to synchronize data using two examples: the user basic information table ods_user_info_d from a MySQL data source and the website access log data file user_log.txt from HttpFile. These data are synchronized to private OSS through Data Integration offline sync tasks and then accessed through external tables created by Spark SQL.

Objective

  1. Use Data Integration to synchronize the basic user information stored in a MySQL data source and the website access logs of users stored in an HttpFile data source to a private OSS bucket. The basic user information and website access logs of users are provided by DataWorks.Add data sources

    Source type

    Data to be synchronized

    Schema of the source table

    Destination type

    MySQL

    Table: ods_user_info_d

    Basic user information

    • uid: the username

    • gender: the gender

    • age_range: the age range

    • zodiac: the zodiac sign

    OSS

    HttpFile

    File: user_log.txt

    Website access logs of users

    A user access record occupies one row.

    $remote_addr - $remote_user [$time_local] "$request" $status $body_bytes_sent"$http_referer" "$http_user_agent" [unknown_content];

    OSS

  2. After data is synchronized, create external tables by using Spark SQL to access synchronized data in the private OSS bucket.

Procedure

Step 1: Design a workflow

Step 2: Configure data synchronization tasks

Step 3: Check the synchronized data

Step 1: Design a workflow

In this step, Data Integration nodes and E-MapReduce (EMR) Spark SQL nodes are used to obtain data. Data acquisition is part of the user profile analysis process. The ods_raw_log_d_2oss_spark node is used to synchronize the website access logs of users in an HttpFile data source to a private OSS bucket. Then, the ods_raw_log_d_spark node is used to create a simple external table to access the synchronized data in the private OSS bucket. The ods_user_info_d_2oss_spark node is used to synchronize the basic user information in a MySQL data source to a private OSS bucket. Then, the ods_user_info_d_spark node is used to create an external table to access the synchronized data in the private OSS bucket.

Log on to the DataWorks console. In the top navigation bar, select the desired region. In the left-side navigation pane, choose Data Development and O&M > Data Development. On the page that appears, select the desired workspace from the drop-down list and click Go to Data Development.

Design a workflow

  1. Create a workflow.

    Development components are used to develop data based on workflows. Before you create a node, you must create a workflow. For more information, see Create a workflow.

    In this example, a workflow named User profile analysis_Spark is used.

  2. Design the workflow.

    After you create the workflow, the workflow canvas is automatically displayed. In the upper part of the workflow canvas, click Create Node, drag nodes to the workflow canvas, and then draw lines to configure dependencies between the nodes for data synchronization based on the workflow design.

    image

  3. In this example, no lineage exists between the zero load node and synchronization nodes. In this case, the dependencies between nodes are configured by drawing lines in the workflow. For more information about how to configure dependencies, see Scheduling dependency configuration guide. The following table describes the node types, the node names, and the functionality of each node.

    Node category

    Node type

    Node name

    Node functionality

    General

    imageZero load node

    workshop_start_spark

    Used to manage the entire workflow for user profile analysis. For example, a zero load node determines the time when the workflow starts to run. If the workflow in the workspace is complex, a zero load node makes the path of data flows in the workflow clearer. This node is a dry-run node. You do not need to edit the code of the node.

    Data Integration

    imageBatch synchronization

    ods_raw_log_d_2oss_spark

    Used to synchronize the website access logs of users stored in an HttpFile data source to a private OSS bucket. The synchronized data can be subsequently obtained by using Spark SQL.

    Data Integration

    imageBatch synchronization

    ods_user_info_d_2oss_spark

    Used to synchronize the basic user information stored in a MySQL data source to a private OSS bucket. The synchronized data can be subsequently obtained by using Spark SQL.

    EMR

    imageEMR Spark SQL

    ods_raw_log_d_spark

    Used to create the ods_raw_log_d_spark external table to access the synchronized website access logs of users in the private OSS bucket.

    EMR

    imageEMR Spark SQL

    ods_user_info_d_spark

    Used to create the ods_user_info_d_spark external table to access the synchronized basic user information in the private OSS bucket.

Configure the scheduling logic

In this example, the workshop_start_spark zero load node is used to trigger the workflow to run at 00:30 every day. The following table describes the configurations of scheduling properties for the zero load node. You do not need to modify the scheduling configurations of other nodes. For information about the implementation logic, see Configure scheduling time for nodes in a workflow in different scenarios. For information about other scheduling configurations, see Overview.

Configuration item

Screenshot

Description

Scheduling time

image

The scheduling time of the zero load node is set to 00:30. The zero load node triggers the current workflow to run at 00:30 every day.

Scheduling dependencies

image

The workshop_start_spark zero load node has no ancestor nodes. In this case, you can configure the zero load node to depend on the root node of the workspace. The root node triggers the workshop_start_spark zero load node to run.

Note

All nodes in the DataWorks workflow depend on another node. All nodes in the data synchronization phase depend on the workshop_start_spark zero load node. Therefore, the workshop_start_spark node triggers the data synchronization workflow to run.

Step 2: Configure data synchronization tasks

After you configure the workflow, you can double-click the ods_user_info_d_2oss_spark and ods_raw_log_d_2oss_spark nodes. On the configuration tabs of the nodes, configure parameters to synchronize the basic user information and website access logs of users to the private OSS bucket. Then, write the Spark SQL code on the ods_raw_log_d_spark and ods_user_info_d_spark nodes to create external tables and access the synchronized data in the private OSS bucket.

Synchronize the basic user information and website access logs of users to the private OSS bucket

Use Data Integration to synchronize the basic user information and website access logs of users that are provided by DataWorks to a directory of the private OSS bucket.

Configure the node to synchronize the website access logs of users to the private OSS bucket

You can use a batch synchronization task to synchronize the website access logs of users stored in the HttpFile data source to the private OSS bucket.

Synchronize the website access logs of the HttpFile data source to the created OSS bucket.

  1. On the DataStudio page, double-click the ods_raw_log_d_2oss_spark node to go to the configuration tab of the node.

  2. Establish network connections between the resource group that you want to use and the data sources.

    After you complete the configuration of network connections and resources, click Next and complete the connectivity test as prompted.

    Parameter

    Description

    Source

    • Source: Set the value to HttpFile.

    • Data Source Name: Set the value to user_behavior_analysis_httpfile.

    Resource Group

    Select the serverless resource group that you purchased.

    Destination

    • Destination: Set the value to OSS.

    • Data Source Name: Set the value to test_g, which specifies the name of the private OSS data source that you added.

  3. Configure the data synchronization node.

    Parameter

    Description

    Source

    • File Path: Set the value to /user_log.txt.

    • Text type: Set the value to text.

    • Column Delimiter: Enter |.

    • Compression format: The compression format of the OSS object. Valid values: None, Gzip, Bzip2, and Zip. Select None.

    • Skip Header: Set the value to No.

    Destination

    • Text type: Set the value to text.

    • Object Name (Path Included): The path of the OSS object. Configure this parameter based on the folder that you created in the OSS bucket. In this example, ods_raw_log_d/log_${bizdate}/log_${bizdate}.txt is entered. ods_raw_log_d is the name of the folder that you created in the OSS bucket. $bizdate indicates the date of the previous day.

    • Column Delimiter: Enter |.

  4. Configure the scheduling properties.

    In the right-side navigation pane of the configuration tab of the node, click Properties. On the Properties tab, configure the scheduling properties and basic information for the node. The following table describes the configurations.

    Note

    DataWorks provides scheduling parameters that you can use to write daily data to different directories and objects of an OSS bucket in the scheduling scenario. The directories and objects are named based on data timestamps.

    Based on your business scenario, you can configure a variable in the ${Variable name} format in the directory when you configure the Object Name (Path Included) parameter for the private OSS bucket. Then, you can assign a scheduling parameter as a value to the variable on the Properties tab of the configuration tab of the node. This way, the directory and object name in the private OSS bucket can be dynamically generated in the scheduling scenario.

    Section

    Description

    Screenshot

    Scheduling Parameter

    In this section, click Add Parameter to add a scheduling parameter.

    • Set Parameter Name to bizdate.

    • Set Parameter Value to $[yyyymmdd-1].

    For more information, see Configure scheduling parameters.

    image

    Dependencies

    In this section, make sure that the output table is used as the output name of the current node.

    The output table is named in the worksspacename.nodename format.

    For more information, see Configure scheduling dependencies.

    image

  5. After the configuration is complete, click the 保存 icon in the toolbar.

Configure the node to synchronize the basic user information to the private OSS bucket

You can use a batch synchronization task to synchronize the basic user information stored in the MySQL data source to the private OSS bucket.

  1. On the DataStudio page, double-click the ods_user_info_d_2oss_spark node to go to the configuration tab of the node.

  2. Establish network connections between the resource group that you want to use and the data sources.

    After you complete the configuration of network connections and resources, click Next and complete the connectivity test as prompted.

    Parameter

    Description

    Source

    • Source: Set the value to MySQL.

    • Data Source Name: Set the value to user_behavior_analysis_mysql.

    Resource Group

    Select the serverless resource group that you purchased.

    Destination

    • Destination: Set the value to OSS.

    • Data Source Name: Set it to test_g, which specifies the name of the private OSS data source that you added.

  3. Configure the data synchronization node.

    Parameter

    Description

    Source

    • Table: Select the ods_user_info_d table in the data source.

    • Split key: The split key for the data to be read. We recommend that you use the primary key or an indexed column as the split key. Only fields of the INTEGER type are supported. In this example, uid is used.

    Destination

    • Text type: Set the value to text.

    • Object Name (Path Included): The path of the OSS object. Configure this parameter based on the folder that you created in the OSS bucket. In this example, ods_user_info_d/user_${bizdate}/user_${bizdate}.txt is entered. ods_user_info_d is the name of the folder that you created in the OSS bucket. $bizdate indicates the date of the previous day.

    • Column Delimiter: Enter |.

  4. Configure the scheduling properties.

    In the right-side navigation pane of the configuration tab of the node, click Properties. On the Properties tab, configure the scheduling properties and basic information for the node. The following table describes the configurations.

    Note

    DataWorks provides scheduling parameters that you can use to write daily data to different directories and objects of an OSS bucket in the scheduling scenario. The directories and objects are named based on data timestamps.

    Based on your business scenario, you can configure a variable in the ${Variable name} format in the directory when you configure the Object Name (Path Included) parameter for the private OSS bucket. Then, you can assign a scheduling parameter as a value to the variable on the Properties tab of the configuration tab of the node. This way, the directory and object name in the private OSS bucket can be dynamically generated in the scheduling scenario.

    Section

    Description

    Screenshot

    Scheduling Parameter

    In this section, click Add Parameter to add a scheduling parameter.

    • Set Parameter Name to bizdate.

    • Set Parameter Value to $[yyyymmdd-1].

    For more information, see Configure scheduling parameters.

    image

    Dependencies

    In this section, make sure that the output table is used as the output name of the current node.

    The output table is named in the worksspacename.nodename format.

    For more information, see Configure scheduling dependencies.

    image

  5. After the configuration is complete, click the 保存 icon in the toolbar.

Create external tables by using Spark SQL to load OSS data

After data is synchronized to the private OSS bucket by using the batch synchronization tasks, you can execute the CREATE statement of Spark SQL based on the generated OSS objects to create the ods_raw_log_d_spark and ods_user_info_d_spark external tables. Then, use LOCATION to access the synchronized basic user information and website access logs of users in the private OSS bucket for subsequent data processing.

Configure the ods_raw_log_d_spark node

Based on the ods_raw_log_d_spark external table created by using the EMR Spark SQL node, you can use LOCATION to access the website access logs of users that are synchronized to the private OSS bucket by the related batch synchronization task.

  1. Configure the code.

    -- Scenario: SQL statements of Spark SQL are used in the following sample code. In the code, the ods_raw_log_d_spark external table is created by using the EMR Spark SQL node and LOCATION is used to access the website access logs of users that are synchronized to the private OSS bucket by the related batch synchronization task. 
    -- Note:
    --      DataWorks provides scheduling parameters that you can use to write daily incremental data to the corresponding partition of the destination table in the scheduling scenario. 
    --      In the actual development scenario, you can define variables in the node code in the ${Variable name} format. Then, you can assign scheduling parameters as values to the variables on the Properties tab of the configuration tab of the node. This way, the values of scheduling parameters are dynamically replaced in the node code based on the configurations of the scheduling parameters in the scheduling scenario.  
    CREATE EXTERNAL TABLE IF NOT EXISTS ods_raw_log_d_spark
    (
      `col` STRING
    ) 
    PARTITIONED BY (
      dt STRING
    )
    LOCATION 'oss://dw-emr-demo/ods_raw_log_d/log_${bizdate}/';
    
    ALTER TABLE ods_raw_log_d_spark ADD IF NOT EXISTS PARTITION (dt = '${bizdate}') 
    LOCATION 'oss://dw-emr-demo/ods_raw_log_d/log_${bizdate}/'
    ;
    Note

    In the preceding code, a sample path is used for LOCATION. The path is based on the value of the Object Name (Path Included) parameter when you configure the related batch synchronization node. You must set the LOCATION parameter to the path of the created folder. dw-emr-demo is the domain name of the OSS bucket that you created when you prepared the environment.

  2. Configure the scheduling properties.

    Configure the scheduling parameters for the ods_raw_log_d_spark node to obtain the private OSS log object with a specific data timestamp, and write data to the corresponding partition with the same data timestamp in a Spark table.

    Section

    Description

    Screenshot

    Scheduling Parameter

    In this section, click Add Parameter to add a scheduling parameter.

    Set Parameter Name to bizdate.

    Set Parameter Value to $[yyyymmdd-1]. For more information, see Configure scheduling parameters.

    image

    Dependencies

    In this section, make sure that the output table is used as the output name of the current node.

    The output table is named in the worksspacename.nodename format.

    For more information, see Configure scheduling dependencies.

    image

    Note

    In the SQL statements in this topic, the ${bizdate} scheduling parameter is configured and the value T-1 is assigned to the scheduling parameter. In batch computing scenarios, bizdate indicates the date on which a business transaction is conducted, which is often referred to as the data timestamp. For example, if you collect statistical data on the turnover of the previous day on the current day, the previous day is the date on which the business transaction is conducted and represents the data timestamp.

  3. After the configuration is complete, click the image icon.

Configure the ods_user_info_d_spark node

Based on the ods_user_info_d_spark external table created by using the EMR Spark SQL node, you can use LOCATION to access the basic user information that is synchronized to the private OSS bucket by the related batch synchronization task.

  1. Configure the code.

    -- Scenario: SQL statements of Spark SQL are used in the following sample code. In the code, the ods_user_info_d_spark external table is created by using the EMR Spark SQL node and LOCATION is used to access the basic user information that is synchronized to the private OSS bucket by the related batch synchronization task. Then, the data is written to the dt partition. 
    -- Note:
    --      DataWorks provides scheduling parameters that you can use to write daily incremental data to the corresponding partition of the destination table in the scheduling scenario. 
    --      In the actual development scenario, you can define variables in the node code in the ${Variable name} format. Then, you can assign scheduling parameters as values to the variables on the Properties tab of the configuration tab of the node. This way, the values of scheduling parameters are dynamically replaced in the node code based on the configurations of the scheduling parameters in the scheduling scenario. 
    CREATE EXTERNAL TABLE IF NOT EXISTS ods_user_info_d_spark
    (
        `uid`        STRING COMMENT 'The user ID'
        ,`gender`    STRING COMMENT 'The gender'
        ,`age_range` STRING COMMENT 'The age range'
        ,`zodiac`    STRING COMMENT 'The zodiac sign'
    )
    PARTITIONED BY 
    (
        dt           STRING
    )
    ROW FORMAT DELIMITED 
    FIELDS
    TERMINATED
    BY'|'
    STORED AS TEXTFILE
    LOCATION 'oss://dw-emr-demo/ods_user_info_d/user_${bizdate}/'
    ;
    
    ALTER TABLE ods_user_info_d_spark ADD IF NOT EXISTS PARTITION (dt = '${bizdate}') 
    LOCATION'oss://dw-emr-demo/ods_user_info_d/user_${bizdate}/'
    ;
    Note

    In the preceding code, a sample path is used for LOCATION. The path is based on the value of the Object Name (Path Included) parameter when you configure the related batch synchronization node. You must set the LOCATION parameter to the path of the created folder. dw-emr-demo is the domain name of the OSS bucket that you created when you prepared the environment.

  2. Configure the scheduling properties.

    Configure the scheduling parameters for the ods_user_info_d_spark node to obtain the private OSS user information object with a specific data timestamp, and write data to the corresponding partition with the same data timestamp in a Spark table.

    Section

    Description

    Screenshot

    Scheduling Parameter

    In this section, click Add Parameter to add a scheduling parameter.

    Set Parameter Name to bizdate.

    Set Parameter Value to $[yyyymmdd-1]. For more information, see Configure scheduling parameters.

    image

    Dependencies

    In this section, make sure that the output table is used as the output name of the current node.

    The output table is named in the worksspacename.nodename format.

    For more information, see Configure scheduling dependencies.

    image

  3. After the configuration is complete, click the image icon.

Step 3: Check the synchronized data

After all nodes in this topic are successfully run, click Ad Hoc Query in the left-side navigation pane of the DataStudio page. In the Ad Hoc Query pane, create an ad hoc query task of the EMR Spark SQL type and write SQL statements to check whether the external tables are created based on the EMR Spark SQL nodes as expected.

-- You need to update the partition filter condition to the data timestamp of your current operation. For example, if the task is run on August 8, 2024, the data timestamp is 20240807, which is one day earlier than the task running date. 
SELECT  * FROM  ods_raw_log_d_spark  WHERE dt ='Data timestamp';-- Query data in the ods_raw_log_d_spark table.
SELECT  * FROM  ods_user_info_d_spark   WHERE dt ='Data timestamp';-- Query data in the ods_user_info_d_spark table.
Note

In the SQL statements that are used to check the synchronized data, replace the WHERE condition with "dt = ${bizdate}". In the ad hoc query task, click the image (Run with Parameters) icon, assign a value to the SQL placeholder ${bizdate}, and then run the ad hoc query task.

What to do next

Data synchronization is complete. You can proceed with the next tutorial. In the next tutorial, you will learn how to process the basic user information and website access logs of users in Spark. For more information, see Process data.