このトピックでは、Kafka Connect(Kafka MM2)で MirrorMaker 2 を使用してクラスター間でデータを同期する方法について説明します。
背景情報
シナリオ
Kafka MM2 は、以下のシナリオに適しています。
- リモートデータ同期:Kafka MM2 を使用して、異なるリージョンにあるクラスター間でデータを同期できます。
- ディザスタリカバリ:Kafka MM2 を使用して、異なるデータセンターにあるプライマリクラスターとセカンダリクラスターで構成されるディザスタリカバリアーキテクチャを構築できます。 2 つのクラスターのデータはリアルタイムで同期できます。 1 つのクラスターが使用できなくなった場合は、そのクラスター内のアプリケーションを別のクラスターに転送できます。 これにより、地理的なディザスタリカバリが保証されます。
- データ移行:ビジネスのクラウド移行、ハイブリッドクラウド、クラスターのアップグレードなどのシナリオでは、元のクラスターから新しいクラスターにデータを移行する必要があります。 Kafka MM2 を使用してデータを移行し、ビジネスの継続性を確保できます。
- データ集約:Kafka MM2 を使用して、複数の Kafka サブクラスターから Kafka 中央クラスターにデータを同期できます。 このようにして、データを集約できます。
機能
データレプリケーションツールとして、Kafka MM2 は以下の機能を提供します。
- トピックのデータと構成情報をレプリケートします。
- コンシューマーグループのオフセット情報と消費されたトピックをレプリケートします。
- アクセス制御リスト(ACL)をレプリケートします。
- 新しいトピックとパーティションを自動的に検出します。
- Kafka MM2 メトリックを提供します。
- 水平方向にスケーラブルな高可用性アーキテクチャを提供します。
ジョブ実行方法
Kafka MM2 ジョブは、次の 3 つの方法で実行できます。
- 分散モードで既存の Kafka Connect クラスターで Kafka MM2 ジョブを実行します。 この方法をお勧めします。 このトピックで説明されている機能を使用して、Kafka MM2 ジョブを管理できます。
- ドライバプログラムを使用してすべての Kafka MM2 ジョブを管理します。 詳細については、「専用クラスターに MM2 をデプロイしてクラスター間でデータを同期する」をご参照ください。
- 単一の MirrorSourceConnector ジョブを実行します。これはテストシナリオに適しています。
説明 最初の方法を使用することをお勧めします。 この方法を使用すると、Kafka Connect クラスターによって提供されるリセットサービスを使用して Kafka MM2 ジョブを管理できます。
Kafka MM2 の詳細については、「Apache Kafka ドキュメント」をご参照ください。
前提条件
emrsource という名前のソース Kafka クラスターと emrdest という名前の宛先 Kafka クラスターが作成されています。 クラスターの作成時に、オプションサービス(少なくとも 1 つ選択)として Kafka が選択されています。 詳細については、「クラスターを作成する」をご参照ください。説明 このトピックでは、ソースクラスターと宛先クラスターの E-MapReduce(EMR)バージョンは V3.42.0 です。 両方のクラスターは DataFlow タイプであり、同じ仮想プライベートクラウド(VPC)にあります。
制限事項
宛先クラスターの Kafka バージョンは 2.12_2.4.1 以降である必要があります。
手順
手順 1:宛先 Kafka クラスターに Kafka Connect クラスターを作成する
- ノードグループを作成します。EMR コンソールの emrdest クラスターの [ノード] ページで、ノードグループを作成します。
- [ノードグループの作成] をクリックします。
- [マシン グループの追加] パネルで、次の表に示すパラメーターを構成します。 必要に応じて他のパラメーターも構成します。
パラメーター 説明 ノードグループタイプ [TASK(タスクノードグループ)] を選択します。 ノードグループ名 この例では、名前は emr-task です。 ストレージ構成 データディスクを選択します。
- ノードグループをスケールアウトします。
- [ノード] ページで、emr-task ノードグループを見つけ、[アクション] 列の [スケールアウト] をクリックします。
- 表示されるダイアログボックスで、追加するインスタンスの数と [サービス規約] を入力して選択します。この例では、1 つのインスタンスが追加されます。 必要に応じてインスタンスの数を指定できます。 高可用性 Kafka Connect クラスターが必要な場合は、2 つ以上のインスタンスを追加することをお勧めします。
- [OK] をクリックします。
- Kafka Connect クラスターのステータスを確認して、クラスターが実行されていることを確認します。
- ページの上部にある [サービス] をクリックします。
- Kafka 詳細に関するセクションで、[ステータス] をクリックします。
- [コンポーネント] セクションで、[kafkaconnect] のステータスを確認して、コンポーネントが実行されていることを確認します。

- セキュアシェル(SSH)を使用して emrdest クラスターにログオンします。 詳細については、「クラスターにログオンする」をご参照ください。
- 次のコマンドを実行して、Kafka Connect クラスターによって提供されるリセットサービスのステータスを確認します。
curl -X GET http://task-1-1:8083| jq .次のような情報が返されます。% Total % Received % Xferd Average Speed Time Time Time Current Dload Upload Total Spent Left Speed 100 91 100 91 0 0 13407 0 --:--:-- --:--:-- --:--:-- 15166 { "version": "2.4.1", "commit": "42ce056344c5625a", "kafka_cluster_id": "6Z7IdHW4SVO1Pbql4c****" }
手順 2:Kafka MM2 コネクタを使用する
- Kafka MM2 コネクタの構成ファイルを準備します。次のファイルが含まれます。
- MirrorSourceConnector の構成ファイルこの例では、ファイル名は mm2-source-connector.json です。 次のサンプルコードは、ファイルの内容を示しています。 必要に応じてファイル内のパラメーターを変更します。 詳細については、「KIP-382: MirrorMaker 2.0」をご参照ください。
{ "name": "mm2-source-connector", "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector", "clusters": "emrsource,emrdest", "source.cluster.alias": "emrsource", "target.cluster.alias": "emrdest", "target.cluster.bootstrap.servers": "core-1-1:9092;core-1-2:9092;core-1-3:9092", "source.cluster.bootstrap.servers": "10.0.**.**:9092", "topics": "^foo.*", "tasks.max": "4", "key.converter": " org.apache.kafka.connect.converters.ByteArrayConverter", "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "replication.factor": "3", "offset-syncs.topic.replication.factor": "3", "sync.topic.acls.interval.seconds": "20", "sync.topic.configs.interval.seconds": "20", "refresh.topics.interval.seconds": "20", "refresh.groups.interval.seconds": "20", "consumer.group.id": "mm2-mirror-source-consumer-group", "producer.enable.idempotence":"true", "source.cluster.security.protocol": "PLAINTEXT", "target.cluster.security.protocol": "PLAINTEXT" }説明 サンプルコードのパラメーター:source.cluster.bootstrap.servers:emrsource クラスターの Kafka サービスエンドポイント。 emrsource クラスターと Kafka Connect クラスターが接続できることを確認してください。topics:レプリケートするトピック。 この例では、名前が foo で始まるトピックがレプリケートされます。
- MirrorCheckpointConnector の構成ファイルこの例では、ファイル名は mm2-checkpoint-connector.json です。 次のサンプルコードは、ファイルの内容を示しています。 必要に応じてファイル内のパラメーターを変更します。 詳細については、「KIP-382: MirrorMaker 2.0」をご参照ください。
{ "name": "mm2-checkpoint-connector", "connector.class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector", "clusters": "emrsource,emrdest", "source.cluster.alias": "emrsource", "target.cluster.alias": "emrdest", "target.cluster.bootstrap.servers": "core-1-1:9092;core-1-2:9092;core-1-3:9092", "source.cluster.bootstrap.servers": "10.0.**.**:9092", "tasks.max": "1", "key.converter": " org.apache.kafka.connect.converters.ByteArrayConverter", "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "replication.factor": "3", "checkpoints.topic.replication.factor": "3", "emit.checkpoints.interval.seconds": "20", "source.cluster.security.protocol": "PLAINTEXT", "target.cluster.security.protocol": "PLAINTEXT" } - MirrorHeartbeatConnector の構成ファイルこの例では、ファイル名は mm2-heartbeat-connector.json です。 次のサンプルコードは、ファイルの内容を示しています。 必要に応じてファイル内のパラメーターを変更します。 詳細については、「KIP-382: MirrorMaker 2.0」をご参照ください。
{ "name": "mm2-heartbeat-connector", "connector.class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector", "clusters": "emrsource,emrdest", "source.cluster.alias": "emrsource", "target.cluster.alias": "emrdest", "target.cluster.bootstrap.servers": "core-1-1:9092;core-1-2:9092;core-1-3:9092", "source.cluster.bootstrap.servers": "10.0.**.**:9092", "tasks.max": "1", "key.converter": " org.apache.kafka.connect.converters.ByteArrayConverter", "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "replication.factor": "3", "heartbeats.topic.replication.factor": "3", "emit.heartbeats.interval.seconds": "20", "source.cluster.security.protocol": "PLAINTEXT", "target.cluster.security.protocol": "PLAINTEXT" }
- MirrorSourceConnector の構成ファイル
- MirrorSourceConnector を使用します。
- mm2-source-connector.json ファイルに基づいて、REST サービスを使用して MirrorSourceConnector ジョブを作成します。
curl -X PUT -H "Content-Type: application/json" --data @mm2-source-connector.json http://task-1-1:8083/connectors/mm2-source-connector/config - MirrorSourceConnector のステータスを表示します。
curl -s task-1-1:8083/connectors/mm2-source-connector/status | jq .
- mm2-source-connector.json ファイルに基づいて、REST サービスを使用して MirrorSourceConnector ジョブを作成します。
- MirrorCheckpointConnector を使用します。
- mm2-checkpoint-connector.json ファイルに基づいて、REST サービスを使用して MirrorCheckpointConnector ジョブを作成します。
curl -X PUT -H "Content-Type: application/json" --data @mm2-checkpoint-connector.json http://task-1-1:8083/connectors/mm2-checkpoint-connector/config - MirrorCheckpointConnector のステータスを表示します。
curl -s task-1-1:8083/connectors/mm2-checkpoint-connector/status | jq .
- mm2-checkpoint-connector.json ファイルに基づいて、REST サービスを使用して MirrorCheckpointConnector ジョブを作成します。
- MirrorHeartbeatConnector を使用します。
- mm2-heartbeat-connector.json ファイルに基づいて、REST サービスを使用して MirrorHeartbeatConnector ジョブを作成します。
curl -X PUT -H "Content-Type: application/json" --data @mm2-heartbeat-connector.json http://task-1-1:8083/connectors/mm2-heartbeat-connector/config - MirrorHeartbeatConnector のステータスを表示します。
curl -s task-1-1:8083/connectors/mm2-heartbeat-connector/status | jq .
- mm2-heartbeat-connector.json ファイルに基づいて、REST サービスを使用して MirrorHeartbeatConnector ジョブを作成します。
- 次のコマンドを実行して、emrdest クラスター内の Kafka MM2 に関連するトピックを表示します。
kafka-topics.sh --list --bootstrap-server core-1-1:9092この例では、次のトピックを表示できます。- 名前が foo で始まり、emrsource クラスターにあるトピック。 トピックは MirrorSourceConnector によって作成されます。
トピックは emrsource クラスター内の既存のトピックであり、レプリケートされます。
- MirrorCheckpointConnector によって作成され、オフセット情報の保存に使用される emrsource.checkpoints.internal トピック。
- MirrorHeartbeatConnector によって作成された heartbeats トピック。
- 名前が foo で始まり、emrsource クラスターにあるトピック。 トピックは MirrorSourceConnector によって作成されます。