Use E-MapReduce (EMR) Spark SQL nodes to transform raw user data into a structured user profile table. This tutorial walks you through building a three-node pipeline — ODS to DWD to DWS to ADS — in a DataWorks workflow and deploying it to production.
By the end of this tutorial, you will know how to:
-
Add EMR Spark SQL nodes to a workflow and configure scheduling dependencies between them.
-
Write Spark SQL to split raw log data, join tables, and aggregate user profile data.
-
Configure debugging parameters and run the workflow to verify results.
-
Deploy the workflow to the production environment and trigger a backfill run.
Prerequisites
Before you begin, ensure that you have completed all steps in Synchronize data. That tutorial creates the User_profile_analysis_Spark workflow and provisions the following resources that this tutorial depends on:
-
The
ods_user_info_d_sparkandods_raw_log_d_sparkexternal tables in your workspace. -
An OSS bucket (domain:
dw-spark-demo.oss-cn-shanghai-internal.aliyuncs.com) to store intermediate and final output data. -
A Spark computing resource and a serverless resource group associated with your workspace.
-
Workflow-level scheduling parameters, including
bizdate(format:yyyymmdd).
Step 1: Add nodes and set up dependencies
The data flows through three EMR Spark SQL nodes in sequence:
| Node name | What it does |
|---|---|
dwd_log_info_di_spark |
Splits raw log records from ods_raw_log_d_spark using the ##@@ delimiter and extracts structured fields (method, URL, protocol, device type, visitor identity) into the dwd_log_info_di_spark table |
dws_user_info_all_di_spark |
Joins the cleaned log table with the user info table ods_user_info_d_spark on uid and writes the result to dws_user_info_all_di_spark |
ads_user_info_1d_spark |
Aggregates the joined table by uid to produce a daily user profile summary in ads_user_info_1d_spark |
-
Log in to the DataWorks console and go to Data Studio. In Workspace Directories, click the
User_profile_analysis_Sparkworkflow. -
From the EMR section, drag an EMR SPARK SQL node onto the canvas. In the Create Node dialog box, enter the node name. Repeat this step to create all three nodes listed above.
-
Draw lines on the canvas to set the scheduling dependencies:
dwd_log_info_di_spark→dws_user_info_all_di_spark→ads_user_info_1d_spark
Lines can be drawn manually (as in this tutorial) or detected automatically using the automatic parsing feature. For automatic parsing, see Use the automatic parsing feature.
Step 2: Configure the data processing nodes
Configure the dwd_log_info_di_spark node
This node splits raw log records from ods_raw_log_d_spark using the ##@@ delimiter and applies regexp_extract to derive structured fields. The output goes to the dwd_log_info_di_spark table, partitioned by dt.
-
On the canvas, hover over
dwd_log_info_di_sparkand click Open Node. -
Paste the following SQL into the code editor:
Replace
dw-spark-demoin theLOCATIONclause with the domain name of your OSS bucket, created when you prepare environments.Paimon table (DLF)
-- Scenario: This Spark SQL code uses Spark functions to split the `col` column of the ods_raw_log_d_spark table by "##@@". It then generates multiple fields and writes the results to a new table named dwd_log_info_di_spark. CREATE TABLE IF NOT EXISTS dwd_log_info_di_spark ( ip STRING COMMENT 'IP address', uid STRING COMMENT 'User ID', tm STRING COMMENT 'Time in yyyymmddhh:mi:ss format', status STRING COMMENT 'Status code returned by the server', bytes STRING COMMENT 'Number of bytes returned to the client', method STRING COMMENT'Request method', url STRING COMMENT 'URL', protocol STRING COMMENT 'Protocol', referer STRING, device STRING, identity STRING, dt STRING COMMENT 'Partition key' -- Best practice: Include the partition key as a column in the table. ) PARTITIONED BY (dt) TBLPROPERTIES ( 'format' = 'paimon' -- Core: Declares the table as a Paimon table. ); INSERT OVERWRITE TABLE dwd_log_info_di_spark PARTITION (dt = '${bizdate}') SELECT ip, uid, tm, status, bytes, regexp_extract(request, '(^[^ ]+) .*', 1) AS method, regexp_extract(request, '^[^ ]+ (.*) [^ ]+$', 1) AS url, regexp_extract(request, '.* ([^ ]+$)', 1) AS protocol, regexp_extract(referer, '^[^/]+://([^/]+){1}', 1) AS referer, CASE WHEN lower(agent) RLIKE 'android' THEN 'android' WHEN lower(agent) RLIKE 'iphone' THEN 'iphone' WHEN lower(agent) RLIKE 'ipad' THEN 'ipad' WHEN lower(agent) RLIKE 'macintosh' THEN 'macintosh' WHEN lower(agent) RLIKE 'windows phone' THEN 'windows_phone' WHEN lower(agent) RLIKE 'windows' THEN 'windows_pc' ELSE 'unknown' END AS device, CASE WHEN lower(agent) RLIKE '(bot|spider|crawler|slurp)' THEN 'crawler' WHEN lower(agent) RLIKE 'feed' OR regexp_extract(request, '^[^ ]+ (.*) [^ ]+$', 1) RLIKE 'feed' THEN 'feed' WHEN lower(agent) NOT RLIKE '(bot|spider|crawler|feed|slurp)' AND agent RLIKE '^(Mozilla|Opera)' AND regexp_extract(request, '^[^ ]+ (.*) [^ ]+$', 1) NOT RLIKE 'feed' THEN 'user' ELSE 'unknown' END AS identity FROM ( SELECT SPLIT(col, '##@@')[0] AS ip, SPLIT(col, '##@@')[1] AS uid, SPLIT(col, '##@@')[2] AS tm, SPLIT(col, '##@@')[3] AS request, SPLIT(col, '##@@')[4] AS status, SPLIT(col, '##@@')[5] AS bytes, SPLIT(col, '##@@')[6] AS referer, SPLIT(col, '##@@')[7] AS agent FROM ods_raw_log_d_spark WHERE dt = '${bizdate}' ) a;Hive table (DLF-Legacy)
-- Split raw log records from ods_raw_log_d_spark using ##@@ as the delimiter, -- then derive method, URL, protocol, device type, and visitor identity fields. -- ${bizdate} is a scheduling parameter injected at runtime. CREATE TABLE IF NOT EXISTS dwd_log_info_di_spark ( ip STRING COMMENT 'The IP address', uid STRING COMMENT 'The user ID', tm STRING COMMENT 'The time in the yyyymmddhh:mi:ss format', status STRING COMMENT 'The status code returned by the server', bytes STRING COMMENT 'The number of bytes returned to the client', method STRING COMMENT 'The request method', url STRING COMMENT 'url', protocol STRING COMMENT 'The protocol', referer STRING, device STRING, identity STRING ) PARTITIONED BY (dt STRING) LOCATION 'oss://dw-spark-demo.oss-cn-shanghai-internal.aliyuncs.com/dwd_log_info_di_spark/log_${bizdate}/'; ALTER TABLE dwd_log_info_di_spark ADD IF NOT EXISTS PARTITION (dt = '${bizdate}'); INSERT OVERWRITE TABLE dwd_log_info_di_spark PARTITION (dt = '${bizdate}') SELECT ip, uid, tm, status, bytes, regexp_extract(request, '(^[^ ]+) .*', 1) AS method, regexp_extract(request, '^[^ ]+ (.*) [^ ]+$', 1) AS url, regexp_extract(request, '.* ([^ ]+$)', 1) AS protocol, regexp_extract(referer, '^[^/]+://([^/]+){1}', 1) AS referer, CASE WHEN lower(agent) RLIKE 'android' THEN 'android' WHEN lower(agent) RLIKE 'iphone' THEN 'iphone' WHEN lower(agent) RLIKE 'ipad' THEN 'ipad' WHEN lower(agent) RLIKE 'macintosh' THEN 'macintosh' WHEN lower(agent) RLIKE 'windows phone' THEN 'windows_phone' WHEN lower(agent) RLIKE 'windows' THEN 'windows_pc' ELSE 'unknown' END AS device, CASE WHEN lower(agent) RLIKE '(bot|spider|crawler|slurp)' THEN 'crawler' WHEN lower(agent) RLIKE 'feed' OR regexp_extract(request, '^[^ ]+ (.*) [^ ]+$', 1) RLIKE 'feed' THEN 'feed' WHEN lower(agent) NOT RLIKE '(bot|spider|crawler|feed|slurp)' AND agent RLIKE '^(Mozilla|Opera)' AND regexp_extract(request, '^[^ ]+ (.*) [^ ]+$', 1) NOT RLIKE 'feed' THEN 'user' ELSE 'unknown' END AS identity FROM ( SELECT SPLIT(col, '##@@')[0] AS ip, SPLIT(col, '##@@')[1] AS uid, SPLIT(col, '##@@')[2] AS tm, SPLIT(col, '##@@')[3] AS request, SPLIT(col, '##@@')[4] AS status, SPLIT(col, '##@@')[5] AS bytes, SPLIT(col, '##@@')[6] AS referer, SPLIT(col, '##@@')[7] AS agent FROM ods_raw_log_d_spark WHERE dt = '${bizdate}' ) a; -
In the right-side navigation pane, click Debugging Configurations and set the following parameters. These are used when you run the workflow in Step 3.
Parameter Description Example Computing Resource The Spark computing resource associated with your workspace Select from the drop-down list Resource Group The serverless resource group associated with your workspace Select from the drop-down list Script Parameters ( bizdate)A date in yyyymmddformat; Data Studio substitutes it for${bizdate}at runtimebizdate=20250223 -
Click Save.
Scheduling parameters and policies are configured at the workflow level in this tutorial. You do not need to set them on individual nodes. To review node-level settings, click Properties in the right-side navigation pane. For details, see Node scheduling.
Configure the dws_user_info_all_di_spark node
This node joins the cleaned log table with the user info dimension table on uid and writes the result to dws_user_info_all_di_spark, partitioned by dt.
-
On the canvas, hover over
dws_user_info_all_di_sparkand click Open Node. -
Paste the following SQL into the code editor:
Replace
dw-spark-demoin theLOCATIONclause with your OSS bucket domain name.Paimon table (DLF)
-- Scenario: This Spark SQL code joins the dwd_log_info_di_spark and ods_user_info_d_spark tables on the uid field and writes the result to the corresponding dt partition. CREATE TABLE IF NOT EXISTS dws_user_info_all_di_spark ( uid STRING COMMENT 'User ID', gender STRING COMMENT 'Gender', age_range STRING COMMENT 'Age range', zodiac STRING COMMENT 'Zodiac sign', device STRING COMMENT 'Device type', method STRING COMMENT 'HTTP request type', url STRING COMMENT 'URL', `time` STRING COMMENT 'Time in yyyymmddhh:mi:ss format', dt STRING COMMENT 'Partition key' -- Best practice: Include the partition key as a column in the table. ) PARTITIONED BY (dt) TBLPROPERTIES ( 'format' = 'paimon' -- Core: Declares the table as a Paimon table. ); --Insert data from the user and log tables. INSERT OVERWRITE TABLE dws_user_info_all_di_spark PARTITION (dt = '${bizdate}') SELECT COALESCE(a.uid, b.uid) AS uid, b.gender AS gender, b.age_range AS age_range, b.zodiac AS zodiac, a.device AS device, a.method AS method, a.url AS url, a.tm FROM ( SELECT * FROM dwd_log_info_di_spark WHERE dt='${bizdate}' ) a LEFT OUTER JOIN ( SELECT * FROM ods_user_info_d_spark WHERE dt='${bizdate}' ) b ON a.uid = b.uid;Hive table (DLF-Legacy)
-- Join dwd_log_info_di_spark (log facts) and ods_user_info_d_spark (user dimensions) on uid. -- COALESCE handles cases where the uid exists in the log table but not in the user table. CREATE TABLE IF NOT EXISTS dws_user_info_all_di_spark ( uid STRING COMMENT 'The user ID', gender STRING COMMENT 'The gender', age_range STRING COMMENT 'The age range', zodiac STRING COMMENT 'The zodiac sign', device STRING COMMENT 'The terminal type', method STRING COMMENT 'The HTTP request type', url STRING COMMENT 'url', `time` STRING COMMENT 'The time in the yyyymmddhh:mi:ss format' ) PARTITIONED BY (dt STRING) LOCATION 'oss://dw-spark-demo.oss-cn-shanghai-internal.aliyuncs.com/dws_user_info_all_di_spark/log_${bizdate}/'; ALTER TABLE dws_user_info_all_di_spark ADD IF NOT EXISTS PARTITION (dt = '${bizdate}'); INSERT OVERWRITE TABLE dws_user_info_all_di_spark PARTITION (dt = '${bizdate}') SELECT COALESCE(a.uid, b.uid) AS uid, b.gender AS gender, b.age_range AS age_range, b.zodiac AS zodiac, a.device AS device, a.method AS method, a.url AS url, a.tm FROM ( SELECT * FROM dwd_log_info_di_spark WHERE dt='${bizdate}' ) a LEFT OUTER JOIN ( SELECT * FROM ods_user_info_d_spark WHERE dt='${bizdate}' ) b ON a.uid = b.uid; -
Configure Debugging Configurations with the same parameter values as the previous node.
-
Click Save.
Configure the ads_user_info_1d_spark node
This node aggregates the joined table by uid to produce the daily user profile summary in ads_user_info_1d_spark. Each row represents one user's daily page view count and profile attributes.
-
On the canvas, hover over
ads_user_info_1d_sparkand click Open Node. -
Paste the following SQL into the code editor:
Replace
dw-spark-demoin theLOCATIONclause with your OSS bucket domain name.Paimon table (DLF)
-- Scenario: This Spark SQL code uses Spark functions to further process the dws_user_info_all_di_spark table and writes the results to a new table named ads_user_info_1d_spark. CREATE TABLE IF NOT EXISTS ads_user_info_1d_spark ( uid STRING COMMENT 'User ID', device STRING COMMENT 'Device type', pv BIGINT COMMENT 'PV', gender STRING COMMENT 'Gender', age_range STRING COMMENT 'Age range', zodiac STRING COMMENT 'Zodiac sign', dt STRING COMMENT 'Partition key' -- Best practice: Include the partition key as a column in the table. ) PARTITIONED BY (dt) TBLPROPERTIES ( 'format' = 'paimon' -- Core: Declares the table as a Paimon table. ); INSERT OVERWRITE TABLE ads_user_info_1d_spark PARTITION (dt='${bizdate}') SELECT uid , MAX(device) , COUNT(0) AS pv , MAX(gender) , MAX(age_range) , MAX(zodiac) FROM dws_user_info_all_di_spark WHERE dt = '${bizdate}' GROUP BY uid;Hive table (DLF-Legacy)
-- Aggregate dws_user_info_all_di_spark by uid to produce the final user profile. -- COUNT(0) computes page views (pv); MAX() picks a representative value for each attribute. CREATE TABLE IF NOT EXISTS ads_user_info_1d_spark ( uid STRING COMMENT 'The user ID', device STRING COMMENT 'The terminal type', pv BIGINT COMMENT 'pv', gender STRING COMMENT 'The gender', age_range STRING COMMENT 'The age range', zodiac STRING COMMENT 'The zodiac sign' ) PARTITIONED BY ( dt STRING ) LOCATION 'oss://dw-spark-demo.oss-cn-shanghai-internal.aliyuncs.com/ads_user_info_1d_spark/log_${bizdate}/'; ALTER TABLE ads_user_info_1d_spark ADD IF NOT EXISTS PARTITION (dt='${bizdate}'); INSERT OVERWRITE TABLE ads_user_info_1d_spark PARTITION (dt='${bizdate}') SELECT uid , MAX(device) , COUNT(0) AS pv , MAX(gender) , MAX(age_range) , MAX(zodiac) FROM dws_user_info_all_di_spark WHERE dt = '${bizdate}' GROUP BY uid; -
Configure Debugging Configurations with the same parameter values as the previous nodes.
-
Click Save.
Step 3: Run and verify the workflow
-
In the top toolbar of the workflow configuration tab, click Run. In the Enter runtime parameters dialog box, enter
20250223(or a date of your choice) as thebizdatevalue and click OK. -
After all nodes complete, query the output to verify the results.
-
In the DataWorks console, go to Data Analysis and Service > DataAnalysis, then click Go to DataAnalysis. In the left-side navigation pane, click SQL Query.
-
Click the icon next to My Files and select Create File. Enter a file name and click OK.
-
Open the file, then click the settings icon in the upper-right corner and configure:
Parameter Value Workspace The workspace that contains the User_profile_analysis_SparkworkflowData Source Type EMR Spark SQLData Source Name The EMR Serverless Spark computing resource associated with your workspace -
Click OK, then run the following query. Replace
The data timestampwith thebizdatevalue you used — for example, if the node ran on February 23, 2025, the data timestamp is20250222(one day earlier than the scheduling date).SELECT * FROM dwd_log_info_di_spark WHERE dt = 'The data timestamp';
-
Step 4: Deploy the workflow
Nodes must be deployed to the production environment before they can run on a schedule.
Scheduling parameters were configured at the workflow level in the data synchronization tutorial. You do not need to configure them for individual nodes before deploying.
-
In the left-side navigation pane of Data Studio, click the Data Studio icon to return to the Data Studio page.
-
In Workspace Directories, find and open the
User_profile_analysis_Sparkworkflow. -
In the node toolbar, click Deploy.
-
Click Start Deployment to Production Environment and follow the guided steps to complete the deployment.
Step 5: Run the nodes in the production environment
After deployment, scheduled instances are generated starting the following day. Run a backfill to verify the workflow immediately.
-
Click Operation Center in the top navigation bar of the Data Studio page. Alternatively, click the products icon in the upper-left corner of the DataWorks console and choose All Products > Data Development And Task Operation > Operation Center.
-
In the left-side navigation pane, choose Auto Triggered Node O&M > Auto Triggered Nodes. Find the zero load node
workshop_start_sparkand click its name. -
In the directed acyclic graph (DAG) of the node, right-click
workshop_start_sparkand choose Run > Current and Descendant Nodes Retroactively. -
In the Backfill Data panel, select the nodes to backfill, set the Data Timestamp, and click Submit and Redirect.
-
On the Data Backfill page, click Refresh to check whether
workshop_start_sparkand its descendant nodes ran successfully.
To avoid unexpected charges after completing this tutorial, configure the Effective Period for all nodes in the workflow, or freeze the zero load node workshop_start_spark.
What's next
You now have a working Spark SQL pipeline that processes raw user data into a structured user profile table and runs on a daily schedule. From here, you can:
-
Visualize the results: Use DataAnalysis to build charts from the
ads_user_info_1d_sparktable and extract trends from the user profile data. -
Monitor data quality: Set up monitoring rules on the output tables to detect and intercept bad data before it propagates downstream.
-
Explore data lineage: View the generated tables in Data Map and trace relationships between tables using data lineage.
-
Expose data via APIs: Use DataService Studio to publish the final processed data as standardized APIs for downstream consumers.