This topic describes how to create HttpFile and MySQL data sources to retrieve user information and website log data. It also explains how to configure a data synchronization pipeline to sync this data to the Object Storage Service (OSS) bucket created during environment preparation. You will also create Spark foreign tables to parse the data in OSS and then query the data to verify the synchronization result.
Prerequisites
Before you start, prepare the required environment. For more information, see Prepare an environment.
Step 1: Create data sources
To ensure smooth data processing in subsequent steps, you must add the following data sources to your DataWorks workspace to retrieve the raw data.
MySQL data source: In this tutorial, the data source is named
user_behavior_analysis_mysql. It is used to retrieve the basic user information (ods_user_info_d) stored in MySQL.HttpFile data source: In this tutorial, the data source is named
user_behavior_analysis_httpfile. It is used to retrieve the user website access records (user_log.txt) stored in OSS.OSS data source: This data source is used to store the basic user information and website access records that are synchronized from the MySQL and HttpFile data sources. This allows Spark to read the data after foreign tables are created.
Create a MySQL data source (user_behavior_analysis_mysql)
The basic user information for this tutorial is stored in a MySQL database. You need to create a MySQL data source to synchronize the user data ( ods_user_info_d) to the private OSS bucket that you created when you prepared the environment.
Go to the Data Source page.
Log on to the DataWorks console. In the top navigation bar, select the desired region. In the left-side navigation pane, choose . On the page that appears, select the desired workspace from the drop-down list and click Go to Management Center.
In the navigation pane on the left, click Data Source to go to the Data Source page.
Click Add Data Source. Search for and select MySQL.
On the Create MySQL Data Source page, configure the parameters. For this tutorial, use the same example values for both the development and production environments.
The following table describes the key parameters. You can keep the default values for the other parameters.
Parameter
Description
Data Source Name
Enter a name for the data source. This tutorial uses
user_behavior_analysis_mysql.Data Source Description
This data source is for DataWorks tutorials. Read from this data source in a batch synchronization configuration to access the test data provided by the platform. This data source can be read from only in Data Integration scenarios. Other modules cannot use it.
Configuration Mode
Select Connection String Mode.
Connection String
Host IP Address:
rm-bp1z69dodhh85z9qa.mysql.rds.aliyuncs.comPort:
3306
Database Name
Enter the database name. This tutorial uses
workshop.Username
Enter the username. This tutorial uses
workshop.Password
Enter the password. This tutorial uses
workshop#2017.Authentication Option
No authentication.
In the Connection Configuration section, click Test Connectivity for the development and production environments. Ensure that the status is Connected.
ImportantEnsure that the resource group is attached to the workspace and has Internet access. Otherwise, an error will occur during data synchronization. For more information, see Prepare an environment.
If no resource groups are available, follow the prompts in the connection configuration section. Click Purchase or Attach Purchased Resource Group.
Click Complete.
Create an HttpFile data source named user_behavior_analysis_httpfile
The user website access records for this tutorial are stored in OSS. You need to create an HttpFile data source to synchronize the user website access records ( user_log.txt) to the private OSS bucket that you created when you prepared the environment.
On the Management Center page, click Data Source in the navigation pane on the left.
Click Add Data Source. In the Add Data Source dialog box, search for and select HttpFile.
On the Create HttpFile Data Source page, configure the parameters. For this tutorial, use the same example values for both the development and production environments.
The following table describes the key parameters. You can keep the default values for the other parameters.
Parameter
Description
Data Source Name
Enter a name for the data source. This tutorial uses
user_behavior_analysis_httpfile.Data Source Description
This data source is for DataWorks tutorials. Read from this data source in a batch synchronization configuration to access the test data provided by the platform. This data source can be read from only in Data Integration scenarios. Other modules cannot use it.
URL Domain Name
Set the URL Domain Name for both the development and production environments to
https://dataworks-workshop-2024.oss-cn-shanghai.aliyuncs.com.In the Connection Configuration section, click Test Connectivity for the development and production environments. Ensure that the status is Connected.
ImportantEnsure that the resource group is attached to the workspace and has Internet access. Otherwise, an error will occur during data synchronization. For more information, see Prepare an environment.
If no resource groups are available, follow the prompts in the connection configuration section. Click Purchase or Attach Purchased Resource Group.
Click Complete.
Create an OSS data source
In this tutorial, the user information from the MySQL data source and the log information from the HttpFile data source are synchronized to an OSS data source.
Go to the Data Sources page and click Add Data Source.
In the Add Data Source dialog box, search for and select OSS.
On the Add OSS Data Source page, configure the parameters. For this tutorial, use the example values for both the development and production environments.
Parameter
Description
Data Source Name
Enter a name for the data source. In this example, enter test_g.
Data Source Description
Enter a brief description for the data source.
Access Mode
Select Access Key Mode.
AccessKey ID
The AccessKey ID of the current logon account. You can go to the AccessKey page to copy the AccessKey ID.
AccessKey Secret
Enter the AccessKey secret of the current logon account.
ImportantThe AccessKey secret is displayed only when you create it. You cannot view it later. Keep your AccessKey secret confidential. If an AccessKey is leaked or lost, delete it and create a new one.
Endpoint
Enter
http://oss-cn-shanghai-internal.aliyuncs.com.Bucket
The name of the private OSS bucket that you prepared. In this example, the name is
dw-spark-demo.For the specified resource group, click Test Network Connectivity in the Connection Status (Development Environment) and Connection Status (Production Environment) columns. Wait for the test to complete and ensure that the status is Connected.
NoteEnsure that at least one resource group is in the Connected state. Otherwise, you cannot use the codeless UI to create a sync task for this data source.
Click Complete Creation to create the OSS data source.
Step 2: Build a synchronization pipeline
In the upper-left corner, click the
icon and choose .In the navigation pane on the left, click
. In the Workspace Directories area, click
and select Create Workflow. Enter a name for the workflow. For this tutorial, set the name to User_profile_analysis_spark.On the workflow orchestration page, drag the Zero Load Node, Batch Synchronization, and EMR SPARK SQL nodes to the canvas. Enter a name for each node.
The following table lists the node names and their functions for this tutorial.
Node type
Node name
Node function
Zero Load Nodeworkshop_start_sparkManages the entire user persona analysis workflow, such as the start time of the inner nodes. It clarifies the data forwarding path when the workflow is complex. This node is a dry-run node. You do not need to edit its code.
Batch synchronizationods_raw_log_d_2oss_sparkSynchronizes the user website access records stored in OSS to the OSS bucket that you created.
Batch synchronizationods_user_info_d_2oss_sparkSynchronizes the basic user information stored in MySQL to the OSS bucket that you created.
EMR SPARK SQLods_raw_log_d_sparkCreates the
ods_raw_log_d_sparkforeign table to read the user website access records stored in OSS.
EMR SPARK SQLods_user_info_d_sparkCreates the
ods_user_info_d_sparkforeign table to read the basic user information stored in OSS.Drag lines to connect the nodes. Set the
workShop_start_sparknode as the ancestor node of the two batch synchronization nodes. The final result is shown in the following figure:Configure scheduling properties for the workflow.
On the workflow canvas, click Scheduling Configuration in the right-side pane and configure the parameters. The following table describes the key parameters. You can keep the default values for the other parameters.
Scheduling parameter
Description
Scheduling Parameters
Set scheduling parameters for the entire workflow. The inner nodes in the workflow can use these parameters directly. This tutorial sets the parameter to
bizdate=$[yyyymmdd-1]to get the previous day's date.Scheduling Cycle
This tutorial uses
Day.Scheduling Time
This tutorial sets Scheduling Time to
00:30. The workflow starts at00:30every day.Dependency Configuration
The workflow has no ancestor dependencies, so you do not need to configure this. For easier management, you can click Use Workspace Root Node to attach the workflow to the workspace root node.
The workspace root node is named in the format
workspace_name_root.
Step 3: Configure synchronization tasks
Configure the initial node
On the workflow orchestration page, hover over the
workshop_start_sparknode and click Open Node.In the right-side pane of the
workshop_start_sparknode editing page, click Scheduling and configure the parameters. The following table describes the key parameters. You can keep the default values for any parameters that are not mentioned.Scheduling configuration parameter
Description
Scheduling Type
For this tutorial, set the type to
Dry-run.Resource Group
For this tutorial, select the Serverless resource group created when you prepared the environment.
Scheduling Dependencies
Because
workshop_start_sparkis the initial node, it has no upstream dependencies. Click Use Workspace Root Node to have the workspace root node trigger the workflow.The workspace root node is named:
WorkspaceName_root.
Configure the user log synchronization pipeline (ods_raw_log_d_2oss_spark)
The ods_raw_log_d_2oss_spark batch synchronization node synchronizes user log information from the HttpFile data source to the private OSS data source.
On the workflow orchestration page, hover over the
ods_raw_log_d_2oss_sparknode and click the open node button to go to the node configuration page.Configure the network and resources.
Parameter
Description
Source
Source:
HttpFile.Data Source Name:
user_behavior_analysis_httpfile.
Resource Group
Select the Serverless resource group purchased during environment preparation.
Destination
Destination:
OSS.Data Source Name: Select the private OSS data source created earlier. In this example, select test_g.
Click Next to configure the sync task.
Configure Source and Destination
The following table describes the key parameters. You can keep the default values for any parameters that are not mentioned.
Parameter
Description
Source
File Path: /user_log.txt.
Text Type: Select text.
Column Delimiter: Enter |.
Compression Format: Includes None, Gzip, Bzip2, and Zip. Select None.
Skip Header: Select No.
Destination
Text type: Select text.
File name (including path): Enter a path based on the directory in your OSS bucket. Example: ods_raw_log_d/log_${bizdate}/log_${bizdate}.txt. Here, ods_raw_log_d is the directory name you created, and $bizdate represents the date of the previous day.
Column Delimiter: Enter |.
Confirm the Field Mapping and Channel Control settings.
DataWorks maps source fields to destination fields. You can also configure the concurrency and the policy for handling dirty data. For this tutorial, set the Policy For Dirty Data parameter to Disallow Dirty Data. You can keep the default values for the other parameters. For more information, see Configure a batch synchronization node in the codeless UI.
Configure debugging parameters.
In the right-side pane of the batch synchronization task configuration page, click Debugging Configurations. Configure the following parameters to run a test with debugging parameters in Step 6.
Configuration item
Configuration description
Resource Group
Select the serverless resource group purchased when you prepared the environment.
Script Parameters
Click Add Parameter and assign a value to bizdate in
yyyymmddformat, such asbizdate=20250223. During debugging, DataStudio will replace the variables defined in the task with this constant.(Optional) Configure scheduling properties.
You can keep the default values for the scheduling parameters in this tutorial. You can click Scheduling in the right-side pane of the node editing page. For more information about the parameters, see Configure scheduling properties for a node.
Scheduling Parameters: These are already configured for the workflow in this tutorial. You do not need to configure them for inner nodes. They can be used directly in tasks or code.
Scheduling Policies: You can set the Time for Delayed Execution parameter to specify how long a child node waits to run after the workflow starts. You do not need to set this parameter for this tutorial.
In the top toolbar, click Save to save the current node.
Configure the user data synchronization pipeline (ods_user_info_d_2oss_spark)
The ods_user_info_d_2oss_Spark batch synchronization node synchronizes user data from the MySQL data source to the private OSS data source.
On the workflow orchestration page, hover over the
ods_user_info_d_2oss_Sparknode and click Open Node.Configure the synchronization pipeline network and resources.
Parameter
Description
Source
Data source:
MySQL.Data source name:
user_behavior_analysis_mysql.
Resource Group
Select the serverless resource group purchased during environment preparation.
Destination
Data destination:
OSS.Data source name: Select the private OSS data source created earlier. In this example, select
test_g.
Click Next to configure the sync task.
Configure Data Source and Destination
The following table describes the key parameters. You can keep the default values for any parameters that are not mentioned.
Parameter
Description
Source
Table: Select ods_user_info_d from the data source.
Split Key: Use a primary key or an indexed column as the shard key. Only integer fields are supported. Set the shard key to uid.
Destination
Text Type: Select text.
File Name (including Path): Enter a path based on the directory in your OSS bucket. Example: ods_user_info_d/user_${bizdate}/user_${bizdate}.txt. Here, ods_user_info_d is the directory name you created, and $bizdate represents the date of the previous day.
Column Delimiter: Enter |.
Confirm the Field Mapping and Channel Control settings.
DataWorks maps source fields to destination fields. You can also configure the concurrency and the policy for handling dirty data. For this tutorial, set the Policy For Dirty Data parameter to Disallow Dirty Data. You can keep the default values for the other parameters. For more information, see Configure a batch synchronization node in the codeless UI.
Configure debugging parameters.
In the right-side pane of the batch synchronization task configuration page, click Debugging Configurations. Configure the following parameters to run a test with debugging parameters in Step 6.
Configuration item
Configuration description
Resource Group
Select the serverless resource group purchased when you prepared the environment.
Script Parameters
Click Add Parameter and assign a value to bizdate in
yyyymmddformat, such asbizdate=20250223. During debugging, DataStudio will replace the variables defined in the task with this constant.(Optional) Configure scheduling properties.
You can keep the default values for the scheduling parameters in this tutorial. You can click Scheduling in the right-side pane of the node editing page. For more information about the parameters, see Configure scheduling properties for a node.
Scheduling Parameters: These are already configured for the workflow in this tutorial. You do not need to configure them for inner nodes. They can be used directly in tasks or code.
Scheduling Policies: You can set the Time for Delayed Execution parameter to specify how long a child node waits to run after the workflow starts. You do not need to set this parameter for this tutorial.
In the top toolbar, click Save to save the current node.
Step 4: Synchronize data
Synchronize the data.
In the toolbar at the top of the workflow canvas, click Run. In the dialog box that opens, set the values for the parameter variables that are defined for the nodes in this run. For this tutorial, use
20250223. You can change this value as needed. Click OK and wait for the run to complete.View the synchronization result.
After the workflow runs successfully, log on to the Object Storage Service console. Verify that the corresponding directories and data exist in the
/ods_user_info_dand/ods_raw_log_ddirectories within the bucket of the newly created OSS data source.
Step 5: Parse data
After the data is synchronized, you can use Spark SQL to create foreign tables and parse the basic user information and website access records that are stored in OSS.
Create a log table (ods_raw_log_d_spark) and parse data
After data is synchronized to the private OSS data source by the batch integration task, you can use an EMR SPARK SQL node to create the ods_raw_log_d_spark foreign table based on the generated OSS file. Use the LOCATION clause to access the log information that is written to the private OSS Bucket by the batch data integration task.
On the workflow orchestration page, hover over the
ods_raw_log_d_sparknode and click Open Node.Edit the table creation statement.
-- Scenario: The following Spark SQL code creates the ods_raw_log_d_spark foreign table using an EMR Spark SQL node. -- It uses LOCATION to get the log information written to the private OSS bucket by the batch data integration task and adds the corresponding dt partition. -- Note: -- DataWorks provides scheduling parameters to write daily incremental data to the corresponding business partition of the destination table in a scheduling scenario. -- In a development scenario, you can define code variables in the ${VariableName} format. -- Then, on the scheduling configuration page, you can assign scheduling parameters as values to the variables. This allows for dynamic parameter input in the code during scheduling. CREATE EXTERNAL TABLE IF NOT EXISTS ods_raw_log_d_spark ( `col` STRING ) PARTITIONED BY ( dt STRING ) LOCATION 'oss://dw-spark-demo.oss-cn-shanghai-internal.aliyuncs.com/ods_raw_log_d/log_${bizdate}/'; ALTER TABLE ods_raw_log_d_spark ADD IF NOT EXISTS PARTITION (dt = '${bizdate}') LOCATION 'oss://dw-spark-demo.oss-cn-shanghai-internal.aliyuncs.com/ods_raw_log_d/log_${bizdate}/'NoteReplace the location address in the code as needed.
dw-spark-demois the name of the OSS bucket that you created when you prepared the environment.Configure debugging parameters.
In the right-side pane of the EMR SPARK SQL task configuration page, click Debugging Configurations. Configure the following parameters to run a test with debugging parameters in Step 6.
Configuration item
Configuration description
Computing Resource
Select the Spark computing resource attached when you prepared the environment.
Resource Group
Select the Serverless resource group purchased when you prepared the environment.
Script Parameters
Click Add Parameter and assign a value to bizdate in
yyyymmddformat, such asbizdate=20250223. During debugging, DataStudio will replace the variables defined in the task with this constant.(Optional) Configure scheduling properties.
You can keep the default values for the scheduling parameters in this tutorial. You can click Scheduling in the right-side pane of the node editing page. For more information about the parameters, see Configure scheduling properties for a node.
Scheduling Parameters: These are already configured for the workflow in this tutorial. You do not need to configure them for inner nodes. They can be used directly in tasks or code.
Scheduling Policies: You can set the Time for Delayed Execution parameter to specify how long a child node waits to run after the workflow starts. You do not need to set this parameter for this tutorial.
In the top toolbar, click Save to save the current node.
Create a user table (ods_user_info_d_spark) and parse data
After data is synchronized to the private OSS data source by the batch integration task, you can use an EMR SPARK SQL node to create the ods_user_info_d_spark foreign table based on the generated OSS file. Use the LOCATION clause to access the user information that is written to the private OSS Bucket by the batch data integration task.
On the workflow orchestration page, hover over the
ods_user_info_d_sparknode and click Open Node.You can configure the network and resources for the synchronization link.
-- Scenario: The following Spark SQL code creates the ods_user_info_d_spark foreign table using an EMR Spark SQL node. -- It uses LOCATION to get the user information written to the private OSS bucket by the batch data integration task and writes it to the corresponding dt partition. -- Note: -- DataWorks provides scheduling parameters to write daily incremental data to the corresponding business partition of the destination table in a scheduling scenario. -- In a development scenario, you can define code variables in the ${VariableName} format. -- Then, on the scheduling configuration page, you can assign scheduling parameters as values to the variables. This allows for dynamic parameter input in the code during scheduling. CREATE EXTERNAL TABLE IF NOT EXISTS ods_user_info_d_spark ( `uid` STRING COMMENT 'User ID' ,`gender` STRING COMMENT 'Gender' ,`age_range` STRING COMMENT 'Age range' ,`zodiac` STRING COMMENT 'Zodiac sign' ) PARTITIONED BY ( dt STRING ) ROW FORMAT DELIMITED FIELDS TERMINATED BY'|' STORED AS TEXTFILE LOCATION 'oss://dw-spark-demo.oss-cn-shanghai-internal.aliyuncs.com/ods_user_info_d/user_${bizdate}/' ; ALTER TABLE ods_user_info_d_spark ADD IF NOT EXISTS PARTITION (dt = '${bizdate}') LOCATION'oss://dw-spark-demo.oss-cn-shanghai-internal.aliyuncs.com/ods_user_info_d/user_${bizdate}/' ;NoteReplace the location address in the code as needed.
dw-spark-demois the name of the OSS bucket that you created when you prepared the environment.Configure debugging parameters.
In the right-side pane of the EMR SPARK SQL task configuration page, click Debugging Configurations. Configure the following parameters to run a test with debugging parameters in Step 6.
Configuration item
Configuration description
Computing Resource
Select the Spark computing resource attached when you prepared the environment.
Resource Group
Select the Serverless resource group purchased when you prepared the environment.
Script Parameters
Click Add Parameter and assign a value to bizdate in
yyyymmddformat, such asbizdate=20250223. During debugging, DataStudio will replace the variables defined in the task with this constant.(Optional) Configure scheduling properties.
You can keep the default values for the scheduling parameters in this tutorial. You can click Scheduling in the right-side pane of the node editing page. For more information about the parameters, see Configure scheduling properties for a node.
Scheduling Parameters: These are already configured for the workflow in this tutorial. You do not need to configure them for inner nodes. They can be used directly in tasks or code.
Scheduling Policies: You can set the Time for Delayed Execution parameter to specify how long a child node waits to run after the workflow starts. You do not need to set this parameter for this tutorial.
In the top toolbar, click Save to save the current node.
Step 6: Run tasks
Synchronize data.
In the workflow toolbar, click Run. Set the values for the parameter variables that are defined for the nodes in this run. This tutorial uses
20250223as an example. You can change the value as needed. Click OK and wait for the process to complete.Query the data to verify the result.
Go to the SQL Query page.
Log on to the DataWorks console. In the top navigation bar, select the desired region. In the left-side navigation pane, choose . On the page that appears, click Go to DataAnalysis. In the left-side navigation pane of the page that appears, click SQL Query.
Configure an SQL query file.
Click the
icon next to My Files to create a new file and enter a custom name for the SQL query file.Click the newly created file to open the file editing page.
On the file editing page, click the
icon in the upper-right corner to configure the workspace and other information for the SQL query. The configuration details are as follows:Configuration item
Description
Workspace
Select the workspace where the
User_profile_analysis_sparkworkflow is located.Data Source Type
Select
EMR Spark SQLfrom the drop-down list.Data Source Name
Select the EMR Serverless Spark computing resource that was attached to the development environment when you prepared the environment.
Click OK to complete the data source configuration.
Edit the query SQL.
After all nodes in this topic run successfully, run the following SQL queries to verify that the foreign tables created by the EMR SPARK SQL nodes are generated correctly.
-- You need to update the partition filter condition to the actual data timestamp of your current operation. -- For example, if the task runs on 20250223, the data timestamp is 20250222, which is the day before the task's running date. SELECT * FROM ods_raw_log_d_spark WHERE dt ='Data timestamp';--Query the ods_raw_log_d_spark table SELECT * FROM ods_user_info_d_spark WHERE dt ='Data timestamp';--Query the ods_user_info_d_spark table
Next steps
You have now learned how to synchronize log data. You can proceed to the next tutorial to learn how to process and analyze the synchronized data. For more information, see Process data.