This topic describes how to use DataWorks Data Integration to migrate data from a
Kafka cluster to MaxCompute.
Prerequisites
- MaxCompute is activated. For more information, see Activate MaxCompute.
- DataWorks is activated.
- A workflow is created in DataWorks. In this example, a DataWorks workspace in basic
mode is used. For more information, see Create a workflow.
- A Kafka cluster is created.
Before data migration, make sure that your Kafka cluster works as expected. In this
example, Alibaba Cloud E-MapReduce (EMR) is used to automatically create a Kafka cluster.
For more information, see Kafka quick start.
In this example, the following version of EMR Kafka is used:
- EMR version: V3.12.1
- Cluster type: Kafka
- Software: Ganglia 3.7.2, ZooKeeper 3.4.12, Kafka 2.11-1.0.1, and Kafka Manager 1.3.3.16
The Kafka cluster is deployed in a virtual private cloud (VPC) in the China (Hangzhou)
region. The Elastic Compute Service (ECS) instances in the primary instance group
of the Kafka cluster are configured with public and private IP addresses.
Background information
Kafka is distributed middleware that is used to publish and subscribe to messages.
Kafka is widely used because of its high performance and high throughput. Kafka can
process millions of messages per second. Kafka is applicable to streaming data processing,
and is used in scenarios such as user behavior tracing and log collection.
A typical Kafka cluster contains several producers, brokers, consumers, and a ZooKeeper
cluster. A Kafka cluster uses ZooKeeper to manage configurations and coordinate services
in the cluster.
A topic is the most commonly used collection of messages in a Kafka cluster, and is
a logical concept for message storage. Topics are not stored on physical disks. Instead,
messages in each topic are stored on the disks of each cluster node by partition.
Multiple producers can publish messages to a topic, and multiple consumers can subscribe
to messages in a topic.
When a message is stored to a partition, the message is allocated an offset. The offset
is the unique ID of the message in the partition. The offsets of messages in each
partition start from 0.
Step 1: Prepare Kafka data
You must prepare test data in the Kafka cluster. Configure a security group rule for
the header node of the EMR cluster to allow requests on TCP ports 22 and 9092. This
way, you can log on to the header node of the EMR cluster and MaxCompute and DataWorks
can communicate with the header node.
- Log on to the header node of the EMR cluster.
- Log on to the EMR console.
- In the top navigation bar, click Cluster Management.
- On the page that appears, find the cluster for which you want to prepare test data
and go to the details page of the cluster.
- On the details page of the cluster, click Instances. Find the IP address of the header
node of the E-MapReduce cluster and use the IP address to remotely log on to the header
node by using Secure Shell (SSH).
- Create a test topic.
Run the following command to create a test topic named testkafka:
[root@emr-header-1 ~]# kafka-topics.sh --zookeeper emr-header-1:2181/kafka-1.0.1 --partitions 10 --replication-factor 3 --topic testkafka --create
Created topic "testkafka".
- Write test data.
Run the following command to simulate a producer to write data to the
testkafka topic. Kafka is used to process streaming data. You can continuously write data to
the topic. To ensure that test results are valid, we recommend that you write more
than 10 records.
[root@emr-header-1 ~]# kafka-console-producer.sh --broker-list emr-header-1:9092 --topic testkafka
>123
>abc
>
To simulate a consumer to check whether data is written to Kafka, open another SSH
window and run the following command. If the data that is written appears, the data
is written to the topic.
[root@emr-header-1 ~]# kafka-console-consumer.sh --bootstrap-server emr-header-1:9092 --topic testkafka --from-beginning
123
abc
Step 2: Create a destination table in DataWorks
Create a destination table in DataWorks to receive data from Kafka.
- Go to the DataStudio page.
- Login DataWorks console.
- In the left-side navigation pane, click Workspaces.
- In the top navigation bar, select the region where the target workspace resides. Find
the target workspace and click Data Analytics in the Actions column.
- Right-click workflow, Select .
- In create a table page, select the engine type, and enter table name.
- Click DDL Statement. In the DDL Statement dialog box, enter the following CREATE TABLE statement and click Generate Table Schema:
CREATE TABLE testkafka
(
key string,
value string,
partition1 string,
timestamp1 string,
offset string,
t123 string,
event_id string,
tag string
) ;
Each column in the statement corresponds to a default column of Kafka Reader that
is provided by DataWorks Data Integration.
- __key__: the key of the message.
- __value__: the complete content of the message.
- __partition__: the partition where the message resides.
- __headers__: the header of the message.
- __offset__: the offset of the message.
- __timestamp__: the timestamp of the message.
You can customize a column. For more information, see Kafka Reader.
- Click commit to the production environment and confirm.
Step 3: Synchronize the data
- Create an exclusive resource group for Data Integration.
- In the top navigation bar, choose
icon.
- In script mode, click
icon.
- Configure the script. In this example, enter the following code:
{
"type": "job",
"steps": [
{
"stepType": "kafka",
"parameter": {
"server": "47.xxx.xxx.xxx:9092",
"kafkaConfig": {
"group.id": "console-consumer-83505"
},
"valueType": "ByteArray",
"column": [
"__key__",
"__value__",
"__partition__",
"__timestamp__",
"__offset__",
"'123'",
"event_id",
"tag.desc"
],
"topic": "testkafka",
"keyType": "ByteArray",
"waitTime": "10",
"beginOffset": "0",
"endOffset": "3"
},
"name": "Reader",
"category": "reader"
},
{
"stepType": "odps",
"parameter": {
"partition": "",
"truncate": true,
"compress": false,
"datasource": "odps_first",
"column": [
"key",
"value",
"partition1",
"timestamp1",
"offset",
"t123",
"event_id",
"tag"
],
"emptyAsNull": false,
"table": "testkafka"
},
"name": "Writer",
"category": "writer"
}
],
"version": "2.0",
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
},
"setting": {
"errorLimit": {
"record": ""
},
"speed": {
"throttle": false,
"concurrent": 1,
}
}
}
To view the values of the
group.id parameter and the names of consumer groups, run the
kafka-consumer-groups.sh --bootstrap-server emr-header-1:9092 --list command on the header node.
[root@emr-header-1 ~]# kafka-consumer-groups.sh --bootstrap-server emr-header-1:9092 --list
Note: This will not show information about old Zookeeper-based consumers.
_emr-client-metrics-handler-group
console-consumer-69493
console-consumer-83505
console-consumer-21030
console-consumer-45322
console-consumer-14773
In this example,
console-consumer-83505 is used. Run the
kafka-consumer-groups.sh --bootstrap-server emr-header-1:9092 --describe --group console-consumer-83505 command on the header node to obtain the values of the
beginOffset and
endOffset parameters.
[root@emr-header-1 ~]# kafka-consumer-groups.sh --bootstrap-server emr-header-1:9092 --describe --group console-consumer-83505
Note: This will not show information about old Zookeeper-based consumers.
Consumer group 'console-consumer-83505' has no active members.
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
testkafka 6 0 0 0 - - -
test 6 3 3 0 - - -
testkafka 0 0 0 0 - - -
testkafka 1 1 1 0 - - -
testkafka 5 0 0 0 - - -
- Configure a resource group for scheduling.
- On the node configuration tab, click the Properties tab in the right-side navigation pane.
- In the Resource Group section, set the Resource Group parameter to the exclusive resource group for Data Integration that you have created.
Note Assume that you want to write Kafka data to MaxCompute at a regular interval, for
example, on an hourly basis. You can use the
beginDateTime and
endDateTime parameters to set the interval for data reading to 1 hour. Then, the data integration
node is scheduled to run once per hour. For more information, see
Kafka Reader.
- Click
icon to run the code.
- You can operation Log view the results.
What to do next
You can create a data development job and run SQL statements to check whether the
data has been synchronized from Message Queue for Apache Kafka to the current table.
This topic uses the select * from testkafka
statement as an example. Specific steps are as follows:
- In the left-side navigation pane, choose .
- Right-click and choose .
- In the Create Node dialog box, enter the node name, and then click Submit.
- On the page of the created node, enter
select * from testkafka
and then click the Run icon.