All Products
Search
Document Center

:Process data

Last Updated:Mar 15, 2024

This topic describes how to create E-MapReduce (EMR) Hive nodes in DataWorks and use EMR Hive nodes to process collected logs in DataWorks.

Prerequisites

The operations described in Collect data are performed.

Upload resources in the OSS console

  1. Download the ip2region-emr.jar package and store the package on your on-premises machine.
  2. Log on to the Object Storage Service (OSS) console.
  3. In the left-side navigation pane, click Buckets. On the Buckets page, click the bucket that you want to manage. In this example, dw-emr-demo is used.
  4. On the page that appears, choose Files > Files in the left-side navigation pane. On the Files page, click the folder that is created in the Prepare the environment topic for storing JAR resources. In this example, the folder ip2region is used.
  5. Click Upload. On the page that appears, configure the parameters that are required to upload the ip2region-emr.jar package.
    Upload
    ParameterDescription
    Upload ToThe folder to which you want to upload the package. Set the value to Current. In this example, the folder is oss://dw-emr-demo/ip2region/.
    File ACLThe access control list (ACL) of the package. The default value is Inherited from Bucket, which indicates that the ACL of each object is the same as that of the bucket.
    Files to UploadClick Select Files, select the downloaded ip2region-emr.jar package, and then click Open to upload the package.

Design a workflow

For more information about how to configure dependencies among nodes in a workflow, see Collect data.

On the DataStudio page, double-click the created workflow in the Scheduled Workflow pane. On the workflow editing tab that appears, click EMR Hive in the EMR section. In the Create Node dialog box, configure the Name parameter and click Commit.

Create three EMR Hive nodes ods_log_info_d, dw_user_info_d, and rpt_user_info_d. Then, configure dependencies among the nodes, as shown in the following figure. EMR Hive

Configure the EMR Hive nodes

  1. Configure the ods_log_info_d node.
    1. Double-click the ods_log_info_d node.
    2. On the configuration tab that appears, enter the following statements:
      Note If multiple EMR compute engine instances are associated with your workspace, you must select an EMR compute engine instance. If only one EMR compute engine instance is associated with your workspace, you do not need to select a compute engine instance.
      -- Create a table at the ODS layer.
      CREATE TABLE IF NOT EXISTS ods_log_info_d (
        ip STRING COMMENT 'The IP address of the client that is used to send the request',
        uid STRING COMMENT 'The ID of the client user',
        `time` STRING COMMENT 'The time when the user accessed the webpage, in the format of yyyymmddhh:mi:ss',
        status STRING COMMENT 'The status code that is returned by the server',
        bytes STRING COMMENT 'The number of bytes that are returned to the client',
        region STRING COMMENT 'The region in which the user resides, which is obtained based on the IP address',
        method STRING COMMENT 'The type of the HTTP request',
        url STRING COMMENT 'url',
        protocol STRING COMMENT 'The version number of HTTP',
        referer STRING COMMENT 'The URL of the webpage linked to the resource that is being requested',
        device STRING COMMENT 'The terminal type',
        identity STRING COMMENT 'The access type, which can be crawler, feed, user, or unknown'
      )
      PARTITIONED BY (
        dt STRING
      );
      
      create function  getregion as 'org.alidata.emr.udf.Ip2Region'
      using jar 'oss://dw-emr-demo/ip2region/ip2region-emr.jar';
      
      ALTER TABLE ods_log_info_d ADD IF NOT EXISTS PARTITION (dt=${bizdate});
      
      set hive.vectorized.execution.enabled = false;
      INSERT OVERWRITE TABLE ods_log_info_d PARTITION (dt=${bizdate})
      SELECT ip
        , uid
        , tm
        , status
        , bytes 
        , getregion(ip) AS region -- Obtain the region by using the user defined function (UDF) based on the IP address.  
        , regexp_substr(request, '(^[^ ]+ )') AS method -- Use the regular expression to extract three fields from the request. 
        , regexp_extract(request, '^[^ ]+ (.*) [^ ]+$') AS url
        , regexp_extract(request, '.* ([^ ]+$)') AS protocol 
        , regexp_extract(referer, '^[^/]+://([^/]+){1}') AS referer  -- Use the regular expression to clean the HTTP referrer to obtain a more accurate URL. 
        , CASE
          WHEN lower (agent) RLIKE 'android' THEN 'android' -- Obtain the terminal and access types from the value of the agent parameter. 
          WHEN lower(agent) RLIKE 'iphone' THEN 'iphone'
          WHEN lower(agent) RLIKE 'ipad' THEN 'ipad'
          WHEN lower(agent) RLIKE 'macintosh' THEN 'macintosh'
          WHEN lower(agent) RLIKE 'windows phone' THEN 'windows_phone'
          WHEN lower(agent) RLIKE 'windows' THEN 'windows_pc'
          ELSE 'unknown'
        END AS device
        , CASE
          WHEN lower(agent) RLIKE '(bot|spider|crawler|slurp)' THEN 'crawler'
          WHEN lower(agent) RLIKE 'feed'
          OR regexp_extract(request, '^[^ ]+ (.*) [^ ]+$') RLIKE 'feed' THEN 'feed'
          WHEN lower(agent) NOT RLIKE '(bot|spider|crawler|feed|slurp)'
          AND agent RLIKE '^[Mozilla|Opera]'
          AND regexp_extract(request, '^[^ ]+ (.*) [^ ]+$') NOT RLIKE 'feed' THEN 'user'
          ELSE 'unknown'
        END AS identity
        FROM (
          SELECT SPLIT(col, '##@@')[0] AS ip
          , SPLIT(col, '##@@')[1] AS uid
          , SPLIT(col, '##@@')[2] AS tm
          , SPLIT(col, '##@@')[3] AS request
          , SPLIT(col, '##@@')[4] AS status
          , SPLIT(col, '##@@')[5] AS bytes
          , SPLIT(col, '##@@')[6] AS referer
          , SPLIT(col, '##@@')[7] AS agent
          FROM ods_raw_log_d
        WHERE dt = ${bizdate}
      ) a;
    3. Click the Save icon in the top toolbar.
  2. Configure the dw_user_info_d node.
    1. Double-click the dw_user_info_d node.
    2. On the configuration tab that appears, enter the following statements:
      Note If multiple EMR compute engine instances are associated with your workspace, you must select an EMR compute engine instance. If only one EMR compute engine instance is associated with your workspace, you do not need to select a compute engine instance.
      -- Create a table at the DW layer.
      CREATE TABLE IF NOT EXISTS dw_user_info_all_d (
        uid STRING COMMENT 'The ID of the client user',
        gender STRING COMMENT 'The gender of the user',
        age_range STRING COMMENT 'The age range of the user',
        zodiac STRING COMMENT 'The zodiac sign of the user',
        region STRING COMMENT 'The region in which the user resides, which is obtained based on the IP address',
        device STRING COMMENT 'The terminal type',
        identity STRING COMMENT 'The access type, which can be crawler, feed, user, or unknown',
        method STRING COMMENT 'The type of the HTTP request',
        url STRING COMMENT 'url',
        referer STRING COMMENT 'The URL of the webpage linked to the resource that is being requested',
        `time` STRING COMMENT 'The time when the user accessed the webpage, in the format of yyyymmddhh:mi:ss'
      )
      PARTITIONED BY (
        dt STRING
      );
      
      ALTER TABLE dw_user_info_all_d ADD IF NOT EXISTS PARTITION (dt = ${bizdate});
      
      INSERT OVERWRITE TABLE dw_user_info_all_d PARTITION (dt=${bizdate})
      SELECT COALESCE(a.uid, b.uid) AS uid
        , b.gender
        , b.age_range
        , b.zodiac
        , a.region
        , a.device
        , a.identity
        , a.method
        , a.url
        , a.referer
        , a.`time`
      FROM (
        SELECT *
        FROM ods_log_info_d
        WHERE dt = ${bizdate}
      ) a
      LEFT OUTER JOIN (
        SELECT *
        FROM ods_user_info_d
        WHERE dt = ${bizdate}
      ) b
      ON a.uid = b.uid;
    3. Click Save in the top toolbar.
  3. Configure the rpt_user_info_d node.
    1. Double-click the rpt_user_info_d node.
    2. On the configuration tab that appears, enter the following statements:
      Note If multiple EMR compute engine instances are associated with your workspace, you must select an EMR compute engine instance. If only one EMR compute engine instance is associated with your workspace, you do not need to select a compute engine instance.
      -- Create a table at the RPT layer.
      CREATE TABLE IF NOT EXISTS rpt_user_info_d (
        uid STRING COMMENT 'The ID of the client user',
        region STRING COMMENT 'The region in which the user resides, which is obtained based on the IP address',
        device STRING COMMENT 'The terminal type',
        pv BIGINT COMMENT 'pv',
        gender STRING COMMENT 'The gender of the user',
        age_range STRING COMMENT 'The age range of the user',
        zodiac STRING COMMENT 'The zodiac sign of the user'
      )
      PARTITIONED BY (
        dt STRING
      );
      
      ALTER TABLE rpt_user_info_d ADD IF NOT EXISTS PARTITION (dt=${bizdate});
      
      INSERT OVERWRITE TABLE rpt_user_info_d PARTITION (dt=${bizdate})
      SELECT uid
        , MAX(region)
        , MAX(device)
        , COUNT(0) AS pv
        , MAX(gender)
        , MAX(age_range)
        , MAX(zodiac)
      FROM dw_user_info_all_d
      WHERE dt = ${bizdate}
      GROUP BY uid;
    3. Click the Save icon in the top toolbar.

Commit the workflow

  1. On the workflow editing tab, click the Run icon to run the workflow.
  2. After the Successful icon appears next to all nodes on the workflow editing tab, click the Submit icon to commit the workflow.
  3. In the Commit dialog box, select the nodes that you want to commit, enter the description, and then select Ignore I/O Inconsistency Alerts.
  4. Click Commit.

Run the nodes in the production environment

  1. After the nodes are deployed, click Operation Center in the upper-right corner.
    You can also click Operation Center in the upper part of the DataStudio page to go to the Operation Center page.
  2. On the Operation Center page, choose Cycle Task Maintenance > Cycle Task in the left-side navigation pane. On the Cycle Task page, click the workstart zero load node.
  3. In the directed acyclic graph (DAG) that is displayed on the right side, right-click the workstart node and choose Run > Current and Descendent Nodes Retroactively.
  4. In the Backfill Data dialog box, select the nodes for which you want to backfill data, specify the data timestamp, and then click OK. The Patch Data page appears.
  5. Click Refresh until all the nodes are successfully run.

Subsequent steps

After you have a good command of how to create EMR Hive nodes and use EMR Hive nodes to process raw logs, you can proceed with the next tutorial to learn how to collect metadata and view table information in DataMap. For more information, see Collect metadata and view table information.