When you operate multiple ApsaraMQ for Confluent clusters across regions, keeping topics, consumer group offsets, and access control lists (ACLs) in sync requires manual effort and custom tooling. Cluster Linking eliminates this overhead by connecting two clusters and replicating data from a source cluster to a destination cluster in real time.
Cluster Linking works through two abstractions:
A cluster link, initiated on the destination cluster, that establishes a connection to the source cluster.
Mirror topics on the destination cluster that replicate data from their source topics.
Beyond topic data, Cluster Linking also synchronizes consumer group offsets and ACLs. This makes it suitable for cross-region replication, disaster recovery, and cluster migration.
This guide walks through the end-to-end process: creating configuration files, preparing test data, setting up synchronization filters, creating a cluster link, and verifying the migration. All steps use the Confluent Platform command-line interface (CLI).
Prerequisites
Before you begin, make sure you have:
A source cluster and a destination cluster running ApsaraMQ for Confluent
An Elastic Compute Service (ECS) instance or other machine with network access to both clusters. For more information about how to create an instance, see Create and manage an ECS instance.
Confluent Platform 7.0.0 or later installed on the machine. For installation instructions, see Confluent Platform installation overview.
Java 8 or 11. For installation instructions, see Install JDK.
Step 1: Create configuration files
Create configuration files on the ECS instance to authenticate with the source and destination clusters.
Replace the following placeholders with your actual values:
| Placeholder | Description | Example |
|---|---|---|
<username> | SASL username for the cluster | admin |
<password> | SASL password for the cluster | pa$$w0rd |
<source-cluster-address:port> | Bootstrap server address of the source cluster | source-kafka.example.com:9093 |
Source cluster configuration
Create a file named /tmp/source.config with the following content. This configuration enables automatic mirror topic creation, consumer offset synchronization, and ACL synchronization.
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";
bootstrap.servers=<source-cluster-address:port>
auto.create.mirror.topics.enable=true
consumer.offset.sync.enable=true
acl.sync.enable=true| Property | Purpose |
|---|---|
auto.create.mirror.topics.enable | Automatically creates mirror topics on the destination cluster for matched source topics |
consumer.offset.sync.enable | Synchronizes consumer group offsets from the source to the destination cluster |
acl.sync.enable | Synchronizes ACL entries from the source to the destination cluster |
Destination cluster configuration
Create a file named /tmp/destination.config with the following content:
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";The /tmp directory is used here for simplicity. In production, store configuration files in a secure, persistent directory because /tmp files may be deleted during system maintenance, which can leave cluster links in an inconsistent state.
Step 2: Prepare test data on the source cluster
Set up a topic with sample data, a consumer group, and an ACL entry on the source cluster before creating the cluster link.
Replace <source-cluster-address:port> with the bootstrap server address of your source cluster.
Create a test topic
Create a single-partition topic named test-topic. A single partition makes it easier to observe message replication order.
kafka-topics --create --topic test-topic --partitions 1 \
--bootstrap-server <source-cluster-address:port> \
--command-config /tmp/source.configTo verify the topic was created, list and describe it:
# List all topics
kafka-topics --list \
--bootstrap-server <source-cluster-address:port> \
--command-config /tmp/source.config
# Describe the test topic
kafka-topics --describe --topic test-topic \
--bootstrap-server <source-cluster-address:port> \
--command-config /tmp/source.configProduce test messages
Send five numbered messages to test-topic:
seq 1 5 | kafka-console-producer --topic test-topic \
--bootstrap-server <source-cluster-address:port> \
--producer.config /tmp/source.configConsume and verify messages
Consume messages from the beginning and assign the consumer to a group named test-group:
kafka-console-consumer --topic test-topic --from-beginning \
--bootstrap-server <source-cluster-address:port> \
--group test-group \
--consumer.config /tmp/source.configExpected output:
1
2
3
4
5To inspect the consumer group offset details:
# List consumer groups
kafka-consumer-groups --list \
--bootstrap-server <source-cluster-address:port> \
--command-config /tmp/source.config
# Describe offsets for test-group
kafka-consumer-groups --describe --offsets \
--bootstrap-server <source-cluster-address:port> \
--group test-group \
--command-config /tmp/source.configAdd an ACL entry
Grant the READ permission on test-topic to user test-user:
kafka-acls --add \
--allow-principal User:test-user \
--operation READ --topic test-topic \
--bootstrap-server <source-cluster-address:port> \
--command-config /tmp/source.configTo verify the ACL was created:
kafka-acls --list \
--bootstrap-server <source-cluster-address:port> \
--command-config /tmp/source.configStep 3: Configure data synchronization filters
This guide assumes that you use SASL_SSL to connect to the source and destination clusters, and that a certificate is used for domain name verification.
Create JSON filter files that specify which topics, consumer groups, and ACLs to replicate. These filters are passed to the cluster link creation command in the next step.
Topic filter
Create /tmp/topic_filter.json to select test-topic for replication:
{
"topicFilters": [
{
"name": "test-topic",
"patternType": "LITERAL",
"filterType": "INCLUDE"
}
]
}Consumer group filter
Create /tmp/group.json to select test-group for offset synchronization:
{
"groupFilters": [
{
"name": "test-group",
"patternType": "LITERAL",
"filterType": "INCLUDE"
}
]
}ACL filter
Create /tmp/acl.json to synchronize all ACL entries:
{
"aclFilters": [
{
"resourceFilter": {
"resourceType": "any",
"patternType": "any"
},
"accessFilter": {
"operation": "any",
"permissionType": "any"
}
}
]
}Step 4: Create the cluster link and synchronize data
Run the following command to create a cluster link named test-cluster-link on the destination cluster. This command applies the topic, consumer group, and ACL filters from the previous step.
Replace <destination-cluster-address:port> with the bootstrap server address of your destination cluster.
kafka-cluster-links --create --link test-cluster-link \
--config-file /tmp/source.config \
--topic-filters-json-file /tmp/topic_filter.json \
--consumer-group-filters-json-file /tmp/group.json \
--acl-filters-json-file /tmp/acl.json \
--bootstrap-server <destination-cluster-address:port> \
--command-config /tmp/destination.configOnce the cluster link is created, the destination cluster starts replicating the matching topics, consumer group offsets, and ACLs from the source cluster.
Promote the mirror topic
After synchronization completes, promote the mirror topic to make it a regular, writable topic. Once promoted, the topic stops synchronizing with the source cluster.
kafka-mirrors --promote --topics test-topic \
--bootstrap-server <destination-cluster-address:port> \
--command-config /tmp/destination.configExpected output:
Calculating max offset and ms lag for mirror topics: [test-topic]
Finished calculating max offset lag and max lag ms for mirror topics: [test-topic]
Request for stopping topic test-topic's mirror was successfully scheduled. Please use the describe command with the --pending-stopped-only option to monitor progress.Step 5: Verify the migration
With synchronization and promotion complete, confirm that all data was replicated to the destination cluster.
Replace <destination-cluster-address:port> with the bootstrap server address of your destination cluster.
Check topics, consumer groups, and ACLs
# List topics on the destination cluster
kafka-topics --list \
--bootstrap-server <destination-cluster-address:port> \
--command-config /tmp/destination.config
# List consumer groups
kafka-consumer-groups --list \
--bootstrap-server <destination-cluster-address:port> \
--command-config /tmp/destination.config
# List ACLs
kafka-acls --list \
--bootstrap-server <destination-cluster-address:port> \
--command-config /tmp/destination.configConfirm that test-topic, test-group, and the ACL entry for test-user appear in the output.
Test produce and consume on the destination cluster
Verify that the promoted topic accepts new messages and that consumption works correctly:
# Produce a message
kafka-console-producer --topic test-topic \
--bootstrap-server <destination-cluster-address:port> \
--producer.config /tmp/destination.config
# Consume messages
kafka-console-consumer --topic test-topic \
--bootstrap-server <destination-cluster-address:port> \
--consumer.config /tmp/destination.configManage cluster links
After the initial setup, use the following commands to manage cluster links on the destination cluster.
Replace <destination-cluster-address:port> with the bootstrap server address of your destination cluster.
List all cluster links
kafka-cluster-links --list \
--bootstrap-server <destination-cluster-address:port> \
--command-config /tmp/destination.configView cluster link details
kafka-configs --describe --cluster-link test-cluster-link \
--bootstrap-server <destination-cluster-address:port> \
--command-config /tmp/destination.configPromote a mirror topic
Convert a mirror topic into a regular, writable topic. After promotion, the topic stops receiving updates from the source cluster.
kafka-mirrors --promote --topics test-topic \
--bootstrap-server <destination-cluster-address:port> \
--command-config /tmp/destination.configExpected output:
Calculating max offset and ms lag for mirror topics: [test-topic]
Finished calculating max offset lag and max lag ms for mirror topics: [test-topic]
Request for stopping topic test-topic's mirror was successfully scheduled. Please use the describe command with the --pending-stopped-only option to monitor progress.Delete a cluster link
kafka-cluster-links --delete --link test-cluster-link \
--bootstrap-server <destination-cluster-address:port> \
--command-config /tmp/destination.configExpected output:
Cluster link 'test-cluster-link' deletion successfully completed.