During streaming data processing, data synchronization between Kafka and other systems or data migration between Kafka clusters is often required. This topic describes how to use Kafka Connect to migrate data between Kafka clusters.

Prerequisites

Background information

Kafka Connect is a scalable and reliable tool for fast transmitting streaming data between Kafka and other systems. For example, you can use Kafka Connect to obtain binlog data from a database and migrate the data of the database to a Kafka cluster. In this way, you can migrate the data of the database and indirectly connect the database to a downstream streaming data processing system. Kafka Connect also provides a Representational State Transfer (REST) application programming interface (API) to help you create and manage Kafka Connect connectors.

Kafka Connect can run in standalone or distributed mode. In standalone mode, all workers run in the same process. Compared with the standalone mode, the distributed mode is more scalable and fault-tolerant. It is the most commonly used mode and the recommended mode for the production environment.

This topic describes how to call the REST API of Kafka Connect to migrate data between Kafka clusters, where Kafka Connect runs in distributed mode.

Step 1: Create Kafka clusters

Create a source Kafka cluster and a target Kafka cluster in E-MapReduce. Kafka Connect is installed on the task node. Therefore, a task node must be created in the target Kafka cluster. Kafka Connect is started on the task node by default after the cluster is created. The port number is 8083.

We recommend that you add the source Kafka cluster and the target Kafka cluster to the same security group. If the source Kafka cluster and the target Kafka cluster belong to different security groups, the two clusters are not accessible to each other by default. You must modify the required settings of the security groups to allow mutual access.

  1. Log on to the Alibaba Cloud E-MapReduce console.
  2. Create the source Kafka cluster and the target Kafka cluster. For more information, see Create a cluster.
    Note When creating the target Kafka cluster, you must configure a task instance, that is, a task node.

    Create a Kafka cluster

Step 2: Create a topic for storing the data to be migrated

Create a topic named connect in the source Kafka cluster.

  1. Use Secure Shell (SSH) to log on to the header node of the source Kafka cluster. In this example, the header node is emr-header-1.
  2. Run the following command as the root user to create a topic named connect:
    kafka-topics.sh --create --zookeeper emr-header-1:2181 --replication-factor 2 --partitions 10 --topic connect

    Create a topic
    Note After performing the preceding operations, keep the logon window for later use.

Step 3: Create a Kafka Connect connector

On the task node of the target Kafka cluster, run the curl command to create a Kafka Connect connector by using JavaScript Object Notation (JSON) data.

  1. Use SSH to log on to the task node of the target Kafka cluster. In this example, the task node is emr-worker-3.
  2. Optional: Customize Kafka Connect configuration.

    Go to the Configuration page of the Kafka service under the target Kafka cluster. Customize the offset.storage.topic, config.storage.topic, and status.storage.topic parameters in connect-distributed.properties. For more information, see Configure parameters.

    Kafka Connect saves the offsets, configurations, and task status in the topics specified by the offset.storage.topic, config.storage.topic, and status.storage.topic parameters, respectively. Kafka Connect automatically creates these topics by using the default partition and replication factor that are saved in /etc/ecm/kafka-conf/connect-distributed.properties.

  3. Run the following command as the root user to create a Kafka Connect connector:
    curl -X POST -H "Content-Type: application/json" --data '{"name": "connect-test", "config": { "connector.class": "EMRReplicatorSourceConnector", "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "src.kafka.bootstrap.servers": "${src-kafka-ip}:9092", "src.zookeeper.connect": "${src-kafka-curator-ip}:2181", "dest.zookeeper.connect": "${dest-kafka-curator-ip}:2181", "topic.whitelist": "${source-topic}", "topic.rename.format": "${dest-topic}", "src.kafka.max.poll.records": "300" } }' http://emr-worker-3:8083/connectors

    In the JSON data, the name field indicates the name of the Kafka Connect connector to create, which is connect-test in this example. The config field needs to be configured based on your actual requirements. The following table describes the key variables of the config field.

    Variable Description
    ${source-topic} The topics for storing the data to be migrated in the source Kafka cluster. For example, connect. Separate multiple topics with commas (,).
    ${dest-topic} The topics to which the data is migrated in the target Kafka cluster. For example, connect.replica.
    ${src-kafka-curator-hostname} The internal IP address of the node where the ZooKeeper service is installed in the source Kafka cluster.
    ${dest-kafka-curator-hostname} The internal IP address of the node where the ZooKeeper service is installed in the target Kafka cluster.
    Note After performing the preceding operations, keep the logon window for later use.

Step 4: View the status of the Kafka Connect connector and task node

View the status of the Kafka Connect connector and task node and make sure that they are in normal status.

  1. Return to the logon window on the task node of the target Kafka cluster. In the example, the task node is emr-worker-3.
  2. Run the following command as the root user to view all Kafka Connect connectors:
    curl emr-worker-3:8083/connectors

    View all Kafka Connect connectors
  3. Run the following command as the root user to view the status of the Kafka Connect connector created in this example, that is, connect-test:
    curl emr-worker-3:8083/connectors/connect-test/status

    View the status of connect-test

    Make sure that the Kafka Connect connector (connect-test in this example) is in the RUNNING status.

  4. Run the following command as the root user to view the details of the task node:
    curl emr-worker-3:8083/connectors/connect-test/tasks

    View task node details

    Make sure that no error message about the task node is returned.

Step 5: Generate the data to be migrated

Send the data to be migrated to the connect topic in the source Kafka cluster.

  1. Return to the logon window on the header node of the source Kafka cluster. In this example, the header node is emr-header-1.
  2. Run the following command as the root user to send data to the connect topic:
    kafka-producer-perf-test.sh --topic connect --num-records 100000 --throughput 5000 --record-size 1000 --producer-props bootstrap.servers=emr-header-1:9092

    Generate the data to be migrated

Step 6: View the result of data migration

After the data to be migrated is generated, Kafka Connect automatically migrates the data to the corresponding topic in the target Kafka cluster. In this example, the topic is connect.replica.

  1. Return to the logon window on the task node of the target Kafka cluster. In this example, the task node is emr-worker-3.
  2. Run the following command as the root user to check whether the data is migrated:
    kafka-consumer-perf-test.sh --topic connect.replica --broker-list emr-header-1:9092 --messages 100000

    View the result of data migration

    According to the command output in the preceding figure, the 100,000 messages sent to the source Kafka cluster are migrated to the target Kafka cluster.

Summary

This topic describes and demonstrates how to use Kafka Connect to migrate data between Kafka clusters. For more information about how to use Kafka Connect, see Kafka official website and REST API.