This topic describes how to create E-MapReduce Hive nodes to process collected logs in DataWorks.

Prerequisites

The data is collected. For more information, see Collect data.

Upload resources in the OSS console

  1. Download the ip2region-emr.jar package and store it in a local directory.
  2. Log on to the OSS console.
  3. In the left-side navigation pane, click Buckets. On the Buckets page that appears, click the target bucket, for example, dw-emr-demo.
  4. On the bucket page, click Files in the left-side navigation pane. On the Files page, click the folder that is created in the Prepare the environment topic for storing JAR resources, for example, ip2region.
  5. Click Upload in the upper-left corner. In the Upload dialog box that appears, set parameters for uploading the ip2region-emr.jar package.
    Parameter Description
    Upload To The folder to which the package is uploaded. Set the value to Current. In this example, the folder is oss://dw-emr-demo/ip2region/.
    File ACL The access control list (ACL) of the package. The default value is Inherited from Bucket, which indicates that the ACL of each object is the same as that of the bucket.
    Upload Click Upload and select the downloaded ip2region-emr.jar package.

Design the workflow

For more information about how to configure the dependencies among nodes of a workflow, see Collect data.

In the DataStudio console, double-click the created workflow in the left-side navigation pane. On the workflow editing tab that appears, click and hold EMR Hive on the left and drag it to the editing section on the right. In the Create Node dialog box that appears, set Node Name and click Commit.

Create three E-MapReduce Hive nodes in total and name them ods_log_info_d, dw_user_info_all_d, and rpt_user_info_d respectively. Then, configure the dependencies among the nodes.

Configure the E-MapReduce Hive nodes

  1. Configure the ods_log_info_d node.
    1. Double-click the ods_log_info_d node.
    2. On the node editing tab that appears, enter the following statements:
      Note If the current workspace is bound to multiple E-MapReduce compute engine instances, you must select an E-MapReduce compute engine instance. If the current workspace is only bound to one E-MapReduce compute engine instance, you do not need to do so.
      -- Create a table at the ODS layer.
      CREATE TABLE IF NOT EXISTS ods_log_info_d (
        ip STRING COMMENT 'The IP address of the client that sends the request',
        uid STRING COMMENT 'The ID of the client user',
        `time` STRING COMMENT 'The time when the user accessed the webpage, in the format of yyyymmddhh:mi:ss',
        status STRING COMMENT 'The status code returned by the server',
        bytes STRING COMMENT 'The number of bytes returned to the client',
        region STRING COMMENT 'The region where the user resides, which is obtained based on the IP address',
        method STRING COMMENT 'The type of the HTTP request',
        url STRING COMMENT 'The URL of the webpage accessed by the user',
        protocol STRING COMMENT 'The version number of HTTP',
        referer STRING COMMENT 'The URL of the webpage linked to the resource being requested',
        device STRING COMMENT 'The terminal type',
        identity STRING COMMENT 'The access type, which can be crawler, feed, user, or unknown'
      )
      PARTITIONED BY (
        dt STRING
      );
      
      create function  getregion as 'org.alidata.emr.udf.Ip2Region'
      using jar 'oss://dw-emr-demo/ip2region/ip2region-emr.jar';
      
      ALTER TABLE ods_log_info_d ADD IF NOT EXISTS PARTITION (dt=${bizdate});
      
      set hive.vectorized.execution.enabled = false;
      INSERT OVERWRITE TABLE ods_log_info_d PARTITION (dt=${bizdate})
      SELECT ip
        , uid
        , tm
        , status
        , bytes 
        , getregion(ip) AS region -- Obtain the region by using the user defined function (UDF) based on the IP address. 
        , regexp_extract(request, '(^[^ ]+) . *') AS method -- Use the regular expression to extract three fields from the request.
        , regexp_extract(request, '^[^ ]+ (. *) [^ ]+$') AS url
        , regexp_extract(request, '. * ([^ ]+$)') AS protocol 
        , regexp_extract(referer, '^[^/]+://([^/]+){1}') AS referer  -- Use the regular expression to clean the HTTP referrer so as to obtain a more accurate URL.
        , CASE
          WHEN lower (agent) RLIKE 'android' THEN 'android' -- Obtain the terminal and access types from the value of the agent parameter.
          WHEN lower(agent) RLIKE 'iphone' THEN 'iphone'
          WHEN lower(agent) RLIKE 'ipad' THEN 'ipad'
          WHEN lower(agent) RLIKE 'macintosh' THEN 'macintosh'
          WHEN lower(agent) RLIKE 'windows phone' THEN 'windows_phone'
          WHEN lower(agent) RLIKE 'windows' THEN 'windows_pc'
          ELSE 'unknown'
        END AS device
        , CASE
          WHEN lower(agent) RLIKE '(bot|spider|crawler|slurp)' THEN 'crawler'
          WHEN lower(agent) RLIKE 'feed'
          OR regexp_extract(request, '^[^ ]+ (. *) [^ ]+$') RLIKE 'feed' THEN 'feed'
          WHEN lower(agent) NOT RLIKE '(bot|spider|crawler|feed|slurp)'
          AND agent RLIKE '^[Mozilla|Opera]'
          AND regexp_extract(request, '^[^ ]+ (. *) [^ ]+$') NOT RLIKE 'feed' THEN 'user'
          ELSE 'unknown'
        END AS identity
        FROM (
          SELECT SPLIT(col, '##@@')[0] AS ip
          , SPLIT(col, '##@@')[1] AS uid
          , SPLIT(col, '##@@')[2] AS tm
          , SPLIT(col, '##@@')[3] AS request
          , SPLIT(col, '##@@')[4] AS status
          , SPLIT(col, '##@@')[5] AS bytes
          , SPLIT(col, '##@@')[6] AS referer
          , SPLIT(col, '##@@')[7] AS agent
          FROM ods_raw_log_d
        WHERE dt = ${bizdate}
      ) a;
    3. Click the Save icon icon in the toolbar.
  2. Configure the dw_user_info_all_d node.
    1. Double-click the dw_user_info_all_d node.
    2. On the node editing tab that appears, enter the following statements:
      Note If the current workspace is bound to multiple E-MapReduce compute engine instances, you must select an E-MapReduce compute engine instance. If the current workspace is only bound to one E-MapReduce compute engine instance, you do not need to do so.
      -- Create a table at the DW layer.
      CREATE TABLE IF NOT EXISTS dw_user_info_all_d (
        uid STRING COMMENT 'The ID of the client user',
        gender STRING COMMENT 'The gender of the user',
        age_range STRING COMMENT 'The age range of the user',
        zodiac STRING COMMENT 'The zodiac sign of the user',
        region STRING COMMENT 'The region where the user resides, which is obtained based on the IP address',
        device STRING COMMENT 'The terminal type',
        identity STRING COMMENT 'The access type, which can be crawler, feed, user, or unknown',
        method STRING COMMENT 'The type of the HTTP request',
        url STRING COMMENT 'The URL of the webpage accessed by the user',
        referer STRING COMMENT 'The URL of the webpage linked to the resource being requested',
        `time` STRING COMMENT 'The time when the user accessed the webpage, in the format of yyyymmddhh:mi:ss'
      )
      PARTITIONED BY (
        dt STRING
      );
      
      ALTER TABLE dw_user_info_all_d ADD IF NOT EXISTS PARTITION (dt = ${bizdate});
      
      INSERT OVERWRITE TABLE dw_user_info_all_d PARTITION (dt=${bizdate})
      SELECT COALESCE(a.uid, b.uid) AS uid
        , b.gender
        , b.age_range
        , b.zodiac
        , a.region
        , a.device
        , a.identity
        , a.method
        , a.url
        , a.referer
        , a.`time`
      FROM (
        SELECT *
        FROM ods_log_info_d
        WHERE dt = ${bizdate}
      ) a
      LEFT OUTER JOIN (
        SELECT *
        FROM ods_user_info_d
        WHERE dt = ${bizdate}
      ) b
      ON a.uid = b.uid;
    3. Click the Save icon icon in the toolbar.
  3. Configure the rpt_user_info_d node.
    1. Double-click the rpt_user_info_d node.
    2. On the node editing tab that appears, enter the following statements:
      Note If the current workspace is bound to multiple E-MapReduce compute engine instances, you must select an E-MapReduce compute engine instance. If the current workspace is only bound to one E-MapReduce compute engine instance, you do not need to do so.
      -- Create a table at the RPT layer.
      CREATE TABLE IF NOT EXISTS rpt_user_info_d (
        uid STRING COMMENT 'The ID of the client user',
        region STRING COMMENT 'The region where the user resides, which is obtained based on the IP address',
        device STRING COMMENT 'The terminal type',
        pv BIGINT COMMENT 'The number of times that the user viewed the webpage',
        gender STRING COMMENT 'The gender of the user', 
        age_range STRING COMMENT 'The age range of the user',
        zodiac STRING COMMENT 'The zodiac sign of the user'
      )
      PARTITIONED BY (
        dt STRING
      );
      
      ALTER TABLE rpt_user_info_d ADD IF NOT EXISTS PARTITION (dt=${bizdate});
      
      INSERT OVERWRITE TABLE rpt_user_info_d PARTITION (dt=${bizdate})
      SELECT uid
        , MAX(region)
        , MAX(device)
        , COUNT(0) AS pv
        , MAX(gender)
        , MAX(age_range)
        , MAX(zodiac)
      FROM dw_user_info_all_d
      WHERE dt = ${bizdate}
      GROUP BY uid;
    3. Click the Save icon icon in the toolbar.

Commit the workflow

  1. On the workflow editing tab, click the Run icon icon to run the workflow.
  2. After the Success icon icon appears next to all nodes on the workflow editing tab, click the Submit icon icon to commit the workflow.
  3. In the Commit dialog box that appears, select the nodes to be committed, and then select Ignore I/O Inconsistency Alerts.
  4. Click Commit.

Run the nodes in the production environment

  1. After you commit the workflow, click Operation Center in the upper-right corner.
    You can also click Go to Operation Center in the toolbar on the workflow editing tab to go to the Operation Center page.
  2. On the Operation Center page, choose Cycle Task Maintenance > Cycle Task in the left-side navigation pane. On the Cycle Task page, click the workstart zero load node.
  3. In the directed acyclic graph (DAG) on the right, right-click the workstart node and choose Run > Current and Descendent Nodes Retroactively.
  4. In the Patch Data dialog box that appears, select a node to generate retroactive data, specify the data timestamp, and then click OK. The Patch Data page appears.
  5. Click Refresh until the instance status is Successful.

What to do next

Now, you have learned how to create E-MapReduce Hive nodes and process raw logs. You can proceed with the next tutorial to learn how to collect metadata and view table information in Data Map. For more information, see Collect and view metadata.