Data migration through Kafka Connect

During streaming data processing, E-MapReduce often needs to synchronize data between Kafka and other systems or perform data migration between Kafka clusters. This section introduces you how to quickly realize data synchronization or data migration between Kafka clusters through Kafka Connect on E-MapReduce.

prerequisite

•A cloud account has been registered. For details, please refer to registering a cloud account.
• The E-MapReduce service has been activated.
• The cloud account authorization has been completed. For details, see Role Authorization.

Background Information

Kafka Connect is a scalable, reliable tool for fast streaming data transfer between Kafka and other systems. For example, Kafka Connect can obtain the binlog data of the database and synchronize the database data to the Kafka cluster, so as to achieve the purpose of migrating the database data. Since the Kafka cluster can be connected to the stream processing system, it can also indirectly realize the purpose of connecting the database to the downstream stream processing system. At the same time, Kafka Connect also provides a REST API interface, which is convenient for you to create and manage Kafka Connect.

Kafka Connect is divided into two operating modes: standalone and distributed. In standalone mode, all workers run in one process. Compared with the standalone mode, the distributed mode is more scalable and fault-tolerant. It is the most commonly used mode and the recommended mode for production environments.

This article describes how to use the REST API interface of Kafka Connect on E-MapReduce to perform data migration between Kafka clusters. Kafka Connect uses the distributed mode.

Step 1 Create a Kafka cluster

Create a source Kafka cluster and a destination Kafka cluster on EMR. Kafka Connect is installed on the Task node, so the destination Kafka cluster must create a Task node. After the cluster is created, the Kafka Connect service on the Task node will be started by default, and the port number is 8083.

It is recommended that you create the source Kafka cluster and the destination Kafka cluster under the same security group. If the source Kafka cluster and the destination Kafka cluster are not in the same security group, the networks of the two are not interoperable by default. You need to configure the security groups of the two so that the two networks can communicate with each other.

Log in to the Alibaba Cloud E-MapReduce console.

Create a source Kafka cluster and a destination Kafka cluster. For details, see Creating Clusters.
Note When creating a destination Kafka cluster, you must start a Task instance, that is, create a Task node.

Step 2 Prepare the data topic to be migrated

Create a topic named connect on the source Kafka cluster.

Log in to the header node of the source Kafka cluster (emr-header-1 in this example) in SSH mode.
Run the following command as the root user to create a topic named connect.
kafka-topics.sh --create --zookeeper emr-header-1:2181 --replication-factor 2 --partitions 10 --topic connect

Note After completing the above operations, please keep the login window, which will still be used later.

Step 3 Create a Kafka Connect connector

On the Task node of the destination Kafka cluster, use the curl command to create a Kafka Connect connector through JSON data.

Log in to the Task node (emr-worker-3 in this section) of the destination Kafka cluster in SSH mode.
Optional: Customize Kafka Connect configuration.
Enter the Kafka service configuration page of the destination Kafka cluster, and customize the three configuration items offset.storage.topic, config.storage.topic, and status.storage.topic in connect-distributed.properties. For details, see Component Parameter Configuration.
Kafka Connect will save offsets, configs, and task status in Topic, and the Topic name corresponds to the three configuration items offset.storage.topic, config.storage.topic, and status.storage.topic. Kafka Connect will automatically use the default partition and replication factor to create these three Topics, where the partition and replication factor configuration items are saved in the /etc/ecm/kafka-conf/connect-distributed.properties file.

Run the following command as root user to create a Kafka Connect.
curl -X POST -H "Content-Type: application/json" --data '{"name": "connect-test", "config": { "connector.class": "EMRReplicatorSourceConnector", "key.converter" : "org.apache.kafka.connect.converters.ByteArrayConverter", "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "src.kafka.bootstrap.servers": "${src- kafka-ip}:9092", "src.zookeeper.connect": "${src-kafka-curator-ip}:2181", "dest.zookeeper.connect": "${dest-kafka-curator-ip} :2181", "topic.whitelist": "${source-topic}", "topic.rename.format": "${dest-topic}", "src.kafka.max.poll.records": "300 " } }' http://emr-worker-3:8083/connectors
In the JSON data, the name field represents the name of the created Kafka Connect, which is connect-test in this example; the config field needs to be configured according to the actual situation, and the description of the key variables is as follows

Note After completing the above operations, please keep the login window, which will still be used later.

Step 4 View the status of Kafka Connect and Task nodes

Check the Kafka Connect and Task node information to ensure that the status of both is normal.

Return to the login window of the Task node (emr-worker-3 in this section) of the destination Kafka cluster.
Run the following command as root user to view all Kafka Connects.
curl emr-worker-3:8083/connectors
image.png

Run the following command as the root user to check the status of the Kafka Connect created in this example (connect-test in this example).
curl emr-worker-3:8083/connectors/connect-test/status
image.png
Make sure the status of Kafka Connect (connect-test in this example) is RUNNING.

Run the following command as the root user to view Task node information.
curl emr-worker-3:8083/connectors/connect-test/tasks

Make sure that there is no error message in the returned information of the Task node.

Step 5 Generate data to be migrated

Send the data to be migrated to the connect topic in the source cluster through the command.

Return to the login window of the header node (emr-header-1 in this example) of the source Kafka cluster.
Run the following command as the root user to send data to the connect topic.
kafka-producer-perf-test.sh --topic connect --num-records 100000 --throughput 5000 --record-size 1000 --producer-props bootstrap.servers=emr-header-1:9092
image.png

Step 6 View data migration results
After generating the data to be migrated, Kafka Connect will automatically migrate the data to the corresponding file of the destination cluster (connect.replica in this example).

Return to the login window of the Task node (emr-worker-3 in this section) of the destination Kafka cluster.

Run the following command as the root user to check whether the data migration is successful.
kafka-consumer-perf-test.sh --topic connect.replica --broker-list emr-header-1:9092 --messages 100000

From the above returned results, it can be seen that the 100,000 pieces of data sent from the source Kafka cluster have been migrated to the destination Kafka cluster.

Summary

This article introduces and demonstrates how to use Kafka Connect to migrate data between Kafka clusters. If you need to know more detailed usage of Kafka Connect, please refer to Kafka official website information and REST API.

Related Articles

Explore More Special Offers

  1. Short Message Service(SMS) & Mail Service

    50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00

phone Contact Us