All Products
Search
Document Center

ApsaraMQ for Kafka:Cluster Linking

Last Updated:Dec 02, 2024

Cluster Linking is a feature provided by Confluent Platform to connect multiple Kafka clusters. You can use this feature to mirror and replicate data from one Kafka cluster to another Kafka cluster. Cluster Linking is started in the destination cluster to replicate data from the source cluster to the destination cluster. This topic describes how to use Cluster Linking provided by ApsaraMQ for Confluent, including how to remotely use the Confluent CLI client to create Cluster Linking and how to manage Cluster Linking.

Prerequisites

Create configuration files

You must create configuration files that are used to connect the source cluster to the destination cluster on the ECS instance. Replace <username>, <password>, and <source-cluster-address:port> in the following sample code with your on-premises configurations.

  1. Create a configuration file named /tmp/source.config to connect to the source cluster and enable the features for the automatic creation of mirror topics, the synchronization of consumer offsets, and the synchronization of access control list (ACL) users.

    security.protocol=SASL_SSL
    sasl.mechanism=PLAIN
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";
    bootstrap.servers=<source-cluster-address:port>
    auto.create.mirror.topics.enable=true
    consumer.offset.sync.enable=true
    acl.sync.enable=true
  2. Create a configuration file named /tmp/destination.config to connect to the destination cluster.

    security.protocol=SASL_SSL
    sasl.mechanism=PLAIN
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";

Prepare test data

You must run the corresponding commands on the Confluent Platform CLI to prepare test data in the source cluster. Replace <source-cluster-address:port> in the sample code with your on-premises configuration.

  1. Run the following commands to create a mirror topic that has a single partition in the source cluster. This way, you can observe the order in which messages are replicated.

    kafka-topics --create --topic test-topic --partitions 1 \
    --bootstrap-server <source-cluster-address:port> \
    --command-config /tmp/source.config

    Run the list topic and describe topic commands to view the topic details.

    #list topic
    kafka-topics --list --bootstrap-server <source-cluster-address:port> \
    --command-config /tmp/source.config
    
    #describe topic
    kafka-topics --describe --topic test-topic \
    --bootstrap-server <source-cluster-address:port> \
    --command-config /tmp/source.config
  2. Run the following commands to send messages to the test-topic topic in the source cluster:

    seq 1 5 | kafka-console-producer --topic test-topic \
    --bootstrap-server <source-cluster-address:port> \
    --producer.config /tmp/source.config
  3. Consume messages from the test-topic topic in the source cluster and specify a consumer group.

    # consume
    kafka-console-consumer --topic test-topic ---beginning \
    --bootstrap-server <source-cluster-address:port> --group test-group \
    --consumer.config /tmp/source.config
    
    # list consumer groups
    kafka-consumer-groups --bootstrap-server <source-cluster-address:port> --list \
    --command-config /tmp/source.config
    
    # describe offsets of consumer groups
    kafka-consumer-groups --bootstrap-server <source-cluster-address:port> \
    --group test-group --describe --offsets \
    --command-config /tmp/source.config

    If messages are consumed, the following output is displayed:

    1

    2

    3

    4

    5

  4. Add an ACL user and grant the write permission to the user.

    # add user and write permission
    kafka-acls --bootstrap-server <source-cluster-address:port> \
    --command-config /tmp/source.config  --add --allow-principal User:test-user \
    --operation READ --topic test-topic
      
    # list
    kafka-acls --list --bootstrap-server <source-cluster-address:port> \
    --command-config /tmp/source.config

Synchronize data

In this example, the SASL_SSL mechanism is used to log on to the source and destination clusters, and the certificate is used to verify the domain name when a client connects to the clusters. Replace <source-cluster-address:port> and <destination-cluster-address:port> in the sample code with your on-premises configurations.

  1. Create a configuration file named /tmp/topic_filter.json to select the topics to be migrated.

    { 
      "topicFilters": [ 
        {
          "name": "test-topic",  
          "patternType": "LITERAL",  
          "filterType": "INCLUDE"
        } 
      ]
    }
  2. Create a configuration file named /tmp/group.json to select the consumer groups to be migrated.

    {
      "groupFilters": [
        {
          "name": "test-group",
          "patternType": "LITERAL",
          "filterType": "INCLUDE"
        }
      ]
    }
  3. Create a configuration file named /tmp/acl.json to select the ACL permissions to be migrated.

    {
      "aclFilters": [
        {
          "resourceFilter": {
            "resourceType": "any",
            "patternType": "any"
          },
          "accessFilter": {
            "operation": "any",
            "permissionType": "any"
          }
        }
      ]
    }
  4. Create Cluster Linking and replicate the topics, consumer groups, and ACL permissions.

    kafka-cluster-links --bootstrap-server <destination-cluster-address:port> \
    --command-config /tmp/destination.config --create --link test-cluster-link \
    --config-file /tmp/source.config \
    --topic-filters-json-file /tmp/topic_filter.json \
    --consumer-group-filters-json-file /tmp/group.json \
    --acl-filters-json-file /tmp/acl.json
  5. After data synchronization is complete, change the status of the mirror topic to promote to allow writes and reads on the mirror topic. After you change the status of the mirror topic, the mirror topic no longer synchronizes messages from the source topic.

    kafka-mirrors --promote --topics test-topic \
    --bootstrap-server <destination-cluster-address:port> \
    --command-config /tmp/destination.config

Test migration

After you perform the preceding steps for data synchronization, you can perform the following steps to verify whether the migration is successful:

  1. Check whether the topics, consumer groups, and ACL permissions of the destination cluster are synchronized.

    # list topic
    kafka-topics --list --bootstrap-server <destination-cluster-address:port> \
    --command-config /tmp/destination.config
    
    # list consumer group
    kafka-consumer-groups --bootstrap-server <destination-cluster-address:port> \
    --list --command-config /tmp/destination.config
    
    # list acl
    kafka-acls --list --bootstrap-server <destination-cluster-address:port> \
    --command-config /tmp/destination.config
  2. Check whether the topics produce and consume messages as expected.

    # produce
    kafka-console-producer --topic test-topic \
    --bootstrap-server <destination-cluster-address:port> \
    --producer.config /tmp/destination.config
    
    # consume
    kafka-console-consumer --topic test-topic \
    --bootstrap-server <destination-cluster-address:port> \
    --consumer.config /tmp/destination.config

Manage Cluster Linking

This section describes how to manage Cluster Linking that you created. Replace <destination-cluster-address:port> in the sample code with your actual configuration.

  1. Run the following commands to view clusters in Cluster Linking.

    kafka-cluster-links --bootstrap-server <destination-cluster-address:port> \
     --list --command-config /tmp/destination.config 
  2. Run the following commands to view the details of a cluster in Cluster Linking:

    kafka-configs --describe --cluster-link test-cluster-link \
    --bootstrap-server <destination-cluster-address:port> \
    --command-config /tmp/destination.config
  3. Run the following commands to convert a mirror topic to a common topic:

    kafka-mirrors --promote --topics test-topic \
    --bootstrap-server <destination-cluster-address:port> \
    --command-config /tmp/destination.config

    Expected output:

    Calculating max offset and ms lag for mirror topics: [test-topic]
    Finished calculating max offset lag and max lag ms for mirror topics: [test-topic]
    Request for stopping topic test-topic's mirror was successfully scheduled. Please use the describe command with the --pending-stopped-only option to monitor progress.
  4. Run the following commands to delete a cluster in Cluster Linking:

    kafka-cluster-links --delete --link test-cluster-link \
    --bootstrap-server <destination-cluster-address:port> \
    --command-config /tmp/destination.config

    Expected output:

    Cluster link 'test-cluster-link' deletion successfully completed.

References