All Products
Search
Document Center

ApsaraMQ for Kafka:Replicate data between Apache Kafka clusters with Replicator

Last Updated:Mar 11, 2026

When you run multiple Apache Kafka clusters, you often need to copy topic data from one cluster to another for disaster recovery, data migration, or aggregation. Replicator handles this by continuously reading from an origin cluster and writing to a destination cluster. This tutorial walks you through setting up two single-node clusters and running Replicator to copy topic data between them.

Instructions for both KRaft mode and ZooKeeper mode are included.

Note

As of Confluent Platform 7.5, ZooKeeper is deprecated for new deployments. Use Apache Kafka Raft (KRaft) mode instead. See KRaft Overview and Quick Start for Confluent Platform.

The KRaft examples use combined mode, where a single Kafka node acts as both broker and KRaft controller. Combined mode simplifies the setup but is not supported for production workloads. To run brokers and controllers on separate nodes, use isolated mode. See KRaft Overview and KRaft mode.

Prerequisites

Before you begin, ensure that you have:

Port assignments

This tutorial runs both clusters on the same host. Each cluster uses different ports to avoid conflicts.

KRaft mode

ComponentOriginDestination
Kafka brokers90829092
KRaft controllers90719093
Metadata server listeners (in brokers)80918090
Connect Replicator worker--8083
Control Center--9021

ZooKeeper mode

ComponentOriginDestination
Kafka brokers90829092
ZooKeeper21712181
Metadata server listeners (in brokers)80918090
Connect Replicator worker--8083
Control Center--9021

Step 1: Start the destination cluster

The destination cluster uses default ports.

KRaft mode

  1. Go to the Confluent Platform installation directory and create a working directory for the example files:

       cd $CONFLUENT_HOME
       mkdir my-examples
  2. Copy the KRaft server properties template and rename it:

       cp etc/kafka/kraft/server.properties my-examples/server_destination.properties
  3. Generate a random cluster ID:

    Note

    The kafka-storage command runs only once per broker and controller. It cannot update an existing cluster. If you make a configuration mistake, delete the log directories and start over.

       KAFKA_CLUSTER_ID="$(bin/kafka-storage random-uuid)"
  4. Format the log directories for the destination cluster:

       ./bin/kafka-storage format -t $KAFKA_CLUSTER_ID -c my-examples/server_destination.properties
  5. Start the destination cluster:

       ./bin/kafka-server-start my-examples/server_destination.properties

For details, see Quick Start for Confluent Platform.

ZooKeeper mode

  1. Go to the Confluent Platform installation directory and create a working directory:

       cd $CONFLUENT_HOME
       mkdir my-examples
  2. Copy the ZooKeeper and Kafka server properties templates:

       cp etc/kafka/zookeeper.properties my-examples/zookeeper_destination.properties
       cp etc/kafka/server.properties my-examples/server_destination.properties
  3. Start ZooKeeper in its own terminal:

       ./bin/zookeeper-server-start my-examples/zookeeper_destination.properties
  4. Start the Kafka broker in its own terminal:

       ./bin/kafka-server-start my-examples/server_destination.properties

Step 2: Start the origin cluster

Open a new terminal window. The origin cluster uses different ports to avoid conflicts with the destination.

KRaft mode

  1. Go to the Confluent Platform installation directory:

       cd $CONFLUENT_HOME
  2. Copy the KRaft server properties template:

       cp etc/kafka/kraft/server.properties my-examples/server_origin.properties
  3. Update port numbers and data directories to avoid conflicts with the destination: These commands remap the origin broker to port 9082, the KRaft controller to 9071, and the metadata server listener to 8091. See Port assignments for the full mapping.

       sed -i '' -e "s/9093/9071/g" my-examples/server_origin.properties
       sed -i '' -e "s/9092/9082/g" my-examples/server_origin.properties
       sed -i '' -e "s/8090/8091/g" my-examples/server_origin.properties
       sed -i '' -e "s/#confluent.metadata.server.listeners/confluent.metadata.server.listeners/g" my-examples/server_origin.properties
       sed -i '' -e "s/confluent.metrics.reporter.bootstrap.servers=localhost:9092/confluent.metrics.reporter.bootstrap.servers=localhost:9082/g" my-examples/server_origin.properties
       sed -i '' -e "s/kraft-combined-logs/kraft-combined-logs-origin/g" my-examples/server_origin.properties
  4. Generate a new cluster ID and format the log directories:

       KAFKA_CLUSTER_ID="$(bin/kafka-storage random-uuid)"
       ./bin/kafka-storage format -t $KAFKA_CLUSTER_ID -c my-examples/server_origin.properties
  5. Start the origin broker in its own terminal:

       ./bin/kafka-server-start my-examples/server_origin.properties

ZooKeeper mode

  1. Copy the ZooKeeper and Kafka server properties templates:

       cp etc/kafka/zookeeper.properties my-examples/zookeeper_origin.properties
       cp etc/kafka/server.properties my-examples/server_origin.properties
  2. Update port numbers, broker ID, and data directories: These commands remap the origin broker to port 9082 and ZooKeeper to 2171. See Port assignments for the full mapping.

       sed -i '' -e "s/2181/2171/g" my-examples/zookeeper_origin.properties
       sed -i '' -e "s/9092/9082/g" my-examples/server_origin.properties
       sed -i '' -e "s/2181/2171/g" my-examples/server_origin.properties
       sed -i '' -e "s/#listeners/listeners/g" my-examples/server_origin.properties
       sed -i '' -e "s/8090/8091/g" my-examples/server_origin.properties
       sed -i '' -e "s/#confluent.metadata.server.listeners/confluent.metadata.server.listeners/g" my-examples/server_origin.properties
       sed -i '' -e "s/broker.id=0/broker.id=1/g" my-examples/server_origin.properties
       sed -i '' -e "s/zookeeper/zookeeper_origin/g" my-examples/zookeeper_origin.properties
       sed -i '' -e "s/kafka-logs/kafka-logs-origin/g" my-examples/server_origin.properties
  3. Start ZooKeeper for the origin cluster in its own terminal:

       ./bin/zookeeper-server-start my-examples/zookeeper_origin.properties
  4. Start the origin Kafka broker in its own terminal:

       ./bin/kafka-server-start my-examples/server_origin.properties

Step 3: Create a test topic

Open a new terminal window.

  1. Create a topic named test-topic on the origin cluster:

       kafka-topics --create --topic test-topic --replication-factor 1 --partitions 1 --bootstrap-server localhost:9082
  2. Verify that the topic exists: Expected output: Topics prefixed with _confluent are internal. The test-topic entry confirms successful creation.

       kafka-topics --list --bootstrap-server localhost:9082
       __confluent.support.metrics
       _confluent-command
       test-topic

Step 4: Configure and run Replicator

Create three configuration files in $CONFLUENT_HOME/my-examples/ to define the origin, destination, and replication settings.

Consumer properties (origin cluster)

Copy the default consumer configuration and update it to point to the origin cluster:

cp etc/kafka/consumer.properties my-examples/.

Verify that the file contains:

# Origin cluster connection
bootstrap.servers=localhost:9082

Producer properties (destination cluster)

Copy the default producer configuration and update it to point to the destination cluster:

cp etc/kafka/producer.properties my-examples/.

Verify that the file contains:

# Destination cluster connection
bootstrap.servers=localhost:9092

Replication properties

Create my-examples/replication.properties with the following content:

# Replication configuration
topic.rename.format=${topic}.replica
replication.factor=1
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
confluent.topic.replication.factor=1
Note
  • If no port is defined in replication.properties, the Connect worker runs on its default port 8083.

  • All replication factors are set to 1 for this test environment. For production, use a minimum replication factor of 3.

Start Replicator

Start the Replicator executable in its own terminal:

./bin/replicator \
  --cluster.id replicator \
  --consumer.config my-examples/consumer.properties \
  --producer.config my-examples/producer.properties \
  --replication.config my-examples/replication.properties \
  --whitelist 'test-topic'
ParameterDescription
--cluster.idIdentifier for the Replicator cluster. Instances with the same ID work together.
--consumer.configPath to the origin cluster configuration
--producer.configPath to the destination cluster configuration
--replication.configPath to non-connection-specific configuration. Command-line arguments override these settings.
--whitelistTopics to replicate from origin to destination

Log messages about starting the source task and creating the replicated topic confirm that Replicator is running. For the full list of command-line options, see Command line parameters of Replicator executable.

Step 5: Verify topic replication

After Replicator starts, it checks the origin cluster for topics to replicate.

  1. Verify that the test-topic.replica topic exists on the destination cluster: Expected output:

       ./bin/kafka-topics --describe --topic test-topic.replica --bootstrap-server localhost:9092
       Topic: test-topic.replica    PartitionCount: 1       ReplicationFactor: 1    Configs: message.timestamp.type=CreateTime,segment.bytes=1073741824
             Topic: test-topic.replica      Partition: 0    Leader: 0       Replicas: 0     Isr: 0  Offline: 0
  2. List all topics on the destination cluster:

    Note
    • To list topics on the origin cluster, use --bootstrap-server localhost:9082.

    • To view details of the original topic, run kafka-topics --describe --topic test-topic --bootstrap-server localhost:9082.

       ./bin/kafka-topics --list --bootstrap-server localhost:9092
  3. Produce test data to the origin cluster. In a new terminal, pipe a sequence of 10,000 numbers into the console producer:

       seq 10000 | ./bin/kafka-console-producer --topic test-topic --broker-list localhost:9082
  4. Consume from the replicated topic on the destination cluster in a separate terminal: If the numbers 1 through 10,000 appear in the consumer output, cross-cluster replication is working.

       ./bin/kafka-console-consumer --from-beginning --topic test-topic.replica --bootstrap-server localhost:9092

Step 6: Monitor Replicator with Control Center

Set up monitoring

  1. Stop Replicator and brokers on both clusters, then stop any ZooKeeper instances.

  2. Activate the Replicator monitoring extension (see Replicator monitoring extension):

    • Add the full path of replicator-rest-extension-<version>.jar to your CLASSPATH.

    • Add the following line to my-examples/replication.properties:

      rest.extension.classes=io.confluent.connect.replicator.monitoring.ReplicatorMonitoringExtension
  3. Add or uncomment the following lines in both my-examples/server_destination.properties and my-examples/server_origin.properties. Set confluent.metrics.reporter.bootstrap.servers to localhost:9092 in both files:

       # Development mode -- replication factor of 1
       confluent.metrics.reporter.topic.replicas=1
       # Enable metrics reporting for Control Center
       metric.reporters=io.confluent.metrics.reporter.ConfluentMetricsReporter
       confluent.metrics.reporter.bootstrap.servers=localhost:9092
  4. Add the monitoring interceptor to my-examples/producer.properties:

       interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
  5. Add the monitoring interceptor to my-examples/consumer.properties:

       interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
  6. Add the following lines to etc/confluent-control-center/control-center-dev.properties to specify the bootstrap servers for both clusters:

    Note
    • Control Center requires the Connect REST endpoint to find Replicator monitoring metrics. In control-center-dev.properties, the default is configured as:

      confluent.controlcenter.connect.cluster=http://localhost:8083

      Separate multiple host names with commas.

    • If you use control-center-production.properties, the default value is commented out. Uncomment it or specify your Connect endpoints. For details, see General settings.

    • If you run both Replicator and Connect clusters, specify them separately:

      • Connect

        # Connect cluster
        confluent.controlcenter.connect.<connect-cluster-name>.cluster=http://connect-host-1:8083
      • Replicator

        confluent.controlcenter.connect.<replicator-name>.cluster=http://replicator-host:8083
       # Multi-cluster monitoring
       confluent.controlcenter.kafka.origin.bootstrap.servers=localhost:9082
       confluent.controlcenter.kafka.destination.bootstrap.servers=localhost:9092

Restart services and launch Control Center

  1. Restart ZooKeeper instances (ZooKeeper mode only):

       ./bin/zookeeper-server-start etc/kafka/zookeeper.properties
       ./bin/zookeeper-server-start my-examples/zookeeper_origin.properties
  2. Restart the brokers on both clusters:

       ./bin/kafka-server-start my-examples/server_destination.properties
       ./bin/kafka-server-start my-examples/server_origin.properties
  3. Restart Replicator:

       ./bin/replicator \
         --cluster.id replicator \
         --consumer.config my-examples/consumer.properties \
         --producer.config my-examples/producer.properties \
         --replication.config my-examples/replication.properties \
         --whitelist 'test-topic'
  4. Launch Control Center:

        ./bin/control-center-start etc/confluent-control-center/control-center-dev.properties
  5. Open Control Center in your browser. The clusters appear with auto-generated names based on your configuration.

    Control Center cluster view

  6. (Optional) Edit the cluster names to match your use case.

  7. Select the destination cluster, then click Replicators in the left navigation pane to monitor replication throughput and drill down into source and replicated topics.

    Replicators monitoring view

Generate test data for monitoring

To view message flow on Control Center, generate test data with kafka-producer-perf-test:

kafka-producer-perf-test \
  --producer-props bootstrap.servers=localhost:9082 \
  --topic test-topic \
  --record-size 1000 \
  --throughput 1000 \
  --num-records 3600000

Expected output:

4999 records sent, 999.8 records/sec (0.95 MB/sec), 1.1 ms avg latency, 240.0 ms max latency.
5003 records sent, 1000.2 records/sec (0.95 MB/sec), 0.5 ms avg latency, 4.0 ms max latency.
5003 records sent, 1000.2 records/sec (0.95 MB/sec), 0.6 ms avg latency, 5.0 ms max latency.
...

Confirm that the replicated topic receives the messages:

./bin/kafka-console-consumer --from-beginning --topic test-topic.replica --bootstrap-server localhost:9092
Replica topic messages on Control Center

For more about monitoring, see Manage Replicator Using Control Center for Confluent Platform.

Clean up

After you finish the tutorial:

  1. Stop producers and consumers by pressing Ctrl+C in each terminal window.

  2. Stop services in reverse order: Control Center, then Replicator, then Kafka brokers, then ZooKeeper (if applicable).