本文介绍如何使用MirrorMaker将自建Kafka集群的数据迁移到消息队列Kafka版实例。

前提条件

您已完成以下操作:

背景信息

Kafka的镜像特性可实现Kafka集群的数据备份。实现这一特性的工具就是MirrorMaker。您可以使用MirrorMaker将源集群中的数据镜像拷贝到目标集群。如下图所示,Mirror Maker使用一个内置的Consumer从源自建Kafka集群消费消息,然后再使用一个内置的Producer将这些消息重新发送到目标消息队列Kafka版集群。

dg_data_migration

更多信息请参见Apache Kafka MirrorMaker

注意事项

  • Topic名称必须一致。
  • 分区数量可以不一致。
  • 迁移后原先在同一个分区中的数据并不保证迁移到同一个分区中。
  • 默认情况下,Key相同的消息会分布在同一分区中。
  • 普通消息在宕机时可能会乱序,分区顺序消息在宕机时依然保持顺序。

VPC接入

  1. 配置consumer.properties
    ## 自建Kafka集群的接入点
    bootstrap.servers=XXX.XXX.XXX.XXX:9092
    
    ## 消费者分区分配策略
    partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
    
    ## Consumer Group的名称
    group.id=test-consumer-group
  2. 配置producer.properties
    ## 消息队列Kafka版集群的默认接入点(可在消息队列Kafka版控制台获取)
    bootstrap.servers=XXX.XXX.XXX.XXX:9092
    
    ## 数据压缩方式
    compression.type=none                                
  3. 执行以下命令开启迁移进程。
    sh bin/kafka-mirror-maker.sh --consumer.config config/consumer.properties --producer.config config/producer.properties --whitelist topicName

公网接入

  1. 下载kafka.client.truststore.jks
  2. 配置kakfa_client_jaas.conf
    KafkaClient {
       org.apache.kafka.common.security.plain.PlainLoginModule required
       username="your username"
       password="your password";
    };
  3. 配置consumer.properties
    ## 自建Kafka集群的接入点
    bootstrap.servers=XXX.XXX.XXX.XXX:9092
    
    ## 消费者分区分配策略
    partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
    
    ## Consumer Group名称
    group.id=test-consumer-group
  4. 配置producer.properties
    ## 消息队列Kafka版集群的SSL接入点(可在消息队列Kafka版控制台获取)
    bootstrap.servers=XXX.XXX.XXX.XXX:9093
    
    ## 数据压缩方式
    compression.type=none
    
    ## truststore(使用步骤1下载的文件)
    ssl.truststore.location=kafka.client.truststore.jks
    ssl.truststore.password=KafkaOnsClient
    security.protocol=SASL_SSL
    sasl.mechanism=PLAIN
    
    ## 消息队列Kafka版2.X版本在配置SASL接入时需要做以下配置,2.X以下版本不需要配置。
    ssl.endpoint.identification.algorithm=
  5. 设置java.security.auth.login.config
    export KAFKA_OPTS="-Djava.security.auth.login.config=kafka_client_jaas.conf"                              
  6. 执行以下命令开启迁移进程。
    sh bin/kafka-mirror-maker.sh --consumer.config config/consumer.properties --producer.config config/producer.properties --whitelist topicName

结果验证

您可通过以下任一方法验证MirrorMaker是否运行成功。

  • 通过kafka-consumer-groups.sh查看自建集群消费进度。

    bin/kafka-consumer-groups.sh --new-consumer --describe --bootstrap-server自建集群接入点 --group test-consumer-group

  • 往自建集群中发送消息,在消息队列Kafka版控制台中查看Topic的分区状态,确认当前服务器上消息总量是否正确。您还可以通过消息队列Kafka版控制台来查看具体消息内容。详情请参见查询消息