All Products
Search
Document Center

E-MapReduce:Process data

Last Updated:Mar 26, 2026

This tutorial walks you through building a three-stage Spark SQL pipeline in DataWorks that transforms raw user and log data from OSS into a user profile table. By the end, you will know how to:

  1. Set up a pipeline DAG with three dependent EMR Spark SQL nodes

  2. Write and run Spark SQL to parse, join, and aggregate data across ODS, DWD, DWS, and ADS layers

  3. Query results and verify the pipeline output

  4. Run the workflow in production using backfill

About the dataset

The pipeline processes two source tables loaded in the previous Synchronize data step:

  • ods_user_info_d_spark: User attribute data (gender, age range, zodiac sign), keyed by uid

  • ods_raw_log_d_spark: Raw web server logs, with all fields concatenated into a single column using ##@@ as a delimiter

The goal is to combine these two sources into a single user profile table that shows each user's page views and device usage.

Prerequisites

Before you begin, ensure that you have:

  • Completed Synchronize data, which loads data into the ods_user_info_d_spark and ods_raw_log_d_spark tables

  • Access to the User_profile_analysis_Spark workflow in DataWorks Data Studio

How the pipeline works

The pipeline processes data through three warehouse layers, each handled by an EMR Spark SQL node:

Node Source table(s) Output table What it does
dwd_log_info_di_spark ods_raw_log_d_spark dwd_log_info_di_spark Splits raw log fields using ##@@ as a delimiter, then derives method, url, protocol, device, and identity
dws_user_info_all_di_spark dwd_log_info_di_spark + ods_user_info_d_spark dws_user_info_all_di_spark Joins the fact log table with the user info table on uid to create an aggregate user log table
ads_user_info_1d_spark dws_user_info_all_di_spark ads_user_info_1d_spark Groups by uid and aggregates to produce the final user profile

All three nodes write to OSS-backed external tables partitioned by dt, using the ${bizdate} scheduling parameter to target the correct daily partition.

Step 1: Set up the data processing pipeline

Add three EMR Spark SQL nodes to the User_profile_analysis_Spark workflow and configure their scheduling dependencies.

  1. Log on to the DataWorks console and go to Data Studio. In the Workspace Directories section of the DATA STUDIO pane, find and click User_profile_analysis_Spark to open the workflow.

  2. Drag EMR SPARK SQL from the EMR section of the configuration tab onto the canvas. In the Create Node dialog box, set the Name to dwd_log_info_di_spark.

  3. Repeat step 2 to create two more nodes: dws_user_info_all_di_spark and ads_user_info_1d_spark.

  4. Draw dependency lines on the canvas to connect the nodes in this order: When the lines are drawn correctly, the DAG shows: dwd_log_info_di_sparkdws_user_info_all_di_sparkads_user_info_1d_spark.

    This tutorial configures scheduling dependencies by drawing lines. To let the system detect dependencies automatically, see Use the automatic parsing feature.

    image

Step 2: Configure the EMR Spark SQL nodes

Configure each node with its SQL code and debugging settings. The debugging settings are used when you test the workflow in Step 3. For a full walkthrough of the debugging workflow, see Debug a workflow.

Parse raw logs: dwd_log_info_di_spark

This node reads each row from ods_raw_log_d_spark, where all log fields are stored as a single string delimited by ##@@. The SQL uses SPLIT() to extract individual fields (ip, uid, tm, request, status, bytes, referer, agent), then applies regexp_extract and CASE/RLIKE expressions to derive method, url, protocol, device, and identity. The result is written to the DWD-layer table dwd_log_info_di_spark.

  1. On the canvas, hover over dwd_log_info_di_spark and click Open Node.

  2. Copy the following SQL and paste it into the code editor:

    Replace dw-spark-demo in the LOCATION clause with the domain name of the OSS bucket you created when preparing environments.
    -- Parses raw log rows from ods_raw_log_d_spark (fields separated by ##@@)
    -- and writes structured records to dwd_log_info_di_spark.
    -- ${bizdate} is a scheduling parameter in yyyymmdd format supplied at runtime.
    CREATE TABLE IF NOT EXISTS dwd_log_info_di_spark (
      ip STRING COMMENT 'The IP address',
      uid STRING COMMENT 'The user ID',
      tm STRING COMMENT 'The time in the yyyymmddhh:mi:ss format',
      status STRING COMMENT 'The status code that is returned by the server',
      bytes STRING COMMENT 'The number of bytes that are returned to the client',
      method STRING COMMENT'The request method',
      url STRING COMMENT 'url',
      protocol STRING COMMENT 'The protocol',
      referer STRING,
      device STRING,
      identity STRING
    )
    PARTITIONED BY (dt STRING)
    LOCATION 'oss://dw-spark-demo.oss-cn-shanghai-internal.aliyuncs.com/dwd_log_info_di_spark/log_${bizdate}/';
    
    ALTER TABLE dwd_log_info_di_spark ADD IF NOT EXISTS PARTITION (dt = '${bizdate}');
    
    INSERT  OVERWRITE TABLE dwd_log_info_di_spark PARTITION (dt = '${bizdate}')
    SELECT ip,
           uid,
           tm,
           status,
           bytes,
           regexp_extract(request, '(^[^ ]+) .*', 1) AS method,
           regexp_extract(request, '^[^ ]+ (.*) [^ ]+$', 1) AS url,
           regexp_extract(request, '.* ([^ ]+$)', 1) AS protocol,
           regexp_extract(referer, '^[^/]+://([^/]+){1}', 1) AS referer,
           CASE
               WHEN lower(agent) RLIKE 'android' THEN 'android'
               WHEN lower(agent) RLIKE 'iphone' THEN 'iphone'
               WHEN lower(agent) RLIKE 'ipad' THEN 'ipad'
               WHEN lower(agent) RLIKE 'macintosh' THEN 'macintosh'
               WHEN lower(agent) RLIKE 'windows phone' THEN 'windows_phone'
               WHEN lower(agent) RLIKE 'windows' THEN 'windows_pc'
               ELSE 'unknown'
           END AS device,
           CASE
               WHEN lower(agent) RLIKE '(bot|spider|crawler|slurp)' THEN 'crawler'
               WHEN lower(agent) RLIKE 'feed' OR regexp_extract(request, '^[^ ]+ (.*) [^ ]+$', 1) RLIKE 'feed' THEN 'feed'
               WHEN lower(agent) NOT RLIKE '(bot|spider|crawler|feed|slurp)' AND agent RLIKE '^(Mozilla|Opera)' AND regexp_extract(request, '^[^ ]+ (.*) [^ ]+$', 1) NOT RLIKE 'feed' THEN 'user'
               ELSE 'unknown'
           END AS identity
    FROM (
        SELECT
            SPLIT(col, '##@@')[0] AS ip,
            SPLIT(col, '##@@')[1] AS uid,
            SPLIT(col, '##@@')[2] AS tm,
            SPLIT(col, '##@@')[3] AS request,
            SPLIT(col, '##@@')[4] AS status,
            SPLIT(col, '##@@')[5] AS bytes,
            SPLIT(col, '##@@')[6] AS referer,
            SPLIT(col, '##@@')[7] AS agent
        FROM ods_raw_log_d_spark
        WHERE dt = '${bizdate}'
    ) a;
  3. In the right-side navigation pane, click Debugging Configurations and set the following parameters:

    Parameter Description
    Computing Resource Select the Spark computing resource associated with your workspace when you prepared environments
    Resource Group Select the serverless resource group associated with your workspace when you prepared environments
    Script Parameters In the Parameter Value column for bizdate, enter a date in yyyymmdd format. Example: bizdate=20250223
  4. (Optional) Configure scheduling properties. Default scheduling property values are sufficient for this tutorial. To adjust them, click Properties in the right-side navigation pane. For details on all Properties tab parameters, see Node scheduling.

    • Scheduling Parameters: Scheduling parameters are already configured at the workflow level. Inner nodes inherit these values automatically — no separate configuration is needed.

    • Scheduling Policies: Skip the Time for Delayed Execution parameter for this tutorial.

  5. Click Save.

Join user attributes with log facts: dws_user_info_all_di_spark

This node joins the parsed log table dwd_log_info_di_spark with the user attribute table ods_user_info_d_spark on the uid field using a LEFT OUTER JOIN. The result — which combines device and request data with user demographics — is written to the DWS-layer table dws_user_info_all_di_spark.

  1. On the canvas, hover over dws_user_info_all_di_spark and click Open Node.

  2. Copy the following SQL and paste it into the code editor:

    Replace dw-spark-demo in the LOCATION clause with the domain name of the OSS bucket you created when preparing environments.
    -- Joins dwd_log_info_di_spark (log facts) with ods_user_info_d_spark (user attributes)
    -- on uid and writes the result to dws_user_info_all_di_spark.
    CREATE TABLE IF NOT EXISTS dws_user_info_all_di_spark (
        uid        STRING COMMENT 'The user ID',
        gender     STRING COMMENT 'The gender',
        age_range  STRING COMMENT 'The age range',
        zodiac     STRING COMMENT 'The zodiac sign',
        device     STRING COMMENT 'The terminal type',
        method     STRING COMMENT 'The HTTP request type',
        url        STRING COMMENT 'url',
        `time`     STRING COMMENT 'The time in the yyyymmddhh:mi:ss format'
    )
    PARTITIONED BY (dt STRING)
    LOCATION 'oss://dw-spark-demo.oss-cn-shanghai-internal.aliyuncs.com/dws_user_info_all_di_spark/log_${bizdate}/';
    
    -- Add a partition.
    ALTER TABLE dws_user_info_all_di_spark ADD IF NOT EXISTS PARTITION (dt = '${bizdate}');
    
    -- Insert data from the basic user information table and the fact log table.
    INSERT OVERWRITE TABLE dws_user_info_all_di_spark PARTITION (dt = '${bizdate}')
    SELECT
        COALESCE(a.uid, b.uid) AS uid,
        b.gender AS gender,
        b.age_range AS age_range,
        b.zodiac AS zodiac,
        a.device AS device,
        a.method AS method,
        a.url AS url,
        a.tm
    FROM (
      SELECT *
      FROM dwd_log_info_di_spark
      WHERE dt='${bizdate}'
    ) a
    LEFT OUTER JOIN (
      SELECT *
      FROM ods_user_info_d_spark
      WHERE dt='${bizdate}'
    ) b
    ON
        a.uid = b.uid;
  3. In the right-side navigation pane, click Debugging Configurations and set the same parameters as in the previous node:

    Parameter Description
    Computing Resource Select the Spark computing resource associated with your workspace
    Resource Group Select the serverless resource group associated with your workspace
    Script Parameters bizdate=20250223 (or your target date in yyyymmdd format)
  4. (Optional) Configure scheduling properties as described for the previous node.

  5. Click Save.

Aggregate into a user profile: ads_user_info_1d_spark

This node reads dws_user_info_all_di_spark, groups by uid, and collapses each user's log records into a single profile row. The aggregation computes page views (COUNT) and picks representative values for device type, gender, age range, and zodiac sign (MAX). The result is the ADS-layer table ads_user_info_1d_spark.

  1. On the canvas, hover over ads_user_info_1d_spark and click Open Node.

  2. Copy the following SQL and paste it into the code editor:

    Replace dw-spark-demo in the LOCATION clause with the domain name of the OSS bucket you created when preparing environments.
    -- Aggregates dws_user_info_all_di_spark by uid to produce one user profile row per user.
    -- pv = page views (COUNT of log rows per uid).
    CREATE TABLE IF NOT EXISTS ads_user_info_1d_spark (
      uid STRING COMMENT 'The user ID',
      device STRING COMMENT 'The terminal type',
      pv BIGINT COMMENT 'pv',
      gender STRING COMMENT 'The gender',
      age_range STRING COMMENT 'The age range',
      zodiac STRING COMMENT 'The zodiac sign'
    )
    PARTITIONED BY (
      dt STRING
    )
    LOCATION 'oss://dw-spark-demo.oss-cn-shanghai-internal.aliyuncs.com/ads_user_info_1d_spark/log_${bizdate}/';
    
    ALTER TABLE ads_user_info_1d_spark ADD IF NOT EXISTS PARTITION (dt='${bizdate}');
    
    INSERT OVERWRITE TABLE ads_user_info_1d_spark PARTITION (dt='${bizdate}')
    SELECT uid
      , MAX(device)
      , COUNT(0) AS pv
      , MAX(gender)
      , MAX(age_range)
      , MAX(zodiac)
    FROM dws_user_info_all_di_spark
    WHERE dt = '${bizdate}'
    GROUP BY uid;
  3. In the right-side navigation pane, click Debugging Configurations and set the same parameters as in the previous nodes.

  4. (Optional) Configure scheduling properties as described for the previous node.

  5. Click Save.

Step 3: Run and verify the pipeline

Run the workflow in debug mode, then query the output to confirm the data was processed correctly.

Run the workflow

  1. In the top toolbar of the workflow configuration tab, click Run.

  2. In the Enter runtime parameters dialog box, enter 20250223 as the bizdate value (or your target date), then click OK. Data Studio replaces all ${bizdate} variables in the node code with this value before executing. All three nodes run in the dependency order shown in the DAG.

Query the output

After all nodes complete, run a spot-check query to verify the DWD table was populated correctly.

  1. In the DataWorks console, choose Data Analysis and Service > DataAnalysis in the left-side navigation pane, then click Go to DataAnalysis.

  2. In the left-side navigation pane, click SQL Query.

  3. Click the image icon next to My Files and select Create File. Enter a file name and click OK.

  4. In the left-side navigation tree, click the file you just created to open it.

  5. In the upper-right corner, click the image icon and configure the data source:

    Parameter Value
    Workspace Select the workspace that contains User_profile_analysis_Spark
    Data Source Type EMR Spark SQL
    Data Source Name Select the EMR Serverless Spark computing resource associated with your workspace
  6. Click OK.

  7. Run the following query to check whether the DWD table was populated:

    -- Replace 'The data timestamp' with the actual bizdate value you used (for example, 20250222).
    -- The data timestamp is one day earlier than the scheduling date:
    -- if the node runs on February 23, 2025, the data timestamp is 20250222.
    SELECT * FROM dwd_log_info_di_spark WHERE dt ='The data timestamp';

    If the query returns rows with parsed fields (ip, uid, method, device, identity, and so on), the pipeline ran successfully.

Step 5: Run the workflow in production

After deploying the nodes, run a backfill to verify the workflow in the production environment.

After you deploy the nodes, the first scheduled instance runs the following day. Use backfill to trigger a run immediately and confirm the workflow behaves as expected. For details, see Backfill data and view data backfill instances (new version).
  1. Click Operation Center in the top navigation bar of the Data Studio page. Alternatively, click the 图标 icon in the upper-left corner of the DataWorks console and choose All Products > Data Development And Task Operation > Operation Center.

  2. In the left-side navigation pane, choose Auto Triggered Node O&M > Auto Triggered Nodes.

  3. Find the zero load node workshop_start_spark and click its name.

  4. In the directed acyclic graph (DAG) view, right-click workshop_start_spark and choose Run > Current and Descendant Nodes Retroactively.

  5. In the Backfill Data panel, select the nodes to backfill, set the Data Timestamp, and click Submit and Redirect.

  6. On the Data Backfill page, click Refresh to check the run status. When workshop_start_spark and all descendant nodes show a Success status, the workflow is running correctly in production.

Clean up

To avoid ongoing charges after completing this tutorial, do one of the following:

  • Configure the Effective Period for all nodes in the workflow to limit the time window during which scheduled instances run.

  • Freeze the zero load node workshop_start_spark to stop all downstream nodes from running on schedule.

What's next