This topic describes how to synchronize user information and website log data. You will create HttpFile and MySQL data sources, configure a data synchronization pipeline to an Object Storage Service (OSS) bucket, and create a Spark foreign table to parse the data. Finally, you will run a query to verify that the data was synchronized correctly.
Prerequisites
Prepare the required environment before you begin. For more information, see Prepare the environment.
1. Create data sources
To ensure that data can be processed in subsequent steps, you must add the following data sources to your DataWorks workspace to retrieve the raw data.
MySQL data source: This tutorial uses a data source named
user_behavior_analysis_mysqlto retrieve basic user information (ods_user_info_d) from MySQL.HttpFile data source: In this tutorial, the data source is named
user_behavior_analysis_httpfileand is used to retrieve user website access logs (user_log.txt) stored in OSS.OSS data source: Stores user information and website access records synchronized from the MySQL and HttpFile data sources. A Spark foreign table will later read this data.
Create a MySQL data source (user_behavior_analysis_mysql)
The user information provided in this tutorial is stored in a MySQL database. Create a MySQL data source to sync the user information data (ods_user_info_d) from the MySQL database to the private OSS bucket that you created in the Prepare the environment step.
Go to the Data Source page.
Log on to the DataWorks console. In the top navigation bar, select the desired region. In the left-side navigation pane, choose . On the page that appears, select the desired workspace from the drop-down list and click Go to Management Center.
In the navigation pane on the left, click Data Sources to go to the Data Sources page.
Click Add Data Source. Search for and select MySQL as the data source type.
On the Add MySQL Data Source page, configure the parameters. For this tutorial, use the same example values for both the development and production environments.
The following table describes the key parameters. You can keep the default values for the other parameters.
Parameter
Description
Data Source Name
Enter a name for the data source. For this tutorial, enter
user_behavior_analysis_mysql.Data Source Description
This data source is for DataWorks tutorials. Read data from this data source when you configure an batch sync task to access the test data provided by the platform. This data source can only be read in Data Integration scenarios. Other modules cannot use it.
Configuration Mode
Select Connection String Mode.
Connection Address
Host IP address:
rm-bp1z69dodhh85z9qa.mysql.rds.aliyuncs.comPort:
3306
Database Name
Enter the database name. For this tutorial, enter
workshop.Username
Enter the username. For this tutorial, enter
workshop.Password
Enter the password. For this tutorial, enter
workshop#2017.Authentication Method
No authentication.
In the Connection Configuration section, click Test Network Connectivity for both the production and development environments. Ensure that the connectivity status is Connected.
ImportantEnsure that the resource group is attached to the workspace and has public network access enabled. Otherwise, data synchronization fails. For more information, see Prepare the environment.
If you do not have a resource group available, follow the prompts in the connection configuration section. Click Purchase and Associated Purchased Resource Group.
Click Complete Creation.
Create an HttpFile data source (user_behavior_analysis_httpfile)
The website access records provided in this tutorial are stored in OSS. Create an HttpFile data source to sync the website access records (user_log.txt) from OSS to the private OSS bucket that you created in the Prepare the environment step.
Click Data Sources in the navigation pane on the left.
Click Add Data Source. In the Add Data Source dialog box, search for and select HttpFile as the data source type.
On the Add HttpFile Data Source page, configure the parameters. For this tutorial, use the same example values for both the development and production environments.
The following table describes the key parameters. You can keep the default values for the other parameters.
Parameter
Description
Data Source Name
Enter the data source name. For this tutorial, enter
user_behavior_analysis_httpfile.Data Source Description
This data source is for DataWorks tutorials. Read data from this data source when you configure an batch sync task to access the test data provided by the platform. This data source can only be read in Data Integration scenarios. Other modules cannot use it.
URL
Set URL to
https://dataworks-workshop-2024.oss-cn-shanghai.aliyuncs.comfor both the development and production environments.In the Connection Configuration section, click Test Network Connectivity for both the production and development environments. Ensure that the connectivity status is Connected.
ImportantEnsure that the resource group is attached to the workspace and has public network access enabled. Otherwise, data synchronization fails. For more information, see Prepare the environment.
If you do not have a resource group available, follow the prompts in the connection configuration section. Click Purchase and Associated Purchased Resource Group.
Click Complete Creation.
Create an OSS data source
In this tutorial, you will sync user information from the MySQL data source and log information from the HttpFile data source to an OSS data source.
On the Management Center page, go to the Data Sources page and click Add Data Source.
In the Add Data Source dialog box, search for and select OSS as the data source type.
On the Add OSS Data Source page, configure the parameters. For this tutorial, use the example values for both the development environment and the production environment.
Parameter
Description
Data Source Name
Enter a name for the data source. This tutorial uses test_g.
Data Source Description
Enter a brief description for the data source.
Access Mode
Select AccessKey Mode.
AccessKey ID
The AccessKey ID of the current account. You can copy the AccessKey ID from the AccessKey page.
AccessKey Secret
Enter the AccessKey secret of the current account.
ImportantThe AccessKey secret is displayed only when you create it. It cannot be viewed later. Keep it secure. If an AccessKey is leaked or lost, delete it and create a new one.
Endpoint
Enter
http://oss-cn-shanghai-internal.aliyuncs.com.Bucket
The name of the private OSS Bucket that you prepared. This tutorial uses
dw-spark-demo.Click Test Network Connectivity in the Connection Status (Development Environment) and Connection Status (Production Environment) columns for the specified resource group. Wait until the test is complete and the status changes to Connected.
NoteAt least one resource group must be in the Connected state. Otherwise, you cannot use the codeless UI to create a sync task for this data source.
Click Complete Creation to create the OSS data source.
2. Build the synchronization pipeline
Click the
icon in the upper-left corner and choose .In the navigation pane on the left, click
. In the Workspace Directories area, click
, select Create Workflow, and set the workflow name. This tutorial uses User_profile_analysis_spark.On the workflow orchestration page, drag a Zero Load node, an Batch Synchronization node, and an EMR Spark SQL node from the left side onto the canvas, and set a name for each node.
The example node names and their functions in this tutorial are as follows:
Node Type
Node Name
Function
Zero Loadworkshop_start_sparkManages the entire user profile analysis workflow, such as the start time of inner nodes. It clarifies the data forwarding path in complex workflows. This node is a dry-run task and does not require code editing.
Batch Synchronization Nodeods_raw_log_d_2oss_sparkSyncs user website access records from OSS to the OSS bucket that you created.
Batch Synchronization Nodeods_user_info_d_2oss_sparkSyncs user information from MySQL to the OSS bucket that you created.
EMR SPARK SQLods_raw_log_d_sparkCreates the
ods_raw_log_d_sparkforeign table to read user website access records stored in OSS.
EMR SPARK SQLods_user_info_d_sparkCreates the
ods_user_info_d_sparkforeign table to read user information stored in OSS.Manually drag lines to connect the nodes. Set the
workShop_start_sparknode as the ancestor node for the two batch synchronization nodes. The final result is shown below:Configure the workflow scheduling properties.
On the workflow canvas, click Scheduling in the right-side pane and configure the parameters. The following table describes the key parameters. You can keep the default values for the other parameters.
Scheduling Parameter
Description
Scheduling Parameters
You can configure scheduling parameters for the entire workflow. The inner nodes of the workflow can directly use the configured scheduling parameters. In this tutorial, the parameter is set to
bizdate=$[yyyymmdd-1]to obtain the date of the previous day.Scheduling Cycle
Set to
Dayfor this tutorial.Scheduling Time
In this tutorial, Scheduling Time is set to
00:30, which means the workflow will start daily at00:30.Scheduling Dependencies
The workflow has no upstream dependencies, so you do not need to configure any. For easier management, click Use Workspace Root Node to attach the workflow to the root node of the workspace.
The naming format for the workspace root node is
workspace_name_root.
3. Configure the sync tasks
Configure the initial node
On the workflow orchestration page, hover over the
workshop_start_sparknode and click Open Node.On the right side of the
workshop_start_sparknode editing page, click Scheduling and configure the parameters. The following table describes the key parameters for this tutorial. Keep the default values for any parameters not mentioned.Scheduling Configuration Parameter
Description
Resource Group
For this tutorial, select the Serverless resource group created in the Prepare the environment step.
Node Dependency Configuration
Because
workshop_start_sparkis the initial node, it has no upstream dependencies. Click Use Workspace Root Node to have the workflow triggered by the workspace root vertex.The workspace root vertex is named:
WorkspaceName_root.
Configure the user log synchronization pipeline (ods_raw_log_d_2oss_spark)
The batch ods_raw_log_d_2oss_spark node syncs user log information from the HttpFile data source to the private OSS data source.
On the business flow development panel, hover over the
ods_raw_log_d_2oss_sparknode and click the open node button to go to the node configuration page.Configure the network and resources.
Parameter
Description
Source
Data Source:
HttpFile.Data Source Name:
user_behavior_analysis_httpfile.
My Resource Group
Select the serverless resource group that you purchased when preparing the environment.
Destination
Data Destination:
OSS.Data Source Name: Select the private OSS data source that you created. This tutorial uses test_g.
ImportantIf the network connection fails, check whether the public network is enabled for the Serverless resource group.
Click Next to configure the sync task.
Configure Data Sources And Destinations
The following table describes the key parameters for this tutorial. Keep the default values for any parameters not mentioned.
Parameter
Description
Source
File Path: /user_log.txt.
Text Type: Select text.
Column Delimiter: Enter |.
Compression Format: Select None.
Skip Header: Select No.
Destination
Text Type: Select text.
File Name (including path): Enter a path based on your OSS directory. Example: ods_raw_log_d/log_${bizdate}/log_${bizdate}.txt. In this path, ods_raw_log_d is the directory you created, and $bizdate represents the date of the previous day.
Column Delimiter: Enter |.
Confirm the Field Mapping and Channel Control.
DataWorks synchronizes data from the specified source fields to the specified destination fields based on the configured field mapping. You can also set the concurrency and configure policies for dirty data. For this tutorial, set Policy for Dirty Data Records to Disallow Dirty Data Records. You can keep the default values for the other settings. For more information, see Configure a sync task using the codeless UI.
Configure the test parameters.
On the right side of the batch sync task configuration page, click Run Configuration. Configure the following parameters to run a test using the test configuration in Step 4.
Configuration Item
Configuration Description
Resource Group
Select the Serverless resource group that you purchased in the Prepare the environment step.
Script Parameter
Click Add Parameter and set bizdate to the
yyyymmddformat, for example,bizdate=20250223. During testing, DataStudio replaces the variables defined in the task with this constant.(Optional) Configure scheduling properties.
For this tutorial, keep the default values for the scheduling configuration parameters. On the right side of the node editing page, click Scheduling. For more information about the parameters in the scheduling configuration, see Node scheduling configuration.
Scheduling Parameter: This tutorial configures scheduling parameters at the workflow level. Inner nodes do not need separate configuration and can use the parameters directly in the task or code.
Scheduling Policy: In the Delayed Execution Time parameter, you can specify how long a child node should wait to run after the workflow starts. This is not set in this tutorial.
In the toolbar at the top, click Save to save the current node.
Configure the user data synchronization pipeline (ods_user_info_d_2oss_spark)
The batch ods_user_info_d_2oss_Spark node syncs user data from the MySQL data source to the private OSS data source.
On the workflow orchestration page, hover over the
ods_user_info_d_2oss_Sparknode and click Open Node.Configure the network and resources for the synchronization pipeline.
Parameter
Description
Source
Data Source:
MySQL.Data Source Name:
user_behavior_analysis_mysql.
My Resource Group
Select the serverless resource group that you purchased when preparing the environment.
Destination
Data Destination:
OSS.Data Source Name: Select the private OSS data source that you created. This tutorial uses
test_g.
ImportantIf the network connection fails, check whether the public network is enabled for the Serverless resource group.
Click Next to configure the sync task.
Configure Data Source And Destination
The following table describes the key parameters for this tutorial. Keep the default values for any parameters not mentioned.
Parameter
Description
Source
Table: Select ods_user_info_d from the data source.
Split Key: Use the primary key or an indexed column as the split key. Only integer fields are supported. Set the split key to uid.
Destination
Text Type: Select text.
File Name (including path): Enter a path based on your OSS directory. Example: ods_user_info_d/user_${bizdate}/user_${bizdate}.txt. In this path, ods_user_info_d is the directory you created, and $bizdate represents the date of the previous day.
Column Delimiter: Enter |.
Confirm the Field Mapping and Channel Control.
DataWorks synchronizes data from the specified source fields to the specified destination fields based on the configured field mapping. You can also set the concurrency and configure policies for dirty data. For this tutorial, set Policy for Dirty Data Records to Disallow Dirty Data Records. You can keep the default values for the other settings. For more information, see Configure a sync task using the codeless UI.
Configure the test parameters.
On the right side of the batch sync task configuration page, click Run Configuration. Configure the following parameters to run a test using the test configuration in Step 4.
Configuration Item
Configuration Description
Resource Group
Select the Serverless resource group that you purchased in the Prepare the environment step.
Script Parameter
Click Add Parameter and set bizdate to the
yyyymmddformat, for example,bizdate=20250223. During testing, DataStudio replaces the variables defined in the task with this constant.(Optional) Configure scheduling properties.
For this tutorial, keep the default values for the scheduling configuration parameters. On the right side of the node editing page, click Scheduling. For more information about the parameters in the scheduling configuration, see Node scheduling configuration.
Scheduling Parameter: This tutorial configures scheduling parameters at the workflow level. Inner nodes do not need separate configuration and can use the parameters directly in the task or code.
Scheduling Policy: In the Delayed Execution Time parameter, you can specify how long a child node should wait to run after the workflow starts. This is not set in this tutorial.
In the toolbar at the top, click Save to save the current node.
4. Synchronize the data
Synchronize the data.
In the toolbar at the top of the Workflow canvas, click Run. Set the values for the parameter variables defined in each node for this run. This tutorial uses
20250223, but you can change it as needed. Click OK and wait for the run to complete.View the synchronization result.
After the workflow runs successfully, log on to the OSS console. Check the bucket for the newly created OSS data source to see if the corresponding directories and data exist in the
/ods_user_info_dand/ods_raw_log_ddirectories.
5. Parse the data
After the data is synchronized, use Spark SQL to create foreign tables and parse the user information and website access records stored in OSS.
Create the log table (ods_raw_log_d_spark) and parse the data
After the data is synced to the private OSS data source by the batch integration task, it is written to the foreign table ods_raw_log_d_spark created by the EMR SPARK SQL node, based on the generated OSS file.
On the workflow orchestration page, hover over the
ods_raw_log_d_sparknode and click Open Node.Edit the table creation statement.
Paimon table (DLF)
-- 1. Create an append-only Paimon table. CREATE TABLE IF NOT EXISTS ods_raw_log_d_spark ( `col` STRING, `dt` STRING -- Also include the partition key as a regular column in the table. This is the recommended practice for Paimon. ) PARTITIONED BY (dt) TBLPROPERTIES ( 'format' = 'paimon' ); -- 2. Create a temporary view to point to and parse the source files on OSS. CREATE TEMPORARY VIEW source_of_logs ( -- The view has only one column to read the entire text line. `value` STRING ) USING TEXT OPTIONS ( path 'oss://dw-spark-demo.oss-cn-shanghai-internal.aliyuncs.com/ods_raw_log_d/log_${bizdate}/' ); INSERT INTO ods_raw_log_d_spark SELECT value, -- Your raw log row '${bizdate}' AS dt -- Specify the partition value directly in the SELECT statement. FROM source_of_logs;Hive table (DLF-Legacy)
-- Scenario: The following Spark SQL statement creates the ods_raw_log_d_spark foreign table using an EMR Spark SQL node. The LOCATION clause is used to get the log information written to the private OSS bucket by the batch data integration task and add the corresponding dt partition. -- Note: -- DataWorks provides scheduling parameters that allow you to write daily incremental data to the corresponding business partition of the target table in a scheduling scenario. -- In a real-world development scenario, you can define code variables in the ${variable_name} format and assign values to them on the scheduling configuration page. This allows for dynamic parameter input in your code during scheduled runs. CREATE EXTERNAL TABLE IF NOT EXISTS ods_raw_log_d_spark ( `col` STRING ) PARTITIONED BY ( dt STRING ) LOCATION 'oss://dw-spark-demo.oss-cn-shanghai-internal.aliyuncs.com/ods_raw_log_d/log_${bizdate}/'; ALTER TABLE ods_raw_log_d_spark ADD IF NOT EXISTS PARTITION (dt = '${bizdate}') LOCATION 'oss://dw-spark-demo.oss-cn-shanghai-internal.aliyuncs.com/ods_raw_log_d/log_${bizdate}/'NoteReplace the location address in the code with your actual address.
dw-spark-demois the name of the OSS bucket that you created when you prepared the OSS environment.Configure the test parameters.
On the right side of the EMR SPARK SQL task configuration page, click Run Configuration. Configure the following parameters to run a test using the test configuration in Step 4.
Configuration Item
Configuration Description
Computing Resource
Select the Spark computing resource that you attached in the Prepare the environment step.
Resource Group
Select the Serverless resource group that you purchased in the Prepare the environment step.
Script Parameter
Click Add Parameter and set bizdate to the
yyyymmddformat, for example,bizdate=20250223. During testing, DataStudio replaces the variables defined in the task with this constant.(Optional) Configure scheduling properties.
For this tutorial, keep the default values for the scheduling configuration parameters. On the right side of the node editing page, click Scheduling. For more information about the parameters in the scheduling configuration, see Node scheduling configuration.
Scheduling Parameter: This tutorial configures scheduling parameters at the workflow level. Inner nodes do not need separate configuration and can use the parameters directly in the task or code.
Scheduling Policy: In the Delayed Execution Time parameter, you can specify how long a child node should wait to run after the workflow starts. This is not set in this tutorial.
In the toolbar at the top, click Save to save the current node.
Create the user table (ods_user_info_d_spark) and parse the data
After the data is synced to the private OSS data source by the batch integration task, it is written to the foreign table ods_user_info_d_spark, based on the generated OSS file.
On the workflow orchestration page, hover over the
ods_user_info_d_sparknode and click Open Node.Configure the synchronization link, network, and resources.
Paimon table (DLF)
-- 1. Create a Paimon table as the destination. CREATE TABLE IF NOT EXISTS ods_user_info_d_spark ( `uid` STRING COMMENT 'User ID', `gender` STRING COMMENT 'Gender', `age_range` STRING COMMENT 'Age range', `zodiac` STRING COMMENT 'Zodiac', `dt` STRING COMMENT 'Partition date' ) PARTITIONED BY (dt) TBLPROPERTIES ( 'format' = 'paimon' ); -- 2. Create a temporary view to point to and parse the source files on OSS. CREATE TEMPORARY VIEW source_of_user_info ( -- The view has only one column to read the entire text line. `value` STRING ) USING TEXT -- Tell Spark that this is a plain text file. OPTIONS ( path 'oss://dw-spark-demo.oss-cn-shanghai-internal.aliyuncs.com/ods_user_info_d/user_${bizdate}/' ); -- 3. Query and parse data from the temporary view and insert it into the Paimon table. INSERT INTO ods_user_info_d_spark SELECT -- Use the split function to split the raw text line by '|'. split(value, '\\|')[0] AS uid, split(value, '\\|')[1] AS gender, split(value, '\\|')[2] AS age_range, split(value, '\\|')[3] AS zodiac, '${bizdate}' AS dt -- Assign a value to the partition field. FROM source_of_user_info;Hive table (DLF-Legacy)
-- Scenario: The following Spark SQL statement creates the ods_user_info_d_spark foreign table using an EMR Spark SQL node. The LOCATION clause is used to get the user information written to the private OSS bucket by the batch data integration task and write it to the corresponding dt partition. -- Note: -- DataWorks provides scheduling parameters that allow you to write daily incremental data to the corresponding business partition of the target table in a scheduling scenario. -- In a real-world development scenario, you can define code variables in the ${variable_name} format and assign values to them on the scheduling configuration page. This allows for dynamic parameter input in your code during scheduled runs. CREATE EXTERNAL TABLE IF NOT EXISTS ods_user_info_d_spark ( `uid` STRING COMMENT 'User ID' ,`gender` STRING COMMENT 'Gender' ,`age_range` STRING COMMENT 'Age range' ,`zodiac` STRING COMMENT 'Zodiac' ) PARTITIONED BY ( dt STRING ) ROW FORMAT DELIMITED FIELDS TERMINATED BY'|' STORED AS TEXTFILE LOCATION 'oss://dw-spark-demo.oss-cn-shanghai-internal.aliyuncs.com/ods_user_info_d/user_${bizdate}/' ; ALTER TABLE ods_user_info_d_spark ADD IF NOT EXISTS PARTITION (dt = '${bizdate}') LOCATION'oss://dw-spark-demo.oss-cn-shanghai-internal.aliyuncs.com/ods_user_info_d/user_${bizdate}/' ;NoteReplace the location address in the code with your actual address.
dw-spark-demois the name of the OSS bucket that you created when you prepared the OSS environment.Configure the test parameters.
On the right side of the EMR SPARK SQL task configuration page, click Run Configuration. Configure the following parameters to run a test using the test configuration in Step 4.
Configuration Item
Configuration Description
Computing Resource
Select the Spark computing resource that you attached in the Prepare the environment step.
Resource Group
Select the Serverless resource group that you purchased in the Prepare the environment step.
Script Parameter
Click Add Parameter and set bizdate to the
yyyymmddformat, for example,bizdate=20250223. During testing, DataStudio replaces the variables defined in the task with this constant.(Optional) Configure scheduling properties.
For this tutorial, keep the default values for the scheduling configuration parameters. On the right side of the node editing page, click Scheduling. For more information about the parameters in the scheduling configuration, see Node scheduling configuration.
Scheduling Parameter: This tutorial configures scheduling parameters at the workflow level. Inner nodes do not need separate configuration and can use the parameters directly in the task or code.
Scheduling Policy: In the Delayed Execution Time parameter, you can specify how long a child node should wait to run after the workflow starts. This is not set in this tutorial.
In the toolbar at the top, click Save to save the current node.
6. Run the task
Sync the data.
In the workflow toolbar, click Run. Set the values for the parameter variables that are defined in each node for this run. This tutorial uses
20250223, but you can modify the value as needed. Then, click OK and wait for the run to complete.Query the data synchronization result.
After all nodes in this section have run successfully, write the following SQL queries to check whether the foreign tables created by the EMR SPARK SQL nodes have been generated correctly.
Verify the result of the ods_raw_log_d_spark table.
-- You need to update the partition filter condition to the actual data timestamp for your current operation. For example, if the task runs on 20250223, the data timestamp is 20250222, which is the day before the task run date. SELECT * FROM ods_raw_log_d_spark WHERE dt='${bizdate}';--Query the ods_raw_log_d_spark tableVerify the result of the ods_user_info_d_spark table.
-- You need to update the partition filter condition to the actual data timestamp for your current operation. For example, if the task runs on 20250223, the data timestamp is 20250222, which is the day before the task run date. SELECT * FROM ods_user_info_d_spark WHERE dt='${bizdate}';--Query the ods_user_info_d_spark table
Next steps
You have now learned how to synchronize log data. You can continue to the next tutorial, where you will learn how to process and analyze the synchronized data. For more information, see Process data.
FAQ
Q: When I create a table, the error
Option 'path' is not allowed for Normal Paimon table, please remove it in table options.is reported.A: This is a syntax error. Create the table as shown in the Paimon table (DLF) section of this topic.