コンシューマーがプロデューサーに追いつかなくなると、メッセージが蓄積され、処理レイテンシーが急増します。オフセットの誤設定により、メッセージが意図せずスキップされたり、再処理されたりする可能性があります。このガイドでは、ApsaraMQ for Kafka 上で信頼性の高い高スループットのコンシューマーアプリケーションを構築するための実績のあるパターンについて説明します。これには、コンシューマーグループ、オフセット管理、障害処理、パフォーマンスチューニングが含まれます。
メッセージ消費の仕組み
ApsaraMQ for Kafka のコンシューマーは、次の3つのステップサイクルを繰り返します。
ポーリング -- ブローカーからメッセージのバッチをフェッチします。
処理 -- フェッチされたメッセージに対してビジネスロジックを実行します。
再度ポーリング -- 処理が完了した後、次のバッチをフェッチします。
安定したポーリング間隔を維持するために、処理時間を短くしてください。処理遅延が長くなると、リバランスがトリガーされたり、メッセージ蓄積が発生したりします。
コンシューマーグループと負荷分散
コンシューマーグループは、同じ group.id を共有するコンシューマーインスタンスのセットです。ApsaraMQ for Kafka は、サブスクライブされたトピックのパーティションをグループ内のコンシューマーに均等に分散するため、各メッセージは正確に1つのコンシューマーに配信されます。
例: グループAはTopic Aをサブスクライブしています。C1、C2、C3の3つのコンシューマーがグループ内にあります。受信メッセージはそれぞれC1、C2、またはC3のいずれかに送られ、3つすべてで消費負荷が分散されます。
ApsaraMQ for Kafka は、コンシューマーが次の状態のときにリバランスをトリガーします。
初めて起動されたとき
再起動済み
グループへの追加または削除
頻繁なリバランスの防止
リバランスは、パーティションが再割り当てされている間、消費を一時停止します。頻繁なリバランスはスループットを妨げ、処理レイテンシーを増加させます。これらはコンシューマーハートビートがタイムアウトしたときに発生します。
頻繁なリバランスを防ぐには、関連するパラメーターを調整するか、消費レートを向上させてください。詳細なトラブルシューティング手順については、「コンシューマークライアントでリバランスが頻繁に発生するのはなぜですか?」をご参照ください。
パーティションの構成
パーティション数は、いくつのコンシューマーがメッセージを並行処理できるかを制御します。各パーティションはグループ内の正確に1つのコンシューマーに割り当てられます。パーティションよりも多くのコンシューマーがある場合、余分なコンシューマーはアイドル状態になります。
推奨パーティション数
ApsaraMQ for Kafka コンソールでのデフォルトのパーティション数は 12 であり、ほとんどのワークロードで機能します。スケーリング時には、次のガイドラインに従ってください。
| パーティション数 | 影響 |
|---|---|
| 12未満 | メッセージ生成および消費パフォーマンスが低下する可能性があります |
| 12 -- 100 | ほとんどのワークロードで推奨される範囲 |
| 100超 | 頻繁なコンシューマーリバランスをトリガーする可能性があります |
パーティション数を増やした後で減らすことはできません。パーティションは、大幅に増やすのではなく、段階的に増やしてください。
サブスクリプションパターン
ApsaraMQ for Kafka は2つのサブスクリプションパターンをサポートしています。
1つのグループが複数のトピックをサブスクライブ
単一のコンシューマーグループは複数のトピックをサブスクライブできます。サブスクライブされたすべてのトピックからのメッセージは、グループ内のコンシューマーに均等に分散されます。
String topicStr = kafkaProperties.getProperty("topic");
String[] topics = topicStr.split(",");
for (String topic: topics) {
subscribedTopics.add(topic.trim());
}
consumer.subscribe(subscribedTopics);複数のグループが1つのトピックをサブスクライブ
複数のコンシューマーグループが同じトピックを独立してサブスクライブできます。各グループはすべてのメッセージを受信し、グループは互いに影響を与えることなく独立して動作します。
このパターンは、異なるアプリケーションが同じデータストリームを独立して処理する必要がある場合に使用します。たとえば、リアルタイム分析用のグループと、データアーカイブ用の別のグループなどです。
アプリケーションごとに1つのグループを保持
各アプリケーションには専用のコンシューマーグループを使用してください。単一のアプリケーションが異なる消費ロジックを実行する必要がある場合は、異なる group.id 値を持つ個別の構成ファイル (例: kafka1.properties および kafka2.properties) を作成します。
グループ内のサブスクリプションの一致
同じグループ内のすべてのコンシューマーは、同じトピックセットをサブスクライブする必要があります。サブスクリプションの不一致はトラブルシューティングを複雑にし、予期しないリバランス動作につながる可能性があります。
コンシューマオフセットの管理
各パーティションは、受信したメッセージの総数である 最大オフセット を追跡します。各コンシューマーは、そのパーティションで処理したメッセージの数である コンシューマオフセット を追跡します。この2つの差が メッセージ蓄積 (未消費のバックログ) です。
自動コミットと手動コミット
ApsaraMQ for Kafka は、コンシューマオフセットをコミットするための2つのパラメーターを提供します。
| パラメーター | デフォルト | 説明 |
|---|---|---|
enable.auto.commit | true | 自動オフセットコミットを有効にします |
auto.commit.interval.ms | 1000 (ミリ秒) | 自動コミット間の間隔 |
自動コミットが有効になっている場合、クライアントは各ポーリングの前に最後のコミットからの経過時間を確認します。経過時間が auto.commit.interval.ms を超えると、現在のオフセットをコミットします。
自動コミットを使用する場合、次のポーリングの前に、前のポーリングからのすべてのメッセージが完全に処理されていることを確認してください。コンシューマーが外部データストアに書き込み、その書き込みが失敗した場合でも、自動コミットはオフセットを進める可能性があり、その結果、それらのメッセージが再試行されるのではなく永続的にスキップされることがあります。デフォルトの動作に依存する前に、このトレードオフを理解してください。
オフセットの手動コミット
オフセットを手動でコミットするには、enable.auto.commit を false に設定し、各バッチの処理後に commit(offsets) 関数を呼び出します。
オフセットリセット動作
コンシューマオフセットは、次の2つの状況でリセットされます。
コミットされたオフセットが存在しない場合 -- たとえば、コンシューマーが初めてブローカーに接続するとき。
コミットされたオフセットが無効な場合 -- たとえば、パーティションの最大オフセットが10であるにもかかわらず、コンシューマーがオフセット11から読み取ろうとするとき。
Java クライアントでのリセット動作は、auto.offset.reset で制御します。
| 値 | 動作 |
|---|---|
latest | 最新のメッセージから開始します。これをデフォルトとして使用してください |
earliest | 最も古い利用可能なメッセージから開始します |
none | リセットせずに例外をスローします |
無効なオフセットが発生したときにトピックの履歴全体を再処理するのを避けるため、earliest よりも latest を優先してください。commit(offsets) 関数を使用してオフセットを手動でコミットする場合、オフセットは常に有効であるはずなので、none は安全な選択肢です。
大量メッセージの処理
個々のメッセージが一般的なサイズを超える場合、フェッチ動作を制御するためにこれらのパラメーターを調整します。
| パラメーター | ガイダンス |
|---|---|
max.poll.records | ポーリングあたりの最大レコード数。1 MBを超えるメッセージの場合は 1 に設定します。 |
fetch.max.bytes | フェッチリクエストあたりの最大データ量。予想されるメッセージサイズよりわずかに大きく設定します。 |
max.partition.fetch.bytes | フェッチあたりのパーティションごとの最大データ量。予想されるメッセージサイズよりわずかに大きく設定します。 |
これらの設定により、コンシューマーは大量のメッセージを一度に1つずつフェッチし、過大なバッチによるメモリ負荷を防ぎます。
メッセージ重複の処理
ApsaraMQ for Kafka は 少なくとも1回の配信 セマンティクスを使用します。これは、データ損失を防ぐためにすべてのメッセージが少なくとも1回配信されることを意味しますが、ネットワークエラーやクライアントの再起動中に重複が発生する可能性があります。
べき等な消費の実装
ご利用のアプリケーションが重複に敏感な場合 (たとえば、金融トランザクションや注文処理など) は、アプリケーションレベルで重複排除を実装してください。
生成時に各メッセージに一意キーを付与します (たとえば、トランザクションID)。
消費されたメッセージを処理する前に、そのキーが既に処理されているかどうかを確認します。
既に処理されたキーを持つメッセージはスキップします。
ご利用のアプリケーションが時折の重複を許容する場合 (たとえば、メトリック集約やログ取り込みなど) は、このステップをスキップしてください。
消費失敗の処理
パーティション内のメッセージは順次消費されます。処理が失敗した場合 (たとえば、不正な形式のデータやダウンストリームサービス停止が原因で) は、次のいずれかの戦略を選択してください。
| 戦略 | トレードオフ |
|---|---|
| その場で再試行 | 失敗したメッセージが成功するまで再試行します。実装は簡単ですが、コンシューマースレッドをブロックし、そのパーティションでメッセージ蓄積を引き起こします。 |
| デッドレタートピック | 失敗したメッセージを専用トピックに送信し、後で調査します。消費はブロックせずに続行されますが、障害を調査および再処理するための個別のプロセスが必要です。 |
ほとんどの本番ワークロードでは、デッドレターアプローチによりコンシューマーパイプラインの停止を回避できます。
消費レイテンシーの削減
ApsaraMQ for Kafka はプルベースモデルを使用します。コンシューマーは各ポーリングサイクルでブローカーからメッセージをフェッチします。コンシューマーがプロデューサーに追いついている場合、レイテンシーは低いままです。レイテンシーの急増は通常、メッセージ蓄積を示します。
蓄積の一般的な原因
| 原因 | ソリューション |
|---|---|
| 消費レートが生成レートより遅い | コンシューマーを追加するか、処理スループットを向上させます |
| コンシューマースレッドが低速なリモート呼び出しによってブロックされる | 外部呼び出しにタイムアウトを設定し、有界な待機後にスレッドを解放します |
消費スループットの向上
コンシューマーを追加します。 追加のコンシューマーインスタンス (コンシューマーあたり1スレッド) を起動するか、より多くのコンシューマープロセスをデプロイします。アクティブなコンシューマーの数はパーティション数を超えることはできません。超過分はアイドル状態になります。
処理にスレッドプールを使用します。 ポーリングと処理を分離して作業を並列化します。
固定数のワーカースレッドを持つスレッドプールを定義します。
メッセージのバッチをポーリングします。
各メッセージをスレッドプールに送信して並行処理します。
バッチ内のすべてのタスクが完了した後、次のバッチをポーリングします。
このパターンにより、ポーリングループの応答性を維持しつつ、複数のスレッドに処理作業を分散させることができます。
メッセージのフィルタリング
ApsaraMQ for Kafka は組み込みのメッセージフィルタリングを提供しません。次のいずれかのアプローチを使用して、アプリケーションレベルでフィルタリングを実装してください。
| アプローチ | 使用するタイミング |
|---|---|
| トピックの分離 | 少数の異なるメッセージカテゴリ |
| クライアント側フィルタリング | 多数のカテゴリ、または頻繁に変更されるフィルタリングロジック |
ご利用のユースケースで必要とされる場合は、両方のアプローチを組み合わせてください。広範なカテゴリを異なるトピックにルーティングし、その後コンシューマー側で詳細なフィルターを適用します。
メッセージのブロードキャスト
ApsaraMQ for Kafka はメッセージブロードキャストをネイティブにサポートしていません。すべてのメッセージを複数の独立したコンシューマーに配信するには、すべてのメッセージを受信する必要がある各コンシューマーに対して個別のコンシューマーグループを作成します。各グループは、トピックからメッセージストリーム全体を独立して消費します。