すべてのプロダクト
Search
ドキュメントセンター

E-MapReduce:Document

最終更新日:Mar 26, 2026

パーティション再割り当て、ブローカー内レプリカ移行、およびブローカー回復は、いずれも通常のプロデューサーおよびコンシューマーのトラフィックを圧迫する可能性のあるレプリケーション I/O を発生させます。このメンテナンス I/O を速度制限することで、ビジネストラフィックへの影響を防止してください。

本ドキュメントは、E-MapReduce (EMR) Kafka 2.4.1 に適用されます。

速度制限パラメーター

レプリケーションの速度制限を制御するパラメーターは 4 つあります。そのうち 2 つはトピックレベルで設定され、対象とするレプリカを選択します。残りの 2 つはブローカーレベルで設定され、スループットを上限値で制限します。

パラメータースコープ説明
leader.replication.throttled.replicasトピックメンテナンストラフィックの速度制限を適用するリーダーレプリカです。フォーマット: [パーティションID]:[ブローカーID],[パーティションID]:[ブローカーID],... または、トピック内のすべてのリーダーレプリカに適用する場合は * を指定します。
follower.replication.throttled.replicasトピックメンテナンストラフィックの速度制限を適用するフォロワーレプリカです。上記と同じフォーマットです。
leader.replication.throttled.rateブローカーブローカー上のリーダーレプリカに対する読み取りスループットの上限値(バイト/秒)です。
follower.replication.throttled.rateブローカーブローカー上のフォロワーレプリカに対する書き込みスループットの上限値(バイト/秒)です。

注意事項

  • Kafka のトラフィック構成、ビジネスシナリオ、および O&M シナリオに基づき、O&M トラフィックの速度制限を実施するかどうかを判断してください。

  • 速度制限のしきい値は、実際のビジネストラフィック量、許容されるレイテンシー、Kafka サービスの一時的な中断可否、およびディスクおよびネットワークの I/O 帯域幅を踏まえて設定してください。

  • しきい値が低すぎると、メンテナンス操作全体が停止する可能性があります。一方、しきい値が高すぎると、I/O 競合や帯域幅の飽和などの問題が発生し、通常のビジネストラフィックに影響を与える可能性があります。

  • 可能であれば、メンテナンス操作は非ピーク時間帯に実行してください。

現在の速度制限構成の照会

任意のブローカーまたはトピックにおける速度制限パラメーターを照会するには、kafka-configs.sh を使用します。

ブローカーを照会する場合:

kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type brokers --entity-name <broker-id> --describe

トピックを照会する場合:

kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type topics --entity-name <topic-name> --describe

パーティション再割り当て時のトラフィック速度制限

kafka-reassign-partitions.sh スクリプトでは、--throttle フラグを直接指定することで速度制限を適用できます。再割り当てが完了した後は、--verify を実行して、対象となるトピックおよびブローカーから速度制限パラメーターを削除してください。

重要
  • 速度制限は、再割り当てトラフィックにのみ適用され、通常のレプリカフェッチ操作には適用されません。

  • 再割り当て実行中に速度制限値を変更する場合は、新しい --throttle 値を指定して --execute を再実行してください。

  • 再割り当てがすでに開始されており、かつ --throttle を指定しなかった場合は、kafka-configs.sh を使用して、leader.replication.throttled.replicasfollower.replication.throttled.replicasleader.replication.throttled.rate、および follower.replication.throttled.rate を直接設定してください。

操作手順:

  1. SSH 経由で Kafka クラスターのマスターノードにログインします。詳細については、「クラスターへのログイン」をご参照ください。

  2. テスト用トピックを作成します。

    kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-throttled --partitions 1 --replication-factor 3 --create

    トピックの詳細を確認します:

    kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-throttled --describe
  3. バックグラウンドトラフィックを生成するために、データ書き込みをシミュレートします。

    kafka-producer-perf-test.sh --topic test-throttled --record-size 1000 --num-records 600000000 --print-metrics --throughput 10240 --producer-props acks=-1 linger.ms=0 bootstrap.servers=core-1-1:9092
  4. パーティション再割り当て計画を作成します。reassign.json という名前のファイルを以下の内容で作成します:

    {"version":1,"partitions":[{"topic":"test-throttled","partition":0,"replicas":[2,0,3],"log_dirs":["any","any","any"]}]}
  5. 速度制限を指定して再割り当てを実行します。本例では、シミュレートされた書き込み速度は 10 Mbit/s であり、速度制限は 30 Mbit/s に設定されています。

    kafka-reassign-partitions.sh --bootstrap-server core-1-1:9092 --reassignment-json-file reassign.json --throttle 30000000 --execute

    再割り当て実行中に速度制限を変更する場合は、新しい値を指定してコマンドを再実行します:

    kafka-reassign-partitions.sh --bootstrap-server core-1-1:9092 --reassignment-json-file reassign.json --throttle <new-rate-in-bytes> --execute
  6. (任意)速度制限パラメーターが正しく適用されていることを確認します。ブローカー 2 を確認します:

    kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type brokers --entity-name 2 --describe

    トピックを確認します:

    kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type topics --entity-name test-throttled --describe
  7. 再割り当ての完了を確認し、速度制限を解除します。

    kafka-reassign-partitions.sh --bootstrap-server core-1-1:9092 --reassignment-json-file reassign.json --verify

    ジョブが完了したら、上記コマンドを再度実行して、速度制限機能に関連するパラメーターを削除します。

ディレクトリ間でのレプリカ移動時のトラフィック速度制限

レプリカディレクトリの移行時にブローカー内 I/O を制限するには、kafka-reassign-partitions.sh--replica-alter-log-dirs-throttle フラグを指定します。この速度制限は、ブローカーレベルのパラメーター replica.alter.log.dirs.io.max.bytes.per.second に反映されます。

操作手順:

  1. SSH 経由で Kafka クラスターのマスターノードにログインします。詳細については、「クラスターへのログイン」をご参照ください。

  2. テスト用トピックを作成します。

    kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-throttled --partitions 1 --replication-factor 3 --create

    トピックの詳細を確認します:

    kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-throttled --describe
  3. データ書き込みをシミュレートします。

    kafka-producer-perf-test.sh --topic test-throttled --record-size 1000 --num-records 600000000 --print-metrics --throughput 10240 --producer-props acks=-1 linger.ms=0 bootstrap.servers=core-1-1:9092
  4. 移行先ディレクトリを含む再割り当て計画を作成します。reassign.json という名前のファイルを以下の内容で作成します:

    {"version":1,"partitions":[{"topic":"test-throttled","partition":0,"replicas":[2,0,3],"log_dirs":["any","/mnt/disk1/kafka/log","any"]}]}
  5. 速度制限を指定してレプリカ移動を実行します。

    kafka-reassign-partitions.sh --bootstrap-server core-1-1:9092 --reassignment-json-file reassign.json --replica-alter-log-dirs-throttle 30000000 --execute
  6. ブローカー上で速度制限パラメーターが正しく適用されていることを確認します。

    kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type brokers --describe --entity-name 0
  7. 移動が完了したことを確認し、スロットルを解除します。

    説明 --verify をジョブ完了後に再度実行して、速度制限パラメーターを削除してください。
    kafka-reassign-partitions.sh --bootstrap-server core-1-1:9092 --reassignment-json-file reassign.json --verify

ブローカー回復時のトラフィック速度制限

ブローカーを再起動すると、リーダーレプリカからレプリカデータの再同期が行われます。ブローカー移行やディスク修復などのシナリオでは、この同期トラフィックが非常に大きくなる可能性があります。ビジネストラフィックへの影響を防ぐため、ブローカーを停止する前に速度制限を設定してください。

重要
  • 速度制限は、メンテナンス用のレプリケーションにのみ適用され、通常のレプリカフェッチ操作には適用されません。

  • ブローカーが回復し、そのレプリカが in-sync replica (ISR) リストに復帰した後は、kafka-configs.sh を使用して速度制限パラメーターを削除してください。この手順を省略すると、速度制限がすべてのレプリケーショントラフィックに継続して適用されます。

操作手順:

  1. SSH 経由で Kafka クラスターのマスターノードにログインします。詳細については、「クラスターへのログイン」をご参照ください。

  2. テスト用トピックを作成します。

    kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-throttled --partitions 1 --replication-factor 3 --create

    トピックの詳細を確認します:

    kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-throttled --describe
  3. テストデータを書き込みます。

    kafka-producer-perf-test.sh --topic test-throttled --record-size 1000 --num-records 600000000 --print-metrics --throughput 10240 --producer-props acks=-1 linger.ms=0 bootstrap.servers=core-1-1:9092
  4. Broker 1 を停止する前に、トピックおよび各ブローカーに対して速度制限を設定します。トピック内のすべてのレプリカに速度制限を適用します:

    kafka-configs.sh --bootstrap-server core-1-1:9092 --entity-type topics --entity-name test-throttled --alter --add-config "leader.replication.throttled.replicas=*,follower.replication.throttled.replicas=*"

    各ブローカーに速度制限レートを適用します。クラスター内のすべてのブローカーについて同様のコマンドを繰り返します:

    kafka-configs.sh --bootstrap-server core-1-1:9092 --entity-type brokers --alter --add-config "leader.replication.throttled.rate=1024,follower.replication.throttled.rate=1024" --entity-name 0
    kafka-configs.sh --bootstrap-server core-1-1:9092 --entity-type brokers --alter --add-config "leader.replication.throttled.rate=1024,follower.replication.throttled.rate=1024" --entity-name 1
    kafka-configs.sh --bootstrap-server core-1-1:9092 --entity-type brokers --alter --add-config "leader.replication.throttled.rate=1024,follower.replication.throttled.rate=1024" --entity-name 2
    ......
  5. EMR コンソールから Broker 1 を停止します。

  6. データ損失をシミュレートするために、Broker 1 上のレプリカデータを削除します。

    rm -rf /mnt/disk2/kafka/log/test-throttled-0/
  7. EMR コンソールから Broker 1 を起動し、速度制限パラメーターが有効であることを確認します。現在の速度制限構成の照会 で紹介したコマンドを使用して確認してください。

  8. Broker 1 のレプリカが回復し、ISR リストに表示された後、速度制限パラメーターを削除します。トピックから削除する場合:

    kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type topics --alter --delete-config 'leader.replication.throttled.replicas,follower.replication.throttled.replicas' --entity-name test-throttled

    各ブローカーから削除する場合:

    kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type brokers --alter --delete-config 'leader.replication.throttled.replicas,follower.replication.throttled.replicas,leader.replication.throttled.rate,follower.replication.throttled.rate' --entity-name 0
    ......