パーティション再割り当て、ブローカー内レプリカ移行、およびブローカー回復は、いずれも通常のプロデューサーおよびコンシューマーのトラフィックを圧迫する可能性のあるレプリケーション I/O を発生させます。このメンテナンス I/O を速度制限することで、ビジネストラフィックへの影響を防止してください。
本ドキュメントは、E-MapReduce (EMR) Kafka 2.4.1 に適用されます。
速度制限パラメーター
レプリケーションの速度制限を制御するパラメーターは 4 つあります。そのうち 2 つはトピックレベルで設定され、対象とするレプリカを選択します。残りの 2 つはブローカーレベルで設定され、スループットを上限値で制限します。
| パラメーター | スコープ | 説明 |
|---|---|---|
leader.replication.throttled.replicas | トピック | メンテナンストラフィックの速度制限を適用するリーダーレプリカです。フォーマット: [パーティションID]:[ブローカーID],[パーティションID]:[ブローカーID],... または、トピック内のすべてのリーダーレプリカに適用する場合は * を指定します。 |
follower.replication.throttled.replicas | トピック | メンテナンストラフィックの速度制限を適用するフォロワーレプリカです。上記と同じフォーマットです。 |
leader.replication.throttled.rate | ブローカー | ブローカー上のリーダーレプリカに対する読み取りスループットの上限値(バイト/秒)です。 |
follower.replication.throttled.rate | ブローカー | ブローカー上のフォロワーレプリカに対する書き込みスループットの上限値(バイト/秒)です。 |
注意事項
Kafka のトラフィック構成、ビジネスシナリオ、および O&M シナリオに基づき、O&M トラフィックの速度制限を実施するかどうかを判断してください。
速度制限のしきい値は、実際のビジネストラフィック量、許容されるレイテンシー、Kafka サービスの一時的な中断可否、およびディスクおよびネットワークの I/O 帯域幅を踏まえて設定してください。
しきい値が低すぎると、メンテナンス操作全体が停止する可能性があります。一方、しきい値が高すぎると、I/O 競合や帯域幅の飽和などの問題が発生し、通常のビジネストラフィックに影響を与える可能性があります。
可能であれば、メンテナンス操作は非ピーク時間帯に実行してください。
現在の速度制限構成の照会
任意のブローカーまたはトピックにおける速度制限パラメーターを照会するには、kafka-configs.sh を使用します。
ブローカーを照会する場合:
kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type brokers --entity-name <broker-id> --describeトピックを照会する場合:
kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type topics --entity-name <topic-name> --describeパーティション再割り当て時のトラフィック速度制限
kafka-reassign-partitions.sh スクリプトでは、--throttle フラグを直接指定することで速度制限を適用できます。再割り当てが完了した後は、--verify を実行して、対象となるトピックおよびブローカーから速度制限パラメーターを削除してください。
速度制限は、再割り当てトラフィックにのみ適用され、通常のレプリカフェッチ操作には適用されません。
再割り当て実行中に速度制限値を変更する場合は、新しい
--throttle値を指定して--executeを再実行してください。再割り当てがすでに開始されており、かつ
--throttleを指定しなかった場合は、kafka-configs.shを使用して、leader.replication.throttled.replicas、follower.replication.throttled.replicas、leader.replication.throttled.rate、およびfollower.replication.throttled.rateを直接設定してください。
操作手順:
SSH 経由で Kafka クラスターのマスターノードにログインします。詳細については、「クラスターへのログイン」をご参照ください。
テスト用トピックを作成します。
kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-throttled --partitions 1 --replication-factor 3 --createトピックの詳細を確認します:
kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-throttled --describeバックグラウンドトラフィックを生成するために、データ書き込みをシミュレートします。
kafka-producer-perf-test.sh --topic test-throttled --record-size 1000 --num-records 600000000 --print-metrics --throughput 10240 --producer-props acks=-1 linger.ms=0 bootstrap.servers=core-1-1:9092パーティション再割り当て計画を作成します。
reassign.jsonという名前のファイルを以下の内容で作成します:{"version":1,"partitions":[{"topic":"test-throttled","partition":0,"replicas":[2,0,3],"log_dirs":["any","any","any"]}]}速度制限を指定して再割り当てを実行します。本例では、シミュレートされた書き込み速度は 10 Mbit/s であり、速度制限は 30 Mbit/s に設定されています。
kafka-reassign-partitions.sh --bootstrap-server core-1-1:9092 --reassignment-json-file reassign.json --throttle 30000000 --execute再割り当て実行中に速度制限を変更する場合は、新しい値を指定してコマンドを再実行します:
kafka-reassign-partitions.sh --bootstrap-server core-1-1:9092 --reassignment-json-file reassign.json --throttle <new-rate-in-bytes> --execute(任意)速度制限パラメーターが正しく適用されていることを確認します。ブローカー 2 を確認します:
kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type brokers --entity-name 2 --describeトピックを確認します:
kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type topics --entity-name test-throttled --describe再割り当ての完了を確認し、速度制限を解除します。
kafka-reassign-partitions.sh --bootstrap-server core-1-1:9092 --reassignment-json-file reassign.json --verifyジョブが完了したら、上記コマンドを再度実行して、速度制限機能に関連するパラメーターを削除します。
ディレクトリ間でのレプリカ移動時のトラフィック速度制限
レプリカディレクトリの移行時にブローカー内 I/O を制限するには、kafka-reassign-partitions.sh に --replica-alter-log-dirs-throttle フラグを指定します。この速度制限は、ブローカーレベルのパラメーター replica.alter.log.dirs.io.max.bytes.per.second に反映されます。
操作手順:
SSH 経由で Kafka クラスターのマスターノードにログインします。詳細については、「クラスターへのログイン」をご参照ください。
テスト用トピックを作成します。
kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-throttled --partitions 1 --replication-factor 3 --createトピックの詳細を確認します:
kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-throttled --describeデータ書き込みをシミュレートします。
kafka-producer-perf-test.sh --topic test-throttled --record-size 1000 --num-records 600000000 --print-metrics --throughput 10240 --producer-props acks=-1 linger.ms=0 bootstrap.servers=core-1-1:9092移行先ディレクトリを含む再割り当て計画を作成します。
reassign.jsonという名前のファイルを以下の内容で作成します:{"version":1,"partitions":[{"topic":"test-throttled","partition":0,"replicas":[2,0,3],"log_dirs":["any","/mnt/disk1/kafka/log","any"]}]}速度制限を指定してレプリカ移動を実行します。
kafka-reassign-partitions.sh --bootstrap-server core-1-1:9092 --reassignment-json-file reassign.json --replica-alter-log-dirs-throttle 30000000 --executeブローカー上で速度制限パラメーターが正しく適用されていることを確認します。
kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type brokers --describe --entity-name 0移動が完了したことを確認し、スロットルを解除します。
説明--verifyをジョブ完了後に再度実行して、速度制限パラメーターを削除してください。kafka-reassign-partitions.sh --bootstrap-server core-1-1:9092 --reassignment-json-file reassign.json --verify
ブローカー回復時のトラフィック速度制限
ブローカーを再起動すると、リーダーレプリカからレプリカデータの再同期が行われます。ブローカー移行やディスク修復などのシナリオでは、この同期トラフィックが非常に大きくなる可能性があります。ビジネストラフィックへの影響を防ぐため、ブローカーを停止する前に速度制限を設定してください。
速度制限は、メンテナンス用のレプリケーションにのみ適用され、通常のレプリカフェッチ操作には適用されません。
ブローカーが回復し、そのレプリカが in-sync replica (ISR) リストに復帰した後は、
kafka-configs.shを使用して速度制限パラメーターを削除してください。この手順を省略すると、速度制限がすべてのレプリケーショントラフィックに継続して適用されます。
操作手順:
SSH 経由で Kafka クラスターのマスターノードにログインします。詳細については、「クラスターへのログイン」をご参照ください。
テスト用トピックを作成します。
kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-throttled --partitions 1 --replication-factor 3 --createトピックの詳細を確認します:
kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-throttled --describeテストデータを書き込みます。
kafka-producer-perf-test.sh --topic test-throttled --record-size 1000 --num-records 600000000 --print-metrics --throughput 10240 --producer-props acks=-1 linger.ms=0 bootstrap.servers=core-1-1:9092Broker 1 を停止する前に、トピックおよび各ブローカーに対して速度制限を設定します。トピック内のすべてのレプリカに速度制限を適用します:
kafka-configs.sh --bootstrap-server core-1-1:9092 --entity-type topics --entity-name test-throttled --alter --add-config "leader.replication.throttled.replicas=*,follower.replication.throttled.replicas=*"各ブローカーに速度制限レートを適用します。クラスター内のすべてのブローカーについて同様のコマンドを繰り返します:
kafka-configs.sh --bootstrap-server core-1-1:9092 --entity-type brokers --alter --add-config "leader.replication.throttled.rate=1024,follower.replication.throttled.rate=1024" --entity-name 0 kafka-configs.sh --bootstrap-server core-1-1:9092 --entity-type brokers --alter --add-config "leader.replication.throttled.rate=1024,follower.replication.throttled.rate=1024" --entity-name 1 kafka-configs.sh --bootstrap-server core-1-1:9092 --entity-type brokers --alter --add-config "leader.replication.throttled.rate=1024,follower.replication.throttled.rate=1024" --entity-name 2 ......EMR コンソールから Broker 1 を停止します。
データ損失をシミュレートするために、Broker 1 上のレプリカデータを削除します。
rm -rf /mnt/disk2/kafka/log/test-throttled-0/EMR コンソールから Broker 1 を起動し、速度制限パラメーターが有効であることを確認します。現在の速度制限構成の照会 で紹介したコマンドを使用して確認してください。
Broker 1 のレプリカが回復し、ISR リストに表示された後、速度制限パラメーターを削除します。トピックから削除する場合:
kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type topics --alter --delete-config 'leader.replication.throttled.replicas,follower.replication.throttled.replicas' --entity-name test-throttled各ブローカーから削除する場合:
kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type brokers --alter --delete-config 'leader.replication.throttled.replicas,follower.replication.throttled.replicas,leader.replication.throttled.rate,follower.replication.throttled.rate' --entity-name 0 ......