本文通过示例为您介绍如何使用MirrorMaker 2(简称MM2)on Kafka Connect跨集群同步数据。

背景信息

使用场景

Kafka MM2适用于下列场景:
  • 远程数据同步:通过MM2,Kafka数据可以在不同地域的集群进行传输复制。
  • 灾备场景:通过MM2,可以构建不同数据中心的主备两个集群容灾架构,MM2实时同步两个集群的数据。当其中一个集群不可用时,可以将上面的应用程序切换到另一个集群,从而实现异地容灾功能。
  • 数据迁移场景:在业务上云、混合云、集群升级等场景,存在数据从旧集群迁移到新集群的需求。此时,您可以使用MM2实现新旧数据的迁移,保证业务的连续性。
  • 聚合数据中心场景:通过MM2,可以将多个Kafka子集群的数据同步到一个中心Kafka集群,实现数据的汇聚。

功能

Kafka MM2作为数据复制工具,具有以下功能:
  • 复制topics数据以及配置信息。
  • 复制consumer groups及其消费topic的offset信息。
  • 复制ACLs。
  • 自动检测新的topic以及partition。
  • 提供MM2的metrics。
  • 高可用以及可水平扩展的框架。

任务执行方式

MM2任务有以下执行方式:
  • Distributed Connect集群的connector方式(推荐):在已有Connect集群执行MM2 connector任务的方式。您可以参照本文使用Connect集群服务的功能来管理MM2任务。
  • Dedicated MirrorMaker集群方式:不需要使用Connect集群执行MM2 connector任务,而是直接通过Driver程序管理MM2的所有任务。具体操作,请参见使用MirrorMaker 2(Dedicated)跨集群同步数据
  • Standalone Connect的worker方式:执行单个MirrorSourceConnector任务,适合在测试场景下使用。
说明 推荐在Distributed Connect集群上启动MM2 connector任务,可以借助Connect集群的Rest服务管理MM2任务。

MM2的详细信息,请参见Apache Kafka

前提条件

已创建两个Kafka集群,一个为源集群emrsource,一个为目标集群emrdest,并选择了Kafka服务,创建DataFlow集群的具体操作,请参见创建集群
说明 本文示例的源集群和目标集群都以EMR-3.42.0版本,且在同一VPC下的DataFlow集群为例。

使用限制

目标集群的Kafka软件版本为2.12_2.4.1及以上。

操作流程

  1. 步骤一:在目标集群创建Kafka Connect集群
  2. 步骤二:使用MirrorMaker2 connector

步骤一:在目标集群创建Kafka Connect集群

  1. 新增EMR Task机器组。
    在EMR控制台目标集群emrdest的节点管理页面,创建Task机器组。
    1. 单击新增机器组
    2. 新增机器组面板,配置以下参数,其余参数请根据实际情况配置。
      参数说明
      节点组类型选择TASK(任务实例组)
      节点组名称本文示例为emr-task。
      存储配置选择一块数据盘。
  2. 扩容Task机器组。
    1. 节点管理页面,单击新增的emr-task节点组操作列的扩容
    2. 在弹出的对话框中,选择待增加的数量,勾选服务协议
      本示例增加的实例数量为1台。您可以根据实际需要扩容Task实例的数量,如果需要高可用Connect集群,则建议扩容两个以上实例。
    3. 单击确定
  3. 查看KafkaConnect服务状态,确保Kafka Connect集群已经启动。
    1. 单击上方的集群服务
    2. 单击Kafka服务区域的状态
    3. 组件列表区域,查看KafkaConnect的组件状态,确保组件在运行中。
      KafkaConnect
  4. 使用SSH方式登录目标集群emrdest,详情请参见登录集群
  5. 执行以下命令,检查Kafka Connect Rest服务状态。
    curl -X GET http://task-1-1:8083| jq .
    返回以下类似信息。
      % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                     Dload  Upload   Total   Spent    Left  Speed
    100    91  100    91    0     0  13407      0 --:--:-- --:--:-- --:--:-- 15166
    {
      "version": "2.4.1",
      "commit": "42ce056344c5625a",
      "kafka_cluster_id": "6Z7IdHW4SVO1Pbql4c****"
    }

步骤二:使用MirrorMaker2 connector

  1. 准备MM2 connector配置文件。
    您需要准备以下文件:
    • 准备MirrorSourceConnector配置文件
      本文示例MirrorSourceConnector配置文件命名为mm2-source-connector.json。按照如下示例并根据实际情况修改相应的参数值。更多配置项详情,请参见KIP-382的相关章节。
      {
        "name": "mm2-source-connector",
        "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
        "clusters": "emrsource,emrdest",
        "source.cluster.alias": "emrsource",
        "target.cluster.alias": "emrdest",
        "target.cluster.bootstrap.servers": "core-1-1:9092;core-1-2:9092;core-1-3:9092",
        "source.cluster.bootstrap.servers": "10.0.**.**:9092",
        "topics": "^foo.*",
        "tasks.max": "4",
        "key.converter": " org.apache.kafka.connect.converters.ByteArrayConverter",
        "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
        "replication.factor": "3",
        "offset-syncs.topic.replication.factor": "3",
        "sync.topic.acls.interval.seconds": "20",
        "sync.topic.configs.interval.seconds": "20",
        "refresh.topics.interval.seconds": "20",
        "refresh.groups.interval.seconds": "20",
        "consumer.group.id": "mm2-mirror-source-consumer-group",
        "producer.enable.idempotence":"true",
        "source.cluster.security.protocol": "PLAINTEXT",
        "target.cluster.security.protocol": "PLAINTEXT"
      }
      说明 本文示例代码中参数:
      • source.cluster.bootstrap.servers:该参数值的IP地址,需要替换为您实际环境源集群emrsource中Kafka服务的访问地址,并且需要确保源Kafka集群和Kafka Connect集群的联通性。
      • topics:该参数值表示会复制您源集群中以foo开头的Topic。
    • 准备MirrorCheckpointConnector配置文件
      本文示例MirrorCheckpointConnector配置文件命名为mm2-checkpoint-connector.json。按照如下示例并根据实际情况修改相应的参数值。更多配置项详情,请参见KIP-382的相关章节。
      {
          "name": "mm2-checkpoint-connector",
          "connector.class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
          "clusters": "emrsource,emrdest",
          "source.cluster.alias": "emrsource",
          "target.cluster.alias": "emrdest",
          "target.cluster.bootstrap.servers": "core-1-1:9092;core-1-2:9092;core-1-3:9092",
          "source.cluster.bootstrap.servers": "10.0.**.**:9092",
          "tasks.max": "1",
          "key.converter": " org.apache.kafka.connect.converters.ByteArrayConverter",
          "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
          "replication.factor": "3",
          "checkpoints.topic.replication.factor": "3",
          "emit.checkpoints.interval.seconds": "20",
          "source.cluster.security.protocol": "PLAINTEXT",
          "target.cluster.security.protocol": "PLAINTEXT"
        }
    • 准备MirrorHeartbeatConnector配置文件
      本文示例MirrorHeartbeatConnector配置文件命名为mm2-heartbeat-connector.json。按照如下示例并根据实际情况修改相应的参数值。更多配置项详情,请参见KIP-382的相关章节。
      {
          "name": "mm2-heartbeat-connector",
          "connector.class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
          "clusters": "emrsource,emrdest",
          "source.cluster.alias": "emrsource",
          "target.cluster.alias": "emrdest",
          "target.cluster.bootstrap.servers": "core-1-1:9092;core-1-2:9092;core-1-3:9092",
          "source.cluster.bootstrap.servers": "10.0.**.**:9092",
          "tasks.max": "1",
          "key.converter": " org.apache.kafka.connect.converters.ByteArrayConverter",
          "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
          "replication.factor": "3",
          "heartbeats.topic.replication.factor": "3",
          "emit.heartbeats.interval.seconds": "20",
          "source.cluster.security.protocol": "PLAINTEXT",
          "target.cluster.security.protocol": "PLAINTEXT"
        }
  2. 使用MirrorSourceConnector。
    1. 通过Connect rest服务,使用mm2-source-connector.json文件,创建MirrorSourceConnector任务。
      curl -X PUT -H "Content-Type: application/json" --data @mm2-source-connector.json http://task-1-1:8083/connectors/mm2-source-connector/config
    2. 执行以下命令,查看mm2-source-connector状态。
      curl -s task-1-1:8083/connectors/mm2-source-connector/status | jq .
  3. 使用MirrorCheckpointConnector。
    1. 通过Connect rest服务,使用mm2-checkpoint-connector.json文件,创建MirrorCheckpointConnector任务。
      curl -X PUT -H "Content-Type: application/json" --data @mm2-checkpoint-connector.json http://task-1-1:8083/connectors/mm2-checkpoint-connector/config
    2. 执行以下命令,查看mm2-checkpoint-connector状态。
      curl -s task-1-1:8083/connectors/mm2-checkpoint-connector/status | jq .
  4. 使用MirrorHeartbeatConnector。
    1. 通过Connect rest服务,使用mm2-heartbeat-connector.json文件,创建MirrorHeartbeatConnector任务。
      curl -X PUT -H "Content-Type: application/json" --data @mm2-heartbeat-connector.json http://task-1-1:8083/connectors/mm2-heartbeat-connector/config
    2. 执行以下命令,查看mm2-heartbeat-connector状态。
      curl -s task-1-1:8083/connectors/mm2-heartbeat-connector/status | jq .
  5. 在目标集群执行以下命令,查看MM2相关topic。
    kafka-topics.sh --list --bootstrap-server core-1-1:9092
    此时,在目标集群中,您可以看到以下topic已经创建:
    • emrsource.foo开头的topic:由MirrorSourceConnector创建。

      foo开头的topic是您源集群上已有的,需要复制的topic。

    • emrsource.checkpoints.internal:由MirrorCheckpointConnector创建,用于存储offset等信息。
    • heartbeats:由MirrorHeartbeatConnector创建。