All Products
Search
Document Center

DataWorks:Process data

Last Updated:Mar 26, 2026

Use EMR Spark SQL nodes in DataWorks to transform raw log data and user records into a user profile table. This tutorial covers three transformation stages — log splitting, table joining, and aggregation — then shows you how to commit and run the workflow in production.

By the end of this tutorial, you will know how to:

  1. Design a workflow with EMR Spark SQL nodes

  2. Configure Spark SQL code and scheduling properties for each node

  3. Commit and deploy the workflow

  4. Run nodes in the production environment using data backfill

Prerequisites

Before you begin, make sure that you have:

  • Completed data synchronization. For details, see Synchronize data.

  • The ods_user_info_d_spark external table created based on an EMR Spark SQL node, which provides access to the basic user information synchronized to the private Object Storage Service (OSS) bucket.

  • The ods_raw_log_d_spark external table created based on an EMR Spark SQL node, which provides access to the website access logs synchronized to the private OSS bucket.

Usage notes

EMR Serverless Spark workspaces do not support function registration. You cannot register custom functions to split log data or convert IP addresses to regions. In this tutorial, the ods_raw_log_d_spark table is split using a built-in Spark SQL function to generate the dwd_log_info_di_spark table.

How it works

The processing pipeline produces a user profile table through three stages:

  1. Split ods_raw_log_d_spark using a built-in Spark SQL function → dwd_log_info_di_spark

  2. Join dwd_log_info_di_spark and ods_user_info_d_spark on the uid field → dws_user_info_all_di_spark

  3. Aggregate dws_user_info_all_di_spark using MAX and COUNT → ads_user_info_1d_spark

The dws_user_info_all_di_spark table contains many fields and a large volume of data. Consuming it directly without further processing may take a long time, which is why the aggregation stage is required.

Step 1: Design a workflow

Design the node structure and dependency graph before writing any SQL code. This gives you a clear picture of the data flow before you configure each node.

  1. Go to the DataStudio page. Log on to the DataWorks console. In the top navigation bar, select the desired region. In the left-side navigation pane, choose Data Development and O&M > Data Development. Select the desired workspace from the drop-down list and click Go to Data Development.

  2. Create the three processing nodes. In the upper part of the workflow canvas, click Create Node to create the nodes described in the following table.

    Node category Node type Node name Code logic
    EMR EMR Spark SQL dwd_log_info_di_spark Split ods_raw_log_d_spark to generate a new log table
    EMR EMR Spark SQL dws_user_info_all_di_spark Join the new log table and the basic user information table to generate an aggregate table
    EMR EMR Spark SQL ads_user_info_1d_spark Aggregate the joined table to generate a user profile table
  3. Configure the directed acyclic graph (DAG). Drag the nodes onto the workflow canvas and draw dependency lines between them to establish the execution order.

    image

Step 2: Configure EMR Spark SQL nodes

Each of the three nodes requires the same configuration sequence: write the SQL code, set scheduling properties, optionally configure advanced Spark parameters, save, and verify the output.

All three nodes use the bizdate scheduling parameter. Set Parameter Name to bizdate and Parameter Value to $[yyyymmdd-1]. This assigns the previous day's date (T-1) as the data timestamp. In batch processing, bizdate represents the business date, which is one day before the node's scheduling time. For example, if a node runs on August 8, 2024, the data timestamp is 20240807. For details, see Configure scheduling parameters.

Configure the dwd_log_info_di_spark node

This node splits the raw log table ods_raw_log_d_spark — where each row is a single delimited string — into structured fields, and writes the result to dwd_log_info_di_spark.

Step 1: Configure the node code

Double-click the dwd_log_info_di_spark node to open its configuration tab. Enter the following SQL:

-- Split the ods_raw_log_d_spark table into structured fields and write to dwd_log_info_di_spark.
-- The source table uses ##@@ as the field delimiter within a single column (col).
-- DataWorks scheduling parameter ${bizdate} is replaced at runtime with the business date (T-1).

CREATE TABLE IF NOT EXISTS dwd_log_info_di_spark (
  ip       STRING COMMENT 'The IP address',
  uid      STRING COMMENT 'The user ID',
  tm       STRING COMMENT 'The time in the yyyymmddhh:mi:ss format',
  status   STRING COMMENT 'The status code returned by the server',
  bytes    STRING COMMENT 'The number of bytes returned to the client',
  method   STRING COMMENT 'The request method',
  url      STRING COMMENT 'url',
  protocol STRING COMMENT 'The protocol',
  referer  STRING,
  device   STRING,
  identity STRING
)
PARTITIONED BY (dt STRING);

ALTER TABLE dwd_log_info_di_spark ADD IF NOT EXISTS PARTITION (dt = '${bizdate}');

INSERT OVERWRITE TABLE dwd_log_info_di_spark PARTITION (dt = '${bizdate}')
SELECT ip,
       uid,
       tm,
       status,
       bytes,
       regexp_extract(request, '(^[^ ]+) .*', 1)          AS method,
       regexp_extract(request, '^[^ ]+ (.*) [^ ]+$', 1)   AS url,
       regexp_extract(request, '.* ([^ ]+$)', 1)           AS protocol,
       regexp_extract(referer, '^[^/]+://([^/]+){1}', 1)   AS referer,
       CASE
           WHEN lower(agent) RLIKE 'android'       THEN 'android'
           WHEN lower(agent) RLIKE 'iphone'        THEN 'iphone'
           WHEN lower(agent) RLIKE 'ipad'          THEN 'ipad'
           WHEN lower(agent) RLIKE 'macintosh'     THEN 'macintosh'
           WHEN lower(agent) RLIKE 'windows phone' THEN 'windows_phone'
           WHEN lower(agent) RLIKE 'windows'       THEN 'windows_pc'
           ELSE 'unknown'
       END AS device,
       CASE
           WHEN lower(agent) RLIKE '(bot|spider|crawler|slurp)' THEN 'crawler'
           WHEN lower(agent) RLIKE 'feed'
             OR regexp_extract(request, '^[^ ]+ (.*) [^ ]+$', 1) RLIKE 'feed' THEN 'feed'
           WHEN lower(agent) NOT RLIKE '(bot|spider|crawler|feed|slurp)'
            AND agent RLIKE '^(Mozilla|Opera)'
            AND regexp_extract(request, '^[^ ]+ (.*) [^ ]+$', 1) NOT RLIKE 'feed' THEN 'user'
           ELSE 'unknown'
       END AS identity
FROM (
    SELECT
        SPLIT(col, '##@@')[0] AS ip,
        SPLIT(col, '##@@')[1] AS uid,
        SPLIT(col, '##@@')[2] AS tm,
        SPLIT(col, '##@@')[3] AS request,
        SPLIT(col, '##@@')[4] AS status,
        SPLIT(col, '##@@')[5] AS bytes,
        SPLIT(col, '##@@')[6] AS referer,
        SPLIT(col, '##@@')[7] AS agent
    FROM ods_raw_log_d_spark
    WHERE dt = '${bizdate}'
) a;

Step 2: Configure scheduling properties

On the Properties tab of the node configuration page:

  • In the Scheduling Parameter section, click Add Parameter, set Parameter Name to bizdate, and set Parameter Value to $[yyyymmdd-1].

  • In the Dependencies section, confirm that the output name follows the workspacename.nodename format. For details, see Configure scheduling dependencies.

In the Schedule section, set Scheduling Cycle to Day. The scheduled run time is inherited from the workshop_start_spark zero load node, which acts as the trigger for all downstream nodes. This node runs after 00:30 every day without additional time configuration.

Step 3: (Optional) Configure advanced Spark parameters

On the Advanced Settings tab in the right-side navigation pane, configure the following Spark system parameters as needed.

Parameter Description Example
SERVERLESS_RELEASE_VERSION Changes the Spark engine version "esr-2.1 (Spark 3.3.1, Scala 2.12, Java Runtime)"
SERVERLESS_QUEUE_NAME Changes the resource queue "dev_queue"
SERVERLESS_SQL_COMPUTE Modifies the SQL compute configuration "sc-b4356b0af6039727"
FLOW_SKIP_SQL_ANALYZE Controls SQL execution mode: true runs multiple statements at a time; false runs one statement at a time. Available only in the development environment. true

To add a custom Spark parameter, enter it on the Advanced Settings tab. For example, specifying spark.eventLog.enabled: false causes DataWorks to deliver it to the EMR Serverless Spark workspace as --conf spark.eventLog.enabled=false. To configure parameters globally, see Configure global Spark parameters. For all available parameters, see Spark Configuration.

Step 4: Save

Click the save icon in the top toolbar to save the node configuration.

Step 5: Verify the output

After the ancestor nodes and this node run successfully, verify that the split log table was created correctly.

In the left-side navigation pane of DataStudio, click Ad Hoc Query. Create an ad hoc query task of the EMR Spark SQL type and run the following query, replacing <data_timestamp> with the actual data timestamp (T-1):

SELECT * FROM dwd_log_info_di_spark WHERE dt = '<data_timestamp>';

Configure the dws_user_info_all_di_spark node

This node joins the structured log table dwd_log_info_di_spark with the user information table ods_user_info_d_spark on the uid field, producing the aggregate table dws_user_info_all_di_spark.

Step 1: Configure the node code

Double-click the dws_user_info_all_di_spark node to open its configuration tab. Enter the following SQL:

-- Join dwd_log_info_di_spark and ods_user_info_d_spark on uid and write to dws_user_info_all_di_spark.
-- DataWorks scheduling parameter ${bizdate} is replaced at runtime with the business date (T-1).

CREATE TABLE IF NOT EXISTS dws_user_info_all_di_spark (
    uid       STRING COMMENT 'The user ID',
    gender    STRING COMMENT 'The gender',
    age_range STRING COMMENT 'The age range',
    zodiac    STRING COMMENT 'The zodiac sign',
    device    STRING COMMENT 'Terminal type',
    method    STRING COMMENT 'HTTP request type',
    url       STRING COMMENT 'url',
    `time`    STRING COMMENT 'The time in the yyyymmddhh:mi:ss format'
)
PARTITIONED BY (dt STRING);

-- Add a partition.
ALTER TABLE dws_user_info_all_di_spark ADD IF NOT EXISTS PARTITION (dt = '${bizdate}');

-- Join user information and log data and insert into the partition.
INSERT OVERWRITE TABLE dws_user_info_all_di_spark PARTITION (dt = '${bizdate}')
SELECT
    COALESCE(a.uid, b.uid) AS uid,
    b.gender               AS gender,
    b.age_range            AS age_range,
    b.zodiac               AS zodiac,
    a.device               AS device,
    a.method               AS method,
    a.url                  AS url,
    a.tm
FROM dwd_log_info_di_spark AS a
LEFT OUTER JOIN ods_user_info_d_spark AS b
ON a.uid = b.uid;

Step 2: Configure scheduling properties

Follow the same steps as for the dwd_log_info_di_spark node: set bizdate to $[yyyymmdd-1], confirm the output name in Dependencies, and set Scheduling Cycle to Day. This node also runs after 00:30 every day, inheriting its scheduled time from the workshop_start_spark zero load node.

Step 3: (Optional) Configure advanced Spark parameters

Follow the same steps as for the dwd_log_info_di_spark node. The available parameters (SERVERLESS_RELEASE_VERSION, SERVERLESS_QUEUE_NAME, SERVERLESS_SQL_COMPUTE, FLOW_SKIP_SQL_ANALYZE) are the same across all three nodes.

Step 4: Save

Click the save icon in the top toolbar to save the node configuration.

Step 5: Verify the output

After the ancestor nodes and this node run successfully, verify that the joined table was created correctly.

In the left-side navigation pane of DataStudio, click Ad Hoc Query. Create an ad hoc query task of the EMR Spark SQL type and run the following query, replacing <data_timestamp> with the actual data timestamp (T-1):

SELECT * FROM dws_user_info_all_di_spark WHERE dt = '<data_timestamp>';

Configure the ads_user_info_1d_spark node

This node aggregates the joined table dws_user_info_all_di_spark using MAX and COUNT operations, producing ads_user_info_1d_spark as the final user profile table.

Step 1: Configure the node code

Double-click the ads_user_info_1d_spark node to open its configuration tab. Enter the following SQL:

-- Aggregate dws_user_info_all_di_spark to generate the user profile table ads_user_info_1d_spark.
-- DataWorks scheduling parameter ${bizdate} is replaced at runtime with the business date (T-1).

CREATE TABLE IF NOT EXISTS ads_user_info_1d_spark (
  uid       STRING COMMENT 'The user ID',
  device    STRING COMMENT 'The terminal type',
  pv        BIGINT COMMENT 'Page views (PV)',
  gender    STRING COMMENT 'The gender',
  age_range STRING COMMENT 'The age range',
  zodiac    STRING COMMENT 'The zodiac sign'
)
PARTITIONED BY (dt STRING);

ALTER TABLE ads_user_info_1d_spark ADD IF NOT EXISTS PARTITION (dt = '${bizdate}');

INSERT OVERWRITE TABLE ads_user_info_1d_spark PARTITION (dt = '${bizdate}')
SELECT uid,
       MAX(device),
       COUNT(0) AS pv,
       MAX(gender),
       MAX(age_range),
       MAX(zodiac)
FROM dws_user_info_all_di_spark
WHERE dt = '${bizdate}'
GROUP BY uid;

Step 2: Configure scheduling properties

Follow the same steps as for the previous nodes: set bizdate to $[yyyymmdd-1], confirm the output name in Dependencies, and set Scheduling Cycle to Day.

Step 3: (Optional) Configure advanced Spark parameters

Follow the same steps as for the dwd_log_info_di_spark node.

Step 4: Save

Click the save icon in the top toolbar to save the node configuration.

Step 5: Verify the output

After the ancestor nodes and this node run successfully, verify that the user profile table was created correctly.

In the left-side navigation pane of DataStudio, click Ad Hoc Query. Create an ad hoc query task of the EMR Spark SQL type and run the following query, replacing <data_timestamp> with the actual data timestamp (T-1):

SELECT * FROM ads_user_info_1d_spark WHERE dt = '<data_timestamp>';

Step 3: Commit the workflow

After configuring all three nodes, run the workflow in the development environment to verify it works end-to-end, then commit and deploy it.

  1. On the workflow configuration tab, click the run icon to run the workflow.

  2. Wait until all nodes show the success icon. Then click the commit icon.

  3. In the Commit dialog box, select the nodes to commit, enter a description, select Ignore I/O Inconsistency Alerts, and click Confirm.

  4. After committing, deploy the nodes:

    1. In the upper-right corner of the workflow configuration tab, click Deploy. The Create Deploy Task page appears.

    2. Select the nodes to deploy and click Deploy. In the Create Deploy Task dialog box, click Deploy.

Step 4: Run the nodes in the production environment

Deployed nodes generate instances starting the next day. Use the data backfill feature to run the nodes immediately and verify they work in the production environment. For details, see Backfill data and view data backfill instances (new version).

  1. After deploying the nodes, click Operation Center in the upper-right corner of the workflow configuration tab.

  2. In the left-side navigation pane, choose Auto Triggered Node O&M > Auto Triggered Nodes. Find and click the workshop_start_spark zero load node to open its DAG.

  3. Right-click the workshop_start_spark node and choose Run > Current and Descendent Nodes Retroactively.

  4. In the Backfill Data panel, select the nodes for which to backfill data, configure the Data Timestamp parameter, and click Submit and Redirect. The page listing the data backfill instances appears.

  5. Click Refresh until all SQL nodes show a successful status.

What's next

After the workflow runs successfully in the production environment, you can:

  • Monitor scheduled runs in Operation Center under Auto Triggered Node O&M > Auto Triggered Nodes.

  • Query the ads_user_info_1d_spark table to consume the user profile data for downstream applications.

  • Adjust Spark advanced parameters (such as SERVERLESS_RELEASE_VERSION and SERVERLESS_QUEUE_NAME) to optimize performance for your workload.