Cluster Linking is a feature provided by Confluent Platform to connect multiple Kafka clusters. You can use this feature to mirror and replicate data from one Kafka cluster to another Kafka cluster. Cluster Linking is started in the destination cluster to replicate data from the source cluster to the destination cluster. This topic describes how to use Cluster Linking provided by ApsaraMQ for Confluent, including how to remotely use the Confluent CLI client to create Cluster Linking and how to manage Cluster Linking.
Prerequisites
A source cluster and a destination cluster are created.
A machine that is used to connect the source cluster and destination cluster is prepared. In this topic, an Elastic Compute Service (ECS) instance is created and used. For more information, see Create and manage an ECS instance in the console (express version).
Confluent Platform 7.0.0 or later is installed. For more information, see Install Confluent Platform On-Premises.
Java 8 or 11 is installed. For more information, see Java Downloads.
Create configuration files
You must create configuration files that are used to connect the source cluster to the destination cluster on the ECS instance. Replace <username>, <password>, and <source-cluster-address:port> in the following sample code with your on-premises configurations.
Create a configuration file named
/tmp/source.configto connect to the source cluster and enable the features for the automatic creation of mirror topics, the synchronization of consumer offsets, and the synchronization of access control list (ACL) users.security.protocol=SASL_SSL sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>"; bootstrap.servers=<source-cluster-address:port> auto.create.mirror.topics.enable=true consumer.offset.sync.enable=true acl.sync.enable=trueCreate a configuration file named
/tmp/destination.configto connect to the destination cluster.security.protocol=SASL_SSL sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";
Prepare test data
You must run the corresponding commands on the Confluent Platform CLI to prepare test data in the source cluster. Replace <source-cluster-address:port> in the sample code with your on-premises configuration.
Run the following commands to create a mirror topic that has a single partition in the source cluster. This way, you can observe the order in which messages are replicated.
kafka-topics --create --topic test-topic --partitions 1 \ --bootstrap-server <source-cluster-address:port> \ --command-config /tmp/source.configRun the
list topicanddescribe topiccommands to view the topic details.#list topic kafka-topics --list --bootstrap-server <source-cluster-address:port> \ --command-config /tmp/source.config #describe topic kafka-topics --describe --topic test-topic \ --bootstrap-server <source-cluster-address:port> \ --command-config /tmp/source.configRun the following commands to send messages to the test-topic topic in the source cluster:
seq 1 5 | kafka-console-producer --topic test-topic \ --bootstrap-server <source-cluster-address:port> \ --producer.config /tmp/source.configConsume messages from the test-topic topic in the source cluster and specify a consumer group.
# consume kafka-console-consumer --topic test-topic ---beginning \ --bootstrap-server <source-cluster-address:port> --group test-group \ --consumer.config /tmp/source.config # list consumer groups kafka-consumer-groups --bootstrap-server <source-cluster-address:port> --list \ --command-config /tmp/source.config # describe offsets of consumer groups kafka-consumer-groups --bootstrap-server <source-cluster-address:port> \ --group test-group --describe --offsets \ --command-config /tmp/source.configIf messages are consumed, the following output is displayed:
1
2
3
4
5
Add an ACL user and grant the write permission to the user.
# add user and write permission kafka-acls --bootstrap-server <source-cluster-address:port> \ --command-config /tmp/source.config --add --allow-principal User:test-user \ --operation READ --topic test-topic # list kafka-acls --list --bootstrap-server <source-cluster-address:port> \ --command-config /tmp/source.config
Synchronize data
In this example, the SASL_SSL mechanism is used to log on to the source and destination clusters, and the certificate is used to verify the domain name when a client connects to the clusters. Replace <source-cluster-address:port> and <destination-cluster-address:port> in the sample code with your on-premises configurations.
Create a configuration file named
/tmp/topic_filter.jsonto select the topics to be migrated.{ "topicFilters": [ { "name": "test-topic", "patternType": "LITERAL", "filterType": "INCLUDE" } ] }Create a configuration file named
/tmp/group.jsonto select the consumer groups to be migrated.{ "groupFilters": [ { "name": "test-group", "patternType": "LITERAL", "filterType": "INCLUDE" } ] }Create a configuration file named
/tmp/acl.jsonto select the ACL permissions to be migrated.{ "aclFilters": [ { "resourceFilter": { "resourceType": "any", "patternType": "any" }, "accessFilter": { "operation": "any", "permissionType": "any" } } ] }Create Cluster Linking and replicate the topics, consumer groups, and ACL permissions.
kafka-cluster-links --bootstrap-server <destination-cluster-address:port> \ --command-config /tmp/destination.config --create --link test-cluster-link \ --config-file /tmp/source.config \ --topic-filters-json-file /tmp/topic_filter.json \ --consumer-group-filters-json-file /tmp/group.json \ --acl-filters-json-file /tmp/acl.jsonAfter data synchronization is complete, change the status of the mirror topic to
promoteto allow writes and reads on the mirror topic. After you change the status of the mirror topic, the mirror topic no longer synchronizes messages from the source topic.kafka-mirrors --promote --topics test-topic \ --bootstrap-server <destination-cluster-address:port> \ --command-config /tmp/destination.config
Test migration
After you perform the preceding steps for data synchronization, you can perform the following steps to verify whether the migration is successful:
Check whether the topics, consumer groups, and ACL permissions of the destination cluster are synchronized.
# list topic kafka-topics --list --bootstrap-server <destination-cluster-address:port> \ --command-config /tmp/destination.config # list consumer group kafka-consumer-groups --bootstrap-server <destination-cluster-address:port> \ --list --command-config /tmp/destination.config # list acl kafka-acls --list --bootstrap-server <destination-cluster-address:port> \ --command-config /tmp/destination.configCheck whether the topics produce and consume messages as expected.
# produce kafka-console-producer --topic test-topic \ --bootstrap-server <destination-cluster-address:port> \ --producer.config /tmp/destination.config # consume kafka-console-consumer --topic test-topic \ --bootstrap-server <destination-cluster-address:port> \ --consumer.config /tmp/destination.config
Manage Cluster Linking
This section describes how to manage Cluster Linking that you created. Replace <destination-cluster-address:port> in the sample code with your actual configuration.
Run the following commands to view clusters in Cluster Linking.
kafka-cluster-links --bootstrap-server <destination-cluster-address:port> \ --list --command-config /tmp/destination.configRun the following commands to view the details of a cluster in Cluster Linking:
kafka-configs --describe --cluster-link test-cluster-link \ --bootstrap-server <destination-cluster-address:port> \ --command-config /tmp/destination.configRun the following commands to convert a mirror topic to a common topic:
kafka-mirrors --promote --topics test-topic \ --bootstrap-server <destination-cluster-address:port> \ --command-config /tmp/destination.configExpected output:
Calculating max offset and ms lag for mirror topics: [test-topic] Finished calculating max offset lag and max lag ms for mirror topics: [test-topic] Request for stopping topic test-topic's mirror was successfully scheduled. Please use the describe command with the --pending-stopped-only option to monitor progress.Run the following commands to delete a cluster in Cluster Linking:
kafka-cluster-links --delete --link test-cluster-link \ --bootstrap-server <destination-cluster-address:port> \ --command-config /tmp/destination.configExpected output:
Cluster link 'test-cluster-link' deletion successfully completed.
References
For more information about Cluster Linking, see Cluster Linking for Confluent Platform.
For information about how to check whether a cluster can use Cluster Linking, see Supported cluster types.