This topic describes how to use Data Integration to synchronize data in LogHub data
sources of Log Service to destinations that are supported by Data Integration, such
as MaxCompute, Object Storage Service (OSS), Tablestore, relational database management
systems (RDBMSs), and DataHub. In this topic, MaxCompute is used as a destination.
Prerequisites
- A MaxCompute data source is added. For more information, see Add a MaxCompute data source.
- The Logstore from which you want to read data and the MaxCompute table to which you
want to write the data are prepared.
Background information
You can synchronize data from a LogHub data source to a destination in the following
scenarios:
- Synchronize data from a LogHub data source to data sources, such as a MaxCompute data
source, across regions.
- Synchronize data from a LogHub data source to data sources, such as a MaxCompute data
source, across Alibaba Cloud accounts.
- Synchronize data from a LogHub data source to data sources, such as a MaxCompute data
source, within the same Alibaba Cloud account.
- Synchronize data from a LogHub data source to data sources, such as a MaxCompute data
source, across the Alibaba Cloud public cloud and Alibaba Finance Cloud.
If you have Alibaba Cloud accounts A and B, you can use Account B to create a synchronization
node in Data Integration. Then, you can use the synchronization node to synchronize
LogHub data within Account A to a MaxCompute data source within Account B. The following
descriptions provide detailed information:
- Use the AccessKey ID and AccessKey secret of Account A to create a LogHub data source.
Account B can be used to synchronize data in all Log Service projects created by using
Account A.
- Use the AccessKey ID and AccessKey secret of RAM user A1 within Account A to add a
LogHub data source.
- Use Account A to attach the
AliyunLogFullAccess
and AliyunLogReadOnlyAccess
system policies on Log Service to RAM user A1. For more information, see Create a RAM user and authorize the RAM user to access Log Service.
- Use Account A to grant custom permissions on Log Service to RAM user A1.
Use Account A to log on to the RAM console. In the left-side navigation pane, choose
. On the Policies page, click Create Policy.
For more information about how to grant custom permissions on Log Service to a RAM
user, see Overview and the Authorize a RAM user to access Log Service section in RAM Overview.
If the following policy is attached to RAM user A1, Account B can be used to synchronize
data only of project_name1 and project_name2 created by using RAM user A1 in Log Service.
{
"Version": "1",
"Statement": [
{
"Action": [
"log:Get*",
"log:List*",
"log:CreateConsumerGroup",
"log:UpdateConsumerGroup",
"log:DeleteConsumerGroup",
"log:ListConsumerGroup",
"log:ConsumerGroupUpdateCheckPoint",
"log:ConsumerGroupHeartBeat",
"log:GetConsumerGroupCheckPoint"
],
"Resource": [
"acs:log:*:*:project/project_name1",
"acs:log:*:*:project/project_name1/*",
"acs:log:*:*:project/project_name2",
"acs:log:*:*:project/project_name2/*"
],
"Effect": "Allow"
}
]
}
Add a LogHub data source
- Log on to the DataWorks console. In the left-side navigation pane, click Workspaces. On the Workspaces page, find
the desired workspace and click Data Integration in the Actions column.
- On the Data Integration page, click Data Source in the left-side navigation pane. The Data Source page appears.
- On the Data Source page, click Add data source in the upper-right corner.
- In the Add data source dialog box, click LogHub in the Message Queue section.
- In the Add LogHub data source dialog box, configure the parameters.
Parameter |
Description |
Data Source Name |
The name of the data source. The name can contain only letters, digits, and underscores
(_) and must start with a letter.
|
Data Source Description |
The description of the data source. The description cannot exceed 80 characters in
length.
|
LogHub Endpoint |
The URL that is used to access the Log Service project. The URL is in the format of
http://example.com . example.com indicates the endpoint of the Log Service project. For more information
about how to obtain the endpoint of a Log Service project, see Endpoints.
|
Project |
The name of the Log Service project. |
AccessKey ID |
The AccessKey ID of the Alibaba Cloud account that is used to connect to the Log Service
project. You can copy the AccessKey ID on the AccessKey Pair page.
|
AccessKey Secret |
The AccessKey secret of the Alibaba Cloud account that is used to connect to the Log
Service project.
|
- Click Test connectivity in the Actions column that corresponds to the resource group you want to use to test
the connectivity between the LogHub data source and resource group.
- If the connectivity test is successful, click Complete.
Create a batch synchronization node
- On the Data Source page, click the icon in the upper-left corner and choose . The DataStudio page appears.
- On the DataStudio page, move the pointer over the
icon and select Create Workflow.
- In the Create Workflow dialog box, configure the Workflow Name and Description parameters and click Create.
- Click the name of the created workflow in the Scheduled Workflow pane, right-click
Data Integration, and then choose .
- In the Create Node dialog box, configure the Name and Path parameters.
- Click Commit. The configuration tab of the node appears.
Configure the batch synchronization node on the codeless UI
- In the Connections step, configure the parameters in the Data source section.

Parameter |
Description |
Data source |
The name of the LogHub data source. Select LogHub from the drop-down list on the left
and select the LogHub data source that you added from the drop-down list on the right.
|
Logstore |
The name of the Logstore from which you want to read data. |
The start time of data consumption. This parameter defines the left boundary of a
time range (left-closed and right-open) in the format of yyyyMMddHHmmss. Example: 20180111013000. The parameter can work with the scheduling parameters in DataWorks.
|
Log end time |
The end time of data consumption. This parameter defines the right boundary of a time
range (left-closed and right-open) in the format of yyyyMMddHHmmss. Example: 20180111013010. The parameter can work with the scheduling parameters in DataWorks.
|
Number of batches |
The number of data entries to read at a time. Default value: 256. |
Note You can click Data preview to preview data. Only a small number of LogHub data entries
are displayed. The data entries that are displayed may be different from the actual
data to be synchronized due to the start time and end time that you specified.
- In the Connections step, select the MaxCompute data source that you added and configure
the remaining parameters in the Data Destination section.
- In the Mappings step, configure field mappings between the source and destination.
- In the Channel step, configure the parameters.
- Verify that the preceding configuration is correct and click the Save icon in the upper-left corner of the configuration tab.
- Run the batch synchronization node.
You can use one of the following methods to run the batch synchronization node:
- Run the node only once.
Click the
Run icon in the top toolbar to run the node on the node configuration tab.
Note Before you run the node, you must configure custom parameters for the node.
- Run the node based on the scheduling configurations of the node.
Click the Submit icon in the top toolbar to commit the node to the scheduling system. The scheduling
system automatically runs the node from the next day based on the scheduling properties
that you configured.
Click the Properties tab in the right-side navigation pane of the configuration tab
of the node. In the Parameters section of the Properties tab, enter startTime=$[yyyymmddhh24miss-10/24/60] and endTime=$[yyyymmddhh24miss-5/24/60]. The values indicate that the start time
of the node is 10 minutes earlier than the system time and the end time is 5 minutes
earlier than the system time.
In the Schedule section, set Scheduling Cycle to Minute, Start From to 00:00, Interval
to 05, and End At to 23:59, as shown in the preceding figure. Then, the node is scheduled
to run every 5 minutes from 00:00 to 23:59.
Configure the batch synchronization node in the code editor
- On the configuration tab of the batch synchronization node, click the Conversion script icon in the top toolbar.
- In the Tips message, click OK to switch to the code editor.
- Click the Import Template icon in the top toolbar.
- In the Import Template dialog box, configure the Source type, Data source, Target type, and Data source
parameters and click Determine to apply the template.
- Edit the code based on your business requirements in the code editor. Sample code:
{
"type": "job",
"version": "1.0",
"configuration": {
"reader": {
"plugin": "loghub",
"parameter": {
"datasource": "loghub_lzz",// The name of the LogHub data source from which you want to read data. The name must be the same as the name of the data source that you added.
"logstore": "logstore-ut2",// The name of the Logstore from which you want to read data. A Logstore is a Log Service unit for collecting, storing, and querying log data.
"beginDateTime": "${startTime}",// The start time of data consumption. This parameter defines the left boundary of a time range (left-closed and right-open).
"endDateTime": "${endTime}",// The end time of data consumption. This parameter defines the right boundary of a time range (left-closed and right-open).
"batchSize": 256,// The number of data entries to read at a time. Default value: 256.
"splitPk": "",
"column": [
"key1",
"key2",
"key3"
]
}
},
"writer": {
"plugin": "odps",
"parameter": {
"datasource": "odps_first",// The name of the data source to which you want to write data. The name must be the same as the name of the data source that you added.
"table": "ok",// The name of the table to which you want to write data.
"truncate": true,
"partition": "",// The partition information in the destination table.
"column": [// The names of the columns to which you want to write data.
"key1",
"key2",
"key3"
]
}
},
"setting": {
"speed": {
"mbps": 8,// The maximum transmission rate.
"concurrent": 7// The maximum number of parallel threads.
}
}
}
}