This topic describes how to use Data Integration to ship data collected by LogHub of Log Service to destinations that Data Integration supports, such as MaxCompute, Object Storage Service (OSS), Table Store, relational database management systems (RDBMSs), and DataHub. In this topic, MaxCompute is used as an example.

Background information

You may need to synchronize data collected by LogHub with destinations in the following scenarios:
  • Synchronize data between LogHub and connections such as MaxCompute across regions.
  • Synchronize data between LogHub and connections such as MaxCompute across Alibaba Cloud accounts.
  • Synchronize data between LogHub and connections such as MaxCompute under the same Alibaba Cloud account.
  • Synchronize data between LogHub and connections such as MaxCompute across an Alibaba Cloud account and a Finance Cloud account.
If you have two Alibaba Cloud accounts A and B, you can use account B to create a sync node in Data Integration. Then, you can synchronize LogHub data under account A to MaxCompute under account B. The procedure is as follows:
  1. Use the AccessKey ID and AccessKey secret of account A to create a LogHub connection.

    Account B has the permission to access all Log Service projects created by account A.

  2. Use the AccessKey ID and AccessKey secret of Resource Access Management (RAM) user A1 under account A to create a LogHub connection.
    • Use account A to grant RAM user A1 the AliyunLogFullAccess and AliyunLogReadOnlyAccess permissions on Log Service. For more information, see Authorize a RAM user to connect to Log Service.
    • Use account A to grant RAM user A1 custom permissions on Log Service.

      Use account A to log on to the RAM console. In the left-side navigation pane, choose Permissions > Policies. On the Policies page that appears, click Create Policy.

      For more information about authorization, see Authorization - Overview and Overview.

      If the following policy is applied to RAM user A1, account B can only synchronize data of project_name1 and project_name2 in Log Service through RAM user A1:
      {
      "Version": "1",
      "Statement": [
      {
      "Action": [
      "log:Get*",
      "log:List*",
      "log:CreateConsumerGroup",
      "log:UpdateConsumerGroup",
      "log:DeleteConsumerGroup",
      "log:ListConsumerGroup",
      "log:ConsumerGroupUpdateCheckPoint",
      "log:ConsumerGroupHeartBeat",
      "log:GetConsumerGroupCheckPoint"
      ],
      "Resource": [
      "acs:log:*:*:project/project_name1",
      "acs:log:*:*:project/project_name1/*",
      "acs:log:*:*:project/project_name2",
      "acs:log:*:*:project/project_name2/*"
      ],
      "Effect": "Allow"
      }
      ]
      }

Add a LogHub connection

  1. Log on to the DataWorks console. In the left-side navigation pane, click Workspaces. On the Workspaces page, find the target workspace and click Data Integration in the Actions column.
  2. On the Data Integration page, click Connection in the left-side navigation pane. The Data Source page appears.
  3. On the Data Source page, click Add Connection in the upper-right corner.
  4. In the Add Connection dialog box that appears, click LogHub.
  5. In the Add LogHub Connection dialog box, set parameters for the LogHub connection.
    Parameter Description
    Connection Name The name of the connection. The name can contain letters, digits, and underscores (_) and must start with a letter.
    Description The description of the connection. The description can be up to 80 characters in length.
    LogHub Endpoint The endpoint of LogHub, in the format of http://yyy.com. For more information, see Service endpoint.
    Project The name of the Log Service project.
    AccessKey ID The AccessKey ID used as the logon credential. You can copy the AccessKey ID on the Security Management page.
    AccessKey Secret The AccessKey Secret used as the logon credential.
  6. Click Test Connection.
  7. After the connectivity test is passed, click Complete.

Create a batch sync node

  1. On the Data Source page, click the DataWorks icon in the upper-left corner and choose All Products > DataStudio. The DataStudio page appears.
  2. In the Data Analytics section, move the pointer over the Create icon icon and select Workflow.
  3. In the Create Workflow dialog box that appears, set Workflow Name and Description and click Create.
  4. Expand the created workflow in the left-side workflow list, right-click Data Integration, and then choose Create > Batch Synchronization.
  5. In the Create Node dialog box that appears, set Node Name and Location.
  6. Click Commit. The node editing tab appears.

Configure the batch sync node on the codeless user interface (UI)

  1. In the Connections step, set parameters in the Source section.
    Source section
    Parameter Description
    Connection The name of the connection. Select LogHub and enter the name of the LogHub connection.
    Logstore The name of the table from which incremental data is exported. You must enable the Stream feature for a table when creating the table, or by calling the UpdateTable operation after creating the table.
    Start Timestamp The start time of data consumption. This parameter defines the left boundary of an interval (left-closed and right-open) in the format of yyyyMMddHHmmss, for example, 20180111013000. The parameter can work with the scheduling time parameters in DataWorks.
    End Timestamp The end time of data consumption. This parameter defines the right boundary of an interval (left-closed and right-open) in the format of yyyyMMddHHmmss, for example, 20180111013010. The parameter can work with the scheduling time parameters in DataWorks.
    Records per Batch The number of data entries read at a time. The default value is 256.
    Note You can click Preview to preview a small number of LogHub data entries in the preview box. Based on the start time and end time you specify for data synchronization, these data entries may be different from the actual data to be synchronized.
  2. Select a MaxCompute connection and a destination table in the Target section.
  3. In the Mappings step, configure the mapping between the fields in the source and destination tables.
  4. In the Channel step, configure the maximum transmission rate and dirty data check rules.
  5. Verify that the preceding configuration is correct and click the Save icon in the upper-left corner of the tab.
  6. Run the batch sync node.
    You can run the batch sync node in one of the following ways:
    • One-time running
      Click the Run icon in the toolbar to run the node on the node editing tab.
      Note After you click the Run icon, set the bizdate parameter in the dialog box that appears.
    • Scheduled running
      Click the Submit icon in the toolbar to submit the node to the scheduling system. The scheduling system then automatically runs the node from the next day based on the scheduling time parameters.Scheduling time parameters

      In the Properties dialog box that appears, set Arguments to startTime=$[yyyymmddhh24miss-10/24/60] endTime=$[yyyymmddhh24miss-5/24/60] in the General section, as shown in the preceding figure. The value indicates that the start time of the node is 10 minutes before the scheduling time, and the end time is 5 minutes before the scheduling time.

      In the Schedule section, set Instance Recurrence to Minute, Start From to 00:00, Interval to 05, and End At to 23:59. Then the node is scheduled to run every 5 minutes from 00:00 to 23:59.

Configure the batch sync node in the code editor

  1. After creating the batch sync node, click the Switch to Code Editor icon in the toolbar on the node editing tab.
  2. In the Confirm dialog box that appears, click OK to switch to the code editor.
  3. Click the Apply Template icon in the toolbar.
  4. In the Apply Template dialog box that appears, set Source Connection Type to LogHub and Target Connection Type to ODPS, select the source and destination connections, and then click OK to apply a template.
  5. Edit code as required in the code editor. The sample code is as follows:
    {
    "type": "job",
    "version": "1.0",
    "configuration": {
    "reader": {
    "plugin": "loghub",
    "parameter": {
    "datasource": "loghub_lzz",// The name of the source connection. Use the name of the connection that you have added.
    "logstore": "logstore-ut2",// The name of the source Logstore. A Logstore is a Log Service unit for collecting, storing, and querying log data.
    "beginDateTime": "${startTime}",// The start time of data consumption. This parameter defines the left boundary of an interval (left-closed and right-open).
    "endDateTime": "${endTime}",// The end time of data consumption. This parameter defines the right boundary of an interval (left-closed and right-open).
    "batchSize": 256,// The number of data entries read at a time. The default value is 256.
    "splitPk": "",
    "column": [
    "key1",
    "key2",
    "key3"
    ]
    }
    },
    "writer": {
    "plugin": "odps",
    "parameter": {
    "datasource": "odps_first",// The name of the destination connection. Use the name of the connection that you have added.
    "table": "ok",// The name of the destination table.
    "truncate": true,
    "partition": "",// The partition information.
    "column": [// The name of the destination column.
    "key1",
    "key2",
    "key3"
    ]
    }
    },
    "setting": {
    "speed": {
    "mbps": 8,// The maximum transmission rate.
    "concurrent": 7// The maximum number of concurrent threads.
    }
    }
    }
    }