このトピックでは、KafkaクラスタのO&M操作によって生成されるトラフィックを調整する方法について説明します。これにより、通常のビジネストラフィックがO&Mトラフィックの影響を受けるのを防ぎます。この例では、E-MapReduce(EMR)クラスタの作成中にKafka 2.4.1が選択されています。
背景情報
- パーティションが再割り当てされるシナリオ
- ノード内のレプリカが異なるディレクトリに移動されるシナリオ
- クラスタ内のブローカーが復旧されたときにレプリカ内のデータが同期されるシナリオ
注意事項
- Kafkaのトラフィック構成、ビジネスシナリオ、およびO&Mシナリオに基づいて、O&Mトラフィックを調整するかどうかを決定する必要があります。
- O&Mトラフィックの調整しきい値は、特定のビジネスシナリオに基づいて指定する必要があります。ほとんどの場合、調整しきい値が小さいと、O&M操作を正常に実行できません。調整しきい値が大きいと、I/O競合や帯域幅のフルロードなどの問題が発生する可能性があります。これは通常のビジネストラフィックに影響を与える可能性があります。クラスタ内のリソースを評価し、適切な調整しきい値を指定する必要があります。
- 調整しきい値は、トピックのビジネストラフィック量、ビジネスが耐えられるレイテンシ、ビジネスシナリオでKafkaサービスを中断できるかどうか、KafkaクラスタのディスクとネットワークのI/O帯域幅などの要因に基づいて指定する必要があります。
- ほとんどの場合、オフピーク時にO&M操作を実行することをお勧めします。
KafkaのO&Mトラフィックの調整
調整機能のパラメータ
| パラメータ | 説明 |
| leader.replication.throttled.replicas | トピック内でO&Mトラフィックを調整する必要があるパーティションのリーダーレプリカ。
|
| follower.replication.throttled.replicas | トピック内でO&Mトラフィックを調整する必要があるパーティションのフォロワーレプリカ。
|
| leader.replication.throttled.rate | ブローカー上のリーダーレプリカの読み取りトラフィック。 |
| follower.replication.throttled.rate | ブローカー上のフォロワーレプリカの書き込みトラフィック。 |
調整機能のパラメータのクエリ
kafka-configs.sh コマンドを実行して、調整機能のパラメータをクエリできます。
- 指定されたブローカーのパラメータをクエリするには、次のコマンドを実行します。
kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type brokers --entity-name <your broker id> --describe - 指定されたトピックのパラメータをクエリするには、次のコマンドを実行します。
kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type topics --entity-name <your topic name> --describe
パーティションが再割り当てされるシナリオでのO&Mトラフィックの調整
- 調整しきい値は小さすぎてはいけません。そうしないと、パーティションの再割り当てがトリガーされない可能性があります。
- 調整機能は、レプリカに対する通常のフェッチ操作のトラフィックを調整しません。
- ジョブが完了したら、verifyパラメータを使用して、トピックとブローカーから調整機能のパラメータを削除する必要があります。
- 調整機能のパラメータをすでに設定している場合は、
executeコマンドを実行してパラメータを変更できます。 - 調整機能のパラメータを設定していない場合は、
kafka-configs.shコマンドを実行して、トピックの leader.replication.throttled.replicas パラメータと follower.replication.throttled.replicas パラメータを変更し、ブローカーの leader.replication.throttled.rate パラメータと follower.replication.throttled.rate パラメータを変更できます。
ほとんどの場合、kafka-reassign-partitions.sh ツールを使用してパーティションを再割り当てし、調整機能のパラメータを使用して調整しきい値を指定します。次のセクションでは、このシナリオの例を示します。
- テストトピックを作成します。
- KafkaクラスタのマスターノードにSSHモードでログオンします。詳細については、「クラスタへのログオン」をご参照ください。
- 次のコマンドを実行して、トピックを作成します。
kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-throttled --partitions 1 --replication-factor 3 --create次のコマンドを実行して、トピックの詳細をクエリします。kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-throttled --describe
- 次のコマンドを実行して、データ書き込みをシミュレートします。
kafka-producer-perf-test.sh --topic test-throttled --record-size 1000 --num-records 600000000 --print-metrics --throughput 10240 --producer-props acks=-1 linger.ms=0 bootstrap.servers=core-1-1:9092 - 調整機能のパラメータを設定し、パーティションを再割り当てします。
- reassign.json という名前のファイルをパーティションの再割り当て用に作成し、次のコンテンツをファイルに追加します。
{"version":1,"partitions":[{"topic":"test-throttled","partition":0,"replicas":[2,0,3],"log_dirs":["any","any","any"]}]} - 次のコマンドを実行して、パーティションを再割り当てします。この例では、シミュレートされた書き込み速度は 10 Mbit/s です。パーティションの再割り当ての調整しきい値を 30 Mbit/s に設定します。
kafka-reassign-partitions.sh --bootstrap-server core-1-1:9092 --reassignment-json-file reassign.json --throttle 30000000 --execute
- reassign.json という名前のファイルをパーティションの再割り当て用に作成し、次のコンテンツをファイルに追加します。
- 調整機能のパラメータをクエリします。
- 指定されたブローカーのパラメータをクエリするには、次のコマンドを実行します。
kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type brokers --entity-name 2 --describe - 指定されたトピックのパラメータをクエリするには、次のコマンドを実行します。
kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type topics --entity-name test-throttled --describe
- 指定されたブローカーのパラメータをクエリするには、次のコマンドを実行します。
- ジョブの結果を表示します。
kafka-reassign-partitions.sh --bootstrap-server core-1-1:9092 --reassignment-json-file reassign.json --verify説明 ジョブが完了したら、上記のコマンドを再度実行して、調整機能のパラメータを削除します。
ノード内のレプリカが異なるディレクトリに移動されるシナリオでのO&Mトラフィックの調整
kafka-reassign-partitions.sh ツールを使用して、ブローカー内のレプリカを移行できます。 replica-alter-log-dirs-throttle パラメータを使用して、ブローカー内の移行 I/O を制限できます。次のセクションでは、このシナリオの例を示します。
- テストトピックを作成します。
- KafkaクラスタのマスターノードにSSHモードでログオンします。詳細については、「クラスタへのログオン」をご参照ください。
- 次のコマンドを実行して、トピックを作成します。
kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-throttled --partitions 1 --replication-factor 3 --create次のコマンドを実行して、トピックの詳細をクエリします。kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-throttled --describe
- 次のコマンドを実行して、データ書き込みをシミュレートします。
kafka-producer-perf-test.sh --topic test-throttled --record-size 1000 --num-records 600000000 --print-metrics --throughput 10240 --producer-props acks=-1 linger.ms=0 bootstrap.servers=core-1-1:9092 - replica-alter-log-dirs-throttle パラメータを設定し、レプリカを移動します。
- reassign.json という名前のファイルを作成し、宛先ディレクトリを reassign.json ファイルに書き込みます。次のコンテンツをファイルに追加します。
{"version":1,"partitions":[{"topic":"test-throttled","partition":0,"replicas":[2,0,3],"log_dirs":["any","/mnt/disk1/kafka/log","any"]}]} - 次のコマンドを実行して、レプリカを移動します。
kafka-reassign-partitions.sh --bootstrap-server core-1-1:9092 --reassignment-json-file reassign.json --replica-alter-log-dirs-throttle 30000000 --execute
- reassign.json という名前のファイルを作成し、宛先ディレクトリを reassign.json ファイルに書き込みます。次のコンテンツをファイルに追加します。
- 調整機能のパラメータをクエリします。ブローカー内のディレクトリ間でレプリカを移動する場合、Brokerreplica.alter.log.dirs.io.max.bytes.per.second パラメータを使用して、ブローカーの調整しきい値を指定します。次のコマンドを実行して、指定されたブローカーのパラメータをクエリします。
kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type brokers --describe --entity-name 0 - ジョブの結果を表示します。
kafka-reassign-partitions.sh --bootstrap-server core-1-1:9092 --reassignment-json-file reassign.json --verify説明 ジョブが完了したら、上記のコマンドを再度実行して、調整機能のパラメータを削除します。
クラスタ内のブローカーが復旧されたときにレプリカ内のデータが同期されるシナリオでのO&Mトラフィックの調整
- 調整しきい値は小さすぎてはいけません。そうしないと、パーティションの再割り当てがトリガーされない可能性があります。
- 調整機能は、レプリカに対する通常のフェッチ操作のトラフィックを調整しません。
- データが復旧されたら、
kafka-configs.shコマンドを実行して、調整機能のパラメータを削除する必要があります。
ブローカーが再起動されると、リーダーレプリカからレプリカデータが同期されます。ブローカーの移行や破損したディスクの修復などのシナリオでは、ブローカーはデータリカバリのために失われたレプリカデータを同期する必要があります。これにより、大量の同期トラフィックが発生します。このようなシナリオでは、データリカバリによって発生するトラフィックの急増によって通常のトラフィックが影響を受けるのを防ぐために、同期トラフィックを調整する必要があります。次のセクションでは、このシナリオの例を示します。
- テストトピックを作成します。
- KafkaクラスタのマスターノードにSSHモードでログオンします。詳細については、「クラスタへのログオン」をご参照ください。
- 次のコマンドを実行して、トピックを作成します。
kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-throttled --partitions 1 --replication-factor 3 --create次のコマンドを実行して、トピックの詳細をクエリします。kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-throttled --describe
- 次のコマンドを実行して、テストデータを書き込みます。
kafka-producer-perf-test.sh --topic test-throttled --record-size 1000 --num-records 600000000 --print-metrics --throughput 10240 --producer-props acks=-1 linger.ms=0 bootstrap.servers=core-1-1:9092 kafka-configs.shコマンドを実行して、調整機能のパラメータを設定します。// テストトピックの調整機能のパラメータを設定します。 kafka-configs.sh --bootstrap-server core-1-1:9092 --entity-type topics --entity-name test-throttled --alter --add-config "leader.replication.throttled.replicas=*,follower.replication.throttled.replicas=*" // ブローカーの調整機能のパラメータを設定します。 kafka-configs.sh --bootstrap-server core-1-1:9092 --entity-type brokers --alter --add-config "leader.replication.throttled.rate=1024,follower.replication.throttled.rate=1024" --entity-name 0 kafka-configs.sh --bootstrap-server core-1-1:9092 --entity-type brokers --alter --add-config "leader.replication.throttled.rate=1024,follower.replication.throttled.rate=1024" --entity-name 1 kafka-configs.sh --bootstrap-server core-1-1:9092 --entity-type brokers --alter --add-config "leader.replication.throttled.rate=1024,follower.replication.throttled.rate=1024" --entity-name 2 ......- EMRコンソールでBroker 1 を停止します。
- Broker 1 からレプリカデータを削除して、データ損失のシナリオをシミュレートします。
rm -rf /mnt/disk2/kafka/log/test-throttled-0/ - EMRコンソールでBroker 1 を起動し、調整機能のパラメータが有効になっているかどうかを確認します。
- Broker 1 でレプリカデータが復旧され、レプリカが同期レプリカ(ISR)リストに表示されたら、
kafka-configs.shコマンドを実行して、調整機能のパラメータを削除します。// テストトピックから調整機能のパラメータを削除します。 kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type topics --alter --delete-config 'leader.replication.throttled.replicas,follower.replication.throttled.replicas' --entity-name test-throttled // ブローカーから調整機能のパラメータを削除します。 kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type brokers --alter --delete-config 'leader.replication.throttled.replicas,follower.replication.throttled.replicas,leader.replication.throttled.rate,follower.replication.throttled.rate' --entity-name 0 ......