All Products
Search
Document Center

DataWorks:Process data

Last Updated:Feb 07, 2024

This topic describes how to use ODPS SQL nodes in DataWorks to process data in the ods_user_info_d and ods_raw_log_d tables in MaxCompute to obtain user profile data after basic user information and website access logs of users are synchronized to the tables. This topic also describes how to use the DataWorks and MaxCompute product combo to compute and analyze collected data to complete simple data processing in data warehousing scenarios.

Prerequisites

The required data is collected. For more information, see Collect data.

  • The basic user information stored in the ApsaraDB RDS for MySQL table ods_user_info_d is synchronized to the MaxCompute table ods_user_info_d by using Data Integration.

  • The website access logs of users stored in the Object Storage Service (OSS) object user_log.txt is synchronized to the MaxCompute table ods_raw_log_d by using Data Integration.

Background information

DataStudio provides a variety of nodes and encapsulates the capabilities of compute engines. The example in this topic describes how to use ODPS SQL nodes to process the basic user information and website access logs of users that are synchronized to MaxCompute tables by layer. The following figure shows the logic.

image.png

  • Manage a workflow.

    Use the zero load node of a workflow to manage the workflow. For example, you can use the zero load node of a user behavior analysis workflow to specify the time to trigger the workflow and specify whether to run nodes in the workflow. In this example, the data processing node is scheduled by day. You can use the WorkShop_Start node to trigger the workflow to start to run at 00:15 every day.

  • Process incremental data.

    Write incremental data to the time-based partition in the desired table every day based on scheduling parameters, and partition names and dynamic parameters in the scheduling scenario.

  • Process data.

    Upload resources and register a user-defined function (UDF) named getregion in a visualized manner. Use the function to convert IP addresses in system log data into regions.

  • Configure scheduling dependencies.

    Use the automatic parsing feature to implement automatic configuration of scheduling dependencies for nodes based on data lineages in the node code. This ensures that descendant nodes can obtain valid data from ancestor nodes.

    Important

    We recommend that you strictly comply with the following node development specifications during development. This can facilitate scheduling dependency parsing and prevent unexpected errors. For more information about the principle of scheduling dependencies, see Scheduling dependency configuration guide.

    • Nodes and output tables have one-to-one mappings.

    • The name of a node must be consistent with the name of the output table of the node.

Go to the DataStudio page

Log on to the DataWorks console. In the left-side navigation pane, choose Data Modeling and Development > DataStudio. On the page that appears, select the desired workspace from the drop-down list and click Go to DataStudio.

Step 1: Create MaxCompute tables

Create the dwd_log_info_di, dws_user_info_all_di, and ads_user_info_1d tables that are used to store processed data at each layer. In this example, the tables are created in a quick manner. For more information about MaxCompute table-related operations, see Create and manage MaxCompute tables.

  1. Go to the entry point of creating tables.

    On the DataStudio page, open the WorkShop workflow that you created in the data collection phase. Right-click MaxCompute and select Create Table.

  2. Define the schemas of MaxCompute tables.

    In the Create Table dialog box, enter a table name and click Create. In this example, you need to create three tables named dwd_log_info_di, dws_user_info_all_di, and ads_user_info_1d. Then, go to the DDL tab and execute a CREATE TABLE statement to create a table. You can view the statements that are used to create the preceding tables in the following content.

  3. Commit the tables to the compute engine.

    After you define the schemas of the tables, click Commit to Development Environment and Commit to Production Environment in sequence on the configuration tab of each table. In the MaxCompute projects that are associated with your workspace in the development and production environments, the system creates the related physical tables in the MaxCompute projects based on the configurations.

    • If you commit the tables to the development environment of the workspace, the tables are created in the MaxCompute project that is associated with the workspace in the development environment.

    • If you commit the tables to the production environment of the workspace, the tables are created in the MaxCompute project that is associated with the workspace in the production environment.

      Note

1. Create the dwd_log_info_di table

Double-click the dwd_log_info_di table. On the table configuration tab that appears, click DDL and enter the following CREATE TABLE statement:

CREATE TABLE IF NOT EXISTS dwd_log_info_di (
 ip STRING COMMENT 'The IP address',
 uid STRING COMMENT 'The user ID',
 time STRING COMMENT 'The time in the format of yyyymmddhh:mi:ss',
 status STRING COMMENT 'The status code that is returned by the server',
 bytes STRING COMMENT 'The number of bytes that are returned to the client',
 region STRING COMMENT 'The region, which is obtained based on the IP address',
 method STRING COMMENT 'The HTTP request type',
 url STRING COMMENT 'url',
 protocol STRING COMMENT 'The version number of HTTP',
 referer STRING COMMENT 'The source URL',
 device STRING COMMENT 'The terminal type',
 identity STRING COMMENT 'The access type, which can be crawler, feed, user, or unknown'
)
PARTITIONED BY (
 dt STRING
)
LIFECYCLE 14;

2. Create the dws_user_info_all_di table

Double-click the dws_user_info_all_di table. On the table configuration tab that appears, click DDL and enter the following CREATE TABLE statement:

CREATE TABLE IF NOT EXISTS dws_user_info_all_di (
 uid STRING COMMENT 'The user ID',
 gender STRING COMMENT 'The gender',
 age_range STRING COMMENT 'The age range',
 zodiac STRING COMMENT 'The zodiac sign',
 region STRING COMMENT 'The region, which is obtained based on the IP address',
 device STRING COMMENT 'The terminal type',
 identity STRING COMMENT 'The access type, which can be crawler, feed, user, or unknown',
 method STRING COMMENT 'The HTTP request type',
 url STRING COMMENT 'url',
 referer STRING COMMENT 'The source URL',
 time STRING COMMENT 'The time in the format of yyyymmddhh:mi:ss'
)
PARTITIONED BY (
 dt STRING
)
LIFECYCLE 14;

3. Create the ads_user_info_1d table

Double-click the ads_user_info_1d table. On the table configuration tab that appears, click DDL and enter the following CREATE TABLE statement:

CREATE TABLE IF NOT EXISTS ads_user_info_1d (
 uid STRING COMMENT 'The user ID',
 region STRING COMMENT 'The region, which is obtained based on the IP address',
 device STRING COMMENT 'The terminal type',
 pv BIGINT COMMENT 'pv',
 gender STRING COMMENT 'The gender',
 age_range STRING COMMENT 'The age range',
 zodiac STRING COMMENT 'The zodiac sign'
)
PARTITIONED BY (
 dt STRING
)
LIFECYCLE 14;    

Step 2: Create a function named getregion

You can use methods such as a function to convert log data in the original format into data in the desired format. In this example, the required resources are provided for the function that is used to convert IP addresses into regions. You need to only download the resources to your on-premises machine and upload the resources to your workspace before you register the function in DataWorks.

1. Upload the resource file ip2region.jar

  1. Download the ip2region.jar file.

  2. On the DataStudio page, open the WorkShop workflow. Right-click MaxCompute and choose Create Resource > JAR.

  3. Click Upload, select the ip2region.jar file that is downloaded to your on-premises machine, and then click Open.

    Note
    • Select Upload to MaxCompute.

    • The resource name can be different from the name of the uploaded file.

  4. Click the image.png icon in the top toolbar to commit the resource to the MaxCompute project that is associated with the workspace in the development environment.

2. Register the function getregion

  1. Go to the function registration page.

    On the DataStudio page, open the WorkShop workflow, right-click MaxCompute, and then select Create Function.

  2. Enter a function name.

    In the Create Function dialog box, set the Name parameter to getregion and click Create.

  3. In the Register Function section of the configuration tab that appears, configure the parameters that are described in the following table.

    image.png

    Parameter

    Description

    Function Type

    The type of the function.

    Engine Instance MaxCompute

    The MaxCompute compute engine instance. The value of this parameter cannot be changed.

    Function Name

    The name of the function.

    Owner

    The owner of the function.

    Class Name

    Set the parameter to org.alidata.odps.udf.Ip2Region.

    Resources

    Set the parameter to ip2region.jar.

    Description

    Set the parameter to Region conversion based on the IP address.

    Expression Syntax

    Set the parameter to getregion('ip').

    Parameter Description

    Set the parameter to IP address.

  4. Commit the function.

    Click the image.png icon in the top toolbar to commit the function to the compute engine that is associated with the workspace in the development environment.

Step 3: Configure ODPS SQL nodes

In this example, you need to use ODPS SQL nodes to implement data processing logic at each layer. Strong data lineages exist between ODPS SQL nodes at different layers. In the data collection phase, the output tables of the synchronization nodes have been manually added to the Output section on the Properties tab for the synchronization nodes. Therefore, the scheduling dependencies of the ODPS SQL nodes used to process data in this example can be automatically configured based on data lineages by using the automatic parsing feature.

Note

Create ODPS SQL nodes in sequence to prevent unexpected errors.

  1. Open the workflow.

    On the DataStudio page, double-click the name of the workflow that you created in the data collection phase. In this example, the WorkShop workflow is used.

  2. Create a node.

    In the workflow, right-click MaxCompute and choose Create Node > ODPS SQL. In this example, you need to create the following nodes in sequence: dwd_log_info_di, dws_user_info_all_di, ads_user_info_1d.

1. Configure the dwd_log_info_di node

Use the getregion function created in Step 2 to parse IP addresses in the ods_raw_log_d table, use methods such as regular expressions to split the parsed data into analyzable fields, and then write the fields into the dwd_log_info_di table. For information about the comparison between data before and after processing in the dwd_log_info_di table, see the Appendix: Data processing example section in this topic.

1. Edit node code

On the configuration tab of the workflow, double-click the name of the dwd_log_info_di node. On the configuration tab of the node, enter the following code. In DataWorks, variables in the node code are defined in the format of ${Variable name}. ${bizdate} is a variable in the node code. The value of the variable is assigned in the next step.

-- Scenario: The following SQL statement uses the getregion function to parse IP addresses in raw log data, use methods such as regular expressions to split the parsed data into analyzable fields, and then write the fields into the dwd_log_info_di table. 
-- In this example, the getregion function that is used to convert IP addresses into regions is prepared. 
-- Note:
-- 1. Before you can use a function in a DataWorks node, you must upload the resources required for registering the function to DataWorks and then register the function by using the resources in a visualized manner. For more information, visit https://www.alibabacloud.com/help/zh/dataworks/user-guide/create-and-use-maxcompute-resources.
-- In this example, the resource used to register the getregion function is ip2region.jar. 
-- 2. You can configure scheduling parameters for nodes in DataWorks to write incremental data to the related partition in the desired table every day in the scheduling scenario. 
-- In actual development scenarios, you can define variables in the code of a node in the format of ${Variable name} and assign scheduling parameters to the variables on the Properties tab of the configuration tab of the node. This way, the values of scheduling parameters can be dynamically replaced in the node code based on the configurations of the scheduling parameters. 
INSERT OVERWRITE TABLE dwd_log_info_di PARTITION (dt='${bizdate}')
SELECT ip 
  , uid
  , time
  , status
  , bytes 
  , getregion(ip) AS region -- Obtain the region based on the IP address by using the UDF. 
  , regexp_substr(request, '(^[^ ]+ )') AS method -- Use the regular expression to extract three fields from the request. 
  , regexp_extract(request, '^[^ ]+ (.*) [^ ]+$') AS url
  , regexp_substr(request, '([^ ]+$)') AS protocol 
  , regexp_extract(referer, '^[^/]+://([^/]+){1}') AS referer -- Use the regular expression to clarify the referrer to obtain a more accurate URL. 
  , CASE
    WHEN TOLOWER(agent) RLIKE 'android' THEN 'android' -- Obtain the terminal information and access type based on the agent parameter. 
    WHEN TOLOWER(agent) RLIKE 'iphone' THEN 'iphone'
    WHEN TOLOWER(agent) RLIKE 'ipad' THEN 'ipad'
    WHEN TOLOWER(agent) RLIKE 'macintosh' THEN 'macintosh'
    WHEN TOLOWER(agent) RLIKE 'windows phone' THEN 'windows_phone'
    WHEN TOLOWER(agent) RLIKE 'windows' THEN 'windows_pc'
    ELSE 'unknown'
  END AS device
  , CASE
    WHEN TOLOWER(agent) RLIKE '(bot|spider|crawler|slurp)' THEN 'crawler'
    WHEN TOLOWER(agent) RLIKE 'feed'
    OR regexp_extract(request, '^[^ ]+ (.*) [^ ]+$') RLIKE 'feed' THEN 'feed'
    WHEN TOLOWER(agent) NOT RLIKE '(bot|spider|crawler|feed|slurp)'
    AND agent RLIKE '^[Mozilla|Opera]'
    AND regexp_extract(request, '^[^ ]+ (.*) [^ ]+$') NOT RLIKE 'feed' THEN 'user'
    ELSE 'unknown'
  END AS identity
  FROM (
    SELECT SPLIT(col, '##@@')[0] AS ip
    , SPLIT(col, '##@@')[1] AS uid
    , SPLIT(col, '##@@')[2] AS time
    , SPLIT(col, '##@@')[3] AS request
    , SPLIT(col, '##@@')[4] AS status
    , SPLIT(col, '##@@')[5] AS bytes
    , SPLIT(col, '##@@')[6] AS referer
    , SPLIT(col, '##@@')[7] AS agent
  FROM ods_raw_log_d 
  WHERE dt ='${bizdate}'
) a;

2. Configure scheduling properties

If you configure scheduling properties for the ods_raw_log_d node as those shown in the following content, after the node synchronizes data from the OSS object user_log.txt to the MaxCompute table ods_raw_log_d at 00:15 every day in the scheduling scenario, the dwd_log_info_di node is triggered to process the data in the ods_raw_log_d table and write the processed data to the data timestamp-based partition in the dwd_log_info_di table.

  • In the Parameters section, enter bizdate for Parameter Name and $[yyyymmdd-1] for Parameter Value, which is used to query the date of the previous day.image.png

  • In the Schedule section, set Scheduling Cycle to Day. You do not need to separately configure the Scheduled time parameter for the current node. The time when the current node is scheduled to run every day is determined by the scheduling time of the root node of the WorkShop workflow. The scheduling time of the root node is later than 00:15 every day.

  • In the Dependencies section, set Automatic Parsing From Code Before Node Committing to Yes to allow the system to configure the ods_raw_log_d node that generates the ods_raw_log_d table as the ancestor node of the dwd_log_info_di node. The dwd_log_info_di table is used as the output of the dwd_log_info_di node. This way, the dwd_log_info_di node can be automatically configured as an ancestor node of other nodes when these nodes query the table data generated by the dwd_log_info_di node.image.png

    Note

    The output of a DataWorks node is used to establish scheduling dependencies between other nodes and the current node. In DataWorks, the output of a node must be configured as the input of its descendant nodes, which forms scheduling dependencies between nodes.

    • The system automatically generates two output names for each node in the formats of projectName.randomNumber_out and projectName.nodeName_out, respectively.

    • If the automatic parsing feature is enabled, the system generates an output name for a node based on the code parsing result in the format of projectName.tableName.

3. Save the configurations

In this example, you can configure other required configuration items based on your business requirements. After the configuration is complete, click the image.png icon in the top toolbar on the configuration tab of the node to save the node configurations.

2. Configure the dws_user_info_all_di node

Aggregate the basic user information in the MaxCompute table ods_user_info_d and the preliminarily processed log data in the MaxCompute table dwd_log_info_di into the dws_user_info_all_di aggregate table that stores user access information.

1. Edit node code

On the configuration tab of the workflow, double-click the name of the dws_user_info_all_di node. On the configuration tab of the node, enter the following code. In DataWorks, variables in the node code are defined in the format of ${Variable name}. ${bizdate} is a variable in the node code. The value of the variable is assigned in the next step.

-- Scenario: Aggregate the processed log data in the dwd_log_info_di table and the basic user information in the ods_user_info_d table and write the aggregated data to the dws_user_info_all_di table. 
-- Note: You can configure scheduling parameters for nodes in DataWorks to write incremental data to the related partition in the desired table every day in the scheduling scenario. 
-- In actual development scenarios, you can define variables in the code of a node in the format of ${Variable name} and assign scheduling parameters to the variables on the Properties tab of the configuration tab of the node. This way, the values of scheduling parameters can be dynamically replaced in the node code based on the configurations of the scheduling parameters. 
INSERT OVERWRITE TABLE dws_user_info_all_di PARTITION (dt='${bizdate}')
SELECT COALESCE(a.uid, b.uid) AS uid
  , b.gender
  , b.age_range
  , b.zodiac
  , a.region
  , a.device
  , a.identity
  , a.method
  , a.url
  , a.referer
  , a.time
FROM (
  SELECT *
  FROM dwd_log_info_di 
  WHERE dt = '${bizdate}'
) a
LEFT OUTER JOIN (
  SELECT *
  FROM ods_user_info_d
  WHERE dt = '${bizdate}'
) b
ON a.uid = b.uid;

2. Configure scheduling properties

If you configure scheduling properties for the ods_user_info_d node as those shown in the following content, after the node synchronizes data from the ApsaraDB RDS for MySQL table ods_user_info_d to the MaxCompute table ods_user_info_d and the dwd_log_info_di node processes data in the ods_raw_log_d table at 00:15 every day in the scheduling scenario, the dws_user_info_all_di node is triggered to aggregate and write the data to the related partition in the dws_user_info_all_di table.

  • In the Parameters section, enter bizdate for Parameter Name and $[yyyymmdd-1] for Parameter Value, which is used to query the date of the previous day.

    image.png

  • In the Schedule section, set Scheduling Cycle to Day. You do not need to separately configure the Scheduled time parameter for the current node. The time when the current node is scheduled to run every day is determined by the scheduling time of the root node of the WorkShop workflow. The scheduling time of the root node is later than 00:15 every day.

  • In the Dependencies section, set Automatic Parsing From Code Before Node Committing to Yes to allow the system to configure the dwd_log_info_di and ods_user_info_d nodes that generate the dwd_log_info_di and ods_user_info_d tables as the ancestor nodes of the dws_user_info_all_di node. The dws_user_info_all_di table is used as the output of the dws_user_info_all_di node. This way, the dws_user_info_all_di node can be automatically configured as an ancestor node of other nodes when these nodes query the table data generated by the dws_user_info_all_di node.

    image.png

3. Save the configurations

In this example, you can configure other required configuration items based on your business requirements. After the configuration is complete, click the image.png icon in the top toolbar on the configuration tab of the node to save the node configurations.

3. Configure the ads_user_info_1d node

Further process the dws_user_info_all_di aggregate table that is used to store user access information into the ads_user_info_1d table that is used to store basic user profile data.

1. Edit node code

On the configuration tab of the workflow, double-click the name of the ads_user_info_1d node. On the configuration tab of the node, enter the following code. In DataWorks, variables in the node code are defined in the format of ${Variable name}. ${bizdate} is a variable in the node code. The value of the variable is assigned in the next step.

-- Scenario: The following SQL statement is used to further process the dws_user_info_all_di wide table that is used to store user access information into the ads_user_info_1d table that is used to store basic user profile data. 
-- Note: You can configure scheduling parameters for nodes in DataWorks to write incremental data to the related partition in the desired table every day in the scheduling scenario. 
-- In actual development scenarios, you can define variables in the code of a node in the format of ${Variable name} and assign scheduling parameters to the variables on the Properties tab of the configuration tab of the node. This way, the values of scheduling parameters can be dynamically replaced in the node code based on the configurations of the scheduling parameters. 
INSERT OVERWRITE TABLE ads_user_info_1d PARTITION (dt='${bizdate}')
SELECT uid
  , MAX(region)
  , MAX(device)
  , COUNT(0) AS pv
  , MAX(gender)
  , MAX(age_range)
  , MAX(zodiac)
FROM dws_user_info_all_di
WHERE dt = '${bizdate}'
GROUP BY uid; 

2. Configure scheduling properties

To implement periodic scheduling, you need to define the properties that are relevant to periodic scheduling of nodes.

  • In the Parameters section, enter bizdate for Parameter Name and $[yyyymmdd-1] for Parameter Value, which is used to query the date of the previous day.

    image.png

  • In the Schedule section, you do not need to separately configure the Scheduled time parameter for the current node. The time when the current node is scheduled to run every day is determined by the scheduling time of the root node of the workflow named WorkShop. The scheduling time of the root node is later than 00:15 every day.

  • In the Dependencies section, set Automatic Parsing From Code Before Node Committing to Yes to allow the system to configure the dws_user_info_all_1d node that generates the dws_user_info_all_1d table as the ancestor node of the ads_user_info_1d node. The ads_user_info_1d table is used as the output of the ads_user_info_1d node. This way, the ads_user_info_1d node can be automatically configured as an ancestor node of other nodes when these nodes query the table data generated by the ads_user_info_1d node.image.png

3. Save the configurations

In this example, you can configure other required configuration items based on your business requirements. After the configuration is complete, click the image.png icon in the top toolbar on the configuration tab of the node to save the node configurations.

Step 4: Run the workflow

Before you deploy the nodes in the workflow to the production environment, you can run the entire workflow to test the code of nodes to ensure accuracy.

Run the workflow

On the configuration tab of the WorkShop workflow, you need to confirm whether the scheduling dependencies that are configured based on the automatic parsing feature for the nodes in the workflow are consistent with those in the following figure. After you confirm that the scheduling dependencies are valid, click the image.png icon in the top toolbar to run the workflow.image.png

View the result

After all nodes in the workflow enter the image.png state, you can query the finally processed result table.

  1. In the left-side navigation pane of the DataStudio page, click image.png.

  2. In the Ad Hoc Query pane, right-click Ad Hoc Query and choose Create Node > ODPS SQL.

    Execute the following SQL statement in the ODPS SQL node to confirm the final result table in this example.

    // You must specify the data timestamp of the data on which you perform read and write operations as the filter condition for partitions. For example, if a node is scheduled to run on February 22, 2023, the data timestamp of the node is 20230221, which is one day earlier than the scheduling time of the node. 
    select count(*) from ads_user_info_1d where dt='Data timestamp';

Step 5: Commit and deploy the workflow

The nodes can be automatically scheduled and run only after the nodes are deployed to the production environment. For more information, see the following content.

Commit the workflow to the development environment

In the top toolbar of the configuration tab of the workflow, click the image.png icon to commit all nodes in the workflow. In the Commit dialog box, configure the parameters as shown in the following figure, and click Confirm.

Commit the workflow to the production environment

After you commit the workflow, the nodes in the workflow enter the development environment. You must deploy the nodes to the production environment because the nodes in the development environment cannot be automatically scheduled.

  1. In the top toolbar of the configuration tab of the workflow, click the image.png icon. Alternatively, go to the configuration tab of one of the nodes in DataStudio and click the Deploy icon in the upper-right corner to go to the Create Deploy Task page.

  2. Deploy the desired nodes at the same time. The deployed content includes the resources and functions that are involved in the workflow.

    image.png

Step 6: Run the nodes in the production environment

In actual development scenarios, you can use the data backfill feature in the production environment to backfill data of a historical period of time or a period of time in the future.

  1. Go to the Operation Center page.

    After the nodes are deployed, click Operation Center in the upper-right corner of the configuration tab of the node.

    You can also click Operation Center in the top toolbar on the configuration tab of the workflow to go to the Operation Center page.

  2. Backfill data for auto triggered nodes.

    1. In the left-side navigation pane, choose Cycle Task Maintenance > Cycle Task. On the page that appears, click the root node WorkShop_Start of the WorkShop workflow.

    2. Right-click the WorkShop_Start node and choose Run > Current and Descendant Nodes Retroactively.

    3. Select all descendant nodes of the WorkShop_Start node, enter data timestamps, and then click OK. The Patch Data page appears.image.png

  3. Click Refresh until all ODPS SQL nodes are successfully run.

Note

After the test is complete, you can configure the Validity Period parameter for the nodes or freeze the root node of the workflow to which the nodes belong to prevent the fees of long-term node scheduling. The root node is the zero load node named WorkShop_Start.

What to do next

To ensure that the table data generated by nodes in the periodic scheduling scenario meets your business requirements, you can configure monitoring rules to monitor the data quality of tables generated by the nodes. For more information, see Configure rules to monitor data quality.

Appendix: Data processing example

  • Before data processing

    58.246.10.82##@@2d24d94f14784##@@2014-02-12 13:12:25##@@GET /wp-content/themes/inove/img/feeds.gif HTTP/1.1##@@200##@@2572##@@http://coolshell.cn/articles/10975.html##@@Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.107 Safari/537.36
  • After data processing

    image.png