All Products
Search
Document Center

ApsaraMQ for Kafka:Use Replicator to replicate data between Apache Kafka clusters

Last Updated:Mar 18, 2024

This topic describes how to start two Apache Kafka clusters and then a Replicator process to replicate data between the clusters.

Background information

As of Confluent Platform 7.5, ZooKeeper is deprecated for new deployments. Confluent recommends that you deploy Confluent Platform in Apache Kafka Raft (KRaft) mode. For information about how to run Apache Kafka in KRaft mode, see KRaft Overview and Quick Start for Confluent Platform.

In the examples of this topic, sample code for deployment in KRaft mode and ZooKeeper mode is provided.

For KRaft, the sample code for the combined mode is provided. In this mode, a Kafka node acts as a broker and a KRaft controller. The combination mode is not supported for production workloads. It is used in the sample code of this topic to simplify the procedure. If you want to run a broker and a controller on different Kafka nodes, use the isolated mode in Confluent Platform. To learn more, see KRaft Overview and KRaft mode.

Prerequisites

Ports for Kafka brokers and Confluent Platform components

KRaft mode

Component

Origin

Destination

Kafka brokers

9082

9092

KRaft controllers

9071

9093

Metadata server listeners (in brokers)

8091

8090

Connect Replicator worker

8083

Control Center

9021

ZooKeeper mode

Component

Origin

Destination

Kafka brokers

9082

9092

ZooKeeper

2171

2181

Metadata server listeners (in brokers)

8091

8090

Connect Replicator worker

8083

Control Center

9021

Start the destination cluster

KRaft mode

  1. Run the following command to go to the path where you want to install Confluent Platform:

    cd $CONFLUENT_HOME
  2. Run the following command to create a directory that is used to store all example files:

    mkdir my-examples
  3. Run the following command to copy the etc/kafka/kraft/server.properties file to the examples directory. Then, rename the file to match its purpose.

    cp etc/kafka/kraft/server.properties my-examples/server_destination.properties
  4. Run the following command to use the kafka-storage tool to generate a random Universally Unique Identifier (UUID):

    KAFKA_CLUSTER_ID="$(bin/kafka-storage random-uuid)"
    Note

    The kafka-storage command is run only once per broker and controller. You cannot use this command to update an existing cluster. If you make a configuration mistake in this step, you must recreate the directories from scratch and perform the steps again.

  5. Run the following command to format the log directories for the destination cluster:

    ./bin/kafka-storage format -t $KAFKA_CLUSTER_ID -c my-examples/server_destination.properties
  6. Run the following command to start the destination cluster:

    ./bin/kafka-server-start my-examples/server_destination.properties

    For more information, see Quick Start for Confluent Platform.

Zookeeper mode

  1. Run the following command to switch to the path where Confluent Platform is installed:

    cd $CONFLUENT_HOME
  2. Run the following command to create a directory to store all example files:

    mkdir my-examples
  3. Run the following command to copy the etc/kafka/zookeeper.properties file to the examples directory. Then, rename the file to match its purpose.

    cp etc/kafka/zookeeper.properties my-examples/zookeeper_destination.properties
  4. Run the following command to copy the etc/kafka/kraft/server.properties file to the examples directory. Then, rename the file to match its purpose.

    cp etc/kafka/server.properties my-examples/server_destination.properties
  5. Start a ZooKeeper server. In this example, services run on localhost. Run the following command to start ZooKeeper in its own terminal:

    ./bin/zookeeper-server-start my-examples/zookeeper_destination.properties
  6. Start a Kafka broker to serve as the single node Kafka cluster for the destination. Run the following command to start Kafka in its own terminal:

    ./bin/kafka-server-start my-examples/server_destination.properties                                          ./bin/kafka-server-start my-examples/server_destination.properties                                                            

Start the origin cluster

Configure and start the origin cluster in a new terminal window.

KRaft mode

Note

When you configure the destination node to run on default ports, you must run the origin node on a different port to prevent conflicts. For example, if the Kafka broker on the origin node is configured on port 9082, the controller can be configured on port 9071. For information about port mappings, see the "Ports for Kafka brokers and Confluent Platform components" section of this topic. You can copy the configuration files of the origin cluster to a temporary location and perform the following steps to modify them to prevent conflicts with the destination cluster.

  1. Run the following command to switch to the path where Confluent Platform is installed:

    cd $CONFLUENT_HOME
  2. Run the following command to copy the etc/kafka/kraft/server.properties file to the examples directory. Then, rename the file to match its purpose.

    cp etc/kafka/kraft/server.properties my-examples/server_origin.properties
  3. Run the following commands to update the port numbers:

    sed -i '' -e "s/9093/9071/g" my-examples/server_origin.properties
    sed -i '' -e "s/9092/9082/g" my-examples/server_origin.properties
    sed -i '' -e "s/8090/8091/g" my-examples/server_origin.properties
    sed -i '' -e "s/#confluent.metadata.server.listeners/confluent.metadata.server.listeners/g" my-examples/server_origin.properties
    sed -i '' -e "s/confluent.metrics.reporter.bootstrap.servers=localhost:9092/confluent.metrics.reporter.bootstrap.servers=localhost:9082/g" my-examples/server_origin.properties
  4. Run the following command to update data directories:

    sed -i '' -e "s/kraft-combined-logs/kraft-combined-logs-origin/g" my-examples/server_origin.properties
  5. Use the kafka-storage tool to generate a random UUID.

    KAFKA_CLUSTER_ID="$(bin/kafka-storage random-uuid)"
  6. Run the following command to format the log directories for the origin cluster:

    ./bin/kafka-storage format -t $KAFKA_CLUSTER_ID -c my-examples/server_origin.properties
  7. Start a Kafka broker to serve as the single node Kafka cluster for the origin. Run the following command to start Kafka in its own terminal:

    ./bin/kafka-server-start my-examples/server_origin.properties

Zookeeper mode

Note

When you configure the destination cluster to run on default ports, you must run the origin cluster on a different port to prevent conflicts. For example, if the Kafka broker in the origin cluster is configured on port 9082, ZooKeeper can be configured on 2171. For information about port mappings, see the "Ports for Kafka brokers and Confluent Platform components" section of this topic. You can copy the configuration files of the origin cluster to your examples directory and perform the following steps to modify them to prevent conflicts with the destination cluster.

  1. Run the following commands to copy the configuration files to my-examples:

    cp etc/kafka/zookeeper.properties my-examples/zookeeper_origin.properties
    cp etc/kafka/server.properties my-examples/server_origin.properties
  2. Run the following commands to update the port numbers:

    sed -i '' -e "s/2181/2171/g" my-examples/zookeeper_origin.properties
    sed -i '' -e "s/9092/9082/g" my-examples/server_origin.properties
    sed -i '' -e "s/2181/2171/g" my-examples/server_origin.properties
    sed -i '' -e "s/#listeners/listeners/g" my-examples/server_origin.properties
    sed -i '' -e "s/8090/8091/g" my-examples/server_origin.properties
    sed -i '' -e "s/#confluent.metadata.server.listeners/confluent.metadata.server.listeners/g" my-examples/server_origin.properties
  3. Run the following command to update the broker ID of the origin cluster:

    sed -i '' -e "s/broker.id=0/broker.id=1/g" my-examples/server_origin.properties
  4. Run the following commands to update data directories:

    sed -i '' -e "s/zookeeper/zookeeper_origin/g" my-examples/zookeeper_origin.properties
    sed -i '' -e "s/kafka-logs/kafka-logs-origin/g" my-examples/server_origin.properties
  5. Start the origin cluster.

    • Run the following command to start ZooKeeper in its own terminal:

      ./bin/zookeeper-server-start my-examples/zookeeper_origin.properties
    • Run the following command to start Kafka in its own terminal:

      ./bin/kafka-server-start my-examples/server_origin.properties

Create a topic

Open a new command window to run Kafka commands.

  1. Run the following command to create a topic named test-topic in the origin cluster:

    kafka-topics --create --topic test-topic --replication-factor 1 --partitions 1 --bootstrap-server localhost:9082
  2. Run the following command to check whether the topic is created:

    kafka-topics --list --bootstrap-server localhost:9082

    If an output similar to the following one is displayed, the topic is created. Note that a topic whose name starts with _confluent is an internal topic.

    __confluent.support.metrics
    _confluent-command
    test-topic

    When you configure and run Replicator, the test-topic topic is replicated to the destination cluster that runs on port 2181.

Configure and run Replicator

Create the following files in $CONFLUENT_HOME/my-examples/.

  1. Run the following command to configure the origin cluster in a file named consumer.properties:

    cp etc/kafka/consumer.properties my-examples/.

    Edit the file and make sure it contains the addresses of brokers from the origin cluster. The default broker list matches the origin cluster that you started earlier.

    # Origin cluster connection configuration
    bootstrap.servers=localhost:9082
  2. Run the following command to configure the destination cluster in a new file named producer.properties:

    cp etc/kafka/producer.properties my-examples/.

    Edit the file and make sure it contains the addresses of brokers from the destination cluster. The default broker list matches the destination cluster that you started earlier.

    # Destination cluster connection configuration
    bootstrap.servers=localhost:9092
  3. Run the following commands to define the Replicator configuration in a new file named replication.properties for the Connect worker:

    # Replication configuration
    topic.rename.format=${topic}.replica
    replication.factor=1
    config.storage.replication.factor=1
    offset.storage.replication.factor=1
    status.storage.replication.factor=1
    confluent.topic.replication.factor=1
    Note
    • If no port is defined in the replication.properties file, this worker runs on its default port 8083. The port is the desired configuration for this deployment.

    • The replication factor properties, which are all set to 1, are used because these test clusters are small. In production environments, we recommend that you specify the minimum cluster size as 3.

Start Replicator

Run the following command to start the Replicator executable in its own terminal:

./bin/replicator --cluster.id replicator --consumer.config my-examples/consumer.properties --producer.config my-examples/producer.properties --replication.config my-examples/replication.properties --whitelist 'test-topic'
  • --cluster.id: an identifier that is used to determine which Replicator cluster this executable joins. Multiple Replicator executable instances with the same cluster.id work together.

  • --consumer.config: the path to the origin cluster configuration.

  • --producer.config: the path to the destination cluster configuration.

  • --replication.config: the path to a file that contains any non-connection specific configuration. Command line arguments overwrite these configurations.

  • --whitelist: a list of topics to replicate from origin to destination.

For information about a full list of command line options, see Command line parameters of Replicator executable. Query messages that are related to starting the source task and creating the replicated topic. These messages indicate Replicator is up and running and is copying topics.

Verify topic replication between clusters

After Replicator is initiated, it checks the origin cluster for topics that need to be replicated.

Run the following command to check whether the test-topic.replica topic exists:

./bin/kafka-topics --describe --topic test-topic.replica --bootstrap-server localhost:9092

Expected output:

./bin/kafka-topics --describe --topic test-topic.replica --bootstrap-server localhost:9092
Topic: test-topic.replica    PartitionCount: 1       ReplicationFactor: 1    Configs: message.timestamp.type=CreateTime,segment.bytes=1073741824
      Topic: test-topic.replica      Partition: 0    Leader: 0       Replicas: 0     Isr: 0  Offline: 0

You can also list and describe the topics on the destination cluster. Example: test-topic.replica.

./bin/kafka-topics --list --bootstrap-server localhost:9092
Note
  • To list topics on the origin cluster, run kafka-topics --list against localhost:9082.

  • To view the description of the original topic, run kafka-topics --describe and then look for test-topic and target localhost:9082.

After you create the topic in the source cluster, you can send data to the test-topic topic in the origin cluster by using a Kafka producer. Then, you can confirm that the data is replicated by consuming from the test-topic.replica topic in the destination cluster. For example, to send a sequence of numbers by using a producer in the Kafka console, run the following command in a new terminal window:

seq 10000 | ./bin/kafka-console-producer --topic test-topic --broker-list localhost:9082

You can confirm delivery in the destination cluster by using a consumer in the Kafka console in its own terminal window:

./bin/kafka-console-consumer --from-beginning --topic test-topic.replica --bootstrap-server localhost:9092

If the numbers 1 to 10,000 appear in the consumer output, multi-cluster replication is created.

Use Control Center to monitor Replicator

  1. Stop Replicator and brokers on both the origin and destination clusters. Then, stop the ZooKeeper instances.

  2. For information about how to activate the monitoring extension for Replicator, see Replicator monitoring extension.

    • Add the full path of replicator-rest-extension-<version>.jar to your CLASSPATH.

    • Add rest.extension.classes=io.confluent.connect.replicator.monitoring.ReplicatorMonitoringExtension to my-examples/replication.properties.

  3. Uncomment or add the following lines to the Kafka configuration files for both the destination and origin: my-examples/server_destination.properties and my-examples/server_origin.properties. The configuration for confluent.metrics.reporter.bootstrap.servers must point to localhost on port 9092 in both files. Therefore, you need to edit one or both of these port numbers.

    # Your deployment is in development mode and a replication factor of 1 is used. 
    confluent.metrics.reporter.topic.replicas=1
    # Enable metrics reporting on Control Center and provide access to the Confluent internal topic that collects and stores the monitoring data. 
    metric.reporters=io.confluent.metrics.reporter.ConfluentMetricsReporter
    confluent.metrics.reporter.bootstrap.servers=localhost:9092
  4. Edit my-examples/producer.properties to add the monitoring interceptor for the producer.

    # Monitoring interceptor for producer
    interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
  5. Edit my-examples/consumer.properties to add the monitoring interceptor for the consumer.

    # Monitoring interceptor for consumer
    interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
  6. Edit etc/confluent-control-center/control-center-dev.properties to add the following two lines that specify origin and destination bootstrap servers for Control Center.

    # multi-cluster monitoring
    confluent.controlcenter.kafka.origin.bootstrap.servers=localhost:9082
    confluent.controlcenter.kafka.destination.bootstrap.servers=localhost:9092
    Note
    • Control Center requires the host and port of the Connect REST endpoint to know where to look for Replicator monitoring metrics. In the configuration file control-center-dev.properties that is used for this example, the location is configured on the default port.

      # Separate multiple host names with commas (,). 
      confluent.controlcenter.connect.cluster=http://localhost:8083
    • The default value of the production-ready configuration file control-center-production.properties is commented out. If you use this file, have multiple connectors, or want to configure Connect clusters in different ways, you must specify the Connect endpoints. To specify the Connect endpoints, you can uncomment the default or specify hosts for your own Connect clusters. For more information, see General settings.

    • If you run both Replicator and Connect clusters in your deployment, you must specify Replicator and Connect clusters separately.

      • Connect

        cluster: confluent.controlcenter.connect.<connect-cluster-name>.cluster=http://connect-host-1:8083
      • Replicator

        confluent.controlcenter.connect.<replicator-name>.cluster=http://replicator-host:8083
  7. Run the following commands to restart the ZooKeeper instances on the destination and origin clusters:

    ./bin/zookeeper-server-start etc/kafka/zookeeper.properties
    ./bin/zookeeper-server-start my-examples/zookeeper_origin.properties
  8. Run the following commands to restart the brokers on the destination and origin clusters:

    ./bin/kafka-server-start my-examples/server_destination.properties
    ./bin/kafka-server-start my-examples/server_origin.properties
  9. Run the following command to restart Replicator and the Connector worker:

    ./bin/replicator --cluster.id replicator --consumer.config my-examples/consumer.properties --producer.config my-examples/producer.properties --replication.config my-examples/replication.properties --whitelist 'test-topic'
  10. Run the following command to launch Control Center:

    ./bin/control-center-start etc/confluent-control-center/control-center-dev.properties
  11. Open Control Center in your browser. The clusters are rendered on Control Center with an auto-generated name based on your configuration.image

  12. (Optional) On Control Center, edit the cluster names to suit your use case.

  13. On Control Center, select the destination cluster. In the left-side navigation pane, click Replicators. Then, use Control Center to monitor replication performance and drill down on source and replicated topics.image

    To view messages produced to both the original and replicated topic on Control Center, run the kafka-consumer-perf-test command in the command window of Control Center to auto-generate test data to the test-topic topic.

    kafka-producer-perf-test \
       --producer-props bootstrap.servers=localhost:9082 \
       --topic test-topic \
       --record-size 1000 \
       --throughput 1000 \
       --num-records 3600000

    Expected output:

    4999 records sent, 999.8 records/sec (0.95 MB/sec), 1.1 ms avg latency, 240.0 ms max latency.
    5003 records sent, 1000.2 records/sec (0.95 MB/sec), 0.5 ms avg latency, 4.0 ms max latency.
    5003 records sent, 1000.2 records/sec (0.95 MB/sec), 0.6 ms avg latency, 5.0 ms max latency.
    5001 records sent, 1000.2 records/sec (0.95 MB/sec), 0.3 ms avg latency, 3.0 ms max latency.
    5001 records sent, 1000.0 records/sec (0.95 MB/sec), 0.3 ms avg latency, 4.0 ms max latency.
    5000 records sent, 1000.0 records/sec (0.95 MB/sec), 0.8 ms avg latency, 24.0 ms max latency.
    5001 records sent, 1000.2 records/sec (0.95 MB/sec), 0.6 ms avg latency, 3.0 ms max latency.
    ...

    Run the kafka-console-consumer command to consume these messages from the command line to verify whether the replica topic is receiving the messages.

    ./bin/kafka-console-consumer --from-beginning --topic test-topic.replica --bootstrap-server localhost:9092

    Verify whether the replica topic is receiving the messages on Control Center.image

For more information about monitoring Replicator in Control Center, see Manage Replicator Using Control Center for Confluent Platform.

After you complete your experiments by referring to this topic, make sure that you perform the following steps to clean up:

  • Stop producers and consumers by using Ctl-C in each command window.

  • Use Ctl-C in each command window to stop each service in reverse order in which you started them. That means you must first stop Control Center, then Replicator and Kafka brokers, and finally ZooKeeper.