All Products
Search
Document Center

E-MapReduce:Process data

Last Updated:Mar 26, 2026

Use Spark SQL to clean, join, and aggregate raw data synchronized from OSS, and produce a user profile table you can query downstream. By the end of this tutorial, you will know how to:

  1. Design a three-node workflow that implements ODS (Operational Data Store) → DWD (Data Warehouse Detail) → DWS (Data Warehouse Summary) → ADS (Application Data Store) data warehouse layers.

  2. Configure each EMR Spark SQL node with the correct SQL logic and scheduling parameters.

  3. Verify each intermediate table after it is generated.

  4. Commit and deploy the workflow, then trigger a backfill run to validate the full pipeline in the production environment.

Prerequisites

Before you begin, ensure that you have:

  • Synchronized user information and website access logs to a private OSS bucket. For details, see Synchronize data.

  • Two external tables already created in DataWorks:

    • ods_user_info_d_spark — reads basic user information from OSS.

    • ods_raw_log_d_spark — reads website access logs from OSS.

Usage notes

EMR Serverless Spark workspaces do not support function registration. As a result, you cannot register custom functions to parse log fields or convert IP addresses to regions. This tutorial uses built-in Spark SQL functions instead — specifically, SPLIT and regexp_extract — to parse the ods_raw_log_d_spark table.

Background

The tables in this tutorial follow a standard data warehouse layering pattern:

LayerTablePurpose
ODS (raw)ods_raw_log_d_spark, ods_user_info_d_sparkExternal tables pointing to raw data in OSS
DWD (cleaned)dwd_log_info_di_sparkSplit raw log fields into structured columns
DWS (aggregated)dws_user_info_all_di_sparkJoin log and user info on uid
ADS (serving)ads_user_info_1d_sparkAggregate per user to produce the final user profile

Step 1: Design the workflow

In DataWorks, create a workflow with three EMR Spark SQL nodes connected in sequence. Each node corresponds to one data warehouse layer.

  1. Go to the DataStudio page. Log on to the DataWorks console. In the top navigation bar, select the desired region. In the left-side navigation pane, choose Data Development and O&M > Data Development. On the page that appears, select the desired workspace from the drop-down list and click Go to Data Development.

  2. In the upper part of the workflow canvas, click Create Node to create the following three nodes.

    Node categoryNode typeNode nameCode logic
    EMREMR Spark SQLdwd_log_info_di_sparkSplits the ods_raw_log_d_spark table into structured fields to produce a clean log table.
    EMREMR Spark SQLdws_user_infor_all_di_sparkJoins the clean log table and the user information table on uid to produce an aggregate table.
    EMREMR Spark SQLads_user_info_1d_sparkAggregates the dws_user_info_all_di_spark table by user to produce the final user profile table.
  3. Drag the three nodes onto the workflow canvas and draw dependency lines between them to set the execution order: dwd_log_info_di_sparkdws_user_infor_all_di_sparkads_user_info_1d_spark.

    image

Step 2: Configure EMR Spark SQL nodes

Configure the SQL code, scheduling parameters, and optional Spark system parameters for each node.

Configure the dwd_log_info_di_spark node

This node reads raw log records from ods_raw_log_d_spark, uses SPLIT to parse each row by the ##@@ delimiter, and applies regexp_extract to break the request field into method, url, and protocol columns. The result is written to a new partitioned table, dwd_log_info_di_spark.

1. Configure the node code

Double-click the dwd_log_info_di_spark node to open its configuration tab, and enter the following SQL:

-- Splits ods_raw_log_d_spark using ##@@ as a delimiter
-- and writes the structured fields to dwd_log_info_di_spark.
-- ${bizdate} is a scheduling parameter replaced at runtime with the data timestamp (T-1).

CREATE TABLE IF NOT EXISTS dwd_log_info_di_spark (
  ip STRING COMMENT 'The IP address',
  uid STRING COMMENT 'The user ID',
  tm STRING COMMENT 'The time in the yyyymmddhh:mi:ss format',
  status STRING COMMENT 'The status code returned by the server',
  bytes STRING COMMENT 'The number of bytes returned to the client',
  method STRING COMMENT 'The request method',
  url STRING COMMENT 'url',
  protocol STRING COMMENT 'The protocol',
  referer STRING,
  device STRING,
  identity STRING
)
PARTITIONED BY (
  dt STRING
);

ALTER TABLE dwd_log_info_di_spark ADD IF NOT EXISTS PARTITION (dt = '${bizdate}');

INSERT OVERWRITE TABLE dwd_log_info_di_spark PARTITION (dt='${bizdate}')
SELECT ip,
       uid,
       tm,
       status,
       bytes,
       regexp_extract(request, '(^[^ ]+) .*', 1) AS method,
       regexp_extract(request, '^[^ ]+ (.*) [^ ]+$', 1) AS url,
       regexp_extract(request, '.* ([^ ]+$)', 1) AS protocol,
       regexp_extract(referer, '^[^/]+://([^/]+){1}', 1) AS referer,
       CASE
           WHEN lower(agent) RLIKE 'android' THEN 'android'
           WHEN lower(agent) RLIKE 'iphone' THEN 'iphone'
           WHEN lower(agent) RLIKE 'ipad' THEN 'ipad'
           WHEN lower(agent) RLIKE 'macintosh' THEN 'macintosh'
           WHEN lower(agent) RLIKE 'windows phone' THEN 'windows_phone'
           WHEN lower(agent) RLIKE 'windows' THEN 'windows_pc'
           ELSE 'unknown'
       END AS device,
       CASE
           WHEN lower(agent) RLIKE '(bot|spider|crawler|slurp)' THEN 'crawler'
           WHEN lower(agent) RLIKE 'feed' OR regexp_extract(request, '^[^ ]+ (.*) [^ ]+$', 1) RLIKE 'feed' THEN 'feed'
           WHEN lower(agent) NOT RLIKE '(bot|spider|crawler|feed|slurp)' AND agent RLIKE '^(Mozilla|Opera)' AND regexp_extract(request, '^[^ ]+ (.*) [^ ]+$', 1) NOT RLIKE 'feed' THEN 'user'
           ELSE 'unknown'
       END AS identity
FROM (
    SELECT
        SPLIT(col, '##@@')[0] AS ip,
        SPLIT(col, '##@@')[1] AS uid,
        SPLIT(col, '##@@')[2] AS tm,
        SPLIT(col, '##@@')[3] AS request,
        SPLIT(col, '##@@')[4] AS status,
        SPLIT(col, '##@@')[5] AS bytes,
        SPLIT(col, '##@@')[6] AS referer,
        SPLIT(col, '##@@')[7] AS agent
    FROM ods_raw_log_d_spark
    WHERE dt = '${bizdate}'
) a;

2. Configure scheduling properties

SectionSetting
Scheduling parameterClick Add Parameter. Set Parameter Name to bizdate and Parameter Value to $[yyyymmdd-1]. For details, see Configure scheduling parameters.
DependenciesConfirm that the output name follows the workspacename.nodename format. For details, see Configure scheduling dependencies.
Set Scheduling Cycle to Day. The scheduled run time is determined by the workshop_start_spark zero load node — all nodes in this workflow run after 00:30 daily.

3. (Optional) Configure Spark system parameters

On the Advanced Settings tab in the right-side pane, you can configure the following system parameters:

ParameterDescription
SERVERLESS_RELEASE_VERSIONChanges the Serverless Spark engine version. Example: "SERVERLESS_RELEASE_VERSION": "esr-2.1 (Spark 3.3.1, Scala 2.12, Java Runtime)"
SERVERLESS_QUEUE_NAMEChanges the resource queue. Example: "SERVERLESS_QUEUE_NAME": "dev_queue"
SERVERLESS_SQL_COMPUTEChanges the SQL compute resource. Example: "SERVERLESS_SQL_COMPUTE": "sc-b4356b0af6039727"
FLOW_SKIP_SQL_ANALYZEControls how SQL statements are executed. true: run multiple statements at once. false: run one statement at a time. Available in the development environment only.
Custom Spark parametersAdd any custom Spark parameter on this tab. DataWorks converts it to --conf key=value format for the EMR Serverless Spark workspace. You can also configure global Spark parameters.

For the full list of Spark configuration options, see Spark Configuration.

4. Save the configuration

Click the image.png icon in the top toolbar to save.

5. Verify the split result

After the ancestor nodes and this node run successfully, click Ad Hoc Query in the left-side navigation pane of the DataStudio page. Create an ad hoc query task of the EMR Spark SQL type and run the following query to confirm that the log fields are parsed correctly. Check that method, url, protocol, device, and identity contain expected values:

-- Replace '20240807' with the actual bizdate value for your run.
-- The data timestamp is one day earlier than the scheduling date.
SELECT ip, uid, method, url, device, identity
FROM dwd_log_info_di_spark
WHERE dt = '20240807'
LIMIT 20;
In batch computing scenarios, bizdate represents the business date — the day on which the data was generated, not the day the node runs. For example, if the node runs on August 8, 2024, the data timestamp is 20240807.

Configure the dws_user_info_all_di_spark node

This node reads from dwd_log_info_di_spark and ods_user_info_d_spark, performs a left join on uid to attach user attributes (gender, age range, zodiac sign) to each log record, and writes the result to dws_user_info_all_di_spark.

1. Configure the node code

Double-click the dws_user_info_all_di_spark node to open its configuration tab, and enter the following SQL:

-- Joins the clean log table and the user information table on uid,
-- then writes the merged data to dws_user_info_all_di_spark.
-- ${bizdate} is replaced at runtime with the data timestamp (T-1).

CREATE TABLE IF NOT EXISTS dws_user_info_all_di_spark (
    uid        STRING COMMENT 'The user ID',
    gender     STRING COMMENT 'The gender',
    age_range  STRING COMMENT 'The age range',
    zodiac     STRING COMMENT 'The zodiac sign',
    device     STRING COMMENT 'Terminal type',
    method     STRING COMMENT 'HTTP request type',
    url        STRING COMMENT 'url',
    `time`     STRING COMMENT 'The time in the yyyymmddhh:mi:ss format'
)
PARTITIONED BY (dt STRING);

-- Add a partition.
ALTER TABLE dws_user_info_all_di_spark ADD IF NOT EXISTS PARTITION (dt = '${bizdate}');

-- Insert data from the basic user information table and the new log table.
INSERT OVERWRITE TABLE dws_user_info_all_di_spark PARTITION (dt = '${bizdate}')
SELECT
    COALESCE(a.uid, b.uid) AS uid,
    b.gender AS gender,
    b.age_range AS age_range,
    b.zodiac AS zodiac,
    a.device AS device,
    a.method AS method,
    a.url AS url,
    a.tm
FROM
   dwd_log_info_di_spark as a
LEFT OUTER JOIN
    ods_user_info_d_spark as b
ON
    a.uid = b.uid;

2. Configure scheduling properties

SectionSetting
Scheduling parameterClick Add Parameter. Set Parameter Name to bizdate and Parameter Value to $[yyyymmdd-1]. For details, see Configure scheduling parameters.
DependenciesConfirm that the output name follows the workspacename.nodename format. For details, see Configure scheduling dependencies.
Set Scheduling Cycle to Day. The scheduled run time is determined by the workshop_start_spark zero load node.

3. (Optional) Configure Spark system parameters

Configure the same Spark system parameters described in the dwd_log_info_di_spark node's step 3.

4. Save the configuration

Click the image.png icon in the top toolbar to save.

5. Verify the join result

After the ancestor nodes and this node run successfully, click Ad Hoc Query in the DataStudio page. Create an EMR Spark SQL ad hoc query task and run the following query. Confirm that gender, age_range, and zodiac are populated for rows where a matching uid exists in ods_user_info_d_spark, and that the total row count matches the row count in dwd_log_info_di_spark for the same partition:

-- Replace '20240807' with the actual bizdate value for your run.
SELECT uid, gender, age_range, zodiac, device, method
FROM dws_user_info_all_di_spark
WHERE dt = '20240807'
LIMIT 20;
Because this is a left join, rows in the log table without a matching uid in the user information table will have NULL values for gender, age_range, and zodiac.

Configure the ads_user_info_1d_spark node

This node reads from dws_user_info_all_di_spark and groups records by uid, computing the page view count (pv) and selecting representative values for device, gender, age range, and zodiac sign. The result is written to ads_user_info_1d_spark as the final user profile table.

1. Configure the node code

Double-click the ads_user_info_1d_spark node to open its configuration tab, and enter the following SQL:

-- Aggregates dws_user_info_all_di_spark by uid,
-- computing pv (page views) and representative user attributes.
-- ${bizdate} is replaced at runtime with the data timestamp (T-1).

CREATE TABLE IF NOT EXISTS ads_user_info_1d_spark (
  uid STRING COMMENT 'The user ID',
  device STRING COMMENT 'The terminal type',
  pv BIGINT COMMENT 'pv',
  gender STRING COMMENT 'The gender',
  age_range STRING COMMENT 'The age range',
  zodiac STRING COMMENT 'The zodiac sign'
)
PARTITIONED BY (
  dt STRING
);
ALTER TABLE ads_user_info_1d_spark ADD IF NOT EXISTS PARTITION (dt='${bizdate}');
INSERT OVERWRITE TABLE ads_user_info_1d_spark PARTITION (dt='${bizdate}')
SELECT uid
  , MAX(device)
  , COUNT(0) AS pv
  , MAX(gender)
  , MAX(age_range)
  , MAX(zodiac)
FROM dws_user_info_all_di_spark
WHERE dt = '${bizdate}'
GROUP BY uid;

2. Configure scheduling properties

SectionSetting
Scheduling parameterClick Add Parameter. Set Parameter Name to bizdate and Parameter Value to $[yyyymmdd-1]. For details, see Configure scheduling parameters.
DependenciesConfirm that the output name follows the workspacename.nodename format. For details, see Configure scheduling dependencies.
Set Scheduling Cycle to Day. The scheduled run time is determined by the workshop_start_spark zero load node.

3. (Optional) Configure Spark system parameters

Configure the same Spark system parameters described in the dwd_log_info_di_spark node's step 3.

4. Save the configuration

Click the image.png icon in the top toolbar to save.

5. Verify the user profile table

After the ancestor nodes and this node run successfully, click Ad Hoc Query in the DataStudio page. Create an EMR Spark SQL ad hoc query task and run the following query. Confirm that each user has exactly one row, pv is greater than zero, and profile attributes such as gender and device reflect the expected distribution:

-- Replace '20240807' with the actual bizdate value for your run.
SELECT uid, device, pv, gender, age_range, zodiac
FROM ads_user_info_1d_spark
WHERE dt = '20240807'
ORDER BY pv DESC
LIMIT 20;
pv equals COUNT(0) grouped by uid, representing the number of log records for that user on the data timestamp.

Step 3: Commit the workflow

After configuring all three nodes, run the workflow to confirm it works, then commit and deploy it.

  1. On the workflow configuration tab, click the 运行 icon to run the workflow.

  2. When the 成功 icon appears next to all nodes, click the 提交 icon to commit.

  3. In the Commit dialog box, select the nodes to commit, enter a description, select Ignore I/O Inconsistency Alerts, and click Confirm.

  4. Deploy the committed workflow.

    1. In the upper-right corner of the workflow configuration tab, click Deploy. The Create Deploy Task page appears.

    2. Select the nodes to deploy and click Deploy. In the Create Deploy Task dialog box, click Deploy.

Step 4: Run the nodes in the production environment

After deploying the nodes, use the data backfill feature to trigger a run and confirm that all nodes execute successfully in the production environment. For details, see Backfill data and view data backfill instances (new version).

  1. Click Operation Center in the upper-right corner of the workflow configuration tab.

  2. In the left-side navigation pane, choose Auto Triggered Node O&M > Auto Triggered Nodes. Find and click the workshop_start_spark zero load node to open its DAG.

  3. Right-click the workshop_start_spark node and choose Run > Current and Descendent Nodes Retroactively.

  4. In the Backfill Data panel, select the nodes for which you want to backfill data, configure the Data Timestamp parameter, and click Submit and Redirect. The data backfill instances page appears.

  5. Click Refresh until all SQL nodes show a successful status.

What's next

After the workflow runs successfully, the ads_user_info_1d_spark table contains one row per user with their page view count and profile attributes for the data timestamp. You can now connect a BI tool or downstream application to this table for reporting and analysis.