All Products
Search
Document Center

ApsaraMQ for Kafka:Replicate data across Kafka clusters with Cluster Linking

Last Updated:Mar 17, 2026

When you operate multiple ApsaraMQ for Confluent clusters across regions, keeping topics, consumer group offsets, and access control lists (ACLs) in sync requires manual effort and custom tooling. Cluster Linking eliminates this overhead by connecting two clusters and replicating data from a source cluster to a destination cluster in real time.

Cluster Linking works through two abstractions:

  • A cluster link, initiated on the destination cluster, that establishes a connection to the source cluster.

  • Mirror topics on the destination cluster that replicate data from their source topics.

Beyond topic data, Cluster Linking also synchronizes consumer group offsets and ACLs. This makes it suitable for cross-region replication, disaster recovery, and cluster migration.

This guide walks through the end-to-end process: creating configuration files, preparing test data, setting up synchronization filters, creating a cluster link, and verifying the migration. All steps use the Confluent Platform command-line interface (CLI).

Prerequisites

Before you begin, make sure you have:

  • A source cluster and a destination cluster running ApsaraMQ for Confluent

  • An Elastic Compute Service (ECS) instance or other machine with network access to both clusters. For more information about how to create an instance, see Create and manage an ECS instance.

  • Confluent Platform 7.0.0 or later installed on the machine. For installation instructions, see Confluent Platform installation overview.

  • Java 8 or 11. For installation instructions, see Install JDK.

Step 1: Create configuration files

Create configuration files on the ECS instance to authenticate with the source and destination clusters.

Replace the following placeholders with your actual values:

PlaceholderDescriptionExample
<username>SASL username for the clusteradmin
<password>SASL password for the clusterpa$$w0rd
<source-cluster-address:port>Bootstrap server address of the source clustersource-kafka.example.com:9093

Source cluster configuration

Create a file named /tmp/source.config with the following content. This configuration enables automatic mirror topic creation, consumer offset synchronization, and ACL synchronization.

security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";
bootstrap.servers=<source-cluster-address:port>
auto.create.mirror.topics.enable=true
consumer.offset.sync.enable=true
acl.sync.enable=true
PropertyPurpose
auto.create.mirror.topics.enableAutomatically creates mirror topics on the destination cluster for matched source topics
consumer.offset.sync.enableSynchronizes consumer group offsets from the source to the destination cluster
acl.sync.enableSynchronizes ACL entries from the source to the destination cluster

Destination cluster configuration

Create a file named /tmp/destination.config with the following content:

security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";
Important

The /tmp directory is used here for simplicity. In production, store configuration files in a secure, persistent directory because /tmp files may be deleted during system maintenance, which can leave cluster links in an inconsistent state.

Step 2: Prepare test data on the source cluster

Set up a topic with sample data, a consumer group, and an ACL entry on the source cluster before creating the cluster link.

Replace <source-cluster-address:port> with the bootstrap server address of your source cluster.

Create a test topic

Create a single-partition topic named test-topic. A single partition makes it easier to observe message replication order.

kafka-topics --create --topic test-topic --partitions 1 \
  --bootstrap-server <source-cluster-address:port> \
  --command-config /tmp/source.config

To verify the topic was created, list and describe it:

# List all topics
kafka-topics --list \
  --bootstrap-server <source-cluster-address:port> \
  --command-config /tmp/source.config

# Describe the test topic
kafka-topics --describe --topic test-topic \
  --bootstrap-server <source-cluster-address:port> \
  --command-config /tmp/source.config

Produce test messages

Send five numbered messages to test-topic:

seq 1 5 | kafka-console-producer --topic test-topic \
  --bootstrap-server <source-cluster-address:port> \
  --producer.config /tmp/source.config

Consume and verify messages

Consume messages from the beginning and assign the consumer to a group named test-group:

kafka-console-consumer --topic test-topic --from-beginning \
  --bootstrap-server <source-cluster-address:port> \
  --group test-group \
  --consumer.config /tmp/source.config

Expected output:

1
2
3
4
5

To inspect the consumer group offset details:

# List consumer groups
kafka-consumer-groups --list \
  --bootstrap-server <source-cluster-address:port> \
  --command-config /tmp/source.config

# Describe offsets for test-group
kafka-consumer-groups --describe --offsets \
  --bootstrap-server <source-cluster-address:port> \
  --group test-group \
  --command-config /tmp/source.config

Add an ACL entry

Grant the READ permission on test-topic to user test-user:

kafka-acls --add \
  --allow-principal User:test-user \
  --operation READ --topic test-topic \
  --bootstrap-server <source-cluster-address:port> \
  --command-config /tmp/source.config

To verify the ACL was created:

kafka-acls --list \
  --bootstrap-server <source-cluster-address:port> \
  --command-config /tmp/source.config

Step 3: Configure data synchronization filters

This guide assumes that you use SASL_SSL to connect to the source and destination clusters, and that a certificate is used for domain name verification.

Create JSON filter files that specify which topics, consumer groups, and ACLs to replicate. These filters are passed to the cluster link creation command in the next step.

Topic filter

Create /tmp/topic_filter.json to select test-topic for replication:

{
  "topicFilters": [
    {
      "name": "test-topic",
      "patternType": "LITERAL",
      "filterType": "INCLUDE"
    }
  ]
}

Consumer group filter

Create /tmp/group.json to select test-group for offset synchronization:

{
  "groupFilters": [
    {
      "name": "test-group",
      "patternType": "LITERAL",
      "filterType": "INCLUDE"
    }
  ]
}

ACL filter

Create /tmp/acl.json to synchronize all ACL entries:

{
  "aclFilters": [
    {
      "resourceFilter": {
        "resourceType": "any",
        "patternType": "any"
      },
      "accessFilter": {
        "operation": "any",
        "permissionType": "any"
      }
    }
  ]
}

Step 4: Create the cluster link and synchronize data

Run the following command to create a cluster link named test-cluster-link on the destination cluster. This command applies the topic, consumer group, and ACL filters from the previous step.

Replace <destination-cluster-address:port> with the bootstrap server address of your destination cluster.

kafka-cluster-links --create --link test-cluster-link \
  --config-file /tmp/source.config \
  --topic-filters-json-file /tmp/topic_filter.json \
  --consumer-group-filters-json-file /tmp/group.json \
  --acl-filters-json-file /tmp/acl.json \
  --bootstrap-server <destination-cluster-address:port> \
  --command-config /tmp/destination.config

Once the cluster link is created, the destination cluster starts replicating the matching topics, consumer group offsets, and ACLs from the source cluster.

Promote the mirror topic

After synchronization completes, promote the mirror topic to make it a regular, writable topic. Once promoted, the topic stops synchronizing with the source cluster.

kafka-mirrors --promote --topics test-topic \
  --bootstrap-server <destination-cluster-address:port> \
  --command-config /tmp/destination.config

Expected output:

Calculating max offset and ms lag for mirror topics: [test-topic]
Finished calculating max offset lag and max lag ms for mirror topics: [test-topic]
Request for stopping topic test-topic's mirror was successfully scheduled. Please use the describe command with the --pending-stopped-only option to monitor progress.

Step 5: Verify the migration

With synchronization and promotion complete, confirm that all data was replicated to the destination cluster.

Replace <destination-cluster-address:port> with the bootstrap server address of your destination cluster.

Check topics, consumer groups, and ACLs

# List topics on the destination cluster
kafka-topics --list \
  --bootstrap-server <destination-cluster-address:port> \
  --command-config /tmp/destination.config

# List consumer groups
kafka-consumer-groups --list \
  --bootstrap-server <destination-cluster-address:port> \
  --command-config /tmp/destination.config

# List ACLs
kafka-acls --list \
  --bootstrap-server <destination-cluster-address:port> \
  --command-config /tmp/destination.config

Confirm that test-topic, test-group, and the ACL entry for test-user appear in the output.

Test produce and consume on the destination cluster

Verify that the promoted topic accepts new messages and that consumption works correctly:

# Produce a message
kafka-console-producer --topic test-topic \
  --bootstrap-server <destination-cluster-address:port> \
  --producer.config /tmp/destination.config

# Consume messages
kafka-console-consumer --topic test-topic \
  --bootstrap-server <destination-cluster-address:port> \
  --consumer.config /tmp/destination.config

Manage cluster links

After the initial setup, use the following commands to manage cluster links on the destination cluster.

Replace <destination-cluster-address:port> with the bootstrap server address of your destination cluster.

List all cluster links

kafka-cluster-links --list \
  --bootstrap-server <destination-cluster-address:port> \
  --command-config /tmp/destination.config

View cluster link details

kafka-configs --describe --cluster-link test-cluster-link \
  --bootstrap-server <destination-cluster-address:port> \
  --command-config /tmp/destination.config

Promote a mirror topic

Convert a mirror topic into a regular, writable topic. After promotion, the topic stops receiving updates from the source cluster.

kafka-mirrors --promote --topics test-topic \
  --bootstrap-server <destination-cluster-address:port> \
  --command-config /tmp/destination.config

Expected output:

Calculating max offset and ms lag for mirror topics: [test-topic]
Finished calculating max offset lag and max lag ms for mirror topics: [test-topic]
Request for stopping topic test-topic's mirror was successfully scheduled. Please use the describe command with the --pending-stopped-only option to monitor progress.

Delete a cluster link

kafka-cluster-links --delete --link test-cluster-link \
  --bootstrap-server <destination-cluster-address:port> \
  --command-config /tmp/destination.config

Expected output:

Cluster link 'test-cluster-link' deletion successfully completed.

References