All Products
Search
Document Center

DataWorks:Collect data

最終更新日:Jun 07, 2024

This topic describes how to use DataWorks Data Integration to synchronize data between heterogeneous data sources. You can use this method to collect data to data warehouses. In the example in this topic, a batch synchronization task in Data Integration is used to synchronize basic user information stored in an ApsaraDB RDS for MySQL table named ods_user_info_d to a MaxCompute table named ods_user_info_d and synchronize website access logs stored in an Object Storage Service (OSS) object named user_log.txt to another MaxCompute table named ods_raw_log_d.

Prerequisites

The basic user information and website access logs of users are prepared and are stored in an ApsaraDB RDS for MySQL instance and an OSS bucket. You can directly register and use the prepared data in DataWorks. You do not need to activate ApsaraDB RDS for MySQL and OSS or prepare test data. You need to only make sure that the following requirements are met:

  • A DataWorks workspace is created.

    In this example, a workspace in standard mode is used. The name of the workspace is WorkShop2024_01. You can specify a name based on your business requirements.

  • A MaxCompute data source is added.

    In this example, a MaxCompute data source named odps_first is added. The name of the MaxCompute project used in the production environment is workshop2024_01. The name of the MaxCompute project used in the development environment is workshop2024_01_dev.

  • Optional. If you perform operations in this example as a RAM user, make sure that the RAM user is attached the AliyunBSSOrderAccess and AliyunDataWorksFullAccess policies. For information about authorization, see Grant permissions to a RAM user.

Background information

Data Integration is a stable, efficient, and scalable data synchronization service. It can efficiently transmit and synchronize data between heterogeneous data sources in complex network environments. Data Integration provides different types of synchronization solutions such as batch synchronization, incremental data synchronization, and real-time full and incremental data synchronization.

In this example, a batch synchronization solution is used. DataWorks encapsulates the batch synchronization capabilities of Data Integration into batch synchronization nodes. Each batch synchronization node represents a synchronization task. You can configure a source and a destination for a node to define data transmission between the data sources and configure field mappings to define the read and write relationships between source fields and destination fields.

Important
  • In this example, the test data and data sources that are required are prepared. To access the test data from your workspace, you need to only add the data source information to your workspace.

  • The data in this experiment can be used only for experimental operations in DataWorks, all the data is manual mock data, and the data can only be read in Data Integration.

Information about source data and destination tables

The Data Integration service is used to synchronize basic user information stored in ApsaraDB RDS for MySQL and website access logs stored in OSS to MaxCompute. The following table describes information about source data and destination tables.

Source

Destination (MaxCompute)

MySQL

Table: ods_user_info_d

  • uid

  • gender

  • age_range

  • zodiac

Table: ods_user_info_d

  • uid string

  • gender string

  • age_range string

  • zodiac string

  • Partition field: dt

  • Lifecycle: 7 days

OSS

object: user_log.txt

$remote_addr - $remote_user [$time_local] "$request" $status 
$body_bytes_sent"$http_referer" "$http_user_agent" [unknown_content];

Table: ods_raw_log_d

  • col string

  • Partition field: dt

  • Lifecycle: 7 days

Step 1: Purchase and configure an exclusive resource group for Data Integration

In this example, synchronization tasks are used to synchronize log data stored in OSS and MySQL to MaxCompute. The synchronization tasks are run based on exclusive resource groups for Data Integration. Therefore, you need to purchase and configure exclusive resource groups for Data Integration.

  1. Purchase an exclusive resource group for Data Integration.

    Log on to the DataWorks console and access the buy page of exclusive resources. Set the Region parameter to China (Shanghai), set the exclusive resources type parameter to exclusive data integration resource, configure other parameters as prompted, and then pay for the exclusive resource. For information about the billing details of exclusive resource groups for Data Integration, see Billing of exclusive resource groups for Data Integration (subscription).

    Note

    In this example, an exclusive resource group for Data Integration that is deployed in the China (Shanghai) region is used. Note that exclusive resource groups for Data Integration do not support cross-region operations.

  2. Configure the exclusive resource group for Data Integration.

    1. Log on to the DataWorks console. In the left-side navigation pane, click Resource Groups.

    2. On the Exclusive Resource Groups tab of the Resource Groups page, find the exclusive resource group for Data Integration that you purchased, click Change Workspace in the Actions column and then associate the resource group with the DataWorks workspace that you create as prompted.

For more information about how to create and use an exclusive resource group for Data Integration, see Create and use an exclusive resource group for Data Integration.

Step 2: Add data sources

In this example, you must add an HttpFile data source named user_behavior_analysis_httpfile and an ApsaraDB RDS for MySQL data source named user_behavior_analysis_mysql to your workspace for you to access the test data. The basic information about the data sources used for the test is provided.

Note
  • Before you configure a Data Integration synchronization task, you can add and configure the source and destination databases or data warehouses on the Data Source page in the DataWorks console. This allows you to search for the data sources by name when you configure the synchronization task to determine the source and destination databases or data warehouses that you want to use.

  • The data in this experiment can be used only for experimental operations in DataWorks, all the data is manual mock data, and the data can only be read in Data Integration.

Add the HttpFile data source named user_behavior_analysis_httpfile

Add the HttpFile data source to your workspace. Then, test whether a network connection is established between the data source and the resource group you want to use for data synchronization. The HttpFile data source is used to read the website access test data of users that is stored in OSS and can be accessed from DataWorks.

  1. Go to the Data Source page.

    1. Log on to the DataWorks console. In the left-side navigation pane, click Management Center. On the page that appears, select the desired workspace from the drop-down list and click Go to Management Center.

    2. In the left-side navigation pane of the page that appears, click Data Sources > Data Sources. The Data Source page appears.

  2. Add the HttpFile data source.

    1. In the left-side navigation pane of the SettingCenter page, choose Data Sources > Data Sources. On the Data Sources page, click Add Data Source.

    2. In the Add Data Source dialog box, click HttpFile.

    3. On the Add HttpFile Data Source page, configure the parameters. The following table describes the parameters.

      Parameter

      Description

      Data Source Name

      The name of the data source. It is the identifier of the data source in your workspace. In this example, the parameter is set to user_behavior_analysis_httpfile.

      Data Source Description

      The description of the data source. The data source is dedicated to the example in this topic. You can access the test data provided by DataWorks if you configure a batch synchronization task to read data from this data source. You can add this data source only on the Data Integration page. You cannot add the data source in other modules.

      Environment

      Select Development Environment and Production Environment.

      Note

      You must add a data source in the development environment and a data source in the production environment. Otherwise, an error is reported when the related task is run to produce data.

      URL Domain

      The URL of the OSS bucket. Enter the https://dataworks-workshop-2024.oss-cn-shanghai.aliyuncs.com.

      Resource group connectivity

      In the table that displays resource group information, find the exclusive resource group for Data Integration that you purchased and click Test Network Connectivity in the Connection Status column. You need to separately test the network connections between the resource group and the data sources in the development and production environments. After the system returns a message indicating that the test is successful, the connectivity status changes to Connected.

Add the ApsaraDB RDS for MySQL data source named user_behavior_analysis_mysql

Add the ApsaraDB RDS for MySQL data source to your workspace. Then, test whether a network connection is established between the data source and the resource group that you want to use for data synchronization. The ApsaraDB RDS for MySQL data source is used to read the basic user information that is stored in ApsaraDB RDS for MySQL and can be accessed from DataWorks.

  1. Go to the Data Source page.

    1. Log on to the DataWorks console. In the left-side navigation pane, click Management Center. On the page that appears, select the desired workspace from the drop-down list and click Go to Management Center.

    2. In the left-side navigation pane of the page that appears, click Data Sources > Data Sources. The Data Source page appears.

  2. Add the ApsaraDB RDS for MySQL data source.

    1. In the left-side navigation pane of the SettingCenter page, choose Data Sources > Data Sources. On the Data Sources page, click Add Data Source.

    2. In the Add Data Source dialog box, click MySQL.

    3. On the Add MySQL Data Source page, configure the parameters. The following table describes the parameters.

      image

      Parameter

      Description

      Configuration Mode

      Set this parameter to Connection String Mode.

      Data Source Name

      The name of the data source. Enter user_behavior_analysis_mysql.

      Data Source Description

      The description of the data source. The data source is dedicated to the example in this topic. You can access the test data provided by DataWorks if you configure a batch synchronization task to read data from this data source. You can add this data source only on the Data Integration page. You cannot add the data source in other modules.

      Environment

      Select Development and Production.

      Note

      You must add a data source in the development environment and a data source in the production environment. Otherwise, an error is reported when the related task is run to produce data.

      JDBC URL

      Set this parameter to jdbc:mysql://rm-bp1z69dodhh85z9qa.mysql.rds.aliyuncs.com:3306/workshop.

      Username

      The username that is used to connect to the ApsaraDB RDS database. Enter workshop.

      Password

      The password that is used to connect to the ApsaraDB RDS database. Enter workshop#2017.

      Authentication Options

      Set this parameter to No Authentication.

      Resource group connectivity

      In the table that displays resource group information, find the exclusive resource group for Data Integration that you purchased and click Test Network Connectivity in the Connection Status column. You need to separately test the network connections between the resource group and the data sources in the development and production environments. After the system returns a message indicating that the test is successful, the connectivity status changes to Connected.

Step 3: Create a workflow

Design a workflow based on requirement analysis. Create and use two batch synchronization nodes named ods_raw_log_d and ods_user_info_d to synchronize basic user information from ApsaraDB RDS for MySQL and website access logs from OSS. Then, create a zero load node named workshop_start to manage nodes in the workflow in a centralized manner. This topic describes only the data collection procedure. Specific task configurations are not included.

1. Create a workflow

By default, DataWorks provides a workflow named workflow. You can skip the workflow creation steps and directly use the provided workflow.

  1. Go to the DataStudio page.

    Log on to the DataWorks console. In the left-side navigation pane, choose Data Modeling and Development > DataStudio. On the page that appears, select the desired workspace from the drop-down list and click Go to DataStudio.

  2. Create a workflow.

    In the Scheduled Workflow pane, right-click Business Flow and select Create Workflow. In the Create Workflow dialog box, configure the Workflow Name parameter based on your business requirements. In this example, WorkShop is used.

2. Design the workflow

  1. Go to the configuration tab of the workflow.

    Double-click the name of the workflow created in the previous step to go to the configuration tab of the workflow.

  2. Create nodes.

    You can design the workflow by performing drag-and-drop operations on components on the configuration tab of the workflow. Click Create Node. Find the node type based on which you want to create a node and drag it to the workflow canvas on the right.

    In this example, you need to create a zero load node named workshop_start and two batch synchronization nodes named ods_raw_log_d and ods_user_info_d. The ods_raw_log_d node is used to synchronize website access logs from OSS and the ods_user_info_d node is used to synchronize basic user information from ApsaraDB RDS for MySQL.

  3. Configure scheduling dependencies for nodes.

    Configure the workshop_start node as the ancestor node of the two batch synchronization nodes. In this example, no data lineage exists between the zero load node and batch synchronization nodes. Therefore, you can draw lines to configure the scheduling dependencies between the zero load node workshop_start and batch synchronization nodes ods_raw_log_d and ods_user_info_d in the workflow. For more information about the methods for configuring scheduling dependencies, see Scheduling dependency configuration guide.

    image.png

Step 4: Create MaxCompute tables

You must create MaxCompute tables that are used to store the data synchronized by using Data Integration in advance. In this example, the tables are created in a quick manner. For more information about MaxCompute table-related operations, see Create and manage MaxCompute tables.

  1. Go to the entry point for creating tables.

    image.png

  2. Create a table named ods_raw_log_d.

    In the Create Table dialog box, enter ods_raw_log_d in the Name field. In the upper part of the table configuration tab, click DDL, enter the following table creation statement, and then click Generate Table Schema. In the Confirm dialog box, click Confirmation to overwrite the original configurations.

    CREATE TABLE IF NOT EXISTS ods_raw_log_d
    (
     col STRING
    ) 
    PARTITIONED BY
    (
     dt STRING
    )
    LIFECYCLE 7;
  3. Create a table named ods_user_info_d.

    In the Create Table dialog box, enter ods_user_info_d in the Name field. In the upper part of the table configuration tab, click DDL, enter the following table creation statement, and then click Generate Table Schema. In the Confirm dialog box, click Confirmation to overwrite the original configurations.

    CREATE TABLE IF NOT EXISTS ods_user_info_d (
     uid STRING COMMENT 'User ID',
     gender STRING COMMENT 'Gender',
     age_range STRING COMMENT 'Age range',
     zodiac STRING COMMENT 'Zodiac sign'
    )
    PARTITIONED BY (
     dt STRING
    )
    LIFECYCLE 7;
  4. Commit and deploy the tables.

    After you confirm that the table information is valid, click Commit to Development Environment and Commit to Production Environment in sequence on the configuration tab of the ods_user_info_d and ods_raw_log_d tables. In the MaxCompute projects that are associated with the workspace in the development and production environments, the system creates the related physical tables in the MaxCompute projects based on the node configurations.

    Note

    After you define the schema of a table, you can commit the table to the development and production environments. After the table is committed, you can view the table in the MaxCompute project in a specific environment.

    • If you commit the tables to the development environment of the workspace, the tables are created in the MaxCompute project that is associated with the workspace in the development environment.

    • If you commit the tables to the production environment of the workspace, the tables are created in the MaxCompute project that is associated with the workspace in the production environment.

Step 5: Configure a zero load node named workshop_start

In this example, the workshop_start node is run at 00:15 every day to trigger the running of the user profile analysis workflow. The workshop_start node is a management and control node. You do not need to write code for the node. The following content describes the scheduling configurations of the node:

  1. Go to the configuration tab of the workshop_start node.

    On the DataStudio page, double-click the name of the workflow created in the previous step to go to the configuration tab of the workflow. Double-click the zero load node workshop_start. On the configuration tab of the zero load node, click Properties in the right-side navigation pane.

  2. Configure the scheduling properties for the workshop_start node.

    Configure the following settings for the workshop_start node to allow the root node of the workspace to trigger the running of the workshop_start node at 00:15 every day:

    • Configure the scheduling time: Set Scheduling Cycle to Day and Scheduled time to 00:15.

    • Configure the rerun properties: Set Rerun to Allow Regardless of Running Status.

    • Configure scheduling dependencies: Configure the root node of the workspace as the ancestor node of the workshop_start node.

      In this example, the workshop_start node is a management and control node and does not depend on other nodes. Therefore, you can configure the workshop_start node to depend on the root node of the workspace. This way, the root node of the workspace can be used to trigger the running of the current user profile analysis workflow.

      Note

      By default, the root node of a workspace is automatically generated after you create the workspace. In most cases, all nodes in a workspace depend on the root node of the workspace. By default, the root node of a workspace starts to trigger all its level-1 descendant nodes to run at 00:00. The configurations of the root node cannot be changed.

Step 6: Configure the ods_raw_log_d node

In this step, you can configure the ods_raw_log_d node to synchronize website access logs of users from the OSS object user_log.txt to the MaxCompute table ods_raw_log_d.

On the DataStudio page, double-click the ods_raw_log_d node in the workflow. On the configuration tab of the node, configure the node.

1. Establish network connections between the data sources and the resource group that you want to use

In this step, an exclusive resource group for Data Integration is used. You need to test the network connectivity between the resource group and the source user_behavior_analysis_httpfile and the destination MaxCompute data source.

image

  • Set Source to HttpFile and Data Source Name to user_behavior_analysis_httpfile, which is the data source that you added in Step 2: Add data sources.

  • In the Resource Group step, select the exclusive resource group for Data Integration that you purchased from the drop-down list.

  • Set Destination to MaxCompute and select the data source that you add.

2. Configure a synchronization task

  1. Configure basic settings of the synchronization task.

    • For the source, you need to configure the user_log.txt object as the object from which you want to read data.

    • For the destination, you need to configure the ods_raw_log_d table as the table to which you want to write data. You also need to configure the Partition information parameter. The bizdate variable is defined in the ${} format in the Partition information field. The variable value is assigned in the subsequent Step 3.

      image

  2. Configure field mappings and general settings.

    DataWorks allows you to configure mappings between source fields and destination fields to read data from specified source fields and write data to the destination fields. In the Channel Control section, you can also use the features such as data read and write parallelism, the maximum transmission rate that can prevent data synchronization from affecting the performance of databases, and the policy for dirty data records and distributed execution. In this example, the default settings are used. For information about other configuration items for a synchronization task, see Configure a batch synchronization task by using the codeless UI.

3. Configure scheduling properties

If you configure scheduling properties for the synchronization task as those shown in the following figure, DataWorks Data Integration synchronizes data from the OSS object user_log.txt to the time-based partition in the MaxCompute table ods_raw_log_d on 00:15 every day.

  • In the Parameters section, enter bizdate for Parameter Name and $bizdate for Parameter Value, which is used to query the date of the previous day. The format of the parameter values is yyyymmdd.

  • In the Schedule section, set Scheduling Cycle to Day. You do not need to separately configure the Scheduled time parameter for the current node. The time when the current node is scheduled to run every day is determined by the scheduling time of the root node workshop_start of the workflow. The scheduling time of the root node is 00:15 every day.

    image

  • Configure the parameters in the Dependencies section:

    • Determine the ancestor nodes of the current node: Determine whether to display the workshop_start node in Parent Nodes for the current node. The node that you specified as the ancestor node of the current node by drawing lines is displayed. If the workshop_start node is not displayed, check whether the workflow design in the business data collection phase has been completed by referring to 2. Design the workflow.

      In this example, when the scheduling time of the workshop_start node arrives and the node finishes running, the current node is triggered to run.

    • Determine the output of the current node: Determine whether the output named in the format of Name of a MaxCompute project in the production environment.ods_raw_log_d for the current node exists. You can go to the Workspace page in SettingCenter to view the MaxCompute project name in the production environment.

      Note

      In DataWorks, the output of a node is used to configure scheduling dependencies between the node and its descendant nodes. If an SQL node depends on a synchronization node, when the SQL node starts to process the output table of the synchronization node, DataWorks uses the automatic parsing feature to quickly configure the synchronization node as the ancestor node of the SQL node based on the table lineage. You need to confirm whether the node output that has the same name as the node output table ods_raw_log_d exists.

      image

Step 7: Configure the ods_user_info_d node

In this step, you can configure the ods_user_info_d node to synchronize basic user information from the ApsaraDB RDS for MySQL table ods_user_info_d to the MaxCompute table ods_user_info_d.

On the DataStudio page, double-click the ods_user_info_d node in the workflow. On the configuration tab of the node, configure the node.

1. Establish network connections between the data sources and the resource group that you want to use

In this step, the exclusive resource group for Data Integration is used. You need to test the network connectivity between the resource group and the source user_behavior_analysis_mysql and the destination MaxCompute data source.

  • Set Source to MySQL and Data Source Name to user_behavior_analysis_mysql.

  • In the Resource Group step, select the exclusive resource group for Data Integration that you purchased from the drop-down list.

  • Set Destination to MaxCompute and select the data source that you add.

2. Configure a synchronization task

  1. Configure basic settings of the synchronization task.

    • For the source, you need to configure the ods_user_info_d table as the table from which you want to read data.

    • For the destination, you need to configure the ods_user_info_d table as the table to which you want to write data. You also need to configure the Partition information parameter. The bizdate variable is defined in the ${} format in the Partition information field. The variable value is assigned in the subsequent Step 3.

      Note

      In this example, full data is read from the ApsaraDB RDS for MySQL table and written to the specified time-based partition of the MaxCompute table by default.

      image

  2. Configure field mappings and general settings.

    DataWorks allows you to configure mappings between source fields and destination fields to read data from specified source fields and write data to the destination fields. In the Channel Control section, you can also use the features such as data read and write parallelism, the maximum transmission rate that can prevent data synchronization from affecting the performance of databases, and the policy for dirty data records and distributed execution. In this example, the default settings are used. For information about other configuration items for a synchronization task, see Configure a batch synchronization task by using the codeless UI.

3. Configure scheduling properties

If you configure scheduling properties for the synchronization task as those shown in the following figure, DataWorks Data Integration synchronizes data from the ApsaraDB RDS for MySQL table ods_user_info_d to the time-based partition in the MaxCompute table ods_user_info_d on 00:15 every day.

  • In the Parameters section, enter bizdate for Parameter Name and $bizdate for Parameter Value, which is used to query the date of the previous day. The format of the parameter values is yyyymmdd.

  • In the Schedule section, set Scheduling Cycle to Day. You do not need to separately configure the Scheduled time parameter for the current node. The time when the current node is scheduled to run every day is determined by the scheduling time of the root node workshop_start of the workflow. The scheduling time of the root node is 00:15 every day.

    image

  • Configure the parameters in the Dependencies section:

    • Determine the ancestor nodes of the current node: Determine whether to display the workshop_start node in Parent Nodes for the current node. The node that you specified as the ancestor node of the current node by drawing lines is displayed. If the workshop_start node is not displayed, check whether the workflow design in the business data collection phase has been completed by referring to 2. Design the workflow.

      In this example, when the scheduling time of the workshop_start node arrives and the node finishes running, the current node is triggered to run.

    • Determine the output of the current node: Determine whether the output named in the format of Name of a MaxCompute project in the production environment.ods_user_info_d for the current node exists. You can go to the Workspace page in SettingCenter to view the MaxCompute project name in the production environment.

      Note

      In DataWorks, the output of a node is used to configure scheduling dependencies between the node and its descendant nodes. If an SQL node depends on a synchronization node, when the SQL node starts to process the output table of the synchronization node, DataWorks uses the automatic parsing feature to quickly add the synchronization node as the ancestor node of the SQL node based on the table lineage. You need to confirm whether the node output that has the same name as the node output table ods_user_info_d exists.

      image

Step 8: Run the nodes in the WorkShop workflow and view the result

Run the nodes in the current workflow to write the basic user information in ApsaraDB RDS for MySQL and the website access logs of users in OSS to the related MaxCompute tables.

Run the workflow

  1. On the DataStudio page, double-click the WorkShop workflow under Business Flow. On the configuration tab of the workflow, click the image.png icon in the top toolbar to run the nodes in the workflow based on the scheduling dependencies between the nodes.

  2. Confirm the status.

    • View the node status: If a node is in the image.png state, the synchronization process is normal.

    • View the node running logs: For example, right-click the ods_user_info_d or ods_raw_log_d node and select View Logs. If the information shown in the following figure appears in the logs, the node is run and data is synchronized.

      image

Query synchronization results

If the nodes in the workflow are run as expected, all basic user information in the ApsaraDB RDS for MySQL table ods_user_info_d is synchronized to the partition of the previous day in the output table workshop2024_01_dev.ods_user_info_d, and all website access logs of users in the OSS object user_log.txt are synchronized to the partition of the previous day in the output table workshop2024_01_dev.ods_raw_log_d. You do not need to deploy query SQL statements to the production environment for execution. Therefore, you can query synchronization results by creating an ad hoc query.

  1. Create an ad hoc query.

    In the left-side navigation pane of the DataStudio page, click the image.png icon. In the Ad Hoc Query pane, right-click Ad Hoc Query and choose Create Node > ODPS SQL.

  2. Query synchronization result tables.

    Execute the following SQL statements to confirm the data write results. View the number of records that are imported into the ods_raw_log_d and ods_user_info_d tables.

    // You must specify the data timestamp of the data on which you perform read and write operations as the filter condition for partitions. For example, if a node is scheduled to run on June 21, 2023, the data timestamp of the node is 20230620, which is one day earlier than the node running date. 
    select count(*) from ods_user_info_d where dt=Data timestamp; 
    select count(*) from ods_raw_log_d where dt=Data timestamp;

    image

    Note

    In this example, nodes are run in DataStudio, which is the development environment. Therefore, data is written to the specified tables in the MaxCompute project workshop2024_01_dev that is associated with the workspace in the development environment by default.

Step 9: Commit the workflow

After the node code is debugged, commit the user profile analysis workflow to the scheduling system for periodic scheduling. This indicates that the raw business data is periodically synchronized to the MaxCompute destination tables.

  1. Go to the configuration tab of the workflow.

    On the DataStudio page, double-click the workflow name WorkShop to go to the configuration tab of the workflow.

  2. Commit the workflow.

    On the workflow canvas, click the image.png icon in the top toolbar to commit the workflow.

  3. Confirm the commit operation.

    In the Commit dialog box, select all nodes in the current workflow and select Ignore I/O Inconsistency Alerts. Confirm the configurations and click Confirm to commit all nodes in the workflow. On the Deploy page, select and deploy the nodes to the production environment.

What to do next

After you understand how to collect and synchronize data based on this tutorial, you can now proceed to the next tutorial. In the next tutorial, you will learn how to compute and analyze the collected data. For more information, see Process data.