This topic describes how to use the Data Integration service of DataWorks to ship data from LogHub to MaxCompute. You can also ship data from LogHub to other supported destinations, such as Object Storage Service (OSS), Tablestore, relational database management system (RDBMS), and Datahub.

Background information

The Data Integration service of DataWorks is a stable, efficient, and scalable data synchronization platform provided by Alibaba Cloud. This platform transmits data in batches for Alibaba Cloud big data services, such as MaxCompute and AnalyticDB.

Scenarios

  • Data synchronization between LogHub and MaxCompute across regions
  • Data synchronization between LogHub and MaxCompute across Alibaba Cloud accounts
  • Data synchronization between LogHub and MaxCompute of the same Alibaba Cloud account
  • Data synchronization between LogHub and MaxCompute of an Alibaba Cloud account and Finance Cloud account

Additional considerations for data synchronization across Alibaba Cloud accounts

If you have two Alibaba Cloud accounts (Accounts A and B), you can use Account B to create a data integration task. Then, you can synchronize LogHub data under Account A to MaxCompute under Account B. The procedure is as follows:

  • Use the AccessKey ID and AccessKey secret of Account A to create a LogHub data source.

    Account B has permission to access all Log Service projects created by Account A.

  • Use the AccessKey ID and AccessKey secret of RAM User A1 under Account A to create a LogHub data source.
    • Use Account A to grant the AliyunLogFullAccess and AliyunLogReadOnlyAccess Log Service permissions to A1. For more information, see Grant a RAM user the permissions to access Log Service.
    • Use Account A to grant custom Log Service permissions to RAM User A1.

      Use Account A to log on to the RAM console. Choose RAM console > Permissions > Policies, and then click Create Policy.

      For more information about authorization, see Authorization - Overview and Overview.

      If the following policy is attached to RAM User A1, Account B can read data from project_name1 and project_name2 in Log Service through RAM User A1.
      {
      "Version": "1",
      "Statement": [
      {
      "Action": [
      "log:Get*",
      "log:List*",
      "log:CreateConsumerGroup",
      "log:UpdateConsumerGroup",
      "log:DeleteConsumerGroup",
      "log:ListConsumerGroup",
      "log:ConsumerGroupUpdateCheckPoint",
      "log:ConsumerGroupHeartBeat",
      "log:GetConsumerGroupCheckPoint"
      ],
      "Resource": [
      "acs:log:*:*:project/project_name1",
      "acs:log:*:*:project/project_name1/*",
      "acs:log:*:*:project/project_name2",
      "acs:log:*:*:project/project_name2/*"
      ],
      "Effect": "Allow"
      }
      ]
      }

Procedure

  1. Create a data source.
    For more information, see Configure a Datahub connection.
  2. Create a workflow.
    1. Log on to the DataWorks console as a developer, find the target workspace, and then click Data Analytics.
    2. On the Data Analytics page, move the pointer over the Workflow icon and click Workflow.
    3. In the Create Workflow dialog box, set the Workflow Name and Description parameters.
    4. Click Create.
  3. Create an offline data synchronization node.
    1. Expand the workflow and right-click Data Integration.
    2. Choose Create > Batch Synchronization.
    3. In the Create Node dialog box that appears, set the Node Name and Location parameters.
    4. Click Commit.
  4. Select a source connection.
    After you configure an offline data synchronization node, you must select a source connection for the node.
    Parameter Description
    Connection Select LogHub and the LogHub connection name.
    Logstore The name of the target Logstore.
    Start Timestamp The start time of data consumption. The value is the time when log data reaches LogHub. This parameter defines the left boundary of an interval (left-closed and right-open) in the format of yyyyMMddHHmmss, for example, 20180111013000. The parameter corresponds to the scheduling time parameter in DataWorks.
    End Timestamp The end time of data consumption. This parameter defines the right boundary of an interval (left-closed and right-open) in the format of yyyyMMddHHmmss, for example, 20180111013010. The parameter corresponds to the scheduling time parameter in DataWorks.
    Batch Size The number of data entries that is read at a time. Default value: 256.
    You can click the Preview button to preview the data.
    Note Data preview allows you to view a small number of LogHub data entries in a preview box. Based on the Start Time and End Time, these data entries may be different from the data to be synchronized.
  5. Select a target connection.
    Select MaxCompute and a table. In this example, select the ok table.
    Parameter Description
    Connection Select the name of the connection that you have configured.
    Table Select the table that you want to synchronize.
    Partition Key Column The table to be synchronized is an unpartitioned table. No partition key column is displayed.
    Writing Rule
    • Write with Original Data Deleted: All data in the table or partition is cleared before new data is imported. This rule is equivalent to the INSERT OVERWRITE syntax.
    • Write with Original Data Retained: No data is cleared before data import. This rule is equivalent to the INSERT INTO syntax, which appends data to the table.
    Compression Specifies whether to compress data. Default value: Disable.
    Convert Empty Strings to Null Specifies whether to convert empty strings to null. Default value: No.
  6. Configure field mappings.
    Map the fields between the source and target tables. The fields in the source table have a one-to-one correspondence with the fields in the target table.
    Note When log fields are shipped, the __time__ field is mapped to the C_LogTime field, and the __source__ field is mapped to the C_Source field. For the Tag field, the content after the colon is used for mapping.
  7. Configure the channel.
    Set the bandwidth throttling and the number of allowed dirty data records.
    Parameter Description
    Expected Maximum Concurrency The maximum number of concurrent threads to read data from the source table or write data to the target table in a single synchronization task. You can configure the concurrency for a node on the codeless UI.
    Bandwidth Throttling Specifies whether to enable bandwidth throttling. You can enable bandwidth throttling and set a maximum transmission rate to limit the speed of reading data from the source database. We recommend that you enable bandwidth throttling and set the maximum transmission rate based on the configurations of the source database.
    Dirty Data Records Allowed The maximum number of dirty data records allowed.
    Resource Group The resource group in which the synchronization task runs. By default, the task runs in the default resource group. If you need to run a large number of tasks and the resources are insufficient, you can purchase exclusive data integration resources or add a custom resource group. For more information, see DataWorks exclusive resources andAdd a custom resource group.
  8. Run the synchronization task.
    You can run the synchronization task by using either of the following methods:
    • Run the task only once.

      Click Run in the tool bar, set the parameters, and then run the task on the Data Integration page.

    • Schedule the synchronization task.

      Click Submit to submit the synchronization task to the scheduling system.

Create the synchronization task by using the code editor

After you create an offline data synchronization node, click Switch to Code Editor.Switch to the code editor
Set the parameters based on your business requirements. The sample code is as follows:
{
"type": "job",
"version": "1.0",
"configuration": {
"reader": {
"plugin": "loghub",
"parameter": {
"datasource": "loghub_lzz",// The name of the source connection that you have added.
"logstore": "logstore-ut2",// The name of the source Logstore.
"beginDateTime": "${startTime}",// The start time of data consumption. This parameter defines the left boundary of an interval (left-closed and right-open).
"endDateTime": "${endTime}",// The start time of data consumption. This parameter defines the right boundary of an interval (left-closed and right-open).
"batchSize": 256,// The number of data entries that is read at a time. Default value: 256.
"splitPk": "",
"column": [
"key1",
"key2",
"key3"
]
}
},
"writer": {
"plugin": "odps",
"parameter": {
"datasource": "odps_first",// The name of the source connection that you have added.
"table": "ok",// The name of the target table.
"truncate": true,
"partition":"",// The partition information.
"column": [// The name of the target column.
"key1",
"key2",
"key3"
]
}
},
"setting": {
"speed": {
"mbps": 8,// The maximum transmission rate
"concurrent": 1, // The maximum number of concurrent jobs
}
}
}
}