All Products
Search
Document Center

E-MapReduce:Process data

Last Updated:Mar 26, 2026

Use EMR Hive nodes in DataWorks to transform raw OSS data into a user profile dataset. This tutorial walks you through a three-stage pipeline: cleanse raw web logs, join them with user records, and aggregate the results into a consumable ADS table.

Pipeline overview:

StageNodeInputOutput
Cleanse (ODS → DWD)dwd_log_info_di_emrods_raw_log_d_emrStructured log records with region, device, and identity fields
Join (DWD → DWS)dws_user_info_all_di_emrdwd_log_info_di_emr + ods_user_info_d_emrCombined log and user demographics
Summarize (DWS → ADS)ads_user_info_1d_emrdws_user_info_all_di_emrDaily user profile with page view (PV) counts

Node dependency: dwd_log_info_di_emrdws_user_info_all_di_emrads_user_info_1d_emr

Prerequisites

Before you begin, make sure that you have:

  • Synchronized the source data as described in Synchronize data

  • An Object Storage Service (OSS) bucket prepared for the environment (referenced in Step 2)

Step 1: Design the workflow

In the Scheduled Workflow pane of the DataStudio page, double-click the workflow. On the workflow configuration tab, click EMR Hive in the EMR section. In the Create Node dialog box, set Name and click Confirm.

Create the following three EMR Hive nodes, then configure their dependencies as shown in the figure:

  • dwd_log_info_di_emr — cleanses raw OSS log data

  • dws_user_info_all_di_emr — joins cleansed logs with basic user information

  • ads_user_info_1d_emr — generates the final user profile data

image

Step 2: Register a UDF for IP-to-region lookup

The raw logs store visitor IP addresses, but the pipeline needs region labels. Register a User-Defined Function (UDF) that translates IP addresses into region names.

Upload the JAR resource

  1. Download the ip2region-emr.jar package.

  2. On the DataStudio page, find the WorkShop workflow, right-click EMR, and choose Create Resource > EMR JAR. Configure the following parameters in the dialog box: Set other parameters based on your requirements, or keep the defaults.

    ParameterValue
    Storage pathThe OSS bucket you prepared
    FileThe downloaded ip2region-emr.jar

    image

  3. Click the image.png icon in the toolbar to commit the resource to the EMR project in the development environment.

Register the function

  1. On the DataStudio page, find the WorkShop workflow, right-click EMR, and select Create Function.

  2. In the Create Function dialog box, set Name to getregion and click Create. Configure the following parameters on the tab that appears: Set other parameters based on your requirements, or keep the defaults.

    ParameterValue
    Resourceip2region-emr.jar
    Class Nameorg.alidata.emr.udf.Ip2Region

    image

  3. Click the image.png icon in the toolbar to commit the function to the EMR project in the development environment.

After this step, you can call getregion(ip) in any HiveQL statement within the project.

Step 3: Configure the EMR Hive nodes

If multiple EMR compute engines are associated with DataStudio in your workspace, select the appropriate compute engine for each node. If only one compute engine is associated, skip this selection.

Configure dwd_log_info_di_emr (cleanse raw logs)

This node parses the raw log table (ods_raw_log_d_emr) and writes structured records to dwd_log_info_di_emr. Each raw log row is a single string delimited by ##@@, with fields in the order: ip, uid, time, request, status, bytes, referer, agent.

Add the HiveQL code

Double-click the dwd_log_info_di_emr node to open its configuration tab. Enter the following statements:

-- Create the DWD-layer table for cleansed log data.
CREATE TABLE IF NOT EXISTS dwd_log_info_di_emr (
  ip       STRING COMMENT 'Visitor IP address',
  uid      STRING COMMENT 'User ID',
  `time`   STRING COMMENT 'Request timestamp (yyyymmddhh:mi:ss)',
  status   STRING COMMENT 'HTTP status code returned by the server',
  bytes    STRING COMMENT 'Bytes returned to the client',
  region   STRING COMMENT 'Region derived from the IP address via UDF',
  method   STRING COMMENT 'HTTP method (GET, POST, etc.)',
  url      STRING COMMENT 'Request URL path',
  protocol STRING COMMENT 'HTTP version',
  referer  STRING COMMENT 'Referring URL (domain only)',
  device   STRING COMMENT 'Device type: android, iphone, ipad, macintosh, windows_phone, windows_pc, or unknown',
  identity STRING COMMENT 'Access type: crawler, feed, user, or unknown'
)
PARTITIONED BY (dt STRING);

ALTER TABLE dwd_log_info_di_emr ADD IF NOT EXISTS PARTITION (dt='${bizdate}');

SET hive.vectorized.execution.enabled = false;

-- Parse and cleanse raw log data for the current business date.
-- Each raw row is a single string with fields separated by ##@@.
-- Field order: [0]=ip, [1]=uid, [2]=tm, [3]=request, [4]=status, [5]=bytes, [6]=referer, [7]=agent
INSERT OVERWRITE TABLE dwd_log_info_di_emr PARTITION (dt='${bizdate}')
SELECT
    ip
  , uid
  , tm
  , status
  , bytes
  , getregion(ip)                                            AS region   -- UDF: translates IP to region name
  , regexp_substr(request, '(^[^ ]+ )')                     AS method   -- Extracts HTTP method from request string
  , regexp_extract(request, '^[^ ]+ (.*) [^ ]+$')           AS url      -- Extracts URL path from request string
  , regexp_extract(request, '.* ([^ ]+$)')                   AS protocol -- Extracts HTTP version from request string
  , regexp_extract(referer, '^[^/]+://([^/]+){1}')           AS referer  -- Strips path; keeps only the referring domain
  , CASE
      WHEN lower(agent) RLIKE 'android'       THEN 'android'
      WHEN lower(agent) RLIKE 'iphone'        THEN 'iphone'
      WHEN lower(agent) RLIKE 'ipad'          THEN 'ipad'
      WHEN lower(agent) RLIKE 'macintosh'     THEN 'macintosh'
      WHEN lower(agent) RLIKE 'windows phone' THEN 'windows_phone'
      WHEN lower(agent) RLIKE 'windows'       THEN 'windows_pc'
      ELSE 'unknown'
    END AS device   -- Derived from the User-Agent string
  , CASE
      WHEN lower(agent) RLIKE '(bot|spider|crawler|slurp)'                              THEN 'crawler'
      WHEN lower(agent) RLIKE 'feed'
        OR regexp_extract(request, '^[^ ]+ (.*) [^ ]+$') RLIKE 'feed'                  THEN 'feed'
      WHEN lower(agent) NOT RLIKE '(bot|spider|crawler|feed|slurp)'
        AND agent RLIKE '^[Mozilla|Opera]'
        AND regexp_extract(request, '^[^ ]+ (.*) [^ ]+$') NOT RLIKE 'feed'             THEN 'user'
      ELSE 'unknown'
    END AS identity -- Derived from the User-Agent string and URL path
FROM (
  -- Split each raw log row into individual fields using the ##@@ delimiter.
  SELECT
      SPLIT(col, '##@@')[0] AS ip
    , SPLIT(col, '##@@')[1] AS uid
    , SPLIT(col, '##@@')[2] AS tm
    , SPLIT(col, '##@@')[3] AS request
    , SPLIT(col, '##@@')[4] AS status
    , SPLIT(col, '##@@')[5] AS bytes
    , SPLIT(col, '##@@')[6] AS referer
    , SPLIT(col, '##@@')[7] AS agent
  FROM ods_raw_log_d_emr
  WHERE dt = '${bizdate}'
) a;

Configure scheduling properties

Configuration itemValue
Add Parameter (in the Scheduling Parameter section)Name: bizdate / Value: $[yyyymmdd-1]
DependenciesSet the output table to workspacename.dwd_log_info_di_emr
The Scheduling Cycle is set to Day. Skip the Scheduled time field — the run time is inherited from the workshop_start_emr root node, which triggers all descendant nodes after 00:30 every day.

After you save these settings, the dwd_log_info_di_emr node runs automatically once the ancestor node ods_raw_log_d_emr finishes syncing data from OSS.

Save the node configuration

Click the image.png icon in the top toolbar to save the node.

Configure dws_user_info_all_di_emr (join logs with user records)

This node joins the cleansed log data with the user demographics table and writes the combined records to dws_user_info_all_di_emr.

Add the HiveQL code

Double-click the dws_user_info_all_di_emr node to open its configuration tab. Enter the following statements:

-- Create the DWS-layer table combining log behavior and user demographics.
CREATE TABLE IF NOT EXISTS dws_user_info_all_di_emr (
  uid       STRING COMMENT 'User ID',
  gender    STRING COMMENT 'Gender',
  age_range STRING COMMENT 'Age range',
  zodiac    STRING COMMENT 'Zodiac sign',
  region    STRING COMMENT 'Region derived from IP address',
  device    STRING COMMENT 'Device type',
  identity  STRING COMMENT 'Access type: crawler, feed, user, or unknown',
  method    STRING COMMENT 'HTTP method',
  url       STRING COMMENT 'Request URL path',
  referer   STRING COMMENT 'Referring domain',
  `time`    STRING COMMENT 'Request timestamp (yyyymmddhh:mi:ss)'
)
PARTITIONED BY (dt STRING);

ALTER TABLE dws_user_info_all_di_emr ADD IF NOT EXISTS PARTITION (dt='${bizdate}');

-- Join cleansed log data (left) with user demographics (right) on uid.
-- COALESCE handles cases where a uid appears in logs but not in the user table.
INSERT OVERWRITE TABLE dws_user_info_all_di_emr PARTITION (dt='${bizdate}')
SELECT
    COALESCE(a.uid, b.uid) AS uid
  , b.gender
  , b.age_range
  , b.zodiac
  , a.region
  , a.device
  , a.identity
  , a.method
  , a.url
  , a.referer
  , a.`time`
FROM (
  SELECT * FROM dwd_log_info_di_emr WHERE dt = '${bizdate}'
) a
LEFT OUTER JOIN (
  SELECT * FROM ods_user_info_d_emr WHERE dt = '${bizdate}'
) b ON a.uid = b.uid;

Configure scheduling properties

Configuration itemValue
Add Parameter (in the Scheduling Parameter section)Name: bizdate / Value: $[yyyymmdd-1]
DependenciesSet the output table to workspacename.dws_user_info_all_di_emr
The Scheduling Cycle is set to Day. Skip the Scheduled time field — the run time is determined by the workshop_start_emr root node.

After you save these settings, the dws_user_info_all_di_emr node runs automatically once both ancestor nodes (ods_user_info_d_emr and dwd_log_info_di_emr) have finished.

Save the node configuration

Click the image.png icon in the top toolbar to save the node.

Configure ads_user_info_1d_emr (generate user profiles)

This node aggregates the joined data into a daily user profile, grouped by uid, and writes the results to ads_user_info_1d_emr.

Add the HiveQL code

Double-click the ads_user_info_1d_emr node to open its configuration tab. Enter the following statements:

-- Create the ADS-layer table for daily user profiles.
CREATE TABLE IF NOT EXISTS ads_user_info_1d_emr (
  uid       STRING COMMENT 'User ID',
  region    STRING COMMENT 'Region derived from IP address',
  device    STRING COMMENT 'Device type',
  pv        BIGINT COMMENT 'Page view count',
  gender    STRING COMMENT 'Gender',
  age_range STRING COMMENT 'Age range',
  zodiac    STRING COMMENT 'Zodiac sign'
)
PARTITIONED BY (dt STRING);

ALTER TABLE ads_user_info_1d_emr ADD IF NOT EXISTS PARTITION (dt='${bizdate}');

-- Aggregate per-user behavior for the current business date.
-- COUNT(0) counts all log rows per uid as the page view total.
INSERT OVERWRITE TABLE ads_user_info_1d_emr PARTITION (dt='${bizdate}')
SELECT
    uid
  , MAX(region)
  , MAX(device)
  , COUNT(0)    AS pv
  , MAX(gender)
  , MAX(age_range)
  , MAX(zodiac)
FROM dws_user_info_all_di_emr
WHERE dt = '${bizdate}'
GROUP BY uid;

Configure scheduling properties

Configuration itemValue
Add Parameter (in the Scheduling Parameter section)Name: bizdate / Value: $[yyyymmdd-1]
DependenciesSet the output table to workspacename.ads_user_info_1d_emr
The Scheduling Cycle is set to Day. Skip the Scheduled time field — the run time is determined by the workshop_start_emr root node.

After you save these settings, the ads_user_info_1d_emr node runs automatically once dws_user_info_all_di_emr finishes.

Save the node configuration

Click the image.png icon in the top toolbar to save the node.

Step 4: Test and commit the workflow

  1. On the workflow configuration tab, click the run icon to run the workflow.

  2. When the success icon appears next to all nodes, click the commit icon to commit.

  3. In the Commit dialog box, select the nodes to commit, enter a description, and select Ignore I/O Inconsistency Alerts.

  4. Click Confirm.

  5. On the Deployment Package Creation page, deploy the nodes.

Step 5: Run nodes in the production environment

Deployed nodes generate instances scheduled for the next day. Use the data backfill feature to trigger an immediate run and verify that the pipeline works end to end in production.

  1. Click Operation Center in the upper-right corner (or in the top toolbar of the workflow configuration tab).

  2. In the left-side navigation pane, choose Auto Triggered Node O&M > Auto Triggered Nodes. Click the name of the workshop_start_emr zero load node.

  3. In the directed acyclic graph (DAG) on the right, right-click the workshop_start_emr node and choose Run > Current and Descendant Nodes Retroactively.

  4. In the Backfill Data panel, select the nodes to backfill, set the Data Timestamp parameter, and click Submit and Redirect. The data backfill instances page opens.

  5. Click Refresh until all nodes show a successful status.

What's next

To monitor the quality of data generated by these nodes, configure data quality rules in DataWorks. For more information about backfilling data and managing backfill instances, see Backfill data and view data backfill instances (new version).