すべてのプロダクト
Search
ドキュメントセンター

E-MapReduce:専用クラスターに MM2 をデプロイしてクラスター間でデータを同期する

最終更新日:Jan 11, 2025

このトピックでは、E-MapReduce(EMR)のクラスタースクリプト機能を使用して、MirrorMaker 2.0(MM2)サービスを迅速にデプロイおよび使用してデータを同期する方法について説明します。

背景情報

このトピックでは、EMR Dataflow クラスターをデータの同期先クラスターとして使用し、MM2 を専用モードでクラスターにデプロイします。このように、EMR Dataflow クラスターは、宛先クラスターと専用の MirrorMaker クラスターの両方として使用されます。実際のビジネスシナリオでは、MirrorMaker クラスターを別のサーバーにデプロイできます。

Kafka MM2 は、次のシナリオに適しています。
  • リモートデータ同期:Kafka MM2 を使用して、異なるリージョンのクラスター間でデータを同期できます。
  • ディザスタリカバリ:Kafka MM2 を使用して、異なるデータセンターにあるプライマリクラスターとセカンダリクラスターで構成されるディザスタリカバリアーキテクチャを構築できます。 2 つのクラスターのデータはリアルタイムで同期できます。 1 つのクラスターが使用できなくなった場合は、そのクラスター内のアプリケーションを別のクラスターに転送できます。これにより、地理的なディザスタリカバリが保証されます。
  • データ移行:ビジネスのクラウド移行、ハイブリッドクラウド、クラスターのアップグレードなどのシナリオでは、データを元のクラスターから新しいクラスターに移行する必要があります。 Kafka MM2 を使用してデータを移行し、ビジネスの継続性を確保できます。
  • データ集約:Kafka MM2 を使用して、複数の Kafka サブクラスターから Kafka 中央クラスターにデータを同期できます。このようにして、データを集約できます。
データレプリケーションツールとして、Kafka MM2 は次の機能を提供します。
  • トピックのデータと構成情報をレプリケートします。
  • コンシューマーグループのオフセット情報と消費されたトピックをレプリケートします。
  • アクセス制御リスト(ACL)をレプリケートします。
  • 新しいトピックとパーティションを自動的に検出します。
  • Kafka MM2 メトリックを提供します。
  • 水平方向にスケーラブルな高可用性アーキテクチャを提供します。
MM2 タスクは、次の 3 つの方法のいずれかを使用して実行できます。
  • 方法 1(推奨):既存の分散 Kafka Connect クラスターで MM2 コネクタタスクを実行します。詳細については、「Kafka MM2 を使用してクラスター間でデータを同期する」をご参照ください。
  • 方法 2:専用の MirrorMaker クラスターをデプロイします。このようにして、ドライバプログラムを実行してすべての MM2 タスクを管理できます。

    このトピックを参照して、ドライバプログラムを実行して MM2 タスクを管理できます。

  • 方法 3:単一の Connect ワーカーで MirrorSourceConnector タスクを実行します。この方法は、テストシナリオに適しています。
説明 分散 Kafka Connect クラスターで MM2 コネクタタスクを実行することをお勧めします。 Connect クラスターの REST サービスを使用して MM2 タスクを管理できます。

前提条件

  • 2 つのクラスターが作成され、クラスター作成中にオプションサービスから Kafka サービスが選択されています。1 つはソースクラスター、もう 1 つは宛先の EMR Dataflow クラスターです。 Dataflow クラスターの作成方法の詳細については、「クラスターを作成する」をご参照ください。
    説明 この例では、ソースクラスターと宛先クラスターはどちらも EMR V3.42.0 の Dataflow クラスターです。
  • オブジェクトストレージサービス(OSS)にバケットが作成されています。詳細については、「バケットを作成する」をご参照ください。

制限事項

EMR Dataflow クラスター用に選択されている Kafka サービスのバージョンは 2.12_2.4.1 以降である必要があります。

手順

  1. MM2 の mm2.properties 構成ファイルを準備し、構成ファイルを OSS バケットにアップロードします。
    次の構成は参照用です。src.bootstrap.servers パラメーターと dest.bootstrap.servers パラメーターの値をソースクラスターと宛先クラスターに変更し、実際のビジネス要件に基づいて他のパラメーターを構成する必要があります。 MM2 構成の詳細については、「Configuring Geo-Replication」をご参照ください。
    # see org.apache.kafka.clients.consumer.ConsumerConfig for more details
    
    # Sample MirrorMaker 2.0 top-level configuration file
    # Run with ./bin/connect-mirror-maker.sh connect-mirror-maker.properties
    
    # specify any number of cluster aliases
    clusters = src, dest
    
    # connection information for each cluster
    src.bootstrap.servers = <your source kafka cluster servers>
    dest.bootstrap.servers = <your destination kafka cluster servers>
    
    # enable and configure individual replication flows
    src->dest.enabled = true
    src->dest.topics = foo-.*
    groups=.*
    topics.blacklist="__.*"
    
    # customize as needed
    replication.factor=3
  2. kafka_mm2_deploy.sh デプロイメントスクリプトを準備し、スクリプトを OSS バケットにアップロードします。
    #!/bin/bash
    SIGNAL=${SIGNAL:-TERM}
    PIDS=$(ps ax | grep -i 'org.apache.kafka.connect.mirror.MirrorMaker' | grep java | grep -v grep | awk '{print $1}')
    if [ -n "$PIDS" ]; then
      echo "stop the exist mirror maker server."
      kill -s $SIGNAL $PIDS
    fi
    KAFKA_CONF=/etc/taihao-apps/kafka-conf/kafka-conf
    TAIHAO_EXECUTOR=/usr/local/taihao-executor-all/executor/1.0.1
    cd $KAFKA_CONF
    if [ -e "./mm2.properties" ]; then
      mv mm2.properties mm2.properties.bak
    fi
    ${TAIHAO_EXECUTOR}/ossutil64 cp oss://<yourBuket>/mm2.properties ./ -e <yourEndpoint> -i <yourAccessKeyId> -k <yourAccessKeySecret>
    su - kafka <<EOF
    exec connect-mirror-maker.sh -daemon $KAFKA_CONF/mm2.properties
    exit;
    EOF
    次の表は、値を変更する必要があるパラメーターを示しています。変数。ストレージの場所が正しいかどうかを確認します。値が正しくない場合は、値を実際の場所に 変更する必要があります。
    パラメーター説明
    KAFKA_CONF変数。ストレージの場所が正しいかどうかを確認します。値が正しくない場合は、値を実際の場所に 변경する必要があります。
    TAIHAO_EXECUTOR
    oss://<yourBucket>/mm2.propertiesmm2.properties 構成ファイルのストレージパス。値を mm2.properties 構成ファイルの実際のストレージパスに置き換えます。
    <yourEndpoint>OSS サービスのエンドポイント。
    <yourAccessKeyId>Alibaba Cloud アカウントの AccessKey ID。
    <yourAccessKeySecret>Alibaba Cloud アカウントの AccessKey シークレット。
  3. EMR コンソールでスクリプトを実行します。詳細については、「手動でスクリプトを実行する」をご参照ください。
    説明 実行スクリプトを作成するときは、スクリプトが実行される正しいノードを選択する必要があります。ほとんどの場合、すべてのブローカーが選択されます。
    実行が完了すると、Kafka クラスター間でデータが同期されます。