Use Spark SQL to clean, join, and aggregate raw data synchronized from OSS, and produce a user profile table you can query downstream. By the end of this tutorial, you will know how to:
Design a three-node workflow that implements ODS (Operational Data Store) → DWD (Data Warehouse Detail) → DWS (Data Warehouse Summary) → ADS (Application Data Store) data warehouse layers.
Configure each EMR Spark SQL node with the correct SQL logic and scheduling parameters.
Verify each intermediate table after it is generated.
Commit and deploy the workflow, then trigger a backfill run to validate the full pipeline in the production environment.
Prerequisites
Before you begin, ensure that you have:
Synchronized user information and website access logs to a private OSS bucket. For details, see Synchronize data.
Two external tables already created in DataWorks:
ods_user_info_d_spark— reads basic user information from OSS.ods_raw_log_d_spark— reads website access logs from OSS.
Usage notes
EMR Serverless Spark workspaces do not support function registration. As a result, you cannot register custom functions to parse log fields or convert IP addresses to regions. This tutorial uses built-in Spark SQL functions instead — specifically, SPLIT and regexp_extract — to parse the ods_raw_log_d_spark table.
Background
The tables in this tutorial follow a standard data warehouse layering pattern:
| Layer | Table | Purpose |
|---|---|---|
| ODS (raw) | ods_raw_log_d_spark, ods_user_info_d_spark | External tables pointing to raw data in OSS |
| DWD (cleaned) | dwd_log_info_di_spark | Split raw log fields into structured columns |
| DWS (aggregated) | dws_user_info_all_di_spark | Join log and user info on uid |
| ADS (serving) | ads_user_info_1d_spark | Aggregate per user to produce the final user profile |
Step 1: Design the workflow
In DataWorks, create a workflow with three EMR Spark SQL nodes connected in sequence. Each node corresponds to one data warehouse layer.
Go to the DataStudio page. Log on to the DataWorks console. In the top navigation bar, select the desired region. In the left-side navigation pane, choose Data Development and O&M > Data Development. On the page that appears, select the desired workspace from the drop-down list and click Go to Data Development.
In the upper part of the workflow canvas, click Create Node to create the following three nodes.
Node category Node type Node name Code logic EMR EMR Spark SQL dwd_log_info_di_sparkSplits the ods_raw_log_d_sparktable into structured fields to produce a clean log table.EMR EMR Spark SQL dws_user_infor_all_di_sparkJoins the clean log table and the user information table on uidto produce an aggregate table.EMR EMR Spark SQL ads_user_info_1d_sparkAggregates the dws_user_info_all_di_sparktable by user to produce the final user profile table.Drag the three nodes onto the workflow canvas and draw dependency lines between them to set the execution order:
dwd_log_info_di_spark→dws_user_infor_all_di_spark→ads_user_info_1d_spark.
Step 2: Configure EMR Spark SQL nodes
Configure the SQL code, scheduling parameters, and optional Spark system parameters for each node.
Configure the dwd_log_info_di_spark node
This node reads raw log records from ods_raw_log_d_spark, uses SPLIT to parse each row by the ##@@ delimiter, and applies regexp_extract to break the request field into method, url, and protocol columns. The result is written to a new partitioned table, dwd_log_info_di_spark.
1. Configure the node code
Double-click the dwd_log_info_di_spark node to open its configuration tab, and enter the following SQL:
-- Splits ods_raw_log_d_spark using ##@@ as a delimiter
-- and writes the structured fields to dwd_log_info_di_spark.
-- ${bizdate} is a scheduling parameter replaced at runtime with the data timestamp (T-1).
CREATE TABLE IF NOT EXISTS dwd_log_info_di_spark (
ip STRING COMMENT 'The IP address',
uid STRING COMMENT 'The user ID',
tm STRING COMMENT 'The time in the yyyymmddhh:mi:ss format',
status STRING COMMENT 'The status code returned by the server',
bytes STRING COMMENT 'The number of bytes returned to the client',
method STRING COMMENT 'The request method',
url STRING COMMENT 'url',
protocol STRING COMMENT 'The protocol',
referer STRING,
device STRING,
identity STRING
)
PARTITIONED BY (
dt STRING
);
ALTER TABLE dwd_log_info_di_spark ADD IF NOT EXISTS PARTITION (dt = '${bizdate}');
INSERT OVERWRITE TABLE dwd_log_info_di_spark PARTITION (dt='${bizdate}')
SELECT ip,
uid,
tm,
status,
bytes,
regexp_extract(request, '(^[^ ]+) .*', 1) AS method,
regexp_extract(request, '^[^ ]+ (.*) [^ ]+$', 1) AS url,
regexp_extract(request, '.* ([^ ]+$)', 1) AS protocol,
regexp_extract(referer, '^[^/]+://([^/]+){1}', 1) AS referer,
CASE
WHEN lower(agent) RLIKE 'android' THEN 'android'
WHEN lower(agent) RLIKE 'iphone' THEN 'iphone'
WHEN lower(agent) RLIKE 'ipad' THEN 'ipad'
WHEN lower(agent) RLIKE 'macintosh' THEN 'macintosh'
WHEN lower(agent) RLIKE 'windows phone' THEN 'windows_phone'
WHEN lower(agent) RLIKE 'windows' THEN 'windows_pc'
ELSE 'unknown'
END AS device,
CASE
WHEN lower(agent) RLIKE '(bot|spider|crawler|slurp)' THEN 'crawler'
WHEN lower(agent) RLIKE 'feed' OR regexp_extract(request, '^[^ ]+ (.*) [^ ]+$', 1) RLIKE 'feed' THEN 'feed'
WHEN lower(agent) NOT RLIKE '(bot|spider|crawler|feed|slurp)' AND agent RLIKE '^(Mozilla|Opera)' AND regexp_extract(request, '^[^ ]+ (.*) [^ ]+$', 1) NOT RLIKE 'feed' THEN 'user'
ELSE 'unknown'
END AS identity
FROM (
SELECT
SPLIT(col, '##@@')[0] AS ip,
SPLIT(col, '##@@')[1] AS uid,
SPLIT(col, '##@@')[2] AS tm,
SPLIT(col, '##@@')[3] AS request,
SPLIT(col, '##@@')[4] AS status,
SPLIT(col, '##@@')[5] AS bytes,
SPLIT(col, '##@@')[6] AS referer,
SPLIT(col, '##@@')[7] AS agent
FROM ods_raw_log_d_spark
WHERE dt = '${bizdate}'
) a;2. Configure scheduling properties
| Section | Setting |
|---|---|
| Scheduling parameter | Click Add Parameter. Set Parameter Name to bizdate and Parameter Value to $[yyyymmdd-1]. For details, see Configure scheduling parameters. |
| Dependencies | Confirm that the output name follows the workspacename.nodename format. For details, see Configure scheduling dependencies. |
Set Scheduling Cycle to Day. The scheduled run time is determined by the workshop_start_spark zero load node — all nodes in this workflow run after 00:30 daily.4. Save the configuration
Click the
icon in the top toolbar to save.
5. Verify the split result
After the ancestor nodes and this node run successfully, click Ad Hoc Query in the left-side navigation pane of the DataStudio page. Create an ad hoc query task of the EMR Spark SQL type and run the following query to confirm that the log fields are parsed correctly. Check that method, url, protocol, device, and identity contain expected values:
-- Replace '20240807' with the actual bizdate value for your run.
-- The data timestamp is one day earlier than the scheduling date.
SELECT ip, uid, method, url, device, identity
FROM dwd_log_info_di_spark
WHERE dt = '20240807'
LIMIT 20;In batch computing scenarios,bizdaterepresents the business date — the day on which the data was generated, not the day the node runs. For example, if the node runs on August 8, 2024, the data timestamp is20240807.
Configure the dws_user_info_all_di_spark node
This node reads from dwd_log_info_di_spark and ods_user_info_d_spark, performs a left join on uid to attach user attributes (gender, age range, zodiac sign) to each log record, and writes the result to dws_user_info_all_di_spark.
1. Configure the node code
Double-click the dws_user_info_all_di_spark node to open its configuration tab, and enter the following SQL:
-- Joins the clean log table and the user information table on uid,
-- then writes the merged data to dws_user_info_all_di_spark.
-- ${bizdate} is replaced at runtime with the data timestamp (T-1).
CREATE TABLE IF NOT EXISTS dws_user_info_all_di_spark (
uid STRING COMMENT 'The user ID',
gender STRING COMMENT 'The gender',
age_range STRING COMMENT 'The age range',
zodiac STRING COMMENT 'The zodiac sign',
device STRING COMMENT 'Terminal type',
method STRING COMMENT 'HTTP request type',
url STRING COMMENT 'url',
`time` STRING COMMENT 'The time in the yyyymmddhh:mi:ss format'
)
PARTITIONED BY (dt STRING);
-- Add a partition.
ALTER TABLE dws_user_info_all_di_spark ADD IF NOT EXISTS PARTITION (dt = '${bizdate}');
-- Insert data from the basic user information table and the new log table.
INSERT OVERWRITE TABLE dws_user_info_all_di_spark PARTITION (dt = '${bizdate}')
SELECT
COALESCE(a.uid, b.uid) AS uid,
b.gender AS gender,
b.age_range AS age_range,
b.zodiac AS zodiac,
a.device AS device,
a.method AS method,
a.url AS url,
a.tm
FROM
dwd_log_info_di_spark as a
LEFT OUTER JOIN
ods_user_info_d_spark as b
ON
a.uid = b.uid;2. Configure scheduling properties
| Section | Setting |
|---|---|
| Scheduling parameter | Click Add Parameter. Set Parameter Name to bizdate and Parameter Value to $[yyyymmdd-1]. For details, see Configure scheduling parameters. |
| Dependencies | Confirm that the output name follows the workspacename.nodename format. For details, see Configure scheduling dependencies. |
Set Scheduling Cycle to Day. The scheduled run time is determined by the workshop_start_spark zero load node.4. Save the configuration
Click the
icon in the top toolbar to save.
5. Verify the join result
After the ancestor nodes and this node run successfully, click Ad Hoc Query in the DataStudio page. Create an EMR Spark SQL ad hoc query task and run the following query. Confirm that gender, age_range, and zodiac are populated for rows where a matching uid exists in ods_user_info_d_spark, and that the total row count matches the row count in dwd_log_info_di_spark for the same partition:
-- Replace '20240807' with the actual bizdate value for your run.
SELECT uid, gender, age_range, zodiac, device, method
FROM dws_user_info_all_di_spark
WHERE dt = '20240807'
LIMIT 20;Because this is a left join, rows in the log table without a matchinguidin the user information table will haveNULLvalues forgender,age_range, andzodiac.
Configure the ads_user_info_1d_spark node
This node reads from dws_user_info_all_di_spark and groups records by uid, computing the page view count (pv) and selecting representative values for device, gender, age range, and zodiac sign. The result is written to ads_user_info_1d_spark as the final user profile table.
1. Configure the node code
Double-click the ads_user_info_1d_spark node to open its configuration tab, and enter the following SQL:
-- Aggregates dws_user_info_all_di_spark by uid,
-- computing pv (page views) and representative user attributes.
-- ${bizdate} is replaced at runtime with the data timestamp (T-1).
CREATE TABLE IF NOT EXISTS ads_user_info_1d_spark (
uid STRING COMMENT 'The user ID',
device STRING COMMENT 'The terminal type',
pv BIGINT COMMENT 'pv',
gender STRING COMMENT 'The gender',
age_range STRING COMMENT 'The age range',
zodiac STRING COMMENT 'The zodiac sign'
)
PARTITIONED BY (
dt STRING
);
ALTER TABLE ads_user_info_1d_spark ADD IF NOT EXISTS PARTITION (dt='${bizdate}');
INSERT OVERWRITE TABLE ads_user_info_1d_spark PARTITION (dt='${bizdate}')
SELECT uid
, MAX(device)
, COUNT(0) AS pv
, MAX(gender)
, MAX(age_range)
, MAX(zodiac)
FROM dws_user_info_all_di_spark
WHERE dt = '${bizdate}'
GROUP BY uid;2. Configure scheduling properties
| Section | Setting |
|---|---|
| Scheduling parameter | Click Add Parameter. Set Parameter Name to bizdate and Parameter Value to $[yyyymmdd-1]. For details, see Configure scheduling parameters. |
| Dependencies | Confirm that the output name follows the workspacename.nodename format. For details, see Configure scheduling dependencies. |
Set Scheduling Cycle to Day. The scheduled run time is determined by the workshop_start_spark zero load node.3. (Optional) Configure Spark system parameters
Configure the same Spark system parameters described in the dwd_log_info_di_spark node's step 3.
4. Save the configuration
Click the
icon in the top toolbar to save.
5. Verify the user profile table
After the ancestor nodes and this node run successfully, click Ad Hoc Query in the DataStudio page. Create an EMR Spark SQL ad hoc query task and run the following query. Confirm that each user has exactly one row, pv is greater than zero, and profile attributes such as gender and device reflect the expected distribution:
-- Replace '20240807' with the actual bizdate value for your run.
SELECT uid, device, pv, gender, age_range, zodiac
FROM ads_user_info_1d_spark
WHERE dt = '20240807'
ORDER BY pv DESC
LIMIT 20;pvequalsCOUNT(0)grouped byuid, representing the number of log records for that user on the data timestamp.
Step 3: Commit the workflow
After configuring all three nodes, run the workflow to confirm it works, then commit and deploy it.
On the workflow configuration tab, click the
icon to run the workflow.When the
icon appears next to all nodes, click the
icon to commit.In the Commit dialog box, select the nodes to commit, enter a description, select Ignore I/O Inconsistency Alerts, and click Confirm.
Deploy the committed workflow.
In the upper-right corner of the workflow configuration tab, click Deploy. The Create Deploy Task page appears.
Select the nodes to deploy and click Deploy. In the Create Deploy Task dialog box, click Deploy.
Step 4: Run the nodes in the production environment
After deploying the nodes, use the data backfill feature to trigger a run and confirm that all nodes execute successfully in the production environment. For details, see Backfill data and view data backfill instances (new version).
Click Operation Center in the upper-right corner of the workflow configuration tab.
In the left-side navigation pane, choose Auto Triggered Node O&M > Auto Triggered Nodes. Find and click the
workshop_start_sparkzero load node to open its DAG.Right-click the
workshop_start_sparknode and choose Run > Current and Descendent Nodes Retroactively.In the Backfill Data panel, select the nodes for which you want to backfill data, configure the Data Timestamp parameter, and click Submit and Redirect. The data backfill instances page appears.
Click Refresh until all SQL nodes show a successful status.
What's next
After the workflow runs successfully, the ads_user_info_1d_spark table contains one row per user with their page view count and profile attributes for the data timestamp. You can now connect a BI tool or downstream application to this table for reporting and analysis.