In this tutorial, you use batch synchronization tasks in Data Integration to move data from two sources — basic user information stored in the ods_user_info_d MySQL table and website access logs stored in the user_log.txt HttpFile — into a private Object Storage Service (OSS) bucket. You then create external tables using Spark SQL so that E-MapReduce (EMR) can query the synchronized data directly from OSS.
Prerequisites
Before you begin, make sure that you have:
-
A serverless resource group purchased in your DataWorks workspace
-
A private OSS bucket created with the following folders:
ods_raw_log_dandods_user_info_d -
A private OSS data source named
test_gadded to DataWorks -
An HttpFile data source named
user_behavior_analysis_httpfileadded to DataWorks -
A MySQL data source named
user_behavior_analysis_mysqladded to DataWorks
What you'll learn
By the end of this tutorial, you will have:
-
Designed a workflow with scheduling dependencies in DataWorks
-
Configured batch synchronization tasks to move data from MySQL and HttpFile to a private OSS bucket
-
Created Spark SQL external tables that point to the synchronized OSS data
-
Verified the synchronized data using an ad hoc query
How it works
This tutorial covers the data acquisition phase of a user profile analysis pipeline. Two batch synchronization nodes move raw data into OSS — one from an HttpFile source (website access logs) and one from a MySQL source (basic user information). Two EMR Spark SQL nodes then create external tables that point to the OSS data using LOCATION, so Spark can query the data without copying it again.
The bizdate scheduling parameter represents the business date — the date on which the source data was generated, which is always one day before the task runs (T-1). For example, if the task runs on August 8, 2024, bizdate equals 20240807. This parameter is used to organize OSS objects and Spark partitions by date.
Step 1: Design a workflow
Log on to the DataWorks console. In the top navigation bar, select the region. In the left-side navigation pane, choose Data Development and O&M > Data Development. Select your workspace from the drop-down list and click Go to Data Development.
Create a workflow
Create a workflow named User profile analysis_Spark. For instructions, see Create a workflow.
Add nodes and configure dependencies
After the workflow is created, the workflow canvas opens automatically. Click Create Node in the upper part of the canvas, drag the nodes onto the canvas, and draw lines to set the dependency order. For details on designing the dependency graph, see Workflow design.
The zero load node has no lineage relationship with the synchronization nodes, so you must configure dependencies by drawing lines manually. For more information, see Scheduling dependency configuration guide.
The workflow uses five nodes:
| Node category | Node type | Node name | Function |
|---|---|---|---|
| General | <img>Zero load node | workshop_start_spark |
Triggers the entire workflow at 00:30 every day. This is a dry-run node — no code is required. |
| Data Integration | <img>Batch synchronization | ods_raw_log_d_2oss_spark |
Syncs website access logs from HttpFile to the private OSS bucket. |
| Data Integration | <img>Batch synchronization | ods_user_info_d_2oss_spark |
Syncs basic user information from MySQL to the private OSS bucket. |
| EMR | <img>EMR Spark SQL | ods_raw_log_d_spark |
Creates the ods_raw_log_d_spark external table to access the synchronized logs in OSS. |
| EMR | <img>EMR Spark SQL | ods_user_info_d_spark |
Creates the ods_user_info_d_spark external table to access the synchronized user information in OSS. |
Configure the scheduling logic
The workshop_start_spark zero load node triggers the entire workflow at 00:30 every day. All other nodes depend on this node, so you only need to configure the scheduling properties for the zero load node.
| Configuration item | Screenshot | Description |
|---|---|---|
| Scheduling time | <img> | Set to 00:30. The node triggers the workflow daily at this time. |
| Scheduling dependencies | <img> | Configure the node to depend on the root node of the workspace, which triggers workshop_start_spark to run. |
For more information about scheduling, see Configure scheduling time for nodes in a workflow in different scenarios and Overview.
Step 2: Configure data synchronization tasks
Double-click each node on the workflow canvas to open its configuration tab.
Sync website access logs from HttpFile to OSS
Configure the ods_raw_log_d_2oss_spark node to move the user_log.txt file from HttpFile to your private OSS bucket.
The user_log.txt file contains website access logs where each record occupies one row in the following format:
$remote_addr - $remote_user [$time_local] "$request" $status $body_bytes_sent"$http_referer" "$http_user_agent" [unknown_content];
-
On the DataStudio page, double-click the
ods_raw_log_d_2oss_sparknode. -
Set up the network connections between the resource group and the data sources, then click Next and complete the connectivity test.
Parameter Value Source Source: HttpFile; Data source name:user_behavior_analysis_httpfileResource group Select the serverless resource group you purchased Destination Destination: OSS; Data source name:test_g -
Configure the synchronization parameters.
The
${bizdate}variable in the object path is replaced at runtime by the scheduling parameter. This writes each day's logs to a separate date-based directory in OSS.Parameter Value Source — File path /user_log.txtSource — Text type textSource — Column delimiter |Source — Compression format NoneSource — Skip header NoDestination — Text type textDestination — Object name (path included) ods_raw_log_d/log_${bizdate}/log_${bizdate}.txtDestination — Column delimiter | -
On the Properties tab, configure the scheduling parameter so the daily data lands in its own directory.
Section Configuration Screenshot Scheduling parameter Click Add Parameter. Set Parameter name to bizdateand Parameter value to$[yyyymmdd-1]. See Configure scheduling parameters.<img> Dependencies Make sure the output name follows the workspacename.nodenameformat. See Configure scheduling dependencies.<img> -
Click the
icon to save the node.
Sync basic user information from MySQL to OSS
Configure the ods_user_info_d_2oss_spark node to move the ods_user_info_d table from MySQL to the private OSS bucket.
-
On the DataStudio page, double-click the
ods_user_info_d_2oss_sparknode. -
Set up the network connections between the resource group and the data sources, then click Next and complete the connectivity test.
Parameter Value Source Source: MySQL; Data source name:user_behavior_analysis_mysqlResource group Select the serverless resource group you purchased Destination Destination: OSS; Data source name:test_g -
Configure the synchronization parameters.
Parameter Value Source — Table ods_user_info_dSource — Split key uid(primary key; must be an INTEGER-type column)Destination — Text type textDestination — Object name (path included) ods_user_info_d/user_${bizdate}/user_${bizdate}.txtDestination — Column delimiter | -
On the Properties tab, configure the scheduling parameter.
Section Configuration Screenshot Scheduling parameter Click Add Parameter. Set Parameter name to bizdateand Parameter value to$[yyyymmdd-1]. See Configure scheduling parameters.<img> Dependencies Make sure the output name follows the workspacename.nodenameformat. See Configure scheduling dependencies.<img> -
Click the
icon to save the node.
Create external tables using Spark SQL to access OSS data
After the batch synchronization nodes write data to OSS, use EMR Spark SQL nodes to create external tables. Each external table uses LOCATION to point to the OSS path, so Spark can query the data directly without importing it.
Configure the ods_raw_log_d_spark node
-
On the DataStudio page, double-click the
ods_raw_log_d_sparknode and enter the following code.Replace
dw-emr-demowith the domain name of the OSS bucket you created when preparing the environment. The LOCATION path must match the Object name (path included) value set in theods_raw_log_d_2oss_sparkbatch synchronization node.-- Creates an external table for the website access logs synchronized to OSS. -- The LOCATION points to the date-partitioned directory written by the batch sync task. -- The ${bizdate} variable is replaced at runtime by the scheduling parameter. CREATE EXTERNAL TABLE IF NOT EXISTS ods_raw_log_d_spark ( `col` STRING ) PARTITIONED BY ( dt STRING ) LOCATION 'oss://dw-emr-demo/ods_raw_log_d/log_${bizdate}/'; ALTER TABLE ods_raw_log_d_spark ADD IF NOT EXISTS PARTITION (dt = '${bizdate}') LOCATION 'oss://dw-emr-demo/ods_raw_log_d/log_${bizdate}/' ; -
On the Properties tab, configure the scheduling parameter so the node reads the correct date-based OSS object.
Section Configuration Screenshot Scheduling parameter Click Add Parameter. Set Parameter name to bizdateand Parameter value to$[yyyymmdd-1]. See Configure scheduling parameters.<img> Dependencies Make sure the output name follows the workspacename.nodenameformat. See Configure scheduling dependencies.<img> -
Click the
icon to save the node.
Configure the ods_user_info_d_spark node
-
On the DataStudio page, double-click the
ods_user_info_d_sparknode and enter the following code.Replace
dw-emr-demowith the domain name of the OSS bucket you created when preparing the environment. The LOCATION path must match the Object name (path included) value set in theods_user_info_d_2oss_sparkbatch synchronization node.-- Creates an external table for the basic user information synchronized to OSS. -- The LOCATION points to the date-partitioned directory written by the batch sync task. -- The ${bizdate} variable is replaced at runtime by the scheduling parameter. CREATE EXTERNAL TABLE IF NOT EXISTS ods_user_info_d_spark ( `uid` STRING COMMENT 'The user ID' ,`gender` STRING COMMENT 'The gender' ,`age_range` STRING COMMENT 'The age range' ,`zodiac` STRING COMMENT 'The zodiac sign' ) PARTITIONED BY ( dt STRING ) ROW FORMAT DELIMITED FIELDS TERMINATED BY'|' STORED AS TEXTFILE LOCATION 'oss://dw-emr-demo/ods_user_info_d/user_${bizdate}/' ; ALTER TABLE ods_user_info_d_spark ADD IF NOT EXISTS PARTITION (dt = '${bizdate}') LOCATION'oss://dw-emr-demo/ods_user_info_d/user_${bizdate}/' ; -
On the Properties tab, configure the scheduling parameter.
Section Configuration Screenshot Scheduling parameter Click Add Parameter. Set Parameter name to bizdateand Parameter value to$[yyyymmdd-1]. See Configure scheduling parameters.<img> Dependencies Make sure the output name follows the workspacename.nodenameformat. See Configure scheduling dependencies.<img> -
Click the
icon to save the node.
Step 3: Verify the synchronized data
After all nodes run successfully, use an ad hoc query to confirm that the external tables were created correctly.
-
In the left-side navigation pane of the DataStudio page, click Ad Hoc Query.
-
In the Ad Hoc Query pane, create an ad hoc query task of the EMR Spark SQL type.
-
Enter the following SQL to query both tables.
-- Replace 'Data timestamp' with the actual bizdate value for your run. -- For example, if the task ran on August 8, 2024, use 20240807. SELECT * FROM ods_raw_log_d_spark WHERE dt = 'Data timestamp'; SELECT * FROM ods_user_info_d_spark WHERE dt = 'Data timestamp'; -
Click the
(Run with parameters) icon, assign a value to ${bizdate}, and run the query.
What's next
Data synchronization is complete. Continue to the next tutorial to learn how to process the synchronized data using Spark. For more information, see Process data.