このトピックでは、Kafkaクラスターのディスク容量が不足している場合のO&M操作の実行方法について説明します。このトピックでは、E-MapReduce (EMR) Kafka 2.4.1を使用しています。
ビジネスシナリオ
Kafkaはログデータをディスクに保存します。ディスク容量が不足すると、ディスク上のKafkaログディレクトリがオフラインになります。この場合、ディスク上のパーティションレプリカの読み取りまたは書き込みができなくなります。これにより、パーティションの可用性とフォールトトレランスが低下します。リーダーパーティションレプリカが他のブローカーに移行されるため、他のブローカーの負荷が増加します。したがって、ディスク容量が不足している場合は、できるだけ早く問題を解決する必要があります。
概要
このトピックでは、Kafkaクラスターのディスク容量が不足している場合に使用できるO&Mポリシーについて、監視とディスクフルの復旧という2つの観点から説明します。
ディスクフルの監視
Kafkaサービス: CloudMonitorコンソールでEMR KafkaクラスターのOfflineLogDirectoryCountメトリックのアラートルールを設定することで、オフラインのログディレクトリをリアルタイムで検出できます。
ディスクフルの復旧
ディスク上のKafkaログディレクトリがオフラインになった場合は、まずディスク容量が不足しているかどうかを確認する必要があります。
- ディスクのサイズ変更: ディスクのサイズを変更してディスク容量を増やします。このポリシーは、ディスクがブローカーに接続されているシナリオに適用できます。詳細については、このトピックのディスクのサイズ変更セクションをご参照ください。
- ブローカー内でのパーティションの移行: ブローカーのディスクフルから、このブローカーの他のディスクにパーティションを移行します。このポリシーは、ブローカーのディスク使用率に偏りがあるシナリオに適用できます。詳細については、このトピックのブローカー内でのパーティションの移行セクションをご参照ください。
- ログのクリア: 容量が不足しているディスクに書き込まれたログデータをクリアします。このポリシーは、古いログデータを削除できるシナリオに適用できます。詳細については、このトピックのログのクリアセクションをご参照ください。
ディスクのサイズ変更
説明
ブローカーのディスク容量が不足している場合は、このポリシーを使用して、関連する要件を満たすようにディスク容量を増やします。このポリシーの利点は、操作が簡単でリスクが低く、ディスク容量不足の問題を迅速に解決できることです。
シナリオ
このポリシーは、ディスクがブローカーに接続されているシナリオに適用できます。
手順
EMRコンソールで、ブローカーのディスクサイズを変更します。詳細については、「ディスクの拡張」をご参照ください。
ブローカー内でのパーティションの移行
説明
ブローカーのディスク容量が不足すると、ディスク上のKafkaログディレクトリがオフラインになります。その結果、kafka-reassign-partitions.sh ツールを使用してパーティションを移行することはできません。この場合、ブローカーがデプロイされているElastic Compute Service (ECS)インスタンスで操作を実行して、パーティションレプリカデータをブローカーの他のディスクに移動し、対応するKafkaデータディレクトリのメタデータを変更できます。これは、ディスク容量不足の問題の解決に役立ちます。
シナリオ
このポリシーは、ディスクフルと使用率が比較的低いディスクが存在するために、ブローカーのディスク使用率に偏りがあるシナリオに適用できます。
注意事項
- このポリシーでは、ブローカー内でのみディスク間でパーティションを移行できます。
- パーティションの移行により、ディスクのI/Oホットスポットの問題が発生する可能性があります。これは、クラスターのパフォーマンスに影響を与える可能性があります。各移行のサイズと期間がビジネスに与える影響を評価する必要があります。
- このポリシーには、非標準の操作が含まれています。本番クラスターで操作を実行する前に、対応するKafkaクラスターバージョンでポリシーをテストすることをお勧めします。
手順
ディスク容量が不足している場合、ディスク上のKafkaログディレクトリはオフラインになります。この場合、kafka-reassign-partitions.sh ツールを使用してパーティションを移行することはできません。このセクションでは、ファイルを直接移動し、Kafka関連のメタデータを変更することにより、非標準の操作を実行してパーティションを移行する方法について説明します。
- テストトピックを作成します。
- SSHモードでKafkaクラスターのマスターノードにログオンします。詳細については、「クラスターへのログオン」をご参照ください。
- 次のコマンドを実行して、test-topicという名前のテストトピックを作成します。パーティションレプリカは、Broker 0とBroker 1に分散されます。
kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-topic --replica-assignment 0:1 --create次のコマンドを実行して、トピックの詳細をクエリします。kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-topic --describe次の出力が返されます。この場合、Broker 0は同期レプリカ (ISR)リストにあります。Topic: test-topic PartitionCount: 1 ReplicationFactor: 2 Configs: Topic: test-topic Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1
- 次のコマンドを実行して、データ書き込みをシミュレートします。
kafka-producer-perf-test.sh --topic test-topic --record-size 1000 --num-records 600000000 --print-metrics --throughput 10240 --producer-props linger.ms=0 bootstrap.servers=core-1-1:9092 - Broker 0のパーティションのログディレクトリの権限を変更します。
- 次のコマンドを実行して、マスターノードのemr-userアカウントに切り替えます。
su emr-user - 次のコマンドを実行して、管理するコアノードにパスワードなしモードでログオンします。
ssh core-1-1 - 次のsudoコマンドを実行して、root権限を取得します。
sudo su - root - 次のコマンドを実行して、test-topic-0パーティションが存在するディスクを見つけます。
sudo find / -name test-topic-0次の出力が返された場合、パーティションは/mnt/disk4/kafka/log ディレクトリにあります。/mnt/disk4/kafka/log/test-topic-0 - 次のコマンドを実行して、Broker 0のパーティションのログディレクトリの権限を 000 に設定します。
sudo chmod 000 /mnt/disk4/kafka/log - 次のコマンドを実行して、test-topicのステータスをクエリします。
kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-topic --describe次の出力が返されます。Broker 0はISRリストにありません。Topic: test-topic PartitionCount: 1 ReplicationFactor: 2 Configs: Topic: test-topic Partition: 0 Leader: 1 Replicas: 0,1 Isr: 1
- 次のコマンドを実行して、マスターノードのemr-userアカウントに切り替えます。
- Broker 0を停止します。
EMRコンソールでBroker 0のKafkaサービスを停止します。
- 次のコマンドを実行して、Broker 0のtest-topicのパーティションをBroker 0の別のディスクに移動します。
mv /mnt/disk4/kafka/log/test-topic-0 /mnt/disk1/kafka/log/ - ファイルを変更します。
/mnt/disk4/kafka/log ソースディレクトリと/mnt/disk1/kafka/log 宛先ディレクトリのメタデータファイルに基づいて、変更するファイルには、replication-offset-checkpoint とrecovery-point-offset-checkpoint が含まれます。
- replication-offset-checkpoint ファイルの変更: ソースログディレクトリのtest-topic関連のエントリを宛先ログディレクトリのreplication-offset-checkpoint ファイルに移動し、ファイルのエントリ数を変更します。 レプリケーションオフセットチェックポイント

- recovery-point-offset-checkpoint ファイルの変更: ソースログディレクトリのtest-topic関連のエントリを宛先ログディレクトリのrecovery-point-offset-checkpoint ファイルに移動し、ファイルのエントリ数を変更します。 リカバリポイントオフセットチェックポイント

- replication-offset-checkpoint ファイルの変更: ソースログディレクトリのtest-topic関連のエントリを宛先ログディレクトリのreplication-offset-checkpoint ファイルに移動し、ファイルのエントリ数を変更します。 レプリケーションオフセットチェックポイント
- 次のコマンドを実行して、Broker 0のパーティションのログディレクトリの権限を変更します。
sudo chmod 755 /mnt/disk4/kafka/log - Broker 0を起動します。
EMRコンソールでBroker 0のKafkaサービスを起動します。
- 次のコマンドを実行して、クラスターのステータスが正常かどうかを確認します。
kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-topic --describe
ログのクリア
説明
ブローカーのディスク容量が不足している場合は、業務ログデータを時系列で最も古いデータから最新のデータまで、十分なディスク容量が解放されるまで削除します。Kafkaクラスターの内部トピックのデータは削除できません。
シナリオ
このポリシーは、古い業務ログデータをディスクフルから削除できるシナリオに適用できます。
データの保存期間が変更されない場合、ディスクはすぐにいっぱいになる可能性があります。したがって、このポリシーは、特別な状況によりデータが急増するシナリオに一般的に適用されます。
注意事項
アンダースコア (_)で始まる名前のトピックからデータを削除することはできません。
手順
- ディスクがいっぱいになっているマシンにログオンします。
- ディスクフルを見つけ、不要な業務ログデータを削除します。次の点に注意してください。
- Kafkaクラスターのデータディレクトリを削除しないでください。これは、不要なデータ損失を防ぎます。
- 大きな容量を占有しているトピック、または不要になったトピックを見つけます。トピックのパーティションをいくつか選択して、最も古い保存済みログデータからデータを削除します。トピックの一部のパーティションから、履歴ログセグメントと、セグメントのindexおよびtimeindexファイルを削除します。 __consumer_offsetsや_schemaなどの内部トピックのデータは削除しないでください。
- ディスクフルが存在するブローカーを再起動して、ログディレクトリをオンラインにします。