This tutorial demonstrates how to synchronize data using two examples: the user basic information table ods_user_info_d from a MySQL data source and the website access log data file user_log.txt from HttpFile. These data are synchronized to private OSS through Data Integration offline sync tasks and then accessed through external tables created by Spark SQL.
Objective
Use Data Integration to synchronize the basic user information stored in a MySQL data source and the website access logs of users stored in an HttpFile data source to a private OSS bucket. The basic user information and website access logs of users are provided by DataWorks.Add data sources
Source type
Data to be synchronized
Schema of the source table
Destination type
MySQL
Table: ods_user_info_d
Basic user information
uid: the username
gender: the gender
age_range: the age range
zodiac: the zodiac sign
OSS
HttpFile
File: user_log.txt
Website access logs of users
A user access record occupies one row.
$remote_addr - $remote_user [$time_local] "$request" $status $body_bytes_sent"$http_referer" "$http_user_agent" [unknown_content];OSS
After data is synchronized, create external tables by using Spark SQL to access synchronized data in the private OSS bucket.
Procedure
Step 1: Design a workflow
In this step, Data Integration nodes and E-MapReduce (EMR) Spark SQL nodes are used to obtain data. Data acquisition is part of the user profile analysis process. The ods_raw_log_d_2oss_spark node is used to synchronize the website access logs of users in an HttpFile data source to a private OSS bucket. Then, the ods_raw_log_d_spark node is used to create a simple external table to access the synchronized data in the private OSS bucket. The ods_user_info_d_2oss_spark node is used to synchronize the basic user information in a MySQL data source to a private OSS bucket. Then, the ods_user_info_d_spark node is used to create an external table to access the synchronized data in the private OSS bucket.
Log on to the DataWorks console. In the top navigation bar, select the desired region. In the left-side navigation pane, choose . On the page that appears, select the desired workspace from the drop-down list and click Go to Data Development.
Design a workflow
Create a workflow.
Development components are used to develop data based on workflows. Before you create a node, you must create a workflow. For more information, see Create a workflow.
In this example, a workflow named
User profile analysis_Sparkis used.Design the workflow.
After you create the workflow, the workflow canvas is automatically displayed. In the upper part of the workflow canvas, click Create Node, drag nodes to the workflow canvas, and then draw lines to configure dependencies between the nodes for data synchronization based on the workflow design.

In this example, no lineage exists between the zero load node and synchronization nodes. In this case, the dependencies between nodes are configured by drawing lines in the workflow. For more information about how to configure dependencies, see Scheduling dependency configuration guide. The following table describes the node types, the node names, and the functionality of each node.
Node category
Node type
Node name
Node functionality
General
Zero load nodeworkshop_start_sparkUsed to manage the entire workflow for user profile analysis. For example, a zero load node determines the time when the workflow starts to run. If the workflow in the workspace is complex, a zero load node makes the path of data flows in the workflow clearer. This node is a dry-run node. You do not need to edit the code of the node.
Data Integration
Batch synchronizationods_raw_log_d_2oss_sparkUsed to synchronize the website access logs of users stored in an HttpFile data source to a private OSS bucket. The synchronized data can be subsequently obtained by using Spark SQL.
Data Integration
Batch synchronizationods_user_info_d_2oss_sparkUsed to synchronize the basic user information stored in a MySQL data source to a private OSS bucket. The synchronized data can be subsequently obtained by using Spark SQL.
EMR
EMR Spark SQLods_raw_log_d_sparkUsed to create the
ods_raw_log_d_sparkexternal table to access the synchronized website access logs of users in the private OSS bucket.EMR
EMR Spark SQLods_user_info_d_sparkUsed to create the
ods_user_info_d_sparkexternal table to access the synchronized basic user information in the private OSS bucket.
Configure the scheduling logic
In this example, the workshop_start_spark zero load node is used to trigger the workflow to run at 00:30 every day. The following table describes the configurations of scheduling properties for the zero load node. You do not need to modify the scheduling configurations of other nodes. For information about the implementation logic, see Configure scheduling time for nodes in a workflow in different scenarios. For information about other scheduling configurations, see Overview.
Configuration item | Screenshot | Description |
Scheduling time |
| The scheduling time of the zero load node is set to 00:30. The zero load node triggers the current workflow to run at 00:30 every day. |
Scheduling dependencies |
| The |
All nodes in the DataWorks workflow depend on another node. All nodes in the data synchronization phase depend on the workshop_start_spark zero load node. Therefore, the workshop_start_spark node triggers the data synchronization workflow to run.
Step 2: Configure data synchronization tasks
After you configure the workflow, you can double-click the ods_user_info_d_2oss_spark and ods_raw_log_d_2oss_spark nodes. On the configuration tabs of the nodes, configure parameters to synchronize the basic user information and website access logs of users to the private OSS bucket. Then, write the Spark SQL code on the ods_raw_log_d_spark and ods_user_info_d_spark nodes to create external tables and access the synchronized data in the private OSS bucket.
Synchronize the basic user information and website access logs of users to the private OSS bucket
Use Data Integration to synchronize the basic user information and website access logs of users that are provided by DataWorks to a directory of the private OSS bucket.
Configure the node to synchronize the website access logs of users to the private OSS bucket
You can use a batch synchronization task to synchronize the website access logs of users stored in the HttpFile data source to the private OSS bucket.
Synchronize the website access logs of the HttpFile data source to the created OSS bucket.
On the DataStudio page, double-click the ods_raw_log_d_2oss_spark node to go to the configuration tab of the node.
Establish network connections between the resource group that you want to use and the data sources.
After you complete the configuration of network connections and resources, click Next and complete the connectivity test as prompted.
Parameter
Description
Source
Source: Set the value to HttpFile.
Data Source Name: Set the value to user_behavior_analysis_httpfile.
Resource Group
Select the serverless resource group that you purchased.
Destination
Destination: Set the value to OSS.
Data Source Name: Set the value to test_g, which specifies the name of the private OSS data source that you added.
Configure the data synchronization node.
Parameter
Description
Source
File Path: Set the value to /user_log.txt.
Text type: Set the value to text.
Column Delimiter: Enter |.
Compression format: The compression format of the OSS object. Valid values: None, Gzip, Bzip2, and Zip. Select None.
Skip Header: Set the value to No.
Destination
Text type: Set the value to text.
Object Name (Path Included): The path of the OSS object. Configure this parameter based on the folder that you created in the OSS bucket. In this example, ods_raw_log_d/log_${bizdate}/log_${bizdate}.txt is entered. ods_raw_log_d is the name of the folder that you created in the OSS bucket. $bizdate indicates the date of the previous day.
Column Delimiter: Enter |.
Configure the scheduling properties.
In the right-side navigation pane of the configuration tab of the node, click Properties. On the Properties tab, configure the scheduling properties and basic information for the node. The following table describes the configurations.
NoteDataWorks provides scheduling parameters that you can use to write daily data to different directories and objects of an OSS bucket in the scheduling scenario. The directories and objects are named based on data timestamps.
Based on your business scenario, you can configure a variable in the
${Variable name}format in the directory when you configure the Object Name (Path Included) parameter for the private OSS bucket. Then, you can assign a scheduling parameter as a value to the variable on the Properties tab of the configuration tab of the node. This way, the directory and object name in the private OSS bucket can be dynamically generated in the scheduling scenario.Section
Description
Screenshot
Scheduling Parameter
In this section, click Add Parameter to add a scheduling parameter.
Set Parameter Name to
bizdate.Set Parameter Value to
$[yyyymmdd-1].
For more information, see Configure scheduling parameters.

Dependencies
In this section, make sure that the output table is used as the output name of the current node.
The output table is named in the
worksspacename.nodenameformat.For more information, see Configure scheduling dependencies.

After the configuration is complete, click the
icon in the toolbar.
Configure the node to synchronize the basic user information to the private OSS bucket
You can use a batch synchronization task to synchronize the basic user information stored in the MySQL data source to the private OSS bucket.
On the DataStudio page, double-click the ods_user_info_d_2oss_spark node to go to the configuration tab of the node.
Establish network connections between the resource group that you want to use and the data sources.
After you complete the configuration of network connections and resources, click Next and complete the connectivity test as prompted.
Parameter
Description
Source
Source: Set the value to MySQL.
Data Source Name: Set the value to user_behavior_analysis_mysql.
Resource Group
Select the serverless resource group that you purchased.
Destination
Destination: Set the value to OSS.
Data Source Name: Set it to test_g, which specifies the name of the private OSS data source that you added.
Configure the data synchronization node.
Parameter
Description
Source
Table: Select the ods_user_info_d table in the data source.
Split key: The split key for the data to be read. We recommend that you use the primary key or an indexed column as the split key. Only fields of the INTEGER type are supported. In this example, uid is used.
Destination
Text type: Set the value to text.
Object Name (Path Included): The path of the OSS object. Configure this parameter based on the folder that you created in the OSS bucket. In this example, ods_user_info_d/user_${bizdate}/user_${bizdate}.txt is entered. ods_user_info_d is the name of the folder that you created in the OSS bucket. $bizdate indicates the date of the previous day.
Column Delimiter: Enter |.
Configure the scheduling properties.
In the right-side navigation pane of the configuration tab of the node, click Properties. On the Properties tab, configure the scheduling properties and basic information for the node. The following table describes the configurations.
NoteDataWorks provides scheduling parameters that you can use to write daily data to different directories and objects of an OSS bucket in the scheduling scenario. The directories and objects are named based on data timestamps.
Based on your business scenario, you can configure a variable in the
${Variable name}format in the directory when you configure the Object Name (Path Included) parameter for the private OSS bucket. Then, you can assign a scheduling parameter as a value to the variable on the Properties tab of the configuration tab of the node. This way, the directory and object name in the private OSS bucket can be dynamically generated in the scheduling scenario.Section
Description
Screenshot
Scheduling Parameter
In this section, click Add Parameter to add a scheduling parameter.
Set Parameter Name to
bizdate.Set Parameter Value to
$[yyyymmdd-1].
For more information, see Configure scheduling parameters.

Dependencies
In this section, make sure that the output table is used as the output name of the current node.
The output table is named in the
worksspacename.nodenameformat.For more information, see Configure scheduling dependencies.

After the configuration is complete, click the
icon in the toolbar.
Create external tables by using Spark SQL to load OSS data
After data is synchronized to the private OSS bucket by using the batch synchronization tasks, you can execute the CREATE statement of Spark SQL based on the generated OSS objects to create the ods_raw_log_d_spark and ods_user_info_d_spark external tables. Then, use LOCATION to access the synchronized basic user information and website access logs of users in the private OSS bucket for subsequent data processing.
Configure the ods_raw_log_d_spark node
Based on the ods_raw_log_d_spark external table created by using the EMR Spark SQL node, you can use LOCATION to access the website access logs of users that are synchronized to the private OSS bucket by the related batch synchronization task.
Configure the code.
-- Scenario: SQL statements of Spark SQL are used in the following sample code. In the code, the ods_raw_log_d_spark external table is created by using the EMR Spark SQL node and LOCATION is used to access the website access logs of users that are synchronized to the private OSS bucket by the related batch synchronization task. -- Note: -- DataWorks provides scheduling parameters that you can use to write daily incremental data to the corresponding partition of the destination table in the scheduling scenario. -- In the actual development scenario, you can define variables in the node code in the ${Variable name} format. Then, you can assign scheduling parameters as values to the variables on the Properties tab of the configuration tab of the node. This way, the values of scheduling parameters are dynamically replaced in the node code based on the configurations of the scheduling parameters in the scheduling scenario. CREATE EXTERNAL TABLE IF NOT EXISTS ods_raw_log_d_spark ( `col` STRING ) PARTITIONED BY ( dt STRING ) LOCATION 'oss://dw-emr-demo/ods_raw_log_d/log_${bizdate}/'; ALTER TABLE ods_raw_log_d_spark ADD IF NOT EXISTS PARTITION (dt = '${bizdate}') LOCATION 'oss://dw-emr-demo/ods_raw_log_d/log_${bizdate}/' ;NoteIn the preceding code, a sample path is used for LOCATION. The path is based on the value of the Object Name (Path Included) parameter when you configure the related batch synchronization node. You must set the LOCATION parameter to the path of the created folder. dw-emr-demo is the domain name of the OSS bucket that you created when you prepared the environment.
Configure the scheduling properties.
Configure the scheduling parameters for the
ods_raw_log_d_sparknode to obtain the private OSS log object with a specific data timestamp, and write data to the corresponding partition with the same data timestamp in a Spark table.Section
Description
Screenshot
Scheduling Parameter
In this section, click Add Parameter to add a scheduling parameter.
Set Parameter Name to
bizdate.Set Parameter Value to
$[yyyymmdd-1]. For more information, see Configure scheduling parameters.
Dependencies
In this section, make sure that the output table is used as the output name of the current node.
The output table is named in the
worksspacename.nodenameformat.For more information, see Configure scheduling dependencies.
NoteIn the SQL statements in this topic, the
${bizdate}scheduling parameter is configured and the valueT-1is assigned to the scheduling parameter. In batch computing scenarios, bizdate indicates the date on which a business transaction is conducted, which is often referred to as the data timestamp. For example, if you collect statistical data on the turnover of the previous day on the current day, the previous day is the date on which the business transaction is conducted and represents the data timestamp.After the configuration is complete, click the
icon.
Configure the ods_user_info_d_spark node
Based on the ods_user_info_d_spark external table created by using the EMR Spark SQL node, you can use LOCATION to access the basic user information that is synchronized to the private OSS bucket by the related batch synchronization task.
Configure the code.
-- Scenario: SQL statements of Spark SQL are used in the following sample code. In the code, the ods_user_info_d_spark external table is created by using the EMR Spark SQL node and LOCATION is used to access the basic user information that is synchronized to the private OSS bucket by the related batch synchronization task. Then, the data is written to the dt partition. -- Note: -- DataWorks provides scheduling parameters that you can use to write daily incremental data to the corresponding partition of the destination table in the scheduling scenario. -- In the actual development scenario, you can define variables in the node code in the ${Variable name} format. Then, you can assign scheduling parameters as values to the variables on the Properties tab of the configuration tab of the node. This way, the values of scheduling parameters are dynamically replaced in the node code based on the configurations of the scheduling parameters in the scheduling scenario. CREATE EXTERNAL TABLE IF NOT EXISTS ods_user_info_d_spark ( `uid` STRING COMMENT 'The user ID' ,`gender` STRING COMMENT 'The gender' ,`age_range` STRING COMMENT 'The age range' ,`zodiac` STRING COMMENT 'The zodiac sign' ) PARTITIONED BY ( dt STRING ) ROW FORMAT DELIMITED FIELDS TERMINATED BY'|' STORED AS TEXTFILE LOCATION 'oss://dw-emr-demo/ods_user_info_d/user_${bizdate}/' ; ALTER TABLE ods_user_info_d_spark ADD IF NOT EXISTS PARTITION (dt = '${bizdate}') LOCATION'oss://dw-emr-demo/ods_user_info_d/user_${bizdate}/' ;NoteIn the preceding code, a sample path is used for LOCATION. The path is based on the value of the Object Name (Path Included) parameter when you configure the related batch synchronization node. You must set the LOCATION parameter to the path of the created folder. dw-emr-demo is the domain name of the OSS bucket that you created when you prepared the environment.
Configure the scheduling properties.
Configure the scheduling parameters for the
ods_user_info_d_sparknode to obtain the private OSS user information object with a specific data timestamp, and write data to the corresponding partition with the same data timestamp in a Spark table.Section
Description
Screenshot
Scheduling Parameter
In this section, click Add Parameter to add a scheduling parameter.
Set Parameter Name to
bizdate.Set Parameter Value to
$[yyyymmdd-1]. For more information, see Configure scheduling parameters.
Dependencies
In this section, make sure that the output table is used as the output name of the current node.
The output table is named in the
worksspacename.nodenameformat.For more information, see Configure scheduling dependencies.

After the configuration is complete, click the
icon.
Step 3: Check the synchronized data
After all nodes in this topic are successfully run, click Ad Hoc Query in the left-side navigation pane of the DataStudio page. In the Ad Hoc Query pane, create an ad hoc query task of the EMR Spark SQL type and write SQL statements to check whether the external tables are created based on the EMR Spark SQL nodes as expected.
-- You need to update the partition filter condition to the data timestamp of your current operation. For example, if the task is run on August 8, 2024, the data timestamp is 20240807, which is one day earlier than the task running date.
SELECT * FROM ods_raw_log_d_spark WHERE dt ='Data timestamp';-- Query data in the ods_raw_log_d_spark table.
SELECT * FROM ods_user_info_d_spark WHERE dt ='Data timestamp';-- Query data in the ods_user_info_d_spark table.In the SQL statements that are used to check the synchronized data, replace the WHERE condition with "dt = ${bizdate}". In the ad hoc query task, click the
(Run with Parameters) icon, assign a value to the SQL placeholder ${bizdate}, and then run the ad hoc query task.
What to do next
Data synchronization is complete. You can proceed with the next tutorial. In the next tutorial, you will learn how to process the basic user information and website access logs of users in Spark. For more information, see Process data.

