メッセージの滞留は、コンシューマーラグとも呼ばれ、Kafka を使用する際の一般的なモニタリングメトリックです。メッセージの滞留を理解し、適切に対処することは、システムの安定性、リアルタイム性能、データ整合性を確保する上で重要です。
Kafka のメッセージ滞留とは
Kafka のメッセージ滞留は、コンシューマーがプロデューサーによって書き込まれたメッセージに追いつけない場合に発生します。これにより、未消費のメッセージがパーティションに蓄積されます。
メッセージの総滞留量 = 最新のオフセット (全パーティション) - コンシューマオフセット (全パーティション)
メッセージの総滞留量が多いほど、問題は深刻です。
メッセージの総滞留量がゼロに近い場合、コンシューマーは生成レートに追いついていることを意味します。
Topic: test (Partition 0)
+----+----+----+----+----+----+----+
| M1 | M2 | M3 | M4 | M5 | M6 | M7 | ← 7 件のメッセージが書き込み済み
+----+----+----+----+----+----+----+
↑ ↑
コンシューマオフセット M3 最新のオフセット (M7)
Topic: test (Partition 1)
+----+----+----+----+----+----+
| M1 | M2 | M3 | M4 | M5 | M6 | ← 6 件のメッセージが書き込み済み
+----+----+----+----+----+----+
↑ ↑
コンシューマオフセット M3 最新のオフセット (M6)
現在のメッセージ総滞留量 = 7 - 3 + 6 - 3 = 7
未消費メッセージ 7 件 → 滞留メッセージ 7 件特定のアラートを解決するために、ApsaraMQ for Kafka を使用して、トピックパーティションのコンシューマオフセットを 0 にリセットできます。コンシューマオフセットが 0 の場合、滞留量は 0 になります。
コンシューマオフセットが存在しない場合 (コンシューマーがオフセットをコミットしていない、またはオフセットが期限切れでクリアされたため)、かつグループ内のコンシューマースレッドがオンラインの場合、
メッセージの総滞留量 = 最新のオフセット (全パーティション) - 最も古いオフセット (全パーティション)となります。グループ内のすべてのコンシューマースレッドがオフラインの場合、滞留量は 0 になります。
メッセージ滞留の根本原因
コンシューマーの処理能力不足:複雑な処理ロジック、低速な I/O、または CPU/メモリのボトルネック。
生成レートの急増:トラフィックのピークやバッチインポート。
コンシューマーの頻繁な故障や再起動:クラッシュ、長時間のガベージコレクション (GC) による一時停止、またはデプロイメントの更新。
頻繁なリバランス:コンシューマーが頻繁にグループに参加または離脱する、ハートビートのタイムアウト、またはセッションのタイムアウト。
コンシューマーコードの問題:無限ループ、未捕捉の例外、または
poll()呼び出し間の長い間隔。コンシューマーのレート制限:消費レートがインスタンスの予約済みまたは弾力的な上限に達している。
オフセットコミットの遅延または失敗:これにより、繰り返しのプルや誤った滞留が発生します。
メッセージ滞留の確認方法
滞留メトリックの詳細については、ご利用のインスタンスタイプのドキュメントをご参照ください:
サブスクリプション/従量課金:Prometheus モニタリング
Serverless:ダッシュボード
メッセージ滞留の影響
リアルタイム性能の低下:データ処理の遅延が増加し、ビジネス上の意思決定に影響を与えます。
システム応答の遅延:コンシューマースレッドがブロックされると、タイムアウトが発生し、サーキットブレーカーがトリガーされる可能性があります。
リバランスのリスク増加:コンシューマーの処理遅延はハートビートのタイムアウトを引き起こし、パーティションのリバランスをトリガーします。頻繁なリバランスは、消費が一時停止する期間を延長し、繰り返しのプルの可能性を高め、消費遅延を悪化させます。これにより、負のフィードバックループが生まれます。
Out-of-Memory (OOM) エラーのリスク:コンシューマーが
poll()を呼び出した後、メッセージを迅速に処理しない場合、多くのメッセージがクライアントのメモリバッファーに蓄積されます。これにより、ヒープメモリのオーバーフローが発生する可能性があります。
メッセージ滞留の解決と最適化
コンシューマーのスループット能力の向上:
コンシューマーインスタンスの追加:同じグループにコンシューマーを追加します。パーティション数はコンシューマー数以上である必要があります (
パーティション数 ≥ コンシューマー数)。パーティション数の増加:これにより、並列度が高まります。
非同期処理の使用:時間のかかる操作を非同期にすることで、poll ループを高速化します。
バッチ処理の使用:一度に複数のメッセージを処理します。
コンシューマーパラメーターの調整:
パラメーター
推奨値
説明
max.poll.records1~500
各 poll でプルするメッセージ数を減らし、ネットワークオーバーヘッドを削減します。
fetch.min.bytes1 KB~1 MB
スループットを向上させ、空のポーリングを減らします。
fetch.max.wait.ms500 ms
より多くのデータがまとめて返されるのを待ちます。
session.timeout.ms30 s
コンシューマーがダウンしていると誤ってフラグ付けされるのを防ぎます。
heartbeat.interval.ms≤ session.timeout / 3
正常なハートビートを維持します。
enable.auto.committrue
自動コミットを推奨します。
一時的な緊急措置の実施:
滞留量が多すぎて迅速に処理できない場合は、コンシューマオフセットを最新のオフセットにリセットできます。