Use EMR Spark SQL nodes in DataWorks to transform raw log data and user records into a user profile table. This tutorial covers three transformation stages — log splitting, table joining, and aggregation — then shows you how to commit and run the workflow in production.
By the end of this tutorial, you will know how to:
Prerequisites
Before you begin, make sure that you have:
-
Completed data synchronization. For details, see Synchronize data.
-
The
ods_user_info_d_sparkexternal table created based on an EMR Spark SQL node, which provides access to the basic user information synchronized to the private Object Storage Service (OSS) bucket. -
The
ods_raw_log_d_sparkexternal table created based on an EMR Spark SQL node, which provides access to the website access logs synchronized to the private OSS bucket.
Usage notes
EMR Serverless Spark workspaces do not support function registration. You cannot register custom functions to split log data or convert IP addresses to regions. In this tutorial, the ods_raw_log_d_spark table is split using a built-in Spark SQL function to generate the dwd_log_info_di_spark table.
How it works
The processing pipeline produces a user profile table through three stages:
-
Split
ods_raw_log_d_sparkusing a built-in Spark SQL function →dwd_log_info_di_spark -
Join
dwd_log_info_di_sparkandods_user_info_d_sparkon theuidfield →dws_user_info_all_di_spark -
Aggregate
dws_user_info_all_di_sparkusing MAX and COUNT →ads_user_info_1d_spark
The dws_user_info_all_di_spark table contains many fields and a large volume of data. Consuming it directly without further processing may take a long time, which is why the aggregation stage is required.
Step 1: Design a workflow
Design the node structure and dependency graph before writing any SQL code. This gives you a clear picture of the data flow before you configure each node.
-
Go to the DataStudio page. Log on to the DataWorks console. In the top navigation bar, select the desired region. In the left-side navigation pane, choose Data Development and O&M > Data Development. Select the desired workspace from the drop-down list and click Go to Data Development.
-
Create the three processing nodes. In the upper part of the workflow canvas, click Create Node to create the nodes described in the following table.
Node category Node type Node name Code logic EMR EMR Spark SQL dwd_log_info_di_sparkSplit ods_raw_log_d_sparkto generate a new log tableEMR EMR Spark SQL dws_user_info_all_di_sparkJoin the new log table and the basic user information table to generate an aggregate table EMR EMR Spark SQL ads_user_info_1d_sparkAggregate the joined table to generate a user profile table -
Configure the directed acyclic graph (DAG). Drag the nodes onto the workflow canvas and draw dependency lines between them to establish the execution order.

Step 2: Configure EMR Spark SQL nodes
Each of the three nodes requires the same configuration sequence: write the SQL code, set scheduling properties, optionally configure advanced Spark parameters, save, and verify the output.
All three nodes use thebizdatescheduling parameter. Set Parameter Name tobizdateand Parameter Value to$[yyyymmdd-1]. This assigns the previous day's date (T-1) as the data timestamp. In batch processing,bizdaterepresents the business date, which is one day before the node's scheduling time. For example, if a node runs on August 8, 2024, the data timestamp is20240807. For details, see Configure scheduling parameters.
Configure the dwd_log_info_di_spark node
This node splits the raw log table ods_raw_log_d_spark — where each row is a single delimited string — into structured fields, and writes the result to dwd_log_info_di_spark.
Step 1: Configure the node code
Double-click the dwd_log_info_di_spark node to open its configuration tab. Enter the following SQL:
-- Split the ods_raw_log_d_spark table into structured fields and write to dwd_log_info_di_spark.
-- The source table uses ##@@ as the field delimiter within a single column (col).
-- DataWorks scheduling parameter ${bizdate} is replaced at runtime with the business date (T-1).
CREATE TABLE IF NOT EXISTS dwd_log_info_di_spark (
ip STRING COMMENT 'The IP address',
uid STRING COMMENT 'The user ID',
tm STRING COMMENT 'The time in the yyyymmddhh:mi:ss format',
status STRING COMMENT 'The status code returned by the server',
bytes STRING COMMENT 'The number of bytes returned to the client',
method STRING COMMENT 'The request method',
url STRING COMMENT 'url',
protocol STRING COMMENT 'The protocol',
referer STRING,
device STRING,
identity STRING
)
PARTITIONED BY (dt STRING);
ALTER TABLE dwd_log_info_di_spark ADD IF NOT EXISTS PARTITION (dt = '${bizdate}');
INSERT OVERWRITE TABLE dwd_log_info_di_spark PARTITION (dt = '${bizdate}')
SELECT ip,
uid,
tm,
status,
bytes,
regexp_extract(request, '(^[^ ]+) .*', 1) AS method,
regexp_extract(request, '^[^ ]+ (.*) [^ ]+$', 1) AS url,
regexp_extract(request, '.* ([^ ]+$)', 1) AS protocol,
regexp_extract(referer, '^[^/]+://([^/]+){1}', 1) AS referer,
CASE
WHEN lower(agent) RLIKE 'android' THEN 'android'
WHEN lower(agent) RLIKE 'iphone' THEN 'iphone'
WHEN lower(agent) RLIKE 'ipad' THEN 'ipad'
WHEN lower(agent) RLIKE 'macintosh' THEN 'macintosh'
WHEN lower(agent) RLIKE 'windows phone' THEN 'windows_phone'
WHEN lower(agent) RLIKE 'windows' THEN 'windows_pc'
ELSE 'unknown'
END AS device,
CASE
WHEN lower(agent) RLIKE '(bot|spider|crawler|slurp)' THEN 'crawler'
WHEN lower(agent) RLIKE 'feed'
OR regexp_extract(request, '^[^ ]+ (.*) [^ ]+$', 1) RLIKE 'feed' THEN 'feed'
WHEN lower(agent) NOT RLIKE '(bot|spider|crawler|feed|slurp)'
AND agent RLIKE '^(Mozilla|Opera)'
AND regexp_extract(request, '^[^ ]+ (.*) [^ ]+$', 1) NOT RLIKE 'feed' THEN 'user'
ELSE 'unknown'
END AS identity
FROM (
SELECT
SPLIT(col, '##@@')[0] AS ip,
SPLIT(col, '##@@')[1] AS uid,
SPLIT(col, '##@@')[2] AS tm,
SPLIT(col, '##@@')[3] AS request,
SPLIT(col, '##@@')[4] AS status,
SPLIT(col, '##@@')[5] AS bytes,
SPLIT(col, '##@@')[6] AS referer,
SPLIT(col, '##@@')[7] AS agent
FROM ods_raw_log_d_spark
WHERE dt = '${bizdate}'
) a;
Step 2: Configure scheduling properties
On the Properties tab of the node configuration page:
-
In the Scheduling Parameter section, click Add Parameter, set Parameter Name to
bizdate, and set Parameter Value to$[yyyymmdd-1]. -
In the Dependencies section, confirm that the output name follows the
workspacename.nodenameformat. For details, see Configure scheduling dependencies.
In the Schedule section, set Scheduling Cycle to Day. The scheduled run time is inherited from the workshop_start_spark zero load node, which acts as the trigger for all downstream nodes. This node runs after 00:30 every day without additional time configuration.
Step 3: (Optional) Configure advanced Spark parameters
On the Advanced Settings tab in the right-side navigation pane, configure the following Spark system parameters as needed.
| Parameter | Description | Example |
|---|---|---|
SERVERLESS_RELEASE_VERSION |
Changes the Spark engine version | "esr-2.1 (Spark 3.3.1, Scala 2.12, Java Runtime)" |
SERVERLESS_QUEUE_NAME |
Changes the resource queue | "dev_queue" |
SERVERLESS_SQL_COMPUTE |
Modifies the SQL compute configuration | "sc-b4356b0af6039727" |
FLOW_SKIP_SQL_ANALYZE |
Controls SQL execution mode: true runs multiple statements at a time; false runs one statement at a time. Available only in the development environment. |
true |
To add a custom Spark parameter, enter it on the Advanced Settings tab. For example, specifying spark.eventLog.enabled: false causes DataWorks to deliver it to the EMR Serverless Spark workspace as --conf spark.eventLog.enabled=false. To configure parameters globally, see Configure global Spark parameters. For all available parameters, see Spark Configuration.
Step 4: Save
Click the save icon in the top toolbar to save the node configuration.
Step 5: Verify the output
After the ancestor nodes and this node run successfully, verify that the split log table was created correctly.
In the left-side navigation pane of DataStudio, click Ad Hoc Query. Create an ad hoc query task of the EMR Spark SQL type and run the following query, replacing <data_timestamp> with the actual data timestamp (T-1):
SELECT * FROM dwd_log_info_di_spark WHERE dt = '<data_timestamp>';
Configure the dws_user_info_all_di_spark node
This node joins the structured log table dwd_log_info_di_spark with the user information table ods_user_info_d_spark on the uid field, producing the aggregate table dws_user_info_all_di_spark.
Step 1: Configure the node code
Double-click the dws_user_info_all_di_spark node to open its configuration tab. Enter the following SQL:
-- Join dwd_log_info_di_spark and ods_user_info_d_spark on uid and write to dws_user_info_all_di_spark.
-- DataWorks scheduling parameter ${bizdate} is replaced at runtime with the business date (T-1).
CREATE TABLE IF NOT EXISTS dws_user_info_all_di_spark (
uid STRING COMMENT 'The user ID',
gender STRING COMMENT 'The gender',
age_range STRING COMMENT 'The age range',
zodiac STRING COMMENT 'The zodiac sign',
device STRING COMMENT 'Terminal type',
method STRING COMMENT 'HTTP request type',
url STRING COMMENT 'url',
`time` STRING COMMENT 'The time in the yyyymmddhh:mi:ss format'
)
PARTITIONED BY (dt STRING);
-- Add a partition.
ALTER TABLE dws_user_info_all_di_spark ADD IF NOT EXISTS PARTITION (dt = '${bizdate}');
-- Join user information and log data and insert into the partition.
INSERT OVERWRITE TABLE dws_user_info_all_di_spark PARTITION (dt = '${bizdate}')
SELECT
COALESCE(a.uid, b.uid) AS uid,
b.gender AS gender,
b.age_range AS age_range,
b.zodiac AS zodiac,
a.device AS device,
a.method AS method,
a.url AS url,
a.tm
FROM dwd_log_info_di_spark AS a
LEFT OUTER JOIN ods_user_info_d_spark AS b
ON a.uid = b.uid;
Step 2: Configure scheduling properties
Follow the same steps as for the dwd_log_info_di_spark node: set bizdate to $[yyyymmdd-1], confirm the output name in Dependencies, and set Scheduling Cycle to Day. This node also runs after 00:30 every day, inheriting its scheduled time from the workshop_start_spark zero load node.
Step 3: (Optional) Configure advanced Spark parameters
Follow the same steps as for the dwd_log_info_di_spark node. The available parameters (SERVERLESS_RELEASE_VERSION, SERVERLESS_QUEUE_NAME, SERVERLESS_SQL_COMPUTE, FLOW_SKIP_SQL_ANALYZE) are the same across all three nodes.
Step 4: Save
Click the save icon in the top toolbar to save the node configuration.
Step 5: Verify the output
After the ancestor nodes and this node run successfully, verify that the joined table was created correctly.
In the left-side navigation pane of DataStudio, click Ad Hoc Query. Create an ad hoc query task of the EMR Spark SQL type and run the following query, replacing <data_timestamp> with the actual data timestamp (T-1):
SELECT * FROM dws_user_info_all_di_spark WHERE dt = '<data_timestamp>';
Configure the ads_user_info_1d_spark node
This node aggregates the joined table dws_user_info_all_di_spark using MAX and COUNT operations, producing ads_user_info_1d_spark as the final user profile table.
Step 1: Configure the node code
Double-click the ads_user_info_1d_spark node to open its configuration tab. Enter the following SQL:
-- Aggregate dws_user_info_all_di_spark to generate the user profile table ads_user_info_1d_spark.
-- DataWorks scheduling parameter ${bizdate} is replaced at runtime with the business date (T-1).
CREATE TABLE IF NOT EXISTS ads_user_info_1d_spark (
uid STRING COMMENT 'The user ID',
device STRING COMMENT 'The terminal type',
pv BIGINT COMMENT 'Page views (PV)',
gender STRING COMMENT 'The gender',
age_range STRING COMMENT 'The age range',
zodiac STRING COMMENT 'The zodiac sign'
)
PARTITIONED BY (dt STRING);
ALTER TABLE ads_user_info_1d_spark ADD IF NOT EXISTS PARTITION (dt = '${bizdate}');
INSERT OVERWRITE TABLE ads_user_info_1d_spark PARTITION (dt = '${bizdate}')
SELECT uid,
MAX(device),
COUNT(0) AS pv,
MAX(gender),
MAX(age_range),
MAX(zodiac)
FROM dws_user_info_all_di_spark
WHERE dt = '${bizdate}'
GROUP BY uid;
Step 2: Configure scheduling properties
Follow the same steps as for the previous nodes: set bizdate to $[yyyymmdd-1], confirm the output name in Dependencies, and set Scheduling Cycle to Day.
Step 3: (Optional) Configure advanced Spark parameters
Follow the same steps as for the dwd_log_info_di_spark node.
Step 4: Save
Click the save icon in the top toolbar to save the node configuration.
Step 5: Verify the output
After the ancestor nodes and this node run successfully, verify that the user profile table was created correctly.
In the left-side navigation pane of DataStudio, click Ad Hoc Query. Create an ad hoc query task of the EMR Spark SQL type and run the following query, replacing <data_timestamp> with the actual data timestamp (T-1):
SELECT * FROM ads_user_info_1d_spark WHERE dt = '<data_timestamp>';
Step 3: Commit the workflow
After configuring all three nodes, run the workflow in the development environment to verify it works end-to-end, then commit and deploy it.
-
On the workflow configuration tab, click the run icon to run the workflow.
-
Wait until all nodes show the success icon. Then click the commit icon.
-
In the Commit dialog box, select the nodes to commit, enter a description, select Ignore I/O Inconsistency Alerts, and click Confirm.
-
After committing, deploy the nodes:
-
In the upper-right corner of the workflow configuration tab, click Deploy. The Create Deploy Task page appears.
-
Select the nodes to deploy and click Deploy. In the Create Deploy Task dialog box, click Deploy.
-
Step 4: Run the nodes in the production environment
Deployed nodes generate instances starting the next day. Use the data backfill feature to run the nodes immediately and verify they work in the production environment. For details, see Backfill data and view data backfill instances (new version).
-
After deploying the nodes, click Operation Center in the upper-right corner of the workflow configuration tab.
-
In the left-side navigation pane, choose Auto Triggered Node O&M > Auto Triggered Nodes. Find and click the
workshop_start_sparkzero load node to open its DAG. -
Right-click the
workshop_start_sparknode and choose Run > Current and Descendent Nodes Retroactively. -
In the Backfill Data panel, select the nodes for which to backfill data, configure the Data Timestamp parameter, and click Submit and Redirect. The page listing the data backfill instances appears.
-
Click Refresh until all SQL nodes show a successful status.
What's next
After the workflow runs successfully in the production environment, you can:
-
Monitor scheduled runs in Operation Center under Auto Triggered Node O&M > Auto Triggered Nodes.
-
Query the
ads_user_info_1d_sparktable to consume the user profile data for downstream applications. -
Adjust Spark advanced parameters (such as
SERVERLESS_RELEASE_VERSIONandSERVERLESS_QUEUE_NAME) to optimize performance for your workload.