All Products
Search
Document Center

DataWorks:Process data

Last Updated:Mar 26, 2026

Use E-MapReduce (EMR) Spark SQL nodes to transform raw user data into a structured user profile table. This tutorial walks you through building a three-node pipeline — ODS to DWD to DWS to ADS — in a DataWorks workflow and deploying it to production.

By the end of this tutorial, you will know how to:

  • Add EMR Spark SQL nodes to a workflow and configure scheduling dependencies between them.

  • Write Spark SQL to split raw log data, join tables, and aggregate user profile data.

  • Configure debugging parameters and run the workflow to verify results.

  • Deploy the workflow to the production environment and trigger a backfill run.

Prerequisites

Before you begin, ensure that you have completed all steps in Synchronize data. That tutorial creates the User_profile_analysis_Spark workflow and provisions the following resources that this tutorial depends on:

  • The ods_user_info_d_spark and ods_raw_log_d_spark external tables in your workspace.

  • An OSS bucket (domain: dw-spark-demo.oss-cn-shanghai-internal.aliyuncs.com) to store intermediate and final output data.

  • A Spark computing resource and a serverless resource group associated with your workspace.

  • Workflow-level scheduling parameters, including bizdate (format: yyyymmdd).

Step 1: Add nodes and set up dependencies

The data flows through three EMR Spark SQL nodes in sequence:

Node name What it does
dwd_log_info_di_spark Splits raw log records from ods_raw_log_d_spark using the ##@@ delimiter and extracts structured fields (method, URL, protocol, device type, visitor identity) into the dwd_log_info_di_spark table
dws_user_info_all_di_spark Joins the cleaned log table with the user info table ods_user_info_d_spark on uid and writes the result to dws_user_info_all_di_spark
ads_user_info_1d_spark Aggregates the joined table by uid to produce a daily user profile summary in ads_user_info_1d_spark
  1. Log in to the DataWorks console and go to Data Studio. In Workspace Directories, click the User_profile_analysis_Spark workflow.

  2. From the EMR section, drag an EMR SPARK SQL node onto the canvas. In the Create Node dialog box, enter the node name. Repeat this step to create all three nodes listed above.

  3. Draw lines on the canvas to set the scheduling dependencies: dwd_log_info_di_sparkdws_user_info_all_di_sparkads_user_info_1d_spark

    image

Lines can be drawn manually (as in this tutorial) or detected automatically using the automatic parsing feature. For automatic parsing, see Use the automatic parsing feature.

Step 2: Configure the data processing nodes

Configure the dwd_log_info_di_spark node

This node splits raw log records from ods_raw_log_d_spark using the ##@@ delimiter and applies regexp_extract to derive structured fields. The output goes to the dwd_log_info_di_spark table, partitioned by dt.

  1. On the canvas, hover over dwd_log_info_di_spark and click Open Node.

  2. Paste the following SQL into the code editor:

    Replace dw-spark-demo in the LOCATION clause with the domain name of your OSS bucket, created when you prepare environments.

    Paimon table (DLF)

    -- Scenario: This Spark SQL code uses Spark functions to split the `col` column of the ods_raw_log_d_spark table by "##@@". It then generates multiple fields and writes the results to a new table named dwd_log_info_di_spark.
    
    CREATE TABLE IF NOT EXISTS dwd_log_info_di_spark (
      ip STRING COMMENT 'IP address',
      uid STRING COMMENT 'User ID',
      tm STRING COMMENT 'Time in yyyymmddhh:mi:ss format',
      status STRING COMMENT 'Status code returned by the server',
      bytes STRING COMMENT 'Number of bytes returned to the client',
      method STRING COMMENT'Request method',
      url STRING COMMENT 'URL',
      protocol STRING COMMENT 'Protocol',
      referer STRING,
      device STRING,
      identity STRING,
      dt STRING COMMENT 'Partition key' -- Best practice: Include the partition key as a column in the table.
    )
    PARTITIONED BY (dt)
    TBLPROPERTIES (
      'format' = 'paimon' -- Core: Declares the table as a Paimon table.
    );
    
    INSERT  OVERWRITE TABLE dwd_log_info_di_spark PARTITION (dt = '${bizdate}')
    SELECT ip, 
           uid, 
           tm, 
           status, 
           bytes, 
           regexp_extract(request, '(^[^ ]+) .*', 1) AS method,
           regexp_extract(request, '^[^ ]+ (.*) [^ ]+$', 1) AS url,
           regexp_extract(request, '.* ([^ ]+$)', 1) AS protocol,
           regexp_extract(referer, '^[^/]+://([^/]+){1}', 1) AS referer,
           CASE 
               WHEN lower(agent) RLIKE 'android' THEN 'android' 
               WHEN lower(agent) RLIKE 'iphone' THEN 'iphone' 
               WHEN lower(agent) RLIKE 'ipad' THEN 'ipad' 
               WHEN lower(agent) RLIKE 'macintosh' THEN 'macintosh' 
               WHEN lower(agent) RLIKE 'windows phone' THEN 'windows_phone' 
               WHEN lower(agent) RLIKE 'windows' THEN 'windows_pc' 
               ELSE 'unknown' 
           END AS device, 
           CASE 
               WHEN lower(agent) RLIKE '(bot|spider|crawler|slurp)' THEN 'crawler' 
               WHEN lower(agent) RLIKE 'feed' OR regexp_extract(request, '^[^ ]+ (.*) [^ ]+$', 1) RLIKE 'feed' THEN 'feed' 
               WHEN lower(agent) NOT RLIKE '(bot|spider|crawler|feed|slurp)' AND agent RLIKE '^(Mozilla|Opera)' AND regexp_extract(request, '^[^ ]+ (.*) [^ ]+$', 1) NOT RLIKE 'feed' THEN 'user' 
               ELSE 'unknown' 
           END AS identity
    FROM (
        SELECT 
            SPLIT(col, '##@@')[0] AS ip, 
            SPLIT(col, '##@@')[1] AS uid, 
            SPLIT(col, '##@@')[2] AS tm, 
            SPLIT(col, '##@@')[3] AS request, 
            SPLIT(col, '##@@')[4] AS status, 
            SPLIT(col, '##@@')[5] AS bytes, 
            SPLIT(col, '##@@')[6] AS referer, 
            SPLIT(col, '##@@')[7] AS agent
        FROM ods_raw_log_d_spark
        WHERE dt = '${bizdate}'
    ) a;

    Hive table (DLF-Legacy)

    -- Split raw log records from ods_raw_log_d_spark using ##@@ as the delimiter,
    -- then derive method, URL, protocol, device type, and visitor identity fields.
    -- ${bizdate} is a scheduling parameter injected at runtime.
    CREATE TABLE IF NOT EXISTS dwd_log_info_di_spark (
      ip STRING COMMENT 'The IP address',
      uid STRING COMMENT 'The user ID',
      tm STRING COMMENT 'The time in the yyyymmddhh:mi:ss format',
      status STRING COMMENT 'The status code returned by the server',
      bytes STRING COMMENT 'The number of bytes returned to the client',
      method STRING COMMENT 'The request method',
      url STRING COMMENT 'url',
      protocol STRING COMMENT 'The protocol',
      referer STRING,
      device STRING,
      identity STRING
    )
    PARTITIONED BY (dt STRING)
    LOCATION 'oss://dw-spark-demo.oss-cn-shanghai-internal.aliyuncs.com/dwd_log_info_di_spark/log_${bizdate}/';
    
    ALTER TABLE dwd_log_info_di_spark ADD IF NOT EXISTS PARTITION (dt = '${bizdate}');
    
    INSERT OVERWRITE TABLE dwd_log_info_di_spark PARTITION (dt = '${bizdate}')
    SELECT ip,
           uid,
           tm,
           status,
           bytes,
           regexp_extract(request, '(^[^ ]+) .*', 1) AS method,
           regexp_extract(request, '^[^ ]+ (.*) [^ ]+$', 1) AS url,
           regexp_extract(request, '.* ([^ ]+$)', 1) AS protocol,
           regexp_extract(referer, '^[^/]+://([^/]+){1}', 1) AS referer,
           CASE
               WHEN lower(agent) RLIKE 'android' THEN 'android'
               WHEN lower(agent) RLIKE 'iphone' THEN 'iphone'
               WHEN lower(agent) RLIKE 'ipad' THEN 'ipad'
               WHEN lower(agent) RLIKE 'macintosh' THEN 'macintosh'
               WHEN lower(agent) RLIKE 'windows phone' THEN 'windows_phone'
               WHEN lower(agent) RLIKE 'windows' THEN 'windows_pc'
               ELSE 'unknown'
           END AS device,
           CASE
               WHEN lower(agent) RLIKE '(bot|spider|crawler|slurp)' THEN 'crawler'
               WHEN lower(agent) RLIKE 'feed' OR regexp_extract(request, '^[^ ]+ (.*) [^ ]+$', 1) RLIKE 'feed' THEN 'feed'
               WHEN lower(agent) NOT RLIKE '(bot|spider|crawler|feed|slurp)' AND agent RLIKE '^(Mozilla|Opera)' AND regexp_extract(request, '^[^ ]+ (.*) [^ ]+$', 1) NOT RLIKE 'feed' THEN 'user'
               ELSE 'unknown'
           END AS identity
    FROM (
        SELECT
            SPLIT(col, '##@@')[0] AS ip,
            SPLIT(col, '##@@')[1] AS uid,
            SPLIT(col, '##@@')[2] AS tm,
            SPLIT(col, '##@@')[3] AS request,
            SPLIT(col, '##@@')[4] AS status,
            SPLIT(col, '##@@')[5] AS bytes,
            SPLIT(col, '##@@')[6] AS referer,
            SPLIT(col, '##@@')[7] AS agent
        FROM ods_raw_log_d_spark
        WHERE dt = '${bizdate}'
    ) a;
  3. In the right-side navigation pane, click Debugging Configurations and set the following parameters. These are used when you run the workflow in Step 3.

    Parameter Description Example
    Computing Resource The Spark computing resource associated with your workspace Select from the drop-down list
    Resource Group The serverless resource group associated with your workspace Select from the drop-down list
    Script Parameters (bizdate) A date in yyyymmdd format; Data Studio substitutes it for ${bizdate} at runtime bizdate=20250223
  4. Click Save.

Scheduling parameters and policies are configured at the workflow level in this tutorial. You do not need to set them on individual nodes. To review node-level settings, click Properties in the right-side navigation pane. For details, see Node scheduling.

Configure the dws_user_info_all_di_spark node

This node joins the cleaned log table with the user info dimension table on uid and writes the result to dws_user_info_all_di_spark, partitioned by dt.

  1. On the canvas, hover over dws_user_info_all_di_spark and click Open Node.

  2. Paste the following SQL into the code editor:

    Replace dw-spark-demo in the LOCATION clause with your OSS bucket domain name.

    Paimon table (DLF)

    -- Scenario: This Spark SQL code joins the dwd_log_info_di_spark and ods_user_info_d_spark tables on the uid field and writes the result to the corresponding dt partition.
    
    CREATE TABLE IF NOT EXISTS dws_user_info_all_di_spark (
        uid        STRING COMMENT 'User ID',
        gender     STRING COMMENT 'Gender',
        age_range  STRING COMMENT 'Age range',
        zodiac     STRING COMMENT 'Zodiac sign',
        device     STRING COMMENT 'Device type',
        method     STRING COMMENT 'HTTP request type',
        url        STRING COMMENT 'URL',
        `time`     STRING COMMENT 'Time in yyyymmddhh:mi:ss format',
        dt STRING COMMENT 'Partition key' -- Best practice: Include the partition key as a column in the table.
    )
    PARTITIONED BY (dt)
    TBLPROPERTIES (
      'format' = 'paimon' -- Core: Declares the table as a Paimon table.
    );
    
    --Insert data from the user and log tables.
    INSERT OVERWRITE TABLE dws_user_info_all_di_spark PARTITION (dt = '${bizdate}')
    SELECT 
        COALESCE(a.uid, b.uid) AS uid,
        b.gender AS gender,    
        b.age_range AS age_range,
        b.zodiac AS zodiac,
        a.device AS device,
        a.method AS method,
        a.url AS url,
        a.tm
    FROM (
      SELECT * 
      FROM dwd_log_info_di_spark 
      WHERE dt='${bizdate}'
    ) a
    LEFT OUTER JOIN (
      SELECT * 
      FROM ods_user_info_d_spark 
      WHERE dt='${bizdate}'
    ) b
    ON 
        a.uid = b.uid;

    Hive table (DLF-Legacy)

    -- Join dwd_log_info_di_spark (log facts) and ods_user_info_d_spark (user dimensions) on uid.
    -- COALESCE handles cases where the uid exists in the log table but not in the user table.
    CREATE TABLE IF NOT EXISTS dws_user_info_all_di_spark (
        uid        STRING COMMENT 'The user ID',
        gender     STRING COMMENT 'The gender',
        age_range  STRING COMMENT 'The age range',
        zodiac     STRING COMMENT 'The zodiac sign',
        device     STRING COMMENT 'The terminal type',
        method     STRING COMMENT 'The HTTP request type',
        url        STRING COMMENT 'url',
        `time`     STRING COMMENT 'The time in the yyyymmddhh:mi:ss format'
    )
    PARTITIONED BY (dt STRING)
    LOCATION 'oss://dw-spark-demo.oss-cn-shanghai-internal.aliyuncs.com/dws_user_info_all_di_spark/log_${bizdate}/';
    
    ALTER TABLE dws_user_info_all_di_spark ADD IF NOT EXISTS PARTITION (dt = '${bizdate}');
    
    INSERT OVERWRITE TABLE dws_user_info_all_di_spark PARTITION (dt = '${bizdate}')
    SELECT
        COALESCE(a.uid, b.uid) AS uid,
        b.gender AS gender,
        b.age_range AS age_range,
        b.zodiac AS zodiac,
        a.device AS device,
        a.method AS method,
        a.url AS url,
        a.tm
    FROM (
      SELECT *
      FROM dwd_log_info_di_spark
      WHERE dt='${bizdate}'
    ) a
    LEFT OUTER JOIN (
      SELECT *
      FROM ods_user_info_d_spark
      WHERE dt='${bizdate}'
    ) b
    ON a.uid = b.uid;
  3. Configure Debugging Configurations with the same parameter values as the previous node.

  4. Click Save.

Configure the ads_user_info_1d_spark node

This node aggregates the joined table by uid to produce the daily user profile summary in ads_user_info_1d_spark. Each row represents one user's daily page view count and profile attributes.

  1. On the canvas, hover over ads_user_info_1d_spark and click Open Node.

  2. Paste the following SQL into the code editor:

    Replace dw-spark-demo in the LOCATION clause with your OSS bucket domain name.

    Paimon table (DLF)

    -- Scenario: This Spark SQL code uses Spark functions to further process the dws_user_info_all_di_spark table and writes the results to a new table named ads_user_info_1d_spark.
    
    CREATE TABLE IF NOT EXISTS ads_user_info_1d_spark (
      uid STRING COMMENT 'User ID',
      device STRING COMMENT 'Device type',
      pv BIGINT COMMENT 'PV',
      gender STRING COMMENT 'Gender',
      age_range STRING COMMENT 'Age range',
      zodiac STRING COMMENT 'Zodiac sign',
      dt STRING COMMENT 'Partition key' -- Best practice: Include the partition key as a column in the table.
    )
    PARTITIONED BY (dt)
    TBLPROPERTIES (
      'format' = 'paimon' -- Core: Declares the table as a Paimon table.
    );
    
    INSERT OVERWRITE TABLE ads_user_info_1d_spark PARTITION (dt='${bizdate}')
    SELECT uid
      , MAX(device)
      , COUNT(0) AS pv
      , MAX(gender)
      , MAX(age_range)
      , MAX(zodiac)
    FROM dws_user_info_all_di_spark
    WHERE dt = '${bizdate}'
    GROUP BY uid; 

    Hive table (DLF-Legacy)

    -- Aggregate dws_user_info_all_di_spark by uid to produce the final user profile.
    -- COUNT(0) computes page views (pv); MAX() picks a representative value for each attribute.
    CREATE TABLE IF NOT EXISTS ads_user_info_1d_spark (
      uid STRING COMMENT 'The user ID',
      device STRING COMMENT 'The terminal type',
      pv BIGINT COMMENT 'pv',
      gender STRING COMMENT 'The gender',
      age_range STRING COMMENT 'The age range',
      zodiac STRING COMMENT 'The zodiac sign'
    )
    PARTITIONED BY (
      dt STRING
    )
    LOCATION 'oss://dw-spark-demo.oss-cn-shanghai-internal.aliyuncs.com/ads_user_info_1d_spark/log_${bizdate}/';
    
    ALTER TABLE ads_user_info_1d_spark ADD IF NOT EXISTS PARTITION (dt='${bizdate}');
    
    INSERT OVERWRITE TABLE ads_user_info_1d_spark PARTITION (dt='${bizdate}')
    SELECT uid
      , MAX(device)
      , COUNT(0) AS pv
      , MAX(gender)
      , MAX(age_range)
      , MAX(zodiac)
    FROM dws_user_info_all_di_spark
    WHERE dt = '${bizdate}'
    GROUP BY uid;
  3. Configure Debugging Configurations with the same parameter values as the previous nodes.

  4. Click Save.

Step 3: Run and verify the workflow

  1. In the top toolbar of the workflow configuration tab, click Run. In the Enter runtime parameters dialog box, enter 20250223 (or a date of your choice) as the bizdate value and click OK.

  2. After all nodes complete, query the output to verify the results.

    1. In the DataWorks console, go to Data Analysis and Service > DataAnalysis, then click Go to DataAnalysis. In the left-side navigation pane, click SQL Query.

    2. Click the icon next to My Files and select Create File. Enter a file name and click OK.

    3. Open the file, then click the settings icon in the upper-right corner and configure:

      Parameter Value
      Workspace The workspace that contains the User_profile_analysis_Spark workflow
      Data Source Type EMR Spark SQL
      Data Source Name The EMR Serverless Spark computing resource associated with your workspace
    4. Click OK, then run the following query. Replace The data timestamp with the bizdate value you used — for example, if the node ran on February 23, 2025, the data timestamp is 20250222 (one day earlier than the scheduling date).

      SELECT * FROM dwd_log_info_di_spark WHERE dt = 'The data timestamp';

Step 4: Deploy the workflow

Nodes must be deployed to the production environment before they can run on a schedule.

Scheduling parameters were configured at the workflow level in the data synchronization tutorial. You do not need to configure them for individual nodes before deploying.
  1. In the left-side navigation pane of Data Studio, click the Data Studio icon to return to the Data Studio page.

  2. In Workspace Directories, find and open the User_profile_analysis_Spark workflow.

  3. In the node toolbar, click Deploy.

  4. Click Start Deployment to Production Environment and follow the guided steps to complete the deployment.

Step 5: Run the nodes in the production environment

After deployment, scheduled instances are generated starting the following day. Run a backfill to verify the workflow immediately.

  1. Click Operation Center in the top navigation bar of the Data Studio page. Alternatively, click the products icon in the upper-left corner of the DataWorks console and choose All Products > Data Development And Task Operation > Operation Center.

  2. In the left-side navigation pane, choose Auto Triggered Node O&M > Auto Triggered Nodes. Find the zero load node workshop_start_spark and click its name.

  3. In the directed acyclic graph (DAG) of the node, right-click workshop_start_spark and choose Run > Current and Descendant Nodes Retroactively.

  4. In the Backfill Data panel, select the nodes to backfill, set the Data Timestamp, and click Submit and Redirect.

  5. On the Data Backfill page, click Refresh to check whether workshop_start_spark and its descendant nodes ran successfully.

To avoid unexpected charges after completing this tutorial, configure the Effective Period for all nodes in the workflow, or freeze the zero load node workshop_start_spark.

What's next

You now have a working Spark SQL pipeline that processes raw user data into a structured user profile table and runs on a daily schedule. From here, you can:

  • Visualize the results: Use DataAnalysis to build charts from the ads_user_info_1d_spark table and extract trends from the user profile data.

  • Monitor data quality: Set up monitoring rules on the output tables to detect and intercept bad data before it propagates downstream.

  • Explore data lineage: View the generated tables in Data Map and trace relationships between tables using data lineage.

  • Expose data via APIs: Use DataService Studio to publish the final processed data as standardized APIs for downstream consumers.