このトピックでは、E-MapReduce(EMR)のクラスタースクリプト機能を使用して、MirrorMaker 2.0(MM2)サービスを迅速にデプロイおよび使用してデータを同期する方法について説明します。
背景情報
このトピックでは、EMR Dataflow クラスターをデータの同期先クラスターとして使用し、MM2 を専用モードでクラスターにデプロイします。このように、EMR Dataflow クラスターは、宛先クラスターと専用の MirrorMaker クラスターの両方として使用されます。実際のビジネスシナリオでは、MirrorMaker クラスターを別のサーバーにデプロイできます。
Kafka MM2 は、次のシナリオに適しています。
- リモートデータ同期:Kafka MM2 を使用して、異なるリージョンのクラスター間でデータを同期できます。
- ディザスタリカバリ:Kafka MM2 を使用して、異なるデータセンターにあるプライマリクラスターとセカンダリクラスターで構成されるディザスタリカバリアーキテクチャを構築できます。 2 つのクラスターのデータはリアルタイムで同期できます。 1 つのクラスターが使用できなくなった場合は、そのクラスター内のアプリケーションを別のクラスターに転送できます。これにより、地理的なディザスタリカバリが保証されます。
- データ移行:ビジネスのクラウド移行、ハイブリッドクラウド、クラスターのアップグレードなどのシナリオでは、データを元のクラスターから新しいクラスターに移行する必要があります。 Kafka MM2 を使用してデータを移行し、ビジネスの継続性を確保できます。
- データ集約:Kafka MM2 を使用して、複数の Kafka サブクラスターから Kafka 中央クラスターにデータを同期できます。このようにして、データを集約できます。
データレプリケーションツールとして、Kafka MM2 は次の機能を提供します。
- トピックのデータと構成情報をレプリケートします。
- コンシューマーグループのオフセット情報と消費されたトピックをレプリケートします。
- アクセス制御リスト(ACL)をレプリケートします。
- 新しいトピックとパーティションを自動的に検出します。
- Kafka MM2 メトリックを提供します。
- 水平方向にスケーラブルな高可用性アーキテクチャを提供します。
MM2 タスクは、次の 3 つの方法のいずれかを使用して実行できます。
- 方法 1(推奨):既存の分散 Kafka Connect クラスターで MM2 コネクタタスクを実行します。詳細については、「Kafka MM2 を使用してクラスター間でデータを同期する」をご参照ください。
- 方法 2:専用の MirrorMaker クラスターをデプロイします。このようにして、ドライバプログラムを実行してすべての MM2 タスクを管理できます。
このトピックを参照して、ドライバプログラムを実行して MM2 タスクを管理できます。
- 方法 3:単一の Connect ワーカーで MirrorSourceConnector タスクを実行します。この方法は、テストシナリオに適しています。
説明 分散 Kafka Connect クラスターで MM2 コネクタタスクを実行することをお勧めします。 Connect クラスターの REST サービスを使用して MM2 タスクを管理できます。
前提条件
- 2 つのクラスターが作成され、クラスター作成中にオプションサービスから Kafka サービスが選択されています。1 つはソースクラスター、もう 1 つは宛先の EMR Dataflow クラスターです。 Dataflow クラスターの作成方法の詳細については、「クラスターを作成する」をご参照ください。説明 この例では、ソースクラスターと宛先クラスターはどちらも EMR V3.42.0 の Dataflow クラスターです。
- オブジェクトストレージサービス(OSS)にバケットが作成されています。詳細については、「バケットを作成する」をご参照ください。
制限事項
EMR Dataflow クラスター用に選択されている Kafka サービスのバージョンは 2.12_2.4.1 以降である必要があります。
手順
- MM2 の mm2.properties 構成ファイルを準備し、構成ファイルを OSS バケットにアップロードします。次の構成は参照用です。src.bootstrap.servers パラメーターと dest.bootstrap.servers パラメーターの値をソースクラスターと宛先クラスターに変更し、実際のビジネス要件に基づいて他のパラメーターを構成する必要があります。 MM2 構成の詳細については、「Configuring Geo-Replication」をご参照ください。
# see org.apache.kafka.clients.consumer.ConsumerConfig for more details # Sample MirrorMaker 2.0 top-level configuration file # Run with ./bin/connect-mirror-maker.sh connect-mirror-maker.properties # specify any number of cluster aliases clusters = src, dest # connection information for each cluster src.bootstrap.servers = <your source kafka cluster servers> dest.bootstrap.servers = <your destination kafka cluster servers> # enable and configure individual replication flows src->dest.enabled = true src->dest.topics = foo-.* groups=.* topics.blacklist="__.*" # customize as needed replication.factor=3 - kafka_mm2_deploy.sh デプロイメントスクリプトを準備し、スクリプトを OSS バケットにアップロードします。
#!/bin/bash SIGNAL=${SIGNAL:-TERM} PIDS=$(ps ax | grep -i 'org.apache.kafka.connect.mirror.MirrorMaker' | grep java | grep -v grep | awk '{print $1}') if [ -n "$PIDS" ]; then echo "stop the exist mirror maker server." kill -s $SIGNAL $PIDS fi KAFKA_CONF=/etc/taihao-apps/kafka-conf/kafka-conf TAIHAO_EXECUTOR=/usr/local/taihao-executor-all/executor/1.0.1 cd $KAFKA_CONF if [ -e "./mm2.properties" ]; then mv mm2.properties mm2.properties.bak fi ${TAIHAO_EXECUTOR}/ossutil64 cp oss://<yourBuket>/mm2.properties ./ -e <yourEndpoint> -i <yourAccessKeyId> -k <yourAccessKeySecret> su - kafka <<EOF exec connect-mirror-maker.sh -daemon $KAFKA_CONF/mm2.properties exit; EOF次の表は、値を変更する必要があるパラメーターを示しています。パラメーター 説明 KAFKA_CONF 変数。ストレージの場所が正しいかどうかを確認します。値が正しくない場合は、値を実際の場所に 변경する必要があります。 TAIHAO_EXECUTOR 変数。ストレージの場所が正しいかどうかを確認します。値が正しくない場合は、値を実際の場所に 変更する必要があります。oss://<yourBucket>/mm2.properties mm2.properties 構成ファイルのストレージパス。値を mm2.properties 構成ファイルの実際のストレージパスに置き換えます。 <yourEndpoint> OSS サービスのエンドポイント。 <yourAccessKeyId> Alibaba Cloud アカウントの AccessKey ID。 <yourAccessKeySecret> Alibaba Cloud アカウントの AccessKey シークレット。 - EMR コンソールでスクリプトを実行します。詳細については、「手動でスクリプトを実行する」をご参照ください。説明 実行スクリプトを作成するときは、スクリプトが実行される正しいノードを選択する必要があります。ほとんどの場合、すべてのブローカーが選択されます。実行が完了すると、Kafka クラスター間でデータが同期されます。