すべてのプロダクト
Search
ドキュメントセンター

E-MapReduce:Kafka MM2 を使用してクラスター間でデータを同期する

最終更新日:Jan 11, 2025

このトピックでは、Kafka Connect(Kafka MM2)で MirrorMaker 2 を使用してクラスター間でデータを同期する方法について説明します。

背景情報

シナリオ

Kafka MM2 は、以下のシナリオに適しています。
  • リモートデータ同期:Kafka MM2 を使用して、異なるリージョンにあるクラスター間でデータを同期できます。
  • ディザスタリカバリ:Kafka MM2 を使用して、異なるデータセンターにあるプライマリクラスターとセカンダリクラスターで構成されるディザスタリカバリアーキテクチャを構築できます。 2 つのクラスターのデータはリアルタイムで同期できます。 1 つのクラスターが使用できなくなった場合は、そのクラスター内のアプリケーションを別のクラスターに転送できます。 これにより、地理的なディザスタリカバリが保証されます。
  • データ移行:ビジネスのクラウド移行、ハイブリッドクラウド、クラスターのアップグレードなどのシナリオでは、元のクラスターから新しいクラスターにデータを移行する必要があります。 Kafka MM2 を使用してデータを移行し、ビジネスの継続性を確保できます。
  • データ集約:Kafka MM2 を使用して、複数の Kafka サブクラスターから Kafka 中央クラスターにデータを同期できます。 このようにして、データを集約できます。

機能

データレプリケーションツールとして、Kafka MM2 は以下の機能を提供します。
  • トピックのデータと構成情報をレプリケートします。
  • コンシューマーグループのオフセット情報と消費されたトピックをレプリケートします。
  • アクセス制御リスト(ACL)をレプリケートします。
  • 新しいトピックとパーティションを自動的に検出します。
  • Kafka MM2 メトリックを提供します。
  • 水平方向にスケーラブルな高可用性アーキテクチャを提供します。

ジョブ実行方法

Kafka MM2 ジョブは、次の 3 つの方法で実行できます。
  • 分散モードで既存の Kafka Connect クラスターで Kafka MM2 ジョブを実行します。 この方法をお勧めします。 このトピックで説明されている機能を使用して、Kafka MM2 ジョブを管理できます。
  • ドライバプログラムを使用してすべての Kafka MM2 ジョブを管理します。 詳細については、「専用クラスターに MM2 をデプロイしてクラスター間でデータを同期する」をご参照ください。
  • 単一の MirrorSourceConnector ジョブを実行します。これはテストシナリオに適しています。
説明 最初の方法を使用することをお勧めします。 この方法を使用すると、Kafka Connect クラスターによって提供されるリセットサービスを使用して Kafka MM2 ジョブを管理できます。

Kafka MM2 の詳細については、「Apache Kafka ドキュメント」をご参照ください。

前提条件

emrsource という名前のソース Kafka クラスターと emrdest という名前の宛先 Kafka クラスターが作成されています。 クラスターの作成時に、オプションサービス(少なくとも 1 つ選択)として Kafka が選択されています。 詳細については、「クラスターを作成する」をご参照ください。
説明 このトピックでは、ソースクラスターと宛先クラスターの E-MapReduce(EMR)バージョンは V3.42.0 です。 両方のクラスターは DataFlow タイプであり、同じ仮想プライベートクラウド(VPC)にあります。

制限事項

宛先クラスターの Kafka バージョンは 2.12_2.4.1 以降である必要があります。

手順

  1. 手順 1:宛先 Kafka クラスターに Kafka Connect クラスターを作成する
  2. 手順 2:Kafka MM2 コネクタを使用する

手順 1:宛先 Kafka クラスターに Kafka Connect クラスターを作成する

  1. ノードグループを作成します。
    EMR コンソールの emrdest クラスターの [ノード] ページで、ノードグループを作成します。
    1. [ノードグループの作成] をクリックします。
    2. [マシン グループの追加] パネルで、次の表に示すパラメーターを構成します。 必要に応じて他のパラメーターも構成します。
      パラメーター説明
      ノードグループタイプ[TASK(タスクノードグループ)] を選択します。
      ノードグループ名この例では、名前は emr-task です。
      ストレージ構成データディスクを選択します。
  2. ノードグループをスケールアウトします。
    1. [ノード] ページで、emr-task ノードグループを見つけ、[アクション] 列の [スケールアウト] をクリックします。
    2. 表示されるダイアログボックスで、追加するインスタンスの数と [サービス規約] を入力して選択します。
      この例では、1 つのインスタンスが追加されます。 必要に応じてインスタンスの数を指定できます。 高可用性 Kafka Connect クラスターが必要な場合は、2 つ以上のインスタンスを追加することをお勧めします。
    3. [OK] をクリックします。
  3. Kafka Connect クラスターのステータスを確認して、クラスターが実行されていることを確認します。
    1. ページの上部にある [サービス] をクリックします。
    2. Kafka 詳細に関するセクションで、[ステータス] をクリックします。
    3. [コンポーネント] セクションで、[kafkaconnect] のステータスを確認して、コンポーネントが実行されていることを確認します。
      KafkaConnect
  4. セキュアシェル(SSH)を使用して emrdest クラスターにログオンします。 詳細については、「クラスターにログオンする」をご参照ください。
  5. 次のコマンドを実行して、Kafka Connect クラスターによって提供されるリセットサービスのステータスを確認します。
    curl -X GET http://task-1-1:8083| jq .
    次のような情報が返されます。
      % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                     Dload  Upload   Total   Spent    Left  Speed
    100    91  100    91    0     0  13407      0 --:--:-- --:--:-- --:--:-- 15166
    {
      "version": "2.4.1",
      "commit": "42ce056344c5625a",
      "kafka_cluster_id": "6Z7IdHW4SVO1Pbql4c****"
    }

手順 2:Kafka MM2 コネクタを使用する

  1. Kafka MM2 コネクタの構成ファイルを準備します。
    次のファイルが含まれます。
    • MirrorSourceConnector の構成ファイル
      この例では、ファイル名は mm2-source-connector.json です。 次のサンプルコードは、ファイルの内容を示しています。 必要に応じてファイル内のパラメーターを変更します。 詳細については、「KIP-382: MirrorMaker 2.0」をご参照ください。
      {
        "name": "mm2-source-connector",
        "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
        "clusters": "emrsource,emrdest",
        "source.cluster.alias": "emrsource",
        "target.cluster.alias": "emrdest",
        "target.cluster.bootstrap.servers": "core-1-1:9092;core-1-2:9092;core-1-3:9092",
        "source.cluster.bootstrap.servers": "10.0.**.**:9092",
        "topics": "^foo.*",
        "tasks.max": "4",
        "key.converter": " org.apache.kafka.connect.converters.ByteArrayConverter",
        "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
        "replication.factor": "3",
        "offset-syncs.topic.replication.factor": "3",
        "sync.topic.acls.interval.seconds": "20",
        "sync.topic.configs.interval.seconds": "20",
        "refresh.topics.interval.seconds": "20",
        "refresh.groups.interval.seconds": "20",
        "consumer.group.id": "mm2-mirror-source-consumer-group",
        "producer.enable.idempotence":"true",
        "source.cluster.security.protocol": "PLAINTEXT",
        "target.cluster.security.protocol": "PLAINTEXT"
      }
      説明 サンプルコードのパラメーター:
      • source.cluster.bootstrap.servers:emrsource クラスターの Kafka サービスエンドポイント。 emrsource クラスターと Kafka Connect クラスターが接続できることを確認してください。
      • topics:レプリケートするトピック。 この例では、名前が foo で始まるトピックがレプリケートされます。
    • MirrorCheckpointConnector の構成ファイル
      この例では、ファイル名は mm2-checkpoint-connector.json です。 次のサンプルコードは、ファイルの内容を示しています。 必要に応じてファイル内のパラメーターを変更します。 詳細については、「KIP-382: MirrorMaker 2.0」をご参照ください。
      {
          "name": "mm2-checkpoint-connector",
          "connector.class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
          "clusters": "emrsource,emrdest",
          "source.cluster.alias": "emrsource",
          "target.cluster.alias": "emrdest",
          "target.cluster.bootstrap.servers": "core-1-1:9092;core-1-2:9092;core-1-3:9092",
          "source.cluster.bootstrap.servers": "10.0.**.**:9092",
          "tasks.max": "1",
          "key.converter": " org.apache.kafka.connect.converters.ByteArrayConverter",
          "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
          "replication.factor": "3",
          "checkpoints.topic.replication.factor": "3",
          "emit.checkpoints.interval.seconds": "20",
          "source.cluster.security.protocol": "PLAINTEXT",
          "target.cluster.security.protocol": "PLAINTEXT"
        }
    • MirrorHeartbeatConnector の構成ファイル
      この例では、ファイル名は mm2-heartbeat-connector.json です。 次のサンプルコードは、ファイルの内容を示しています。 必要に応じてファイル内のパラメーターを変更します。 詳細については、「KIP-382: MirrorMaker 2.0」をご参照ください。
      {
          "name": "mm2-heartbeat-connector",
          "connector.class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
          "clusters": "emrsource,emrdest",
          "source.cluster.alias": "emrsource",
          "target.cluster.alias": "emrdest",
          "target.cluster.bootstrap.servers": "core-1-1:9092;core-1-2:9092;core-1-3:9092",
          "source.cluster.bootstrap.servers": "10.0.**.**:9092",
          "tasks.max": "1",
          "key.converter": " org.apache.kafka.connect.converters.ByteArrayConverter",
          "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
          "replication.factor": "3",
          "heartbeats.topic.replication.factor": "3",
          "emit.heartbeats.interval.seconds": "20",
          "source.cluster.security.protocol": "PLAINTEXT",
          "target.cluster.security.protocol": "PLAINTEXT"
        }
  2. MirrorSourceConnector を使用します。
    1. mm2-source-connector.json ファイルに基づいて、REST サービスを使用して MirrorSourceConnector ジョブを作成します。
      curl -X PUT -H "Content-Type: application/json" --data @mm2-source-connector.json http://task-1-1:8083/connectors/mm2-source-connector/config
    2. MirrorSourceConnector のステータスを表示します。
      curl -s task-1-1:8083/connectors/mm2-source-connector/status | jq .
  3. MirrorCheckpointConnector を使用します。
    1. mm2-checkpoint-connector.json ファイルに基づいて、REST サービスを使用して MirrorCheckpointConnector ジョブを作成します。
      curl -X PUT -H "Content-Type: application/json" --data @mm2-checkpoint-connector.json http://task-1-1:8083/connectors/mm2-checkpoint-connector/config
    2. MirrorCheckpointConnector のステータスを表示します。
      curl -s task-1-1:8083/connectors/mm2-checkpoint-connector/status | jq .
  4. MirrorHeartbeatConnector を使用します。
    1. mm2-heartbeat-connector.json ファイルに基づいて、REST サービスを使用して MirrorHeartbeatConnector ジョブを作成します。
      curl -X PUT -H "Content-Type: application/json" --data @mm2-heartbeat-connector.json http://task-1-1:8083/connectors/mm2-heartbeat-connector/config
    2. MirrorHeartbeatConnector のステータスを表示します。
      curl -s task-1-1:8083/connectors/mm2-heartbeat-connector/status | jq .
  5. 次のコマンドを実行して、emrdest クラスター内の Kafka MM2 に関連するトピックを表示します。
    kafka-topics.sh --list --bootstrap-server core-1-1:9092
    この例では、次のトピックを表示できます。
    • 名前が foo で始まり、emrsource クラスターにあるトピック。 トピックは MirrorSourceConnector によって作成されます。

      トピックは emrsource クラスター内の既存のトピックであり、レプリケートされます。

    • MirrorCheckpointConnector によって作成され、オフセット情報の保存に使用される emrsource.checkpoints.internal トピック。
    • MirrorHeartbeatConnector によって作成された heartbeats トピック。