All Products
Search
Document Center

DataWorks:Process data

Last Updated:Feb 19, 2025

This topic explains how to process data using E-MapReduce (EMR) Hive nodes in DataWorks. It involves processing data from the ods_user_info_d_emr and ods_raw_log_d_emr tables, which are stored in an Object Storage Service (OSS) bucket after synchronization, to derive the desired user profile data.

Prerequisites

Before starting this experiment, ensure you have completed the steps in Data synchronization.

Step 1: Design a workflow

For information on configuring dependencies between workflow nodes, refer to Data synchronization.

Double-click the newly created workflow to access the edit page. Click Create Node, choose EMR Hive, and drag it onto the right-hand side edit page. In the Create Node dialog box, input the Node Name and click Confirm.

Create three EMR Hive nodes named dwd_log_info_di_emr, dws_user_info_all_di_emr, and ads_user_info_1d_emr respectively, and set up their dependencies as illustrated below.

  • dwd_log_info_di_emr: Cleans the raw OSS log data.

  • dws_user_info_all_di_emr: Aggregates the cleaned log data with basic user information.

  • ads_user_info_1d_emr: Generates the final user profile data.

image

Step 2: Create a function

To format the synchronized raw log data correctly, we need to use functions to convert it into the target format. In this example, a function code package for translating IP addresses into regions is provided. Download the function code package to your local machine, register it as a function in DataWorks, and then call the function as needed.

Upload resources

  1. Download the ip2region-emr.jar file.

  2. On the Data Development page, open the WorkShop workflow, right-click EMR, and select Create Resource> > EMR JAR. Configure the parameters for the new resource, then click Create. image

    Key parameters include the following:

    • Storage Path: Choose the OSS bucket in the preparation environment where the EMR cluster configuration is stored.

    • Upload File: Choose the ip2region-emr.jar file you have downloaded.

    Configure other parameters as per your business needs or use the default settings.

  3. Click the toolbar image.png, and submit the resource to the EMR engine project in the development environment.

Register a function

  1. On the Data Development page, open the workflow, right-click EMR , and select Create Function .

  2. In the Create Function dialog box, enter "getregion" in the Function Name field, then click Create and configure the function's information. image

    Key parameters include the following:

    • Resource: Select the ip2region-emr.jar file.

    • Class Name: Enter org.alidata.emr.udf.Ip2Region.

    Configure other parameters as per your business needs or use the default settings.

  3. Click the toolbar image.png, and submit the function to the EMR engine project in the development environment.

Step 3: Configure EMR Hive nodes

Create dwd_log_info_di_emr node

1. Edit code

Double-click the dwd_log_info_di_emr node to access the node configuration tab. Enter the following statements in the configuration tab that appears.

Note

If your workspace is associated with multiple EMR compute engines in DataStudio, select the necessary EMR Engine. If there is only one associated EMR compute engine, selection is not required.

--Create ODS layer table
CREATE TABLE IF NOT EXISTS dwd_log_info_di_emr (
  ip STRING COMMENT 'The IP address',
  uid STRING COMMENT 'The ID of the user',
  `time` STRING COMMENT 'The time in the yyyymmddhh:mi:ss format',
  status STRING COMMENT 'The status code that is returned by the server',
  bytes STRING COMMENT 'The number of bytes that are returned to the client',
  region STRING COMMENT 'The region, which is obtained based on the IP address',
  method STRING COMMENT 'The type of the HTTP request',
  url STRING COMMENT 'The URL',
  protocol STRING COMMENT 'The version number of HTTP',
  referer STRING COMMENT 'The source URL',
  device STRING COMMENT 'The terminal type ',
  identity STRING COMMENT 'The access type, which can be crawler, feed, user, or unknown'
)
PARTITIONED BY (
  dt STRING
);

ALTER TABLE dwd_log_info_di_emr ADD IF NOT EXISTS PARTITION (dt='${bizdate}');

set hive.vectorized.execution.enabled = false;
INSERT OVERWRITE TABLE dwd_log_info_di_emr PARTITION (dt='${bizdate}')
SELECT ip
  , uid
  , tm
  , status
  , bytes 
  , getregion(ip) AS region --Obtain the region by using the user defined function (UDF) based on the IP address. 
  , regexp_extract(request, '(^[^ ]+) .*') AS method --Use a regular expression to split the request into three fields.
  , regexp_extract(request, '^[^ ]+ (.*) [^ ]+$') AS url
  , regexp_extract(request, '.* ([^ ]+$)') AS protocol 
  , regexp_extract(referer, '^[^/]+://([^/]+){1}') AS referer  --Use a regular expression to scrub the referer and obtain a more accurate URL.
  , CASE
    WHEN lower(agent) RLIKE 'android' THEN 'android' --Obtain the terminal and access types from the value of the agent parameter.
    WHEN lower(agent) RLIKE 'iphone' THEN 'iphone'
    WHEN lower(agent) RLIKE 'ipad' THEN 'ipad'
    WHEN lower(agent) RLIKE 'macintosh' THEN 'macintosh'
    WHEN lower(agent) RLIKE 'windows phone' THEN 'windows_phone'
    WHEN lower(agent) RLIKE 'windows' THEN 'windows_pc'
    ELSE 'unknown'
  END AS device
  , CASE
    WHEN lower(agent) RLIKE '(bot|spider|crawler|slurp)' THEN 'crawler'
    WHEN lower(agent) RLIKE 'feed'
    OR regexp_extract(request, '^[^ ]+ (.*) [^ ]+$') RLIKE 'feed' THEN 'feed'
    WHEN lower(agent) NOT RLIKE '(bot|spider|crawler|feed|slurp)'
    AND agent RLIKE '^[Mozilla|Opera]'
    AND regexp_extract(request, '^[^ ]+ (.*) [^ ]+$') NOT RLIKE 'feed' THEN 'user'
    ELSE 'unknown'
  END AS identity
  FROM (
    SELECT SPLIT(col, '##@@')[0] AS ip
    , SPLIT(col, '##@@')[1] AS uid
    , SPLIT(col, '##@@')[2] AS tm
    , SPLIT(col, '##@@')[3] AS request
    , SPLIT(col, '##@@')[4] AS status
    , SPLIT(col, '##@@')[5] AS bytes
    , SPLIT(col, '##@@')[6] AS referer
    , SPLIT(col, '##@@')[7] AS agent
    FROM ods_raw_log_d_emr
  WHERE dt = '${bizdate}'
) a;

2. Configure scheduling information

To set up the scheduling scenario, configure the system to trigger the ods_raw_log_d_emr node every day at 00:30. This will synchronize the user_log.txt data from OSS to the ods_raw_log_d_emr table in EMR. Subsequently, the dwd_log_info_di_emr node processes the data in the ods_raw_log_d_emr table, and the results are stored in the business time partition of the dwd_log_info_di_emr table.

Configuration item

Configuration content

Illustration

Scheduling Parameters

Add the following in the Scheduling Parameters area:

  • Parameter name: bizdate

  • Parameter value: $[yyyymmdd-1]

image

Time Attributes

Set Rerun Attributes to Rerun Regardless Of Success Or Failure.

image

Scheduling Dependencies

In Scheduling Dependencies, confirm that the output table has been set as the output of this node.

The format is WorkSpaceName.NodeName

image

Note

To configure time attributes, set the Scheduling Cycle to daily. There's no need to configure the Timed Scheduling Time separately for the current node. The daily trigger time for this node is determined by the timed scheduling time of the virtual node workshop_start_emr within the workflow, ensuring it is scheduled to run after 00:30 each day.

3. Save configuration

Configure any other necessary items for this example as needed. Once complete, click the image.png button on the node code editing page toolbar to save the current configuration.

Create dws_user_info_all_di_emr node

1. Edit code

Double-click the dws_user_info_all_di_emr node to access the node configuration tab. Enter the following statements in the configuration tab that appears.

Note

If your workspace is associated with multiple EMR compute engines in DataStudio, select the necessary EMR Engine. If there is only one associated EMR compute engine, selection is not required.

--Create DW layer table
CREATE TABLE IF NOT EXISTS dws_user_info_all_di_emr (
  uid STRING COMMENT 'The ID of the user',
  gender STRING COMMENT 'The gender of the user',
  age_range STRING COMMENT 'The age range of the user',
  zodiac STRING COMMENT 'The zodiac sign of the user',
  region STRING COMMENT 'The region, which is obtained based on the IP address',
  device STRING COMMENT 'The terminal type ',
  identity STRING COMMENT 'The access type, which can be crawler, feed, user, or unknown',
  method STRING COMMENT 'The type of the HTTP request',
  url STRING COMMENT 'The URL',
  referer STRING COMMENT 'The source URL',
  `time` STRING COMMENT 'The time in the yyyymmddhh:mi:ss format'
)
PARTITIONED BY (
  dt STRING
);

ALTER TABLE dws_user_info_all_di_emr ADD IF NOT EXISTS PARTITION (dt='${bizdate}');

INSERT OVERWRITE TABLE dws_user_info_all_di_emr PARTITION (dt='${bizdate}')
SELECT COALESCE(a.uid, b.uid) AS uid
  , b.gender
  , b.age_range
  , b.zodiac
  , a.region
  , a.device
  , a.identity
  , a.method
  , a.url
  , a.referer
  , a.`time`
FROM (
  SELECT *
  FROM dwd_log_info_di_emr
  WHERE dt = '${bizdate}'
) a
LEFT OUTER JOIN (
  SELECT *
  FROM ods_user_info_d_emr
  WHERE dt = '${bizdate}'
) b
ON a.uid = b.uid;

2. Configure scheduling information

To implement the scheduling scenario, configure the system so that every day at 00:30, once the upstream tasks ods_user_info_d_emr and dwd_log_info_di_emr have completed, the dws_user_info_all_di_emr node is triggered. This will merge the tables ods_user_info_d_emr and dwd_log_info_di_emr, and write the results into the dws_user_info_all_di_emr table.

Configuration item

Configuration content

Illustration

Scheduling Parameters

Add the following in the Scheduling Parameters area:

  • Parameter name: bizdate

  • Parameter value: $[yyyymmdd-1]

image

Time Attributes

Set Rerun Attributes to Rerun Regardless Of Success Or Failure.

image

Scheduling Dependencies

In Scheduling Dependencies, confirm that the output table has been set as the output of this node.

The format is WorkSpaceName.NodeName

image

Note

To configure time attributes, set the Scheduling Cycle to daily. There's no need to configure the Timed Scheduling Time separately for the current node. The daily trigger time for this node is determined by the timed scheduling time of the virtual node workshop_start_emr within the workflow, ensuring it is scheduled to run after 00:30 each day.

3. Save configuration

Configure any other necessary items for this example as needed. Once complete, click the image.png button on the node code editing page toolbar to save the current configuration.

Create ads_user_info_1d_emr node

1. Edit code

Double-click the ads_user_info_1d_emr node to access the node configuration tab. Enter the following statements in the configuration tab that appears.

Note

If your workspace is associated with multiple EMR compute engines in DataStudio, select the necessary EMR Engine. If there is only one associated EMR compute engine, selection is not required.

--Create RPT layer table
CREATE TABLE IF NOT EXISTS ads_user_info_1d_emr (
  uid STRING COMMENT 'The ID of the user',
  region STRING COMMENT 'The region, which is obtained based on the IP address',
  device STRING COMMENT 'The terminal type ',
  pv BIGINT COMMENT 'pv',
  gender STRING COMMENT 'The gender of the user',
  age_range STRING COMMENT 'The age range of the user',
  zodiac STRING COMMENT 'The zodiac sign of the user'
)
PARTITIONED BY (
  dt STRING
);

ALTER TABLE ads_user_info_1d_emr ADD IF NOT EXISTS PARTITION (dt='${bizdate}');

INSERT OVERWRITE TABLE ads_user_info_1d_emr PARTITION (dt='${bizdate}')
SELECT uid
  , MAX(region)
  , MAX(device)
  , COUNT(0) AS pv
  , MAX(gender)
  , MAX(age_range)
  , MAX(zodiac)
FROM dws_user_info_all_di_emr
WHERE dt = '${bizdate}'
GROUP BY uid;

2. Configure scheduling information

After the upstream dws_user_info_all_di_emr node taskmerges the ods_user_info_d_emr and dwd_log_info_di_emr tables, the ads_user_info_1d_emr node task can be triggered to further process the data and generate consumable data.

Configuration item

Configuration content

Illustration

Scheduling Parameters

Add the following in the Scheduling Parameters area:

  • Parameter name: bizdate

  • Parameter value: $[yyyymmdd-1]

image

Time Attributes

Set Rerun Attributes to Rerun Regardless Of Success Or Failure.

image

Scheduling Dependencies

In Scheduling Dependencies, confirm that the output table has been set as the output of this node.

The format is WorkSpaceName.NodeName

image

Note

To configure time attributes, set the Scheduling Cycle to daily. There's no need to configure the Timed Scheduling Time separately for the current node. The daily trigger time for this node is determined by the timed scheduling time of the virtual node workshop_start_emr within the workflow, ensuring it is scheduled to run after 00:30 each day.

3. Save configuration

Configure any other necessary items for this example as needed. Once complete, click the image.png button on the node code editing page toolbar to save the current configuration.

Step 4: Commit the workflow

After configuring the workflow, test it to ensure it runs as expected. If the test is successful, commit the workflow and wait for deployment.

  1. On the workflow editing page, click Run to execute the workflow.

  2. Once every node in the workflow displays 成功, you can click 提交 to submit the successfully executed workflow.

  3. In the Submit dialog box, select the nodes that require submission, check the Ignore Warnings About Inconsistent Input And Output option, and then click Confirm .

  4. Following a successful submission, publish each workflow node.

    1. Click Publish on the right side of the page to access the Create Deployment Package page.

    2. Select the nodes you want to publish, click Publish Selected Items, and in the Confirm Publication dialog box, click Publish.

Step 5: Run the nodes in the production environment

After nodes are published, instances will be created and scheduled to run the following day. You can use Data Backfill to conduct data backfill operations on the published workflow to verify their functionality in the production environment. For more information, see Perform data backfill and view data backfill instances (new version).

  1. Once the nodes have been successfully published, click the Operation Center in the upper right corner.

    Additionally, you can navigate to the workflow editing page and click Go To Operation on the toolbar to access the Operation Center page.

  2. Click Periodic Task Operation > Periodic Task in the left navigation bar to access the Periodic Task page, and then click the workshop_start_emr virtual node.

  3. In the DAG diagram on the right, right-click the workshop_start_emr node and select Data Backfill > Current Node And Descendant Nodes.

  4. Select the tasks requiring data backfill, enter the business date, and click Submit And Jump.

  5. On the data backfill page, you can click Refresh until all SQL tasks have run successfully.

What to do next

In periodic task scheduling scenarios, to ensure the table data produced by the tasks meets expectations, consider implementing data quality monitoring. For more information, see Configure data quality monitoring.