This topic describes how to use the Data Integration service of DataWorks to ship data from LogHub to MaxCompute. You can also ship data from LogHub to other supported destinations, such as Object Storage Service (OSS), Tablestore, relational database management system (RDBMS), and Datahub.
Background information
The Data Integration service of DataWorks is a stable, efficient, and scalable data synchronization platform provided by Alibaba Cloud. This platform transmits data in batches for Alibaba Cloud big data services, such as MaxCompute and AnalyticDB.
Scenarios
- Data synchronization between LogHub and MaxCompute across regions
- Data synchronization between LogHub and MaxCompute across Alibaba Cloud accounts
- Data synchronization between LogHub and MaxCompute of the same Alibaba Cloud account
- Data synchronization between LogHub and MaxCompute of an Alibaba Cloud account and Finance Cloud account
Additional considerations for data synchronization across Alibaba Cloud accounts
If you have two Alibaba Cloud accounts (Accounts A and B), you can use Account B to create a data integration task. Then, you can synchronize LogHub data under Account A to MaxCompute under Account B. The procedure is as follows:
- Use the AccessKey ID and AccessKey secret of Account A to create a LogHub data source.
Account B has permission to access all Log Service projects created by Account A.
- Use the AccessKey ID and AccessKey secret of RAM User A1 under Account A to create
a LogHub data source.
- Use Account A to grant the
AliyunLogFullAccess
andAliyunLogReadOnlyAccess
Log Service permissions to A1. For more information, see Grant a RAM user the permissions to access Log Service. - Use Account A to grant custom Log Service permissions to RAM User A1.
Use Account A to log on to the RAM console. Choose
, and then click .For more information about authorization, see Authorization - Overview and Overview.
If the following policy is attached to RAM User A1, Account B can read data from project_name1 and project_name2 in Log Service through RAM User A1.{ "Version": "1", "Statement": [ { "Action": [ "log:Get*", "log:List*", "log:CreateConsumerGroup", "log:UpdateConsumerGroup", "log:DeleteConsumerGroup", "log:ListConsumerGroup", "log:ConsumerGroupUpdateCheckPoint", "log:ConsumerGroupHeartBeat", "log:GetConsumerGroupCheckPoint" ], "Resource": [ "acs:log:*:*:project/project_name1", "acs:log:*:*:project/project_name1/*", "acs:log:*:*:project/project_name2", "acs:log:*:*:project/project_name2/*" ], "Effect": "Allow" } ] }
- Use Account A to grant the
Procedure
Create the synchronization task by using the code editor

{
"type": "job",
"version": "1.0",
"configuration": {
"reader": {
"plugin": "loghub",
"parameter": {
"datasource": "loghub_lzz",// The name of the source connection that you have added.
"logstore": "logstore-ut2",// The name of the source Logstore.
"beginDateTime": "${startTime}",// The start time of data consumption. This parameter defines the left boundary of an interval (left-closed and right-open).
"endDateTime": "${endTime}",// The start time of data consumption. This parameter defines the right boundary of an interval (left-closed and right-open).
"batchSize": 256,// The number of data entries that is read at a time. Default value: 256.
"splitPk": "",
"column": [
"key1",
"key2",
"key3"
]
}
},
"writer": {
"plugin": "odps",
"parameter": {
"datasource": "odps_first",// The name of the source connection that you have added.
"table": "ok",// The name of the target table.
"truncate": true,
"partition":"",// The partition information.
"column": [// The name of the target column.
"key1",
"key2",
"key3"
]
}
},
"setting": {
"speed": {
"mbps": 8,// The maximum transmission rate
"concurrent": 1, // The maximum number of concurrent jobs
}
}
}
}