All Products
Search
Document Center

DataWorks:Process data

Last Updated:May 29, 2025

This topic describes how to use E-MapReduce (EMR) Hive nodes in DataWorks to process data in the ods_user_info_d_emr and the ods_raw_log_d_emr tables, which are stored in an Object Storage Service (OSS) bucket after synchronization, to obtain the desired user profile data. The ods_user_info_d_emr table stores basic user information, and the ods_raw_log_d_emr table stores website access logs.

Prerequisites

The required data is synchronized. For more information, see Synchronize data.

Step 1: Design a data processing link

Log on to the DataWorks console and go to the DATA STUDIO pane of the Data Studio page. In the Workspace Directories section of the DATA STUDIO pane, find the workflow workshop_emr that you create when you synchronize data. On the configuration tab of the workflow, configure the workflow.

You must create three EMR Hive nodes named dwd_log_info_di_emr, dws_user_info_all_di_emr, and ads_user_info_1d_emr. Then, configure scheduling dependencies for these nodes, as shown in the following figure.

The following table lists node names that are used in this tutorial and the functionalities of the nodes.

Node type

Node name

Node functionality

imageEMR Hive node

dwd_log_info_di_emr

This node is used to cleanse raw OSS log data. This node splits data in the ods_raw_log_d_emr table and synchronizes the data to multiple fields in the dwd_log_info_di_emr table based on a built-in function or a user-defined function (UDF) such as getregion.

imageEMR Hive node

dws_user_info_all_di_emr

This node is used to aggregate the cleansed OSS log data and basic user information data. This node aggregates data in the basic user information table ods_user_info_d_emr and the processed log data table dwd_log_info_di_emr and synchronizes the aggregation result to the dws_user_info_all_di_emr table.

imageEMR Hive node

ads_user_info_1d_emr

This node is used to further process data in the dws_user_info_all_di_emr table and synchronize the processed data to the ads_user_info_1d_emr table to generate final user profile data.

image

Step 2: Register a UDF

To ensure that data can be processed as expected, you must register an EMR UDF named getregion to split the log data structure that is synchronized to EMR when you synchronize data into a table.

Upload an EMR JAR resource (ip2region.jar)

  1. Download the required JAR package.

    In this example, you need to download the ip2region-emr.jar package.

  2. Create an EMR JAR resource.

    1. Go to the Workspaces page in the DataWorks console. In the top navigation bar, select a desired region. Find the desired workspace and choose Shortcuts > Data Studio in the Actions column.

    2. In the left-side navigation pane of the Data Studio page, click the image icon to go to the RESOURCE MANAGEMENT pane.

    3. In the RESOURCE MANAGEMENT pane, click Create. In the Create Resource dialog box, select EMR Jar from the Type drop-down list and configure the Name parameter based on your business requirements. Then, click OK.

    4. On the configuration tab of the EMR JAR resource, configure the following parameters.

      Parameter

      Description

      File Source

      Select On-premises.

      File Content

      Click Upload to upload the downloaded JAR package ip2region-emr.jar.

      Storage Path

      Select OSS.

      Select the OSS bucket that you specified for the EMR cluster created during environment preparation.

      Data Source

      Select the computing resource that you associate with the workspace when you synchronize data.

      Resource Group

      Select the serverless resource group that you create when you prepare environments.

    5. In the top toolbar of the configuration tab, click Save. Then, click Deploy to deploy the EMR JAR resource to the development environment and production environment.

Register an EMR UDF (getregion)

  1. Create an EMR UDF.

    In the RESOURCE MANAGEMENT pane, find the created EMR JAR resource, right-click the resource name, and then choose Create Function > EMR Function. In the Create Resource or Function dialog box, set the Name parameter to getregion and click OK.

  2. Register the EMR UDF.

    On the configuration tab of the EMR UDF, configure the following parameters.

    Parameter

    Description

    Function Type

    Select OTHER.

    Data Source

    Select the computing resource that you associate with the workspace when you synchronize data.

    EMR Database

    Select default.

    Resource Group

    Select the serverless resource group that you create when you prepare environments.

    Owner

    Select a user who has the required permissions as the owner.

    Class Name

    Enter org.alidata.emr.udf.Ip2Region.

    Resources

    Select the name of the EMR JAR resource that you create.

  3. Deploy the EMR UDF.

    In the top toolbar of the configuration tab, click Save. Then, click Deploy to deploy the EMR UDF to the development environment and production environment.

Step 3: Configure the EMR nodes

To perform data processing, you must schedule the related EMR Hive node to implement each layer of processing logic. In this tutorial, complete sample code for data processing is provided. You must configure the code separately for the dwd_log_info_di_emr, dws_user_info_all_di_emr, and ads_user_info_1d_emr nodes.

Configure the dwd_log_info_di_emr node

  1. Write code for the node.

    In the canvas of the workflow, move the pointer over the dwd_log_info_di_emr node and click Open Node. In the Reminder message, click Save and Open to go to the configuration tab of the node. In the code editor of the configuration tab, enter the following SQL statements:

    -- Create a table at the ODS layer.
    CREATE TABLE IF NOT EXISTS dwd_log_info_di_emr (
      ip STRING COMMENT 'The IP address',
      uid STRING COMMENT 'The user ID',
      `time` STRING COMMENT 'The time in the yyyymmddhh:mi:ss format',
      status STRING COMMENT 'The status code that is returned by the server',
      bytes STRING COMMENT 'The number of bytes that are returned to the client',
      region STRING COMMENT 'The region, which is obtained based on the IP address',
      method STRING COMMENT 'The HTTP request type',
      url STRING COMMENT 'url',
      protocol STRING COMMENT 'The version number of HTTP',
      referer STRING COMMENT 'The source URL',
      device STRING COMMENT 'The terminal type',
      identity STRING COMMENT 'The access type, which can be crawler, feed, user, or unknown'
    )
    PARTITIONED BY (
      dt STRING
    );
    
    ALTER TABLE dwd_log_info_di_emr ADD IF NOT EXISTS PARTITION (dt='${bizdate}');
    
    set hive.vectorized.execution.enabled = false;
    INSERT OVERWRITE TABLE dwd_log_info_di_emr PARTITION (dt='${bizdate}')
    SELECT ip
      , uid
      , tm
      , status
      , bytes 
      , getregion(ip) AS region -- Obtain the region by using a UDF based on the IP address.  
      , regexp_extract(request, '(^[^ ]+) .*') AS method -- Use a regular expression to extract three fields from the request. 
      , regexp_extract(request, '^[^ ]+ (.*) [^ ]+$') AS url
      , regexp_extract(request, '.* ([^ ]+$)') AS protocol 
      , regexp_extract(referer, '^[^/]+://([^/]+){1}') AS referer  -- Use a regular expression to cleanse the HTTP referrer to obtain a more accurate URL. 
      , CASE
        WHEN lower(agent) RLIKE 'android' THEN 'android' -- Obtain the terminal and access types from the value of the agent parameter. 
        WHEN lower(agent) RLIKE 'iphone' THEN 'iphone'
        WHEN lower(agent) RLIKE 'ipad' THEN 'ipad'
        WHEN lower(agent) RLIKE 'macintosh' THEN 'macintosh'
        WHEN lower(agent) RLIKE 'windows phone' THEN 'windows_phone'
        WHEN lower(agent) RLIKE 'windows' THEN 'windows_pc'
        ELSE 'unknown'
      END AS device
      , CASE
        WHEN lower(agent) RLIKE '(bot|spider|crawler|slurp)' THEN 'crawler'
        WHEN lower(agent) RLIKE 'feed'
        OR regexp_extract(request, '^[^ ]+ (.*) [^ ]+$') RLIKE 'feed' THEN 'feed'
        WHEN lower(agent) NOT RLIKE '(bot|spider|crawler|feed|slurp)'
        AND agent RLIKE '^[Mozilla|Opera]'
        AND regexp_extract(request, '^[^ ]+ (.*) [^ ]+$') NOT RLIKE 'feed' THEN 'user'
        ELSE 'unknown'
      END AS identity
      FROM (
        SELECT SPLIT(col, '##@@')[0] AS ip
        , SPLIT(col, '##@@')[1] AS uid
        , SPLIT(col, '##@@')[2] AS tm
        , SPLIT(col, '##@@')[3] AS request
        , SPLIT(col, '##@@')[4] AS status
        , SPLIT(col, '##@@')[5] AS bytes
        , SPLIT(col, '##@@')[6] AS referer
        , SPLIT(col, '##@@')[7] AS agent
        FROM ods_raw_log_d_emr
      WHERE dt = '${bizdate}'
    ) a;
  2. Configure debugging parameters.

    In the right-side navigation pane of the configuration tab of the node, click Debugging Configurations. On the Debugging Configurations tab, configure the following parameters. These parameters are used to test the workflow in Step 4.

    Parameter

    Description

    Computing Resource

    Select the EMR computing resource that is associated with the workspace when you prepare environments.

    Resource Group

    Select the serverless resource group that you purchase when you prepare environments.

    Script Parameters

    You do not need to configure this parameter. In the sample code provided in this tutorial, the ${bizdate} variable is used to represent the data timestamp. When you debug the workflow by following Step 4, you can set the value of the variable to a constant, such as 20250223. When the workflow is run, the variables defined for the inner nodes of the workflow are replaced with the constant.

  3. After the configuration is complete, click the image icon in the top toolbar to save the node.

Configure the dws_user_info_all_di_emr node

  1. Write code for the node.

    In the canvas of the workflow, move the pointer over the dws_user_info_all_di_emr node and click Open Node. In the Reminder message, click Save and Open to go to the configuration tab of the node. In the code editor of the configuration tab, enter the following SQL statements:

    -- Create a table at the DW layer.
    CREATE TABLE IF NOT EXISTS dws_user_info_all_di_emr (
      uid STRING COMMENT 'The user ID',
      gender STRING COMMENT 'The gender',
      age_range STRING COMMENT 'The age range',
      zodiac STRING COMMENT 'The zodiac sign',
      region STRING COMMENT 'The region, which is obtained based on the IP address',
      device STRING COMMENT 'The terminal type',
      identity STRING COMMENT 'The access type, which can be crawler, feed, user, or unknown',
      method STRING COMMENT 'The HTTP request type',
      url STRING COMMENT 'url',
      referer STRING COMMENT 'The source URL',
      `time` STRING COMMENT 'The time in the yyyymmddhh:mi:ss format',
    )
    PARTITIONED BY (
      dt STRING
    );
    
    ALTER TABLE dws_user_info_all_di_emr ADD IF NOT EXISTS PARTITION (dt='${bizdate}');
    
    INSERT OVERWRITE TABLE dws_user_info_all_di_emr PARTITION (dt='${bizdate}')
    SELECT COALESCE(a.uid, b.uid) AS uid
      , b.gender
      , b.age_range
      , b.zodiac
      , a.region
      , a.device
      , a.identity
      , a.method
      , a.url
      , a.referer
      , a.`time`
    FROM (
      SELECT *
      FROM dwd_log_info_di_emr
      WHERE dt = '${bizdate}'
    ) a
    LEFT OUTER JOIN (
      SELECT *
      FROM ods_user_info_d_emr
      WHERE dt = '${bizdate}'
    ) b
    ON a.uid = b.uid;
  2. Configure debugging parameters.

    In the right-side navigation pane of the configuration tab of the node, click Debugging Configurations. On the Debugging Configurations tab, configure the following parameters. These parameters are used to test the workflow in Step 4.

    Parameter

    Description

    Computing Resource

    Select the EMR computing resource that is associated with the workspace when you prepare environments.

    Resource Group

    Select the serverless resource group that you purchase when you prepare environments.

    Script Parameters

    You do not need to configure this parameter. In the sample code provided in this tutorial, the ${bizdate} variable is used to represent the data timestamp. When you debug the workflow by following Step 4, you can set the value of the variable to a constant, such as 20250223. When the workflow is run, the variables defined for the inner nodes of the workflow are replaced with the constant.

  3. After the configuration is complete, click the image icon in the top toolbar to save the node.

Configure the ads_user_info_1d_emr node

  1. Write code for the node.

    In the canvas of the workflow, move the pointer over the ads_user_info_1d_emr node and click Open Node. In the Reminder message, click Save and Open to go to the configuration tab of the node. In the code editor of the configuration tab, enter the following SQL statements:

    -- Create a table at the RPT layer.
    CREATE TABLE IF NOT EXISTS ads_user_info_1d_emr (
      uid STRING COMMENT 'The user ID',
      region STRING COMMENT 'The region, which is obtained based on the IP address',
      device STRING COMMENT 'The terminal type',
      pv BIGINT COMMENT 'pv',
      gender STRING COMMENT 'The gender',
      age_range STRING COMMENT 'The age range',
      zodiac STRING COMMENT 'The zodiac sign'
    )
    PARTITIONED BY (
      dt STRING
    );
    
    ALTER TABLE ads_user_info_1d_emr ADD IF NOT EXISTS PARTITION (dt='${bizdate}');
    
    INSERT OVERWRITE TABLE ads_user_info_1d_emr PARTITION (dt='${bizdate}')
    SELECT uid
      , MAX(region)
      , MAX(device)
      , COUNT(0) AS pv
      , MAX(gender)
      , MAX(age_range)
      , MAX(zodiac)
    FROM dws_user_info_all_di_emr
    WHERE dt = '${bizdate}'
    GROUP BY uid;
  2. Configure debugging parameters.

    In the right-side navigation pane of the configuration tab of the node, click Debugging Configurations. On the Debugging Configurations tab, configure the following parameters. These parameters are used to test the workflow in Step 4.

    Parameter

    Description

    Computing Resource

    Select the EMR computing resource that is associated with the workspace when you prepare environments.

    Resource Group

    Select the serverless resource group that you purchase when you prepare environments.

    Script Parameters

    You do not need to configure this parameter. In the sample code provided in this tutorial, the ${bizdate} variable is used to represent the data timestamp. When you debug the workflow by following Step 4, you can set the value of the variable to a constant, such as 20250223. When the workflow is run, the variables defined for the inner nodes of the workflow are replaced with the constant.

  3. After the configuration is complete, click the image icon in the top toolbar to save the node.

Step 4: Process data

  1. Process data.

    In the top toolbar of the configuration tab of the workflow, click Run. In the Enter runtime parameters dialog box, specify a value that is used for scheduling parameters defined for each node in this run, and click OK. In this tutorial, 20250223 is specified. You can specify a value based on your business requirements.

  2. Query the data processing result.

    1. Go to the Workspaces page in the DataWorks console. In the top navigation bar, select a desired region. Find the desired workspace and choose Shortcuts > Data Studio in the Actions column.

    2. In the left-side navigation pane of the Data Studio page, click the image icon to go to the DATA STUDIO pane. The Workspace Directories section appears.

    3. In the Workspace Directories section, find the work directory that you create, right-click the directory name, and then choose Create Node > EMR > EMR Hive. In the Create Node dialog box, configure the Name parameter based on your business requirements and click OK.

    4. On the configuration tab of the EMR Hive node, enter the following SQL statement in the code editor, replace the data timestamp with the data timestamp of the ads_user_info_1d_emr node, and then execute the SQL statement to query the data synchronization result and view the number of data records that are imported into the ods_raw_log_d_emr and ods_user_info_d_emr tables.

      Note

      You must change the value of the partition key column dt to the data timestamp of the ads_user_info_1d_emr node. For example, if a node is scheduled to run on February 23, 2025, the data timestamp of the node is 20250222, which is one day earlier than the scheduling time of the node.

      SELECT * FROM ads_user_info_1d_emr WHERE dt=The data timestamp; 
      • If the result returned after you execute the preceding statement shows that data exists, the data processing is complete.

      • If the results returned after you execute the preceding statements show that data does not exist in the destination, you must make sure that the values specified for the scheduling parameters defined for the inner nodes of the workflow in this run are the same as the value of the dt field in the preceding statements when you run the workflow. You can click Running History in the right-side navigation pane of the configuration tab of the workflow, and then click View in the Actions column of the running record generated for this run to view the data timestamp that is used when the workflow is run in the run logs of the workflow. The data timestamp is in the partition=[pt=xxx] format.

Step 5: Deploy the workflow

An auto triggered node can be automatically scheduled to run only after you deploy the node to the production environment. You can refer to the following steps to deploy the workflow to the production environment:

Note

In this tutorial, scheduling parameters are configured for the workflow when you configure scheduling properties for the workflow. You do not need to separately configure scheduling parameters for each node in the workflow.

  1. In the left-side navigation pane of the Data Studio page, click the image icon. In the Workspace Directories section of the DATA STUDIO pane, find the created workflow and click the workflow name to go to the configuration tab of the workflow.

  2. In the top toolbar of the configuration tab, click Deploy.

  3. On the DEPLOY tab, click Start Deployment to Production Environment to deploy the workflow by following the on-screen instructions.

Step 6: Run the nodes in the production environment

After you deploy the nodes on a day, the instances generated for the nodes can be scheduled to run on the next day. You can use the data backfill feature to backfill data for nodes in a workflow that is deployed, which allows you to check whether the nodes can be run in the production environment. For more information, see Backfill data and view data backfill instances (new version).

  1. After the nodes are deployed, click Operation Center in the upper-right corner of the Data Studio page.

    You can also click the 图标 icon in the upper-left corner of the Data Studio page and choose All Products > Data Development And Task Operation > Operation Center.

  2. In the left-side navigation pane of the Operation Center page, choose Auto Triggered Node O&M > Auto Triggered Nodes. On the Auto Triggered Nodes page, find the zero load node workshop_start_emr and click the node name.

  3. In the direct acyclic graph (DAG) of the node, right-click the workshop_start_emr node and choose Run > Current and Descendant Nodes Retroactively.

  4. In the Backfill Data panel, select the nodes for which you want to backfill data, configure the Data Timestamp parameter, and then click Submit and Redirect.

  5. In the upper part of the Data Backfill page, click Refresh to check whether all nodes are successfully run.

Note

To prevent excessive fees from being generated after the tutorial is complete, you can configure the Effective Period parameter for all nodes in the workflow or freeze the zero load node workshop_start_emr.

What to do next

  • Visualize data on a dashboard: After you complete user profile analysis, use DataAnalysis to display the processed data in charts. This helps you quickly extract key information to gain insights into the business trends behind the data.

  • Monitor data quality: Configure monitoring rules for tables that are generated after data processing to help identify and intercept dirty data in advance to prevent the impacts of dirty data from escalating.

  • Manage data: Data tables are generated in EMR Hive nodes after user profile analysis is complete. You can view the generated data tables in Data Map and view the relationships between the tables based on data lineages.

  • Use an API to provide data services: After you obtain the final processed data, use standardized APIs in DataService Studio to share data and to provide data for other business modules that use APIs to receive data.