When streaming data is processed, data synchronization between Kafka and other systems
or data migration between Kafka clusters is often required. This topic describes how
to use Kafka Connect in E-MapReduce (EMR) to migrate data between Kafka clusters.
Prerequisites
- An Alibaba Cloud account is created.
- EMR is activated.
- The Alibaba Cloud account is authorized. For more information, see Authorize roles.
Background information
Kafka Connect is a scalable and reliable tool used to synchronize data between Kafka
and other systems and to transmit streaming data between Kafka clusters. For example,
you can use Kafka Connect to obtain binlog data from a database and migrate the data
of the database to a Kafka cluster. This also indirectly connects the database to
the downstream streaming data processing system of the Kafka cluster. Kafka Connect
also provides a RESTful API to help you create and manage Kafka Connect connectors.
Kafka Connect can run in standalone or distributed mode. In standalone mode, all workers
run in the same process. The distributed mode is more scalable and fault-tolerant
than the standalone mode. The distributed mode is the most commonly used mode and
the recommended mode for the production environment.
This topic describes how to call the RESTful API of Kafka Connect to migrate data
between Kafka clusters, where Kafka Connect runs in distributed mode.
Step 1: Create Kafka clusters
Create a source Kafka cluster and a destination Kafka cluster in the EMR console.
Kafka Connect is installed on a task node. To install Kafka Connect, you must create
a task node in the destination Kafka cluster. Kafka Connect is started on the task
node after the destination cluster is created. The port number is 8083.
- Log on to the EMR console.
- Create a source Kafka cluster and a destination Kafka cluster. For more information,
see Create a cluster.
Step 2: Create a topic used to store the data you want to migrate
Create a topic named connect in the source Kafka cluster.
- Log on to the master node of the source Kafka cluster by using SSH. In this example,
the master node is emr-header-1.
- Run the following command as the root user to create a topic named connect:
kafka-topics.sh --create --zookeeper emr-header-1:2181 --replication-factor 2 --partitions 10 --topic connect

Note After you complete the preceding operations, do not close the logon window. The logon
window is required in a later step.
Step 3: Create a Kafka Connect connector
On the task node of the destination Kafka cluster, run the curl
command to create a Kafka Connect connector by using JSON data.
- Customize Kafka Connect configurations.
Go to the Configure tab of the Kafka service under the destination Kafka cluster in the Alibaba Cloud EMR console. Specify the offset.storage.topic, config.storage.topic, and status.storage.topic parameters on the connect-distributed.properties subtab.
Kafka Connect saves the offsets, configurations, and task status in the topics specified
by the offset.storage.topic, config.storage.topic, and status.storage.topic parameters, respectively. Kafka Connect automatically creates these topics by using
the default settings of partitions and replication-factor that are saved in /etc/ecm/kafka-conf/connect-distributed.properties.
- Log on to the master node of the destination Kafka cluster. In this example, the master
node is emr-header-1.
- Switch to the task node named emr-worker-3 in this example.
- Run the following command as the root user to create a Kafka Connect connector:
curl -X POST -H "Content-Type: application/json" --data '{"name": "connect-test", "config": { "connector.class": "EMRReplicatorSourceConnector", "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "src.kafka.bootstrap.servers": "${src-kafka-ip}:9092", "src.zookeeper.connect": "${src-kafka-curator-ip}:2181", "dest.zookeeper.connect": "${dest-kafka-curator-ip}:2181", "topic.whitelist": "${source-topic}", "topic.rename.format": "${dest-topic}", "src.kafka.max.poll.records": "300" } }' http://emr-worker-3:8083/connectors
In the JSON data, the
name field specifies the name of the Kafka Connect connector that you want to create.
In this example, the name is
connect-test. Configure the
config field based on your needs. The following table describes the key variables of the
config field.
Variable |
Description |
${source-topic} |
The topics that store the data to be migrated in the source Kafka cluster, for example,
connect. Separate multiple topics with commas (,).
|
${dest-topic} |
The topics to which data is migrated in the destination Kafka cluster, for example,
connect.replica.
|
${src-kafka-curator-hostname} |
The internal IP address of the node where the ZooKeeper service is deployed in the
source Kafka cluster.
|
${dest-kafka-curator-hostname} |
The internal IP address of the node where the ZooKeeper service is deployed in the
destination Kafka cluster.
|
Note After you complete the preceding operations, do not close the logon window. The logon
window is required in a later step.
Step 4: View the status of the Kafka Connect connector and task node
View the status of the Kafka Connect connector and task node and make sure that they
are normal.
- Return to the logon window of the task node of the destination Kafka cluster. In this
example, the task node is emr-worker-3.
- Run the following command as the root user to view all Kafka Connect connectors:
curl emr-worker-3:8083/connectors

- Run the following command as the root user to view the status of the Kafka Connect
connector created in this example, that is, connect-test:
curl emr-worker-3:8083/connectors/connect-test/status

Make sure that the Kafka Connect connector, connect-test in this example, is in the RUNNING state.
- Run the following command as the root user to view the details of the task node:
curl emr-worker-3:8083/connectors/connect-test/tasks

Make sure that no error message about the task node is returned.
Step 5: Generate the data to be migrated
Send the data to be migrated to the connect topic in the source Kafka cluster.
- Return to the logon window of the master node of the source Kafka cluster. In this
example, the master node is emr-header-1.
- Run the following command as the root user to send data to the connect topic:
kafka-producer-perf-test.sh --topic connect --num-records 100000 --throughput 5000 --record-size 1000 --producer-props bootstrap.servers=emr-header-1:9092
When the information shown in the following figure appears, the data to be migrated
is generated.

Step 6: View the data migration results
After the data to be migrated is generated, Kafka Connect automatically migrates the
data to the corresponding topic in the destination Kafka cluster. In this example,
the topic is connect.replica.
- Return to the logon window of the task node of the destination Kafka cluster. In this
example, the task node is emr-worker-3.
- Run the following command as the root user to check whether the data is migrated:
kafka-consumer-perf-test.sh --topic connect.replica --broker-list emr-header-1:9092 --messages 100000

Based on the command output in the preceding figure, the 100,000 messages sent to
the source Kafka cluster are migrated to the destination Kafka cluster.
Summary
This topic describes how to use Kafka Connect to migrate data between Kafka clusters.
For more information about how to use Kafka Connect, visit the Kafka official website and see RESTful API.