All Products
Search
Document Center

DataWorks:Process data

Last Updated:Mar 26, 2026

This tutorial walks you through building a scheduled EMR Hive workflow in DataWorks that transforms raw OSS log data into user profile data. By the end, you will know how to:

  • Design a 3-node Hive workflow with explicit dependencies (ODS → DW → ADS)

  • Register a JAR-based user-defined function (UDF) for IP-to-region translation

  • Write Hive SQL for each data layer: cleansing, aggregation, and profiling

  • Configure scheduling parameters and node dependencies

  • Commit, deploy, and validate the workflow via data backfill

The workflow reads from two source tables — ods_user_info_d_emr (basic user information) and ods_raw_log_d_emr (website access logs) — and produces a daily user profile table in the ADS layer.

Prerequisites

Before you begin, ensure that you have:

Step 1: Design a workflow

The workflow contains three EMR Hive nodes that run in the following order:

workshop_start_emr → ods_raw_log_d_emr → dwd_log_info_di_emr → dws_user_info_all_di_emr → ads_user_info_1d_emr

Each node serves a specific purpose in the data pipeline:

NodeData layerPurpose
dwd_log_info_di_emrODS (cleansed)Parses raw log fields and translates IP addresses to regions
dws_user_info_all_di_emrDWJoins cleansed log data with user profile data
ads_user_info_1d_emrADS (RPT)Aggregates per-user metrics for downstream consumption

To create the nodes and set up dependencies:

  1. In the Scheduled Workflow pane of DataStudio, double-click the WorkShop workflow.

  2. On the workflow configuration tab, click Create Node and drag EMR Hive onto the canvas.

  3. In the Create Node dialog box, set Name and click Confirm.

  4. Repeat for all three nodes, then connect them as shown:

image

Step 2: Register a UDF for IP-to-region translation

The dwd_log_info_di_emr node calls a UDF named getregion to translate each visitor's IP address into a geographic region. The UDF is packaged as a JAR file that you upload as an EMR resource, then register as a Hive function.

Upload the JAR resource

  1. Download the ip2region-emr.jar package.

  2. In DataStudio, find the WorkShop workflow, right-click EMR, and choose Create Resource > EMR JAR.

  3. In the Create Resource dialog box, set the key parameters and click Create.

    ParameterValue
    Storage pathThe OSS bucket you specified for your EMR cluster during environment preparation
    FileThe downloaded ip2region-emr.jar package

    image

  4. Click the image.png icon in the toolbar to commit the resource to the EMR project in the development environment.

Register the function

  1. In DataStudio, find the WorkShop workflow, right-click EMR, and select Create Function.

  2. In the Create Function dialog box, set Name to getregion and click Create.

  3. On the function configuration tab, set the key parameters.

    ParameterValue
    Resourceip2region-emr.jar
    Class Nameorg.alidata.emr.udf.Ip2Region

    image

  4. Click the image.png icon in the toolbar to commit the function to the EMR project in the development environment.

Step 3: Configure the EMR Hive nodes

Configure each node by editing its Hive SQL, setting scheduling parameters, and saving. All three nodes share the same scheduling parameter (bizdate = $[yyyymmdd-1]) and run after 00:30 daily, triggered by the workshop_start_emr zero load node.

Note If multiple EMR compute engines are associated with DataStudio in your workspace, select an EMR compute engine on the node configuration tab. If only one compute engine is associated, skip this step.

Node 1: dwd_log_info_di_emr

This node parses the raw ##@@-delimited log records from ods_raw_log_d_emr, calls the getregion UDF to resolve IP addresses to regions, and writes the cleansed data to the ODS-layer table dwd_log_info_di_emr.

Edit node code

Double-click dwd_log_info_di_emr to open the node configuration tab and enter the following SQL:

-- Create a table at the ODS layer.
CREATE TABLE IF NOT EXISTS dwd_log_info_di_emr (
  ip STRING COMMENT 'The IP address',
  uid STRING COMMENT 'The user ID',
  `time` STRING COMMENT 'The time in the yyyymmddhh:mi:ss format',
  status STRING COMMENT 'The status code that is returned by the server',
  bytes STRING COMMENT 'The number of bytes that are returned to the client',
  region STRING COMMENT 'The region, which is obtained based on the IP address',
  method STRING COMMENT 'The HTTP request type',
  url STRING COMMENT 'url',
  protocol STRING COMMENT 'The version number of HTTP',
  referer STRING COMMENT 'The source URL',
  device STRING COMMENT 'The terminal type',
  identity STRING COMMENT 'The access type, which can be crawler, feed, user, or unknown'
)
PARTITIONED BY (
  dt STRING
);

ALTER TABLE dwd_log_info_di_emr ADD IF NOT EXISTS PARTITION (dt='${bizdate}');

set hive.vectorized.execution.enabled = false;
INSERT OVERWRITE TABLE dwd_log_info_di_emr PARTITION (dt='${bizdate}')
SELECT ip
  , uid
  , tm
  , status
  , bytes
  , getregion(ip) AS region -- Obtain the region by using a UDF based on the IP address.
  , regexp_substr(request, '(^[^ ]+ )') AS method -- Use a regular expression to extract three fields from the request.
  , regexp_extract(request, '^[^ ]+ (.*) [^ ]+$') AS url
  , regexp_extract(request, '.* ([^ ]+$)') AS protocol
  , regexp_extract(referer, '^[^/]+://([^/]+){1}') AS referer  -- Use a regular expression to cleanse the HTTP referrer to obtain a more accurate URL.
  , CASE
    WHEN lower (agent) RLIKE 'android' THEN 'android' -- Obtain the terminal and access types from the value of the agent parameter.
    WHEN lower(agent) RLIKE 'iphone' THEN 'iphone'
    WHEN lower(agent) RLIKE 'ipad' THEN 'ipad'
    WHEN lower(agent) RLIKE 'macintosh' THEN 'macintosh'
    WHEN lower(agent) RLIKE 'windows phone' THEN 'windows_phone'
    WHEN lower(agent) RLIKE 'windows' THEN 'windows_pc'
    ELSE 'unknown'
  END AS device
  , CASE
    WHEN lower(agent) RLIKE '(bot|spider|crawler|slurp)' THEN 'crawler'
    WHEN lower(agent) RLIKE 'feed'
    OR regexp_extract(request, '^[^ ]+ (.*) [^ ]+$') RLIKE 'feed' THEN 'feed'
    WHEN lower(agent) NOT RLIKE '(bot|spider|crawler|feed|slurp)'
    AND agent RLIKE '^[Mozilla|Opera]'
    AND regexp_extract(request, '^[^ ]+ (.*) [^ ]+$') NOT RLIKE 'feed' THEN 'user'
    ELSE 'unknown'
  END AS identity
  FROM (
    SELECT SPLIT(col, '##@@')[0] AS ip
    , SPLIT(col, '##@@')[1] AS uid
    , SPLIT(col, '##@@')[2] AS tm
    , SPLIT(col, '##@@')[3] AS request
    , SPLIT(col, '##@@')[4] AS status
    , SPLIT(col, '##@@')[5] AS bytes
    , SPLIT(col, '##@@')[6] AS referer
    , SPLIT(col, '##@@')[7] AS agent
    FROM ods_raw_log_d_emr
  WHERE dt = '${bizdate}'
) a;

Configure scheduling properties

SectionParameterValue
Scheduling Parameterbizdate$[yyyymmdd-1]
ScheduleRerunAllow Regardless of Running Status
ScheduleScheduling CycleDay
DependenciesOutput table<Workspace name>.dwd_log_info_di_emr

After this node's ancestor node (ods_raw_log_d_emr) finishes synchronizing data from OSS to the EMR table at 00:30 daily, this node runs automatically and writes the processed data into the ${bizdate} partition of dwd_log_info_di_emr.

Save the node

Click the image.png icon in the top toolbar to save the node configuration.

Node 2: dws_user_info_all_di_emr

This node joins the cleansed log data from dwd_log_info_di_emr with the user profile data from ods_user_info_d_emr to produce an enriched per-visit record at the DW layer.

Edit node code

Double-click dws_user_info_all_di_emr to open the node configuration tab and enter the following SQL:

-- Create a table at the DW layer.
CREATE TABLE IF NOT EXISTS dws_user_info_all_di_emr (
  uid STRING COMMENT 'The user ID',
  gender STRING COMMENT 'The gender',
  age_range STRING COMMENT 'The age range',
  zodiac STRING COMMENT 'The zodiac sign',
  region STRING COMMENT 'The region, which is obtained based on the IP address',
  device STRING COMMENT 'The terminal type',
  identity STRING COMMENT 'The access type, which can be crawler, feed, user, or unknown',
  method STRING COMMENT 'The HTTP request type',
  url STRING COMMENT 'url',
  referer STRING COMMENT 'The source URL',
  `time` STRING COMMENT 'The time in the yyyymmddhh:mi:ss format',
)
PARTITIONED BY (
  dt STRING
);

ALTER TABLE dws_user_info_all_di_emr ADD IF NOT EXISTS PARTITION (dt='${bizdate}');

INSERT OVERWRITE TABLE dws_user_info_all_di_emr PARTITION (dt='${bizdate}')
SELECT COALESCE(a.uid, b.uid) AS uid
  , b.gender
  , b.age_range
  , b.zodiac
  , a.region
  , a.device
  , a.identity
  , a.method
  , a.url
  , a.referer
  , a.`time`
FROM (
  SELECT *
  FROM dwd_log_info_di_emr
  WHERE dt = '${bizdate}'
) a
LEFT OUTER JOIN (
  SELECT *
  FROM ods_user_info_d_emr
  WHERE dt = '${bizdate}'
) b
ON a.uid = b.uid;

Configure scheduling properties

SectionParameterValue
Scheduling Parameterbizdate$[yyyymmdd-1]
ScheduleRerunAllow Regardless of Running Status
ScheduleScheduling CycleDay
DependenciesOutput table<Workspace name>.dws_user_info_all_di_emr

This node runs after both ods_user_info_d_emr and dwd_log_info_di_emr finish at 00:30 daily.

Save the node

Click the image.png icon in the top toolbar to save the node configuration.

Node 3: ads_user_info_1d_emr

This node aggregates the per-visit records in dws_user_info_all_di_emr into one row per user, computing page view (PV) count and selecting representative attribute values for each user's profile.

Edit node code

Double-click ads_user_info_1d_emr to open the node configuration tab and enter the following SQL:

-- Create a table at the RPT layer.
CREATE TABLE IF NOT EXISTS ads_user_info_1d_emr (
  uid STRING COMMENT 'The user ID',
  region STRING COMMENT 'The region, which is obtained based on the IP address',
  device STRING COMMENT 'The terminal type',
  pv BIGINT COMMENT 'pv',
  gender STRING COMMENT 'The gender',
  age_range STRING COMMENT 'The age range',
  zodiac STRING COMMENT 'The zodiac sign'
)
PARTITIONED BY (
  dt STRING
);

ALTER TABLE ads_user_info_1d_emr ADD IF NOT EXISTS PARTITION (dt='${bizdate}');

INSERT OVERWRITE TABLE ads_user_info_1d_emr PARTITION (dt='${bizdate}')
SELECT uid
  , MAX(region)
  , MAX(device)
  , COUNT(0) AS pv
  , MAX(gender)
  , MAX(age_range)
  , MAX(zodiac)
FROM dws_user_info_all_di_emr
WHERE dt = '${bizdate}'
GROUP BY uid;

Configure scheduling properties

SectionParameterValue
Scheduling Parameterbizdate$[yyyymmdd-1]
ScheduleRerunAllow Regardless of Running Status
ScheduleScheduling CycleDay
DependenciesOutput table<Workspace name>.ads_user_info_1d_emr

This node runs after dws_user_info_all_di_emr completes.

Save the node

Click the image.png icon in the top toolbar to save the node configuration.

Step 4: Commit the workflow

Before committing, run the workflow to confirm all nodes complete without errors.

  1. On the workflow configuration tab, click the 运行 icon to run the workflow.

  2. Confirm that the 成功 icon appears next to every node, then click the 提交 icon to commit.

  3. In the Commit dialog box, select the nodes to commit, enter a description, select Ignore I/O Inconsistency Alerts, and click Confirm.

  4. Deploy the committed nodes:

    1. In the upper-right corner of the workflow configuration tab, click Deploy. The Create Deploy Task page appears.

    2. Select the nodes to deploy and click Deploy. In the Create Deploy Task dialog box, click Deploy.

Step 5: Run the nodes in the production environment

After deployment, newly created instances are scheduled to run the following day. Use data backfill to trigger an immediate run and verify the nodes work correctly in the production environment. For details, see Backfill data and view data backfill instances (new version).

  1. Click Operation Center in the upper-right corner of the workflow configuration tab.

  2. In the left-side navigation pane, choose Auto Triggered Node O&M > Auto Triggered Nodes. Click workshop_start_emr to open its directed acyclic graph (DAG).

  3. In the DAG, right-click the workshop_start_emr node and choose Run > Current and Descendant Nodes Retroactively.

  4. In the Backfill Data panel, select the nodes to backfill, set the Data Timestamp parameter, and click Submit and Redirect.

  5. On the data backfill page, click Refresh until all SQL nodes are successfully run.

What to do next

To monitor the quality of the data generated by your scheduled nodes, configure data quality monitoring rules. For details, see Monitor data quality.