This tutorial walks you through building a scheduled EMR Hive workflow in DataWorks that transforms raw OSS log data into user profile data. By the end, you will know how to:
Design a 3-node Hive workflow with explicit dependencies (ODS → DW → ADS)
Register a JAR-based user-defined function (UDF) for IP-to-region translation
Write Hive SQL for each data layer: cleansing, aggregation, and profiling
Configure scheduling parameters and node dependencies
Commit, deploy, and validate the workflow via data backfill
The workflow reads from two source tables — ods_user_info_d_emr (basic user information) and ods_raw_log_d_emr (website access logs) — and produces a daily user profile table in the ADS layer.
Prerequisites
Before you begin, ensure that you have:
Completed data synchronization. For details, see Synchronize data
Step 1: Design a workflow
The workflow contains three EMR Hive nodes that run in the following order:
workshop_start_emr → ods_raw_log_d_emr → dwd_log_info_di_emr → dws_user_info_all_di_emr → ads_user_info_1d_emrEach node serves a specific purpose in the data pipeline:
| Node | Data layer | Purpose |
|---|---|---|
dwd_log_info_di_emr | ODS (cleansed) | Parses raw log fields and translates IP addresses to regions |
dws_user_info_all_di_emr | DW | Joins cleansed log data with user profile data |
ads_user_info_1d_emr | ADS (RPT) | Aggregates per-user metrics for downstream consumption |
To create the nodes and set up dependencies:
In the Scheduled Workflow pane of DataStudio, double-click the WorkShop workflow.
On the workflow configuration tab, click Create Node and drag EMR Hive onto the canvas.
In the Create Node dialog box, set Name and click Confirm.
Repeat for all three nodes, then connect them as shown:

Step 2: Register a UDF for IP-to-region translation
The dwd_log_info_di_emr node calls a UDF named getregion to translate each visitor's IP address into a geographic region. The UDF is packaged as a JAR file that you upload as an EMR resource, then register as a Hive function.
Upload the JAR resource
Download the ip2region-emr.jar package.
In DataStudio, find the WorkShop workflow, right-click EMR, and choose Create Resource > EMR JAR.
In the Create Resource dialog box, set the key parameters and click Create.
Parameter Value Storage path The OSS bucket you specified for your EMR cluster during environment preparation File The downloaded ip2region-emr.jarpackage
Click the
icon in the toolbar to commit the resource to the EMR project in the development environment.
Register the function
In DataStudio, find the WorkShop workflow, right-click EMR, and select Create Function.
In the Create Function dialog box, set Name to
getregionand click Create.On the function configuration tab, set the key parameters.
Parameter Value Resource ip2region-emr.jarClass Name org.alidata.emr.udf.Ip2Region
Click the
icon in the toolbar to commit the function to the EMR project in the development environment.
Step 3: Configure the EMR Hive nodes
Configure each node by editing its Hive SQL, setting scheduling parameters, and saving. All three nodes share the same scheduling parameter (bizdate = $[yyyymmdd-1]) and run after 00:30 daily, triggered by the workshop_start_emr zero load node.
Node 1: dwd_log_info_di_emr
This node parses the raw ##@@-delimited log records from ods_raw_log_d_emr, calls the getregion UDF to resolve IP addresses to regions, and writes the cleansed data to the ODS-layer table dwd_log_info_di_emr.
Edit node code
Double-click dwd_log_info_di_emr to open the node configuration tab and enter the following SQL:
-- Create a table at the ODS layer.
CREATE TABLE IF NOT EXISTS dwd_log_info_di_emr (
ip STRING COMMENT 'The IP address',
uid STRING COMMENT 'The user ID',
`time` STRING COMMENT 'The time in the yyyymmddhh:mi:ss format',
status STRING COMMENT 'The status code that is returned by the server',
bytes STRING COMMENT 'The number of bytes that are returned to the client',
region STRING COMMENT 'The region, which is obtained based on the IP address',
method STRING COMMENT 'The HTTP request type',
url STRING COMMENT 'url',
protocol STRING COMMENT 'The version number of HTTP',
referer STRING COMMENT 'The source URL',
device STRING COMMENT 'The terminal type',
identity STRING COMMENT 'The access type, which can be crawler, feed, user, or unknown'
)
PARTITIONED BY (
dt STRING
);
ALTER TABLE dwd_log_info_di_emr ADD IF NOT EXISTS PARTITION (dt='${bizdate}');
set hive.vectorized.execution.enabled = false;
INSERT OVERWRITE TABLE dwd_log_info_di_emr PARTITION (dt='${bizdate}')
SELECT ip
, uid
, tm
, status
, bytes
, getregion(ip) AS region -- Obtain the region by using a UDF based on the IP address.
, regexp_substr(request, '(^[^ ]+ )') AS method -- Use a regular expression to extract three fields from the request.
, regexp_extract(request, '^[^ ]+ (.*) [^ ]+$') AS url
, regexp_extract(request, '.* ([^ ]+$)') AS protocol
, regexp_extract(referer, '^[^/]+://([^/]+){1}') AS referer -- Use a regular expression to cleanse the HTTP referrer to obtain a more accurate URL.
, CASE
WHEN lower (agent) RLIKE 'android' THEN 'android' -- Obtain the terminal and access types from the value of the agent parameter.
WHEN lower(agent) RLIKE 'iphone' THEN 'iphone'
WHEN lower(agent) RLIKE 'ipad' THEN 'ipad'
WHEN lower(agent) RLIKE 'macintosh' THEN 'macintosh'
WHEN lower(agent) RLIKE 'windows phone' THEN 'windows_phone'
WHEN lower(agent) RLIKE 'windows' THEN 'windows_pc'
ELSE 'unknown'
END AS device
, CASE
WHEN lower(agent) RLIKE '(bot|spider|crawler|slurp)' THEN 'crawler'
WHEN lower(agent) RLIKE 'feed'
OR regexp_extract(request, '^[^ ]+ (.*) [^ ]+$') RLIKE 'feed' THEN 'feed'
WHEN lower(agent) NOT RLIKE '(bot|spider|crawler|feed|slurp)'
AND agent RLIKE '^[Mozilla|Opera]'
AND regexp_extract(request, '^[^ ]+ (.*) [^ ]+$') NOT RLIKE 'feed' THEN 'user'
ELSE 'unknown'
END AS identity
FROM (
SELECT SPLIT(col, '##@@')[0] AS ip
, SPLIT(col, '##@@')[1] AS uid
, SPLIT(col, '##@@')[2] AS tm
, SPLIT(col, '##@@')[3] AS request
, SPLIT(col, '##@@')[4] AS status
, SPLIT(col, '##@@')[5] AS bytes
, SPLIT(col, '##@@')[6] AS referer
, SPLIT(col, '##@@')[7] AS agent
FROM ods_raw_log_d_emr
WHERE dt = '${bizdate}'
) a;Configure scheduling properties
| Section | Parameter | Value |
|---|---|---|
| Scheduling Parameter | bizdate | $[yyyymmdd-1] |
| Schedule | Rerun | Allow Regardless of Running Status |
| Schedule | Scheduling Cycle | Day |
| Dependencies | Output table | <Workspace name>.dwd_log_info_di_emr |
After this node's ancestor node (ods_raw_log_d_emr) finishes synchronizing data from OSS to the EMR table at 00:30 daily, this node runs automatically and writes the processed data into the ${bizdate} partition of dwd_log_info_di_emr.
Save the node
Click the
icon in the top toolbar to save the node configuration.
Node 2: dws_user_info_all_di_emr
This node joins the cleansed log data from dwd_log_info_di_emr with the user profile data from ods_user_info_d_emr to produce an enriched per-visit record at the DW layer.
Edit node code
Double-click dws_user_info_all_di_emr to open the node configuration tab and enter the following SQL:
-- Create a table at the DW layer.
CREATE TABLE IF NOT EXISTS dws_user_info_all_di_emr (
uid STRING COMMENT 'The user ID',
gender STRING COMMENT 'The gender',
age_range STRING COMMENT 'The age range',
zodiac STRING COMMENT 'The zodiac sign',
region STRING COMMENT 'The region, which is obtained based on the IP address',
device STRING COMMENT 'The terminal type',
identity STRING COMMENT 'The access type, which can be crawler, feed, user, or unknown',
method STRING COMMENT 'The HTTP request type',
url STRING COMMENT 'url',
referer STRING COMMENT 'The source URL',
`time` STRING COMMENT 'The time in the yyyymmddhh:mi:ss format',
)
PARTITIONED BY (
dt STRING
);
ALTER TABLE dws_user_info_all_di_emr ADD IF NOT EXISTS PARTITION (dt='${bizdate}');
INSERT OVERWRITE TABLE dws_user_info_all_di_emr PARTITION (dt='${bizdate}')
SELECT COALESCE(a.uid, b.uid) AS uid
, b.gender
, b.age_range
, b.zodiac
, a.region
, a.device
, a.identity
, a.method
, a.url
, a.referer
, a.`time`
FROM (
SELECT *
FROM dwd_log_info_di_emr
WHERE dt = '${bizdate}'
) a
LEFT OUTER JOIN (
SELECT *
FROM ods_user_info_d_emr
WHERE dt = '${bizdate}'
) b
ON a.uid = b.uid;Configure scheduling properties
| Section | Parameter | Value |
|---|---|---|
| Scheduling Parameter | bizdate | $[yyyymmdd-1] |
| Schedule | Rerun | Allow Regardless of Running Status |
| Schedule | Scheduling Cycle | Day |
| Dependencies | Output table | <Workspace name>.dws_user_info_all_di_emr |
This node runs after both ods_user_info_d_emr and dwd_log_info_di_emr finish at 00:30 daily.
Save the node
Click the
icon in the top toolbar to save the node configuration.
Node 3: ads_user_info_1d_emr
This node aggregates the per-visit records in dws_user_info_all_di_emr into one row per user, computing page view (PV) count and selecting representative attribute values for each user's profile.
Edit node code
Double-click ads_user_info_1d_emr to open the node configuration tab and enter the following SQL:
-- Create a table at the RPT layer.
CREATE TABLE IF NOT EXISTS ads_user_info_1d_emr (
uid STRING COMMENT 'The user ID',
region STRING COMMENT 'The region, which is obtained based on the IP address',
device STRING COMMENT 'The terminal type',
pv BIGINT COMMENT 'pv',
gender STRING COMMENT 'The gender',
age_range STRING COMMENT 'The age range',
zodiac STRING COMMENT 'The zodiac sign'
)
PARTITIONED BY (
dt STRING
);
ALTER TABLE ads_user_info_1d_emr ADD IF NOT EXISTS PARTITION (dt='${bizdate}');
INSERT OVERWRITE TABLE ads_user_info_1d_emr PARTITION (dt='${bizdate}')
SELECT uid
, MAX(region)
, MAX(device)
, COUNT(0) AS pv
, MAX(gender)
, MAX(age_range)
, MAX(zodiac)
FROM dws_user_info_all_di_emr
WHERE dt = '${bizdate}'
GROUP BY uid;Configure scheduling properties
| Section | Parameter | Value |
|---|---|---|
| Scheduling Parameter | bizdate | $[yyyymmdd-1] |
| Schedule | Rerun | Allow Regardless of Running Status |
| Schedule | Scheduling Cycle | Day |
| Dependencies | Output table | <Workspace name>.ads_user_info_1d_emr |
This node runs after dws_user_info_all_di_emr completes.
Save the node
Click the
icon in the top toolbar to save the node configuration.
Step 4: Commit the workflow
Before committing, run the workflow to confirm all nodes complete without errors.
On the workflow configuration tab, click the
icon to run the workflow.Confirm that the
icon appears next to every node, then click the
icon to commit.In the Commit dialog box, select the nodes to commit, enter a description, select Ignore I/O Inconsistency Alerts, and click Confirm.
Deploy the committed nodes:
In the upper-right corner of the workflow configuration tab, click Deploy. The Create Deploy Task page appears.
Select the nodes to deploy and click Deploy. In the Create Deploy Task dialog box, click Deploy.
Step 5: Run the nodes in the production environment
After deployment, newly created instances are scheduled to run the following day. Use data backfill to trigger an immediate run and verify the nodes work correctly in the production environment. For details, see Backfill data and view data backfill instances (new version).
Click Operation Center in the upper-right corner of the workflow configuration tab.
In the left-side navigation pane, choose Auto Triggered Node O&M > Auto Triggered Nodes. Click workshop_start_emr to open its directed acyclic graph (DAG).
In the DAG, right-click the workshop_start_emr node and choose Run > Current and Descendant Nodes Retroactively.
In the Backfill Data panel, select the nodes to backfill, set the Data Timestamp parameter, and click Submit and Redirect.
On the data backfill page, click Refresh until all SQL nodes are successfully run.
What to do next
To monitor the quality of the data generated by your scheduled nodes, configure data quality monitoring rules. For details, see Monitor data quality.