This tutorial walks you through building a three-stage Spark SQL pipeline in DataWorks that transforms raw user and log data from OSS into a user profile table. By the end, you will know how to:
-
Set up a pipeline DAG with three dependent EMR Spark SQL nodes
-
Write and run Spark SQL to parse, join, and aggregate data across ODS, DWD, DWS, and ADS layers
About the dataset
The pipeline processes two source tables loaded in the previous Synchronize data step:
-
ods_user_info_d_spark: User attribute data (gender, age range, zodiac sign), keyed byuid -
ods_raw_log_d_spark: Raw web server logs, with all fields concatenated into a single column using##@@as a delimiter
The goal is to combine these two sources into a single user profile table that shows each user's page views and device usage.
Prerequisites
Before you begin, ensure that you have:
-
Completed Synchronize data, which loads data into the
ods_user_info_d_sparkandods_raw_log_d_sparktables -
Access to the
User_profile_analysis_Sparkworkflow in DataWorks Data Studio
How the pipeline works
The pipeline processes data through three warehouse layers, each handled by an EMR Spark SQL node:
| Node | Source table(s) | Output table | What it does |
|---|---|---|---|
dwd_log_info_di_spark |
ods_raw_log_d_spark |
dwd_log_info_di_spark |
Splits raw log fields using ##@@ as a delimiter, then derives method, url, protocol, device, and identity |
dws_user_info_all_di_spark |
dwd_log_info_di_spark + ods_user_info_d_spark |
dws_user_info_all_di_spark |
Joins the fact log table with the user info table on uid to create an aggregate user log table |
ads_user_info_1d_spark |
dws_user_info_all_di_spark |
ads_user_info_1d_spark |
Groups by uid and aggregates to produce the final user profile |
All three nodes write to OSS-backed external tables partitioned by dt, using the ${bizdate} scheduling parameter to target the correct daily partition.
Step 1: Set up the data processing pipeline
Add three EMR Spark SQL nodes to the User_profile_analysis_Spark workflow and configure their scheduling dependencies.
-
Log on to the DataWorks console and go to Data Studio. In the Workspace Directories section of the DATA STUDIO pane, find and click
User_profile_analysis_Sparkto open the workflow. -
Drag EMR SPARK SQL from the EMR section of the configuration tab onto the canvas. In the Create Node dialog box, set the Name to
dwd_log_info_di_spark. -
Repeat step 2 to create two more nodes:
dws_user_info_all_di_sparkandads_user_info_1d_spark. -
Draw dependency lines on the canvas to connect the nodes in this order: When the lines are drawn correctly, the DAG shows:
dwd_log_info_di_spark→dws_user_info_all_di_spark→ads_user_info_1d_spark.This tutorial configures scheduling dependencies by drawing lines. To let the system detect dependencies automatically, see Use the automatic parsing feature.
Step 2: Configure the EMR Spark SQL nodes
Configure each node with its SQL code and debugging settings. The debugging settings are used when you test the workflow in Step 3. For a full walkthrough of the debugging workflow, see Debug a workflow.
Parse raw logs: dwd_log_info_di_spark
This node reads each row from ods_raw_log_d_spark, where all log fields are stored as a single string delimited by ##@@. The SQL uses SPLIT() to extract individual fields (ip, uid, tm, request, status, bytes, referer, agent), then applies regexp_extract and CASE/RLIKE expressions to derive method, url, protocol, device, and identity. The result is written to the DWD-layer table dwd_log_info_di_spark.
-
On the canvas, hover over
dwd_log_info_di_sparkand click Open Node. -
Copy the following SQL and paste it into the code editor:
Replace
dw-spark-demoin theLOCATIONclause with the domain name of the OSS bucket you created when preparing environments.-- Parses raw log rows from ods_raw_log_d_spark (fields separated by ##@@) -- and writes structured records to dwd_log_info_di_spark. -- ${bizdate} is a scheduling parameter in yyyymmdd format supplied at runtime. CREATE TABLE IF NOT EXISTS dwd_log_info_di_spark ( ip STRING COMMENT 'The IP address', uid STRING COMMENT 'The user ID', tm STRING COMMENT 'The time in the yyyymmddhh:mi:ss format', status STRING COMMENT 'The status code that is returned by the server', bytes STRING COMMENT 'The number of bytes that are returned to the client', method STRING COMMENT'The request method', url STRING COMMENT 'url', protocol STRING COMMENT 'The protocol', referer STRING, device STRING, identity STRING ) PARTITIONED BY (dt STRING) LOCATION 'oss://dw-spark-demo.oss-cn-shanghai-internal.aliyuncs.com/dwd_log_info_di_spark/log_${bizdate}/'; ALTER TABLE dwd_log_info_di_spark ADD IF NOT EXISTS PARTITION (dt = '${bizdate}'); INSERT OVERWRITE TABLE dwd_log_info_di_spark PARTITION (dt = '${bizdate}') SELECT ip, uid, tm, status, bytes, regexp_extract(request, '(^[^ ]+) .*', 1) AS method, regexp_extract(request, '^[^ ]+ (.*) [^ ]+$', 1) AS url, regexp_extract(request, '.* ([^ ]+$)', 1) AS protocol, regexp_extract(referer, '^[^/]+://([^/]+){1}', 1) AS referer, CASE WHEN lower(agent) RLIKE 'android' THEN 'android' WHEN lower(agent) RLIKE 'iphone' THEN 'iphone' WHEN lower(agent) RLIKE 'ipad' THEN 'ipad' WHEN lower(agent) RLIKE 'macintosh' THEN 'macintosh' WHEN lower(agent) RLIKE 'windows phone' THEN 'windows_phone' WHEN lower(agent) RLIKE 'windows' THEN 'windows_pc' ELSE 'unknown' END AS device, CASE WHEN lower(agent) RLIKE '(bot|spider|crawler|slurp)' THEN 'crawler' WHEN lower(agent) RLIKE 'feed' OR regexp_extract(request, '^[^ ]+ (.*) [^ ]+$', 1) RLIKE 'feed' THEN 'feed' WHEN lower(agent) NOT RLIKE '(bot|spider|crawler|feed|slurp)' AND agent RLIKE '^(Mozilla|Opera)' AND regexp_extract(request, '^[^ ]+ (.*) [^ ]+$', 1) NOT RLIKE 'feed' THEN 'user' ELSE 'unknown' END AS identity FROM ( SELECT SPLIT(col, '##@@')[0] AS ip, SPLIT(col, '##@@')[1] AS uid, SPLIT(col, '##@@')[2] AS tm, SPLIT(col, '##@@')[3] AS request, SPLIT(col, '##@@')[4] AS status, SPLIT(col, '##@@')[5] AS bytes, SPLIT(col, '##@@')[6] AS referer, SPLIT(col, '##@@')[7] AS agent FROM ods_raw_log_d_spark WHERE dt = '${bizdate}' ) a; -
In the right-side navigation pane, click Debugging Configurations and set the following parameters:
Parameter Description Computing Resource Select the Spark computing resource associated with your workspace when you prepared environments Resource Group Select the serverless resource group associated with your workspace when you prepared environments Script Parameters In the Parameter Value column for bizdate, enter a date inyyyymmddformat. Example:bizdate=20250223 -
(Optional) Configure scheduling properties. Default scheduling property values are sufficient for this tutorial. To adjust them, click Properties in the right-side navigation pane. For details on all Properties tab parameters, see Node scheduling.
-
Scheduling Parameters: Scheduling parameters are already configured at the workflow level. Inner nodes inherit these values automatically — no separate configuration is needed.
-
Scheduling Policies: Skip the Time for Delayed Execution parameter for this tutorial.
-
-
Click Save.
Join user attributes with log facts: dws_user_info_all_di_spark
This node joins the parsed log table dwd_log_info_di_spark with the user attribute table ods_user_info_d_spark on the uid field using a LEFT OUTER JOIN. The result — which combines device and request data with user demographics — is written to the DWS-layer table dws_user_info_all_di_spark.
-
On the canvas, hover over
dws_user_info_all_di_sparkand click Open Node. -
Copy the following SQL and paste it into the code editor:
Replace
dw-spark-demoin theLOCATIONclause with the domain name of the OSS bucket you created when preparing environments.-- Joins dwd_log_info_di_spark (log facts) with ods_user_info_d_spark (user attributes) -- on uid and writes the result to dws_user_info_all_di_spark. CREATE TABLE IF NOT EXISTS dws_user_info_all_di_spark ( uid STRING COMMENT 'The user ID', gender STRING COMMENT 'The gender', age_range STRING COMMENT 'The age range', zodiac STRING COMMENT 'The zodiac sign', device STRING COMMENT 'The terminal type', method STRING COMMENT 'The HTTP request type', url STRING COMMENT 'url', `time` STRING COMMENT 'The time in the yyyymmddhh:mi:ss format' ) PARTITIONED BY (dt STRING) LOCATION 'oss://dw-spark-demo.oss-cn-shanghai-internal.aliyuncs.com/dws_user_info_all_di_spark/log_${bizdate}/'; -- Add a partition. ALTER TABLE dws_user_info_all_di_spark ADD IF NOT EXISTS PARTITION (dt = '${bizdate}'); -- Insert data from the basic user information table and the fact log table. INSERT OVERWRITE TABLE dws_user_info_all_di_spark PARTITION (dt = '${bizdate}') SELECT COALESCE(a.uid, b.uid) AS uid, b.gender AS gender, b.age_range AS age_range, b.zodiac AS zodiac, a.device AS device, a.method AS method, a.url AS url, a.tm FROM ( SELECT * FROM dwd_log_info_di_spark WHERE dt='${bizdate}' ) a LEFT OUTER JOIN ( SELECT * FROM ods_user_info_d_spark WHERE dt='${bizdate}' ) b ON a.uid = b.uid; -
In the right-side navigation pane, click Debugging Configurations and set the same parameters as in the previous node:
Parameter Description Computing Resource Select the Spark computing resource associated with your workspace Resource Group Select the serverless resource group associated with your workspace Script Parameters bizdate=20250223(or your target date inyyyymmddformat) -
(Optional) Configure scheduling properties as described for the previous node.
-
Click Save.
Aggregate into a user profile: ads_user_info_1d_spark
This node reads dws_user_info_all_di_spark, groups by uid, and collapses each user's log records into a single profile row. The aggregation computes page views (COUNT) and picks representative values for device type, gender, age range, and zodiac sign (MAX). The result is the ADS-layer table ads_user_info_1d_spark.
-
On the canvas, hover over
ads_user_info_1d_sparkand click Open Node. -
Copy the following SQL and paste it into the code editor:
Replace
dw-spark-demoin theLOCATIONclause with the domain name of the OSS bucket you created when preparing environments.-- Aggregates dws_user_info_all_di_spark by uid to produce one user profile row per user. -- pv = page views (COUNT of log rows per uid). CREATE TABLE IF NOT EXISTS ads_user_info_1d_spark ( uid STRING COMMENT 'The user ID', device STRING COMMENT 'The terminal type', pv BIGINT COMMENT 'pv', gender STRING COMMENT 'The gender', age_range STRING COMMENT 'The age range', zodiac STRING COMMENT 'The zodiac sign' ) PARTITIONED BY ( dt STRING ) LOCATION 'oss://dw-spark-demo.oss-cn-shanghai-internal.aliyuncs.com/ads_user_info_1d_spark/log_${bizdate}/'; ALTER TABLE ads_user_info_1d_spark ADD IF NOT EXISTS PARTITION (dt='${bizdate}'); INSERT OVERWRITE TABLE ads_user_info_1d_spark PARTITION (dt='${bizdate}') SELECT uid , MAX(device) , COUNT(0) AS pv , MAX(gender) , MAX(age_range) , MAX(zodiac) FROM dws_user_info_all_di_spark WHERE dt = '${bizdate}' GROUP BY uid; -
In the right-side navigation pane, click Debugging Configurations and set the same parameters as in the previous nodes.
-
(Optional) Configure scheduling properties as described for the previous node.
-
Click Save.
Step 3: Run and verify the pipeline
Run the workflow in debug mode, then query the output to confirm the data was processed correctly.
Run the workflow
-
In the top toolbar of the workflow configuration tab, click Run.
-
In the Enter runtime parameters dialog box, enter
20250223as thebizdatevalue (or your target date), then click OK. Data Studio replaces all${bizdate}variables in the node code with this value before executing. All three nodes run in the dependency order shown in the DAG.
Query the output
After all nodes complete, run a spot-check query to verify the DWD table was populated correctly.
-
In the DataWorks console, choose Data Analysis and Service > DataAnalysis in the left-side navigation pane, then click Go to DataAnalysis.
-
In the left-side navigation pane, click SQL Query.
-
Click the
icon next to My Files and select Create File. Enter a file name and click OK. -
In the left-side navigation tree, click the file you just created to open it.
-
In the upper-right corner, click the
icon and configure the data source:Parameter Value Workspace Select the workspace that contains User_profile_analysis_SparkData Source Type EMR Spark SQLData Source Name Select the EMR Serverless Spark computing resource associated with your workspace -
Click OK.
-
Run the following query to check whether the DWD table was populated:
-- Replace 'The data timestamp' with the actual bizdate value you used (for example, 20250222). -- The data timestamp is one day earlier than the scheduling date: -- if the node runs on February 23, 2025, the data timestamp is 20250222. SELECT * FROM dwd_log_info_di_spark WHERE dt ='The data timestamp';If the query returns rows with parsed fields (
ip,uid,method,device,identity, and so on), the pipeline ran successfully.
Step 5: Run the workflow in production
After deploying the nodes, run a backfill to verify the workflow in the production environment.
After you deploy the nodes, the first scheduled instance runs the following day. Use backfill to trigger a run immediately and confirm the workflow behaves as expected. For details, see Backfill data and view data backfill instances (new version).
-
Click Operation Center in the top navigation bar of the Data Studio page. Alternatively, click the
icon in the upper-left corner of the DataWorks console and choose All Products > Data Development And Task Operation > Operation Center. -
In the left-side navigation pane, choose Auto Triggered Node O&M > Auto Triggered Nodes.
-
Find the zero load node
workshop_start_sparkand click its name. -
In the directed acyclic graph (DAG) view, right-click
workshop_start_sparkand choose Run > Current and Descendant Nodes Retroactively. -
In the Backfill Data panel, select the nodes to backfill, set the Data Timestamp, and click Submit and Redirect.
-
On the Data Backfill page, click Refresh to check the run status. When
workshop_start_sparkand all descendant nodes show a Success status, the workflow is running correctly in production.
Clean up
To avoid ongoing charges after completing this tutorial, do one of the following:
-
Configure the Effective Period for all nodes in the workflow to limit the time window during which scheduled instances run.
-
Freeze the zero load node
workshop_start_sparkto stop all downstream nodes from running on schedule.
What's next
-
Display data in a visualized manner: Use DataAnalysis to build charts from the user profile data in
ads_user_info_1d_spark. -
Monitor data quality: Set up monitoring rules on the output tables to catch dirty data before it propagates downstream.
-
Manage data: View generated tables in Data Map and trace relationships through data lineage.
-
Use DataService Studio APIs to provide services: Expose the processed data through standardized APIs for downstream consumers.