All Products
Search
Document Center

DataWorks:Use Data Integration to synchronize data from a LogHub data source to a destination

Last Updated:Aug 17, 2023

This topic describes how to use Data Integration to synchronize data in LogHub data sources of Log Service to destinations that are supported by Data Integration, such as MaxCompute, Object Storage Service (OSS), Tablestore, relational database management systems (RDBMSs), and DataHub. In this topic, MaxCompute is used as a destination.

Prerequisites

  • A MaxCompute data source is added. For more information, see Add a MaxCompute data source.

  • The Logstore from which you want to read data and the MaxCompute table to which you want to write the data are prepared.

Background information

You can synchronize data from a LogHub data source to a destination in the following scenarios:

  • Synchronize data from a LogHub data source to data sources, such as a MaxCompute data source, across regions.

  • Synchronize data from a LogHub data source to data sources, such as a MaxCompute data source, across Alibaba Cloud accounts.

  • Synchronize data from a LogHub data source to data sources, such as a MaxCompute data source, within the same Alibaba Cloud account.

  • Synchronize data from a LogHub data source to data sources, such as a MaxCompute data source, across the Alibaba Cloud public cloud and Alibaba Finance Cloud.

If you have Alibaba Cloud accounts A and B, you can use Account B to create a synchronization node in Data Integration. Then, you can use the synchronization node to synchronize LogHub data within Account A to a MaxCompute data source within Account B. The following descriptions provide detailed information:

  1. Use the AccessKey ID and AccessKey secret of Account A to create a LogHub data source.

    Account B can be used to synchronize data in all Log Service projects created by using Account A.

  2. Use the AccessKey ID and AccessKey secret of RAM user A1 within Account A to add a LogHub data source.

    • Use Account A to attach the AliyunLogFullAccess and AliyunLogReadOnlyAccess system policies on Log Service to RAM user A1. For more information, see Create a RAM user and authorize the RAM user to access Simple Log Service.

    • Use Account A to grant custom permissions on Log Service to RAM user A1.

      Use Account A to log on to the RAM console. In the left-side navigation pane, choose Permissions > Policies. On the Policies page, click Create Policy.

      For more information about how to grant custom permissions on Log Service to a RAM user, see Overview and the Authorize a RAM user to access Log Service section in Overview.

      If the following policy is attached to RAM user A1, Account B can be used to synchronize data only of project_name1 and project_name2 created by using RAM user A1 in Log Service.

      {
      "Version": "1",
      "Statement": [
      {
      "Action": [
      "log:Get*",
      "log:List*",
      "log:CreateConsumerGroup",
      "log:UpdateConsumerGroup",
      "log:DeleteConsumerGroup",
      "log:ListConsumerGroup",
      "log:ConsumerGroupUpdateCheckPoint",
      "log:ConsumerGroupHeartBeat",
      "log:GetConsumerGroupCheckPoint"
      ],
      "Resource": [
      "acs:log:*:*:project/project_name1",
      "acs:log:*:*:project/project_name1/*",
      "acs:log:*:*:project/project_name2",
      "acs:log:*:*:project/project_name2/*"
      ],
      "Effect": "Allow"
      }
      ]
      }

Add a LogHub data source

  1. Log on to the DataWorks console. In the left-side navigation pane, click Data Integration. On the page that appears, select the desired workspace from the drop-down list and click Go to Data Integration.

  2. On the Data Integration page, click Data Source in the left-side navigation pane. The Data Source page appears.

  3. On the Data Source page, click Add data source in the upper-right corner.

  4. In the Add data source dialog box, click LogHub in the Message Queue section.

  5. In the Add LogHub data source dialog box, configure the parameters.

    Parameter

    Description

    Data Source Name

    The name of the data source. The name can contain only letters, digits, and underscores (_) and must start with a letter.

    Data Source Description

    The description of the data source. The description cannot exceed 80 characters in length.

    LogHub Endpoint

    The URL that is used to access the Log Service project. The URL is in the format of http://example.com. example.com indicates the endpoint of the Log Service project. For more information about how to obtain the endpoint of a Log Service project, see Endpoints.

    Project

    The name of the Log Service project.

    AccessKey ID

    The AccessKey ID of the Alibaba Cloud account that is used to connect to the Log Service project. You can copy the AccessKey ID on the AccessKey Pair page.

    AccessKey Secret

    The AccessKey secret of the Alibaba Cloud account that is used to connect to the Log Service project.

  6. Click Test connectivity in the Actions column that corresponds to the resource group you want to use to test the connectivity between the LogHub data source and resource group.

  7. If the connectivity test is successful, click Complete.

Create a batch synchronization node

  1. On the Data Source page, click the icon in the upper-left corner and choose All Products > DataStudio. The DataStudio page appears.

  2. On the DataStudio page, move the pointer over the Create icon icon and select Create Workflow.

  3. In the Create Workflow dialog box, configure the Workflow Name and Description parameters and click Create.

  4. Click the name of the created workflow in the Scheduled Workflow pane, right-click Data Integration, and then choose Create Node > Offline synchronization.

  5. In the Create Node dialog box, configure the Name and Path parameters.

  6. Click Commit. The configuration tab of the node appears.

Configure the batch synchronization node on the codeless UI

  1. In the Connections step, configure the parameters in the Data source section.

    Data source

    Parameter

    Description

    Data source

    The name of the LogHub data source. Select LogHub from the drop-down list on the left and select the LogHub data source that you added from the drop-down list on the right.

    Logstore

    The name of the Logstore from which you want to read data.

    Log start time

    The start time of data consumption. This parameter defines the left boundary of a time range (left-closed and right-open) in the format of yyyyMMddHHmmss. Example: 20180111013000. The parameter can work with the scheduling parameters in DataWorks.

    Log end time

    The end time of data consumption. This parameter defines the right boundary of a time range (left-closed and right-open) in the format of yyyyMMddHHmmss. Example: 20180111013010. The parameter can work with the scheduling parameters in DataWorks.

    Number of batches

    The number of data entries to read at a time. Default value: 256.

    Note

    You can click Data preview to preview data. Only a small number of LogHub data entries are displayed. The data entries that are displayed may be different from the actual data to be synchronized due to the start time and end time that you specified.

  2. In the Connections step, select the MaxCompute data source that you added and configure the remaining parameters in the Data Destination section.

  3. In the Mappings step, configure field mappings between the source and destination.

  4. In the Channel step, configure the parameters.

  5. Verify that the preceding configuration is correct and click the Save icon in the upper-left corner of the configuration tab.

  6. Run the batch synchronization node.

    You can use one of the following methods to run the batch synchronization node:

    • Run the node only once.

      Click the Run icon in the top toolbar to run the node on the node configuration tab.

      Note

      Before you run the node, you must configure custom parameters for the node.

    • Run the node based on the scheduling configurations of the node.

      Click the Submit icon in the top toolbar to commit the node to the scheduling system. The scheduling system automatically runs the node from the next day based on the scheduling properties that you configured.

      Click the Properties tab in the right-side navigation pane of the configuration tab of the node. In the Parameters section of the Properties tab, enter startTime=$[yyyymmddhh24miss-10/24/60] and endTime=$[yyyymmddhh24miss-5/24/60]. The values indicate that the start time of the node is 10 minutes earlier than the system time and the end time is 5 minutes earlier than the system time.

      In the Schedule section, set Scheduling Cycle to Minute, Start From to 00:00, Interval to 05, and End At to 23:59, as shown in the preceding figure. Then, the node is scheduled to run every 5 minutes from 00:00 to 23:59.

Configure the batch synchronization node in the code editor

  1. On the configuration tab of the batch synchronization node, click the Conversion script icon in the top toolbar.

    Conversion script
  2. In the Tips message, click OK to switch to the code editor.

  3. Click the Import Template icon in the top toolbar.

    Import Template
  4. In the Import Template dialog box, configure the Source type, Data source, Target type, and Data source parameters and click Determine to apply the template.

  5. Edit the code based on your business requirements in the code editor. Sample code:

    {
    "type": "job",
    "version": "1.0",
    "configuration": {
    "reader": {
    "plugin": "loghub",
    "parameter": {
    "datasource": "loghub_lzz",// The name of the LogHub data source from which you want to read data. The name must be the same as the name of the data source that you added. 
    "logstore": "logstore-ut2",// The name of the Logstore from which you want to read data. A Logstore is a Log Service unit for collecting, storing, and querying log data. 
    "beginDateTime": "${startTime}",// The start time of data consumption. This parameter defines the left boundary of a time range (left-closed and right-open). 
    "endDateTime": "${endTime}",// The end time of data consumption. This parameter defines the right boundary of a time range (left-closed and right-open). 
    "batchSize": 256,// The number of data entries to read at a time. Default value: 256. 
    "splitPk": "",
    "column": [
    "key1",
    "key2",
    "key3"
    ]
    }
    },
    "writer": {
    "plugin": "odps",
    "parameter": {
    "datasource": "odps_first",// The name of the data source to which you want to write data. The name must be the same as the name of the data source that you added. 
    "table": "ok",// The name of the table to which you want to write data. 
    "truncate": true,
    "partition": "",// The partition information in the destination table. 
    "column": [// The names of the columns to which you want to write data. 
    "key1",
    "key2",
    "key3"
    ]
    }
    },
    "setting": {
    "speed": {
    "mbps": 8,// The maximum transmission rate. 
    "concurrent": 7// The maximum number of parallel threads. 
    }
    }
    }
    }