In this topic, batch synchronization tasks in Data Integration are used to respectively synchronize the basic user information stored in the MySQL table ods_user_info_d and the website access logs of users stored in the Object Storage Service (OSS) object user_log.txt to the StarRocks tables ods_user_info_d_starrocks and ods_raw_log_d_starrocks. This topic describes how to use the Data Integration service of DataWorks to synchronize data between heterogeneous data sources to complete synchronization for data warehouses.
Prerequisites
The required environments are prepared for data synchronization. For more information, see Prepare environments.
Objective
Synchronize the data in the public data sources that are provided in this example to StarRocks to complete data synchronization in the workflow design.
Source type | Data to be synchronized | Schema of the source table | Destination type | Destination table | Schema of the destination table |
MySQL | Table: ods_user_info_d Basic user information |
| StarRocks |
|
|
HttpFile | Object: user_log.txt Website access logs of users | A user access record occupies one row. | StarRocks |
|
|
Go to the DataStudio page
Log on to the DataWorks console. In the top navigation bar, select the desired region. In the left-side navigation pane, choose . On the page that appears, select the desired workspace from the drop-down list and click Go to Data Development.
Step 1: Design a workflow
Design a workflow
Create a workflow.
Development components are used to develop data based on workflows. Before you create a node, you must create a workflow. For more information, see Create a workflow.
In this example, a workflow whose name is User profile analysis_StarRocks is used.
Design the workflow.
After the workflow is created, the workflow canvas is automatically displayed. In the upper part of the workflow canvas, click Create Node, drag nodes to the workflow canvas, and then draw lines to configure dependencies between the nodes for data synchronization based on the workflow design.

In this example, no lineage exists between the zero load node and synchronization nodes. In this case, the dependencies between nodes are configured by drawing lines in the workflow. For more information about how to configure dependencies, see Scheduling dependency configuration guide. The following table describes the node types, naming convention, and the functionality of each node.
Node classification
Node type
Naming convention
(Named after the final output table)
Node functionality
General
Zero load node

workshop_start_starrocksUsed to manage the entire workflow that is used for user profile analysis. For example, a zero load node determines the time when the workflow starts to run. If the workflow is complex in the workspace, a zero load node makes the path of data flows in the workflow clearer. This node is a dry-run node. You do not need to edit the code of the node.
Database
StarRocks

ddl_ods_user_info_d_starrocksUsed to create the StarRocks table
ods_user_info_d_starrocksfor receiving the basic user information in a MySQL table. This database node is created before synchronization.Database
StarRocks

ddl_ods_raw_log_d_starrocksUsed to create the StarRocks table
ods_raw_log_d_starrocksfor receiving the website access logs of users in an OSS object. This database node is created before synchronization.Data Integration
Offline synchronization

ods_user_info_d_starrocksUsed to synchronize the basic user information stored in MySQL to the StarRocks table
ods_user_info_d_starrocks.Data Integration
Offline synchronization

ods_raw_log_d_starrocksUsed to synchronize the website access logs of users stored in OSS to the StarRocks table
ods_raw_log_d_starrocks.
Configure the scheduling logic
In this example, the zero load node workshop_start_starrocks is used to trigger the workflow to run at 00:30 every day. The following table describes the configurations of scheduling properties for the zero load node. You do not need to modify the scheduling configurations of other nodes. For information about the implementation logic, see Configure scheduling time for nodes in a workflow in different scenarios. For information about other scheduling configurations, see Overview.
Configuration item | Screenshot | Description |
Scheduling time |
| The scheduling time of the zero load node is set to 00:30. The zero load node triggers the current workflow to run at 00:30 every day. |
Scheduling dependencies |
| The zero load node |
All nodes in the DataWorks workflow need to depend on another node. All nodes in the data synchronization phase depend on the zero load node workshop_start_starrocks. Therefore, the running of the data synchronization workflow is triggered by the workshop_start_starrocks node.
Step 2: Configure data synchronization tasks
Create the destination StarRocks tables
Before you synchronize data, you must create the destination StarRocks tables to store the raw data that you want to synchronize.
In this example, the destination StarRocks tables are generated based on the source table schemas. For more information, see the Objective section in this topic. On the configuration tab of the workflow, double-click the database nodes ddl_ods_user_info_d_starrocks and ddl_ods_raw_log_d_starrocks. On the configuration tab of each node, enter the statement used to create a StarRocks table and click the
icon.
ddl_ods_user_info_d_starrocksCREATE TABLE IF NOT EXISTS ods_user_info_d_starrocks ( uid STRING COMMENT 'The user ID', gender STRING COMMENT 'The gender', age_range STRING COMMENT 'The age range', zodiac STRING COMMENT 'The zodiac sign', dt STRING not null COMMENT 'The time' ) DUPLICATE KEY(uid) COMMENT 'User behavior analysis case - table that stores basic user information' PARTITION BY(dt) PROPERTIES("replication_num" = "1");ddl_ods_raw_info_d_starrocksCREATE TABLE IF NOT EXISTS ods_raw_log_d_starrocks ( col STRING COMMENT 'Log', dt DATE not null COMMENT 'The time' ) DUPLICATE KEY(col) COMMENT 'User behavior analysis case - table that stores the website access logs of users' PARTITION BY(dt) PROPERTIES ("replication_num" = "1");
Configure a batch synchronization task to synchronize the basic user information
On the configuration tab of the workflow, double-click the batch synchronization node ods_user_info_d_starrocks to go to the configuration tab of the node. Configure the parameters to synchronize the basic user information from the MySQL table ods_user_info_d provided in this example to the StarRocks table ods_user_info_d_starrocks.
Configure network connections and a resource group.
After you finish configuring the source, resource group, and destination, click Next and complete the connectivity test as prompted. The following table describes the configurations.
Parameter
Description
Source
Set the parameter to MySQL.
Set the Data Source Name parameter to
user_behavior_analysis_mysql.
Resource Group
Select the serverless resource group that you purchased in the environment preparation phase.
Destination
Set the parameter to StarRocks.
Set the Data Source Name parameter to
Doc_StarRocks_Storage_Compute_Tightly_01.

Configure the task.
Configure the source and destination.
Item
Parameter
Description
Source
Table
Select the MySQL table
ods_user_info_d.Split key
The split key for the data to be read. We recommend that you use the primary key or an indexed column as the split key. Only fields of the INTEGER type are supported.
In this example, the
uidfield is used as the split key.Destination
Table
Select the StarRocks table
ods_user_info_d_starrocks.Statement Run Before Writing
In this example, data is dynamically partitioned based on the
dtfield. To prevent the node from rerunning and repeatedly writing data, the following SQL statement is used to delete the existing destination partitions before each synchronization.In the SQL statement
ALTER TABLE ods_user_info_d_starrocks DROP PARTITION IF EXISTS p${var} FORCE,${var}is a parameter. The scheduling parameter that you configure in the scheduling configuration phase will be assigned to the ${var} parameter as a value. In this case, the value of the scheduling parameter is dynamically replaced in the code of the node based on the configuration of the scheduling parameter. For more information, see Scheduling configuration.StreamLoad Request Parameters
The StreamLoad request parameter, which must be in the JSON format.
{ "row_delimiter": "\\x02", "column_separator": "\\x01" }
Configure field mappings.
Configure mappings between source fields and destination fields. You can assign a scheduling parameter to a variable as a value in the node code. This way, the value of the partition field in the StarRocks table can be dynamically replaced. In this case, the data of each day can be written to the data timestamp-based partition in the StarRocks table.
Click Map Fields with Same Name. The fields in the source MySQL table are automatically mapped to fields that have the same names in the destination table. In this case, data in the source fields is automatically written to the destination fields that have the same names.
Click Add, enter
'${var}', and then manually map this field with the dt field of the StarRocks table.
Configure the scheduling properties.
On the configuration tab of the node, click Properties in the right-side navigation pane. For more information, see Scheduling properties of a node. The following table describes the configurations.
Section
Description
Screenshot
Scheduling Parameter
In this section, click Add Parameter to add a scheduling parameter.
Set the Parameter Name parameter to var.
Set the Parameter Value parameter to $[yyyymmdd-1].

Dependencies
In this section, make sure that the output table is used as the output name of the current node.
The output table is named in the
worksspacename.tablenameformat.
Configure a batch synchronization task to synchronize the website access logs of users
On the configuration tab of the workflow, double-click the batch synchronization node ods_raw_log_d_starrocks to go to the configuration tab of the node. Configure the parameters to synchronize the website access logs of users from the user_log.txt file in a public HttpFile data source to the StarRocks table ods_raw_log_d_starrocks.
Configure network connections and a resource group.
After you finish configuring the source, resource group, and destination, click Next and complete the connectivity test as prompted. The following table describes the configurations.
Parameter
Description
Source
Set the parameter to HttpFile.
Set the Data Source Name parameter to
user_behavior_analysis_HttpFile.
Resource Group
Select the serverless resource group that you purchased in the environment preparation phase.
Destination
Set the parameter to StarRocks.
Set the Data Source Name parameter to
Doc_StarRocks_Storage_Compute_Tightly_01.

Configure the task.
Configure the source and destination.
Item
Parameter
Description
Source
File Path
/user_log.txtFile Type
textColumn Delimiter
|No
After you finish configuring the data sources, click Confirm Data Structure.
Destination
Table
ods_raw_log_d_starrocksStatement Run Before Writing
In this example, data is dynamically partitioned based on the
dtfield. To prevent the node from rerunning and repeatedly writing data, the following SQL statement is used to delete the existing destination partitions before each synchronization.ALTER TABLE ods_user_info_d_starrocks DROP PARTITION IF EXISTS p${var} FORCEIn the SQL statement,
${var}is a parameter. The scheduling parameter that you configure in the scheduling configuration phase will be assigned to the ${var} parameter as a value. In this case, the value of the scheduling parameter is dynamically replaced in the code of the node based on the configuration of the scheduling parameter.StreamLoad Request Parameters
{ "row_delimiter": "\\x02", "column_separator": "\\x01" }
Configure field mappings.
In the top toolbar on the configuration tab of the node, click the
icon to change the node configuration mode from the codeless UI to the script mode. Configure mappings between fields in the HttpFile file and fields in the StarRocks table, and enable the value of the dt partition field in the StarRocks table to be dynamically replaced. Additional configuration for columns in an HttpFile file:
{ "type": "STRING", "value": "${var}" }Complete sample script for the
ods_raw_log_d_starrocksnode:{ "type": "job", "version": "2.0", "steps": [ { "stepType": "httpfile", "parameter": { "fileName": "/user_log.txt", "nullFormat": "", "compress": "", "requestMethod": "GET", "connectTimeoutSeconds": 60, "column": [ { "index": 0, "type": "STRING" }, { "type": "STRING", "value": "${var}" } ], "skipHeader": "false", "encoding": "UTF-8", "fieldDelimiter": "|", "fieldDelimiterOrigin": "|", "socketTimeoutSeconds": 3600, "envType": 0, "datasource": "user_behavior_analysis", "bufferByteSizeInKB": 1024, "fileFormat": "text" }, "name": "Reader", "category": "reader" }, { "stepType": "starrocks", "parameter": { "loadProps": { "row_delimiter": "\\x02", "column_separator": "\\x01" }, "envType": 0, "datasource": "Doc_StarRocks_Storage_Compute_Tightly_01", "column": [ "col", "dt" ], "tableComment": "", "table": "ods_raw_log_d_starrocks", "preSql": "ALTER TABLE ods_raw_log_d_starrocks DROP PARTITION IF EXISTS p${var} FORCE ; " }, "name": "Writer", "category": "writer" }, { "copies": 1, "parameter": { "nodes": [], "edges": [], "groups": [], "version": "2.0" }, "name": "Processor", "category": "processor" } ], "setting": { "errorLimit": { "record": "0" }, "locale": "zh", "speed": { "throttle": false, "concurrent": 2 } }, "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] } }Configure the scheduling properties.
On the configuration tab of the node, click Properties in the right-side navigation pane. On the Properties tab, configure the scheduling properties and basic information for the node. The following table describes the configurations.
Section
Description
Screenshot
Scheduling Parameter
In this section, click Add Parameter to add a scheduling parameter.
Set the Parameter Name parameter to var.
Set the Parameter Value parameter to $[yyyymmdd-1].

Dependencies
In this section, make sure that the output table is used as the output name of the current node.
The output table is named in the
worksspacename.tablenameformat.
Step 3: Verify the synchronized data
Run the workflow
Go to the configuration tab of the workflow.
Double-click the User profile analysis_StarRocks workflow below Business Flow. The workflow canvas appears.
Run the workflow.
In the top toolbar of the workflow canvas, click the
icon to run the workflow in the data integration phase based on the dependencies. View the status of the nodes.
If nodes are in the
state, the synchronization process is normal. View node execution logs.
Right-click the
ods_user_info_d_starrocksorods_raw_log_d_starrocksnode on the canvas and select View Log to view the synchronization details.
View the synchronization result
Create an ad hoc query.
In the left-side navigation pane of the DataStudio page, click the Ad hoc Query icon. The Ad Hoc Query pane appears. Right-click Ad Hoc Query and choose .
Query synchronization result tables.
-- In the query statements, change the partition key value to the data timestamp of the node. For example, if the node is run on January 2, 2024, the data timestamp is 20240101, which is one day earlier than the running time of the node. SELECT * from ods_raw_log_d_starrocks where dt=The data timestamp; SELECT * from ods_user_info_d_starrocks where dt=The data timestamp;
What to do next
Data synchronization is complete. You can proceed with the next tutorial. In the next tutorial, you will learn how to process the basic user information and website access logs of users in StarRocks. For more information, see Process data.

