This topic describes how to use MirrorMaker 2 on Kafka Connect (Kafka MM2) to synchronize data across clusters.

Background information

Scenarios

Kafka MM2 is suitable for the following scenarios:
  • Remote data synchronization: You can use Kafka MM2 to synchronize data among clusters in different regions.
  • Disaster recovery: You can use Kafka MM2 to build a disaster recovery architecture that consists of primary and secondary clusters in different data centers. Data in the two clusters can be synchronized in real time. If one cluster becomes unavailable, you can transfer applications in the cluster to a different cluster. This ensures geo-disaster recovery.
  • Data migration: In scenarios such as cloud migration of businesses, hybrid clouds, and cluster upgrades, data needs to be migrated from the original cluster to a new cluster. You can use Kafka MM2 to migrate data to ensure business continuity.
  • Data aggregation: You can use Kafka MM2 to synchronize data from multiple Kafka sub-clusters to a Kafka central cluster. This way, data can be aggregated.

Features

As a data replication tool, Kafka MM2 provides the following features:
  • Replicates the data and configuration information of topics.
  • Replicates the offset information of consumer groups and the consumed topics.
  • Replicates access control lists (ACLs).
  • Automatically detects new topics and partitions.
  • Provides Kafka MM2 metrics.
  • Provides high-availability architectures that are horizontally scalable.

Job execution methods

Kafka MM2 jobs can be run in the following three methods:
  • Run Kafka MM2 jobs in an existing Kafka Connect cluster in Distributed mode. This method is recommended. You can use the features described in this topic to manage Kafka MM2 jobs.
  • Use the driver program to manage all Kafka MM2 jobs. For more information, see Deploy MM2 on a dedicated cluster to synchronize data across clusters.
  • Run a single MirrorSourceConnector job, which is suitable for test scenarios.
Note We recommend that you use the first method. If you use this method, you can use the reset service provided by the Kafka Connect cluster to manage Kafka MM2 jobs.

For more information about Kafka MM2, see Apache Kafka documentation.

Prerequisites

The source Kafka cluster named emrsource and the destination Kafka cluster named emrdest are created. Kafka is selected as Optional Services (Select One At Least) when the clusters are created. For more information, see Create a cluster.
Note In this topic, the E-MapReduce (EMR) versions of the source and destination clusters are V3.42.0. Both clusters are of the DataFlow type and reside in the same virtual private cloud (VPC).

Limits

The Kafka version of the destination cluster must be 2.12_2.4.1 or later.

Procedure

  1. Step 1: Create a Kafka Connect cluster in the destination Kafka cluster
  2. Step 2: Use the Kafka MM2 connector

Step 1: Create a Kafka Connect cluster in the destination Kafka cluster

  1. Create a node group.
    On the Nodes page of the emrdest cluster in the EMR console, create a node group.
    1. Click Create Node Group.
    2. In the Add machine group panel, configure the parameters described in the following table. Configure other parameters as required.
      ParameterDescription
      Node Group TypeSelect TASK (Task Node Group).
      Node Group NameIn this example, the name is emr-task.
      Storage ConfigurationSelect a data disk.
  2. Scale out the node group.
    1. On the Nodes page, find the emr-task node group and click Scale Out in the Actions column.
    2. In the dialog box that appears, specify the number of instances that you want to add and read and select Terms of Service.
      In this example, one instance is added. You can specify the number of instances as required. If you require a high-availability Kafka Connect cluster, we recommend that you add more than two instances.
    3. Click OK.
  3. Check the status of the Kafka Connect cluster to ensure that the cluster is running.
    1. In the upper part of the page, click Services.
    2. In the section about Kafka details, click Status.
    3. In the Components section, check the status of the KafkaConnect to ensure the component is running.
      KafkaConnect
  4. Use Secure Shell (SSH) to log on to the emrdest cluster. For more information, see Log on to a cluster.
  5. Run the following command to check the status of the reset service provided by the Kafka Connect cluster:
    curl -X GET http://task-1-1:8083| jq .
    Information similar to the following output is returned:
      % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                     Dload  Upload   Total   Spent    Left  Speed
    100    91  100    91    0     0  13407      0 --:--:-- --:--:-- --:--:-- 15166
    {
      "version": "2.4.1",
      "commit": "42ce056344c5625a",
      "kafka_cluster_id": "6Z7IdHW4SVO1Pbql4c****"
    }

Step 2: Use the Kafka MM2 connector

  1. Prepare the configuration files of the Kafka MM2 connector.
    The following files are included:
    • The configuration file of MirrorSourceConnector
      In this example, the file name is mm2-source-connector.json. The following sample code describes the content of the file. Modify the parameters in the file as required. For more information, see KIP-382: MirrorMaker 2.0.
      {
        "name": "mm2-source-connector",
        "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
        "clusters": "emrsource,emrdest",
        "source.cluster.alias": "emrsource",
        "target.cluster.alias": "emrdest",
        "target.cluster.bootstrap.servers": "core-1-1:9092;core-1-2:9092;core-1-3:9092",
        "source.cluster.bootstrap.servers": "10.0.**.**:9092",
        "topics": "^foo.*",
        "tasks.max": "4",
        "key.converter": " org.apache.kafka.connect.converters.ByteArrayConverter",
        "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
        "replication.factor": "3",
        "offset-syncs.topic.replication.factor": "3",
        "sync.topic.acls.interval.seconds": "20",
        "sync.topic.configs.interval.seconds": "20",
        "refresh.topics.interval.seconds": "20",
        "refresh.groups.interval.seconds": "20",
        "consumer.group.id": "mm2-mirror-source-consumer-group",
        "producer.enable.idempotence":"true",
        "source.cluster.security.protocol": "PLAINTEXT",
        "target.cluster.security.protocol": "PLAINTEXT"
      }
      Note Parameters in the sample code:
      • source.cluster.bootstrap.servers: the Kafka service endpoint in the emrsource cluster. Make sure that the emrsource cluster and the Kafka Connect cluster can be connected.
      • topics: The topics to be replicated. In this example, topics whose names start with foo are to be replicated.
    • The configuration file of MirrorCheckpointConnector
      In this example, the file name is mm2-checkpoint-connector.json. The following sample code describes the content of the file. Modify the parameters in the file as required. For more information, see KIP-382: MirrorMaker 2.0.
      {
          "name": "mm2-checkpoint-connector",
          "connector.class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
          "clusters": "emrsource,emrdest",
          "source.cluster.alias": "emrsource",
          "target.cluster.alias": "emrdest",
          "target.cluster.bootstrap.servers": "core-1-1:9092;core-1-2:9092;core-1-3:9092",
          "source.cluster.bootstrap.servers": "10.0.**.**:9092",
          "tasks.max": "1",
          "key.converter": " org.apache.kafka.connect.converters.ByteArrayConverter",
          "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
          "replication.factor": "3",
          "checkpoints.topic.replication.factor": "3",
          "emit.checkpoints.interval.seconds": "20",
          "source.cluster.security.protocol": "PLAINTEXT",
          "target.cluster.security.protocol": "PLAINTEXT"
        }
    • The configuration file of MirrorHeartbeatConnector
      In this example, the file name is mm2-heartbeat-connector.json. The following sample code describes the content of the file. Modify the parameters in the file as required. For more information, see KIP-382: MirrorMaker 2.0.
      {
          "name": "mm2-heartbeat-connector",
          "connector.class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
          "clusters": "emrsource,emrdest",
          "source.cluster.alias": "emrsource",
          "target.cluster.alias": "emrdest",
          "target.cluster.bootstrap.servers": "core-1-1:9092;core-1-2:9092;core-1-3:9092",
          "source.cluster.bootstrap.servers": "10.0.**.**:9092",
          "tasks.max": "1",
          "key.converter": " org.apache.kafka.connect.converters.ByteArrayConverter",
          "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
          "replication.factor": "3",
          "heartbeats.topic.replication.factor": "3",
          "emit.heartbeats.interval.seconds": "20",
          "source.cluster.security.protocol": "PLAINTEXT",
          "target.cluster.security.protocol": "PLAINTEXT"
        }
  2. Use MirrorSourceConnector.
    1. Use the rest service to create a MirrorSourceConnector job based on the mm2-source-connector.json file.
      curl -X PUT -H "Content-Type: application/json" --data @mm2-source-connector.json http://task-1-1:8083/connectors/mm2-source-connector/config
    2. View the status of the MirrorSourceConnector.
      curl -s task-1-1:8083/connectors/mm2-source-connector/status | jq .
  3. Use MirrorCheckpointConnector.
    1. Use the rest service to create a MirrorCheckpointConnector job based on the mm2-checkpoint-connector.json file.
      curl -X PUT -H "Content-Type: application/json" --data @mm2-checkpoint-connector.json http://task-1-1:8083/connectors/mm2-checkpoint-connector/config
    2. View the status of MirrorCheckpointConnector.
      curl -s task-1-1:8083/connectors/mm2-checkpoint-connector/status | jq .
  4. Use MirrorHeartbeatConnector.
    1. Use the rest service to create a MirrorHeartbeatConnector job based on the mm2-heartbeat-connector.json file.
      curl -X PUT -H "Content-Type: application/json" --data @mm2-heartbeat-connector.json http://task-1-1:8083/connectors/mm2-heartbeat-connector/config
    2. View the status of MirrorHeartbeatConnector.
      curl -s task-1-1:8083/connectors/mm2-heartbeat-connector/status | jq .
  5. Run the following command to view topics related to Kafka MM2 in the emrdest cluster.
    kafka-topics.sh --list --bootstrap-server core-1-1:9092
    In this example, you can view the following topics:
    • Topics whose names start with foo and reside in the emrsource cluster. The topics are created by MirrorSourceConnector.

      The topics are existing topics in the emrsource cluster and are to be replicated.

    • The emrsource.checkpoints.internal topic that is created by MirrorCheckpointConnector and used to store the offset information.
    • The heartbeats topic created by MirrorHeartbeatConnector.