このトピックでは、メッセージ消費エラーを減らすのに役立つ ApsaraMQ for Kafka コンシューマーのベストプラクティスについて説明します。
メッセージ消費のプロセス
ApsaraMQ for Kafka コンシューマーは、データをポーリングし、消費ロジックを実行し、再度データをポーリングするというプロセスに従ってメッセージを消費します。次の図は、このプロセスを示しています。
負荷分散
グループ ApsaraMQ for Kafka 内の各 は、複数のコンシューマーインスタンスで構成されています。複数のコンシューマーを起動し、コンシューマーの group.id パラメーターを同じ値に設定できます。同じ グループ 内のコンシューマーは、グループがサブスクライブしているトピックから負荷分散モードでメッセージを消費します。
たとえば、グループ A がトピック A をサブスクライブし、コンシューマー C1、C2、C3 がグループ内で起動されているとします。この場合、トピック A が受信する各メッセージは、C1、C2、C3 のいずれか 1 つにのみ配信されます。デフォルトでは、ApsaraMQ for Kafka は、消費負荷を分散するためにメッセージをコンシューマーに均等に配信します。
消費における負荷分散を実装するために、ApsaraMQ for Kafka は、サブスクライブされたトピックのパーティションを各コンシューマーに均等に分散します。コンシューマーの数は、パーティションの数を超えることはできません。超えると、特定のコンシューマーにパーティションが割り当てられず、アイドル状態になる可能性があります。負荷分散は、コンシューマーが最初に起動、再起動、追加、または削除されたときにトリガーされます。
コンシューマークライアントでの頻繁なリバランス
クライアントのハートビートがタイムアウトした場合、コンシューマークライアントでリバランスが発生します。リバランスを防ぐには、関連パラメーターを変更するか、消費率を上げます。詳細については、「コンシューマークライアントで頻繁にリバランスが発生するのはなぜですか?」をご参照ください。
パーティション
パーティションの数は、同時コンシューマーの数に影響します。
1 つのパーティション内のメッセージは、同じグループ内の 1 つのコンシューマーのみが消費できます。コンシューマーの数は、パーティションの数を超えることはできません。超えると、特定のコンシューマーにパーティションが割り当てられず、アイドル状態になる可能性があります。
デフォルトでは、ApsaraMQ for Kafka コンソールでパーティションの数は 12 に設定されています。これは、ほとんどのシナリオのビジネス要件を満たすことができます。ビジネス要件に基づいて値を増やすことができます。パーティションの数は 12 ~ 100 の範囲内の値に設定することをお勧めします。12 未満の値は、メッセージの生成と消費のパフォーマンスに影響を与える可能性があります。100 を超える値は、コンシューマークライアントでリバランスをトリガーする可能性があります。
パーティションの数を増やした後、減らすことはできません。パーティションの数を少し増やすことをお勧めします。
サブスクリプションモード
ApsaraMQ for Kafka は、次のサブスクリプションモードをサポートしています。
1 つのグループが複数のトピックをサブスクライブする
1 つのグループは複数のトピックをサブスクライブできます。このサブスクリプションモードでは、複数のトピックからのメッセージは、グループ内のコンシューマーによって均等に消費されます。たとえば、グループ A がトピック A、トピック B、トピック C をサブスクライブしているとします。3 つのトピックからのメッセージは、グループ A 内のコンシューマーによって均等に消費されます。
サンプルコード:
String topicStr = kafkaProperties.getProperty("topic"); String[] topics = topicStr.split(","); for (String topic: topics) { // 各トピックをトリミングして追加 subscribedTopics.add(topic.trim()); } consumer.subscribe(subscribedTopics);複数のグループが 1 つのトピックをサブスクライブする
複数のグループが同じトピックをサブスクライブできます。このサブスクリプションモードでは、各グループはトピックからのすべてのメッセージを個別に消費します。たとえば、グループ A とグループ B がトピック A をサブスクライブしているとします。トピック A が受信する各メッセージは、グループ A とグループ B 内のコンシューマーに配信されます。配信プロセスは相互に独立しており、相互に影響を与えることはありません。
1 つのアプリケーションにつき 1 つのグループ
1 つのアプリケーションにつき 1 つのグループを設定することをお勧めします。これは、各アプリケーションがメッセージを消費するために使用するコードが異なることを意味します。同じアプリケーションで異なるコードを記述する場合は、kafka1.properties や kafka2.properties など、異なる kafka.properties ファイルを準備する必要があります。
コンシューマーオフセット
ApsaraMQ for Kafka では、各トピックには複数のパーティションがあります。各パーティションはメッセージの総数を計算し、これは最大オフセットと呼ばれます。
ApsaraMQ for Kafka コンシューマーは、パーティション内のメッセージを順番に消費し、消費されたメッセージの数を記録します。これはコンシューマーオフセットと呼ばれます。
未消費のメッセージの数は、最大オフセットからコンシューマーオフセットを引くことによって計算されます。この数は、累積メッセージの数を示します。
コンシューマーオフセットのコミット
ApsaraMQ for Kafka は、コンシューマーがコンシューマーオフセットをコミットするための次のパラメーターを提供します。
enable.auto.commit: 自動コミットを有効にするかどうかを指定します。デフォルト値: true。
auto.commit.interval.ms: コンシューマーオフセットが自動的にコミットされる間隔。デフォルト値: 1000。単位: ミリ秒。
上記の パラメーターを設定した後、クライアントは各ポーリングの前にコンシューマーオフセットが最後にコミットされた時間を確認します。オフセットが最後にコミットされた時間と現在の時間の間隔が auto.commit.interval.ms パラメーターで指定された間隔を超える場合、クライアントはコンシューマーオフセットをコミットします。
enable.auto.commit パラメーターを true に設定する場合は、各ポーリングの前に、最後にポーリングされたすべてのデータが消費されていることを確認する必要があります。そうでない場合、未消費のメッセージがスキップされる可能性があります。
コンシューマーオフセットを手動でコミットするには、enable.auto.commit パラメーターを false に設定し、commit(offsets) 関数を呼び出します。
コンシューマーオフセットのリセット
コンシューマーオフセットは、次のシナリオでリセットされます。
ブローカーにオフセットがコミットされていない。たとえば、コンシューマーが最初にブローカーに接続された場合。
無効なオフセットからメッセージがプルされた。たとえば、パーティションの最大オフセットが 10 であるのに、コンシューマーがオフセット 11 から消費を開始した場合。
Java クライアントでは、auto.offset.reset パラメーターを次のいずれかの値に設定して、コンシューマーオフセットのリセット方法を指定できます。
latest: コンシューマーオフセットを最大オフセットにリセットします。
earliest: コンシューマーオフセットを最小オフセットにリセットします。
none: コンシューマーオフセットをリセットしません。
このパラメーターは、earliest ではなく latest に設定することをお勧めします。これにより、無効なオフセットが原因でコンシューマーが最初からメッセージを消費することを防ぎます。
commit(offsets) 関数を呼び出してオフセットをコミットする場合は、このパラメーターを none に設定できます。
大きなメッセージのプル
メッセージ消費中、クライアントはブローカーからメッセージを積極的にプルします。大きなメッセージをプルする場合は、次のパラメーターを設定することでプルレートを制御できます。
max.poll.records: poll メソッドの 1 回の呼び出しで返されるメッセージの最大数。各メッセージのサイズが 1 MB を超える場合は、このパラメーターを 1 に設定することをお勧めします。
fetch.max.bytes: フェッチリクエストで返されるデータの最大量。このパラメーターは、単一メッセージのサイズよりわずかに大きい値に設定します。
max.partition.fetch.bytes: フェッチリクエストでパーティションごとに返されるデータの最大量。このパラメーターは、単一メッセージのサイズよりわずかに大きい値に設定します。
クライアントは大きなメッセージを 1 つずつプルします。
メッセージの重複と消費の冪等性
ApsaraMQ for Kafka の配信セマンティクスは、少なくとも 1 回です。これは、メッセージが失われないように、メッセージが少なくとも 1 回配信されることを意味します。ただし、これはメッセージが重複しないことを保証するものではありません。ネットワークエラーが発生した場合、またはクライアントが再起動した場合、少数のメッセージが繰り返し配信される可能性があります。オンライン取引シナリオなど、コンシューマーがメッセージの重複に敏感な場合は、消費の冪等性を実装する必要があります。
アプリケーションがデータベースアプリケーションの場合は、次の操作を実行して冪等性チェックを実装できます。
メッセージを送信するときに、キーを一意のトランザクション ID として指定します。
メッセージを消費するときに、キーが消費されているかどうかを確認します。キーが消費されている場合は、メッセージをスキップします。キーが消費されていない場合は、メッセージを 1 回消費します。
アプリケーションが少数のメッセージの重複に敏感でない場合は、冪等性チェックは必要ありません。
消費の失敗
ApsaraMQ for Kafka のメッセージは、パーティション内で 1 つずつ消費されます。コンシューマーがメッセージを受信した後に消費ロジックを実行できない場合は、次の方法を使用してトラブルシューティングできます。このような失敗の例として、アプリケーションサーバーのダーティデータが原因でメッセージの処理に失敗することがあります。
失敗時に消費ロジックの実行を継続して試みます。この方法では、消費スレッドが現在のメッセージでブロックされ、メッセージが蓄積される可能性があります。
ApsaraMQ for Kafka は、失敗したメッセージを処理するロジックを指定していません。失敗したメッセージをエクスポートするか、サービスにメッセージを保存できます。たとえば、失敗したメッセージを格納するための専用のトピックを作成できます。その後、失敗したメッセージを定期的にチェックし、原因を分析し、適切な対策を講じることができます。
消費レイテンシ
ApsaraMQ for Kafka では、クライアントはブローカーからメッセージを積極的にプルします。クライアントがデータを迅速に消費できる場合、レイテンシは低くなります。レイテンシが高い場合は、メッセージが蓄積されているかどうかを確認し、ビジネス要件に基づいて消費率を上げます。
消費のブロッキングとメッセージの蓄積
メッセージの蓄積は、コンシューマークライアントで最も一般的な問題です。この問題は、次の原因によって発生する可能性があります。
消費率が生産率よりも低い。この場合は、消費率を上げる必要があります。詳細については、「消費率の向上」をご参照ください。
コンシューマースレッドがブロックされている。
コンシューマーがメッセージを受信した後、コンシューマーは消費ロジックを実行するためにリモートコールを開始します。このプロセス中にコンシューマーがコール結果を待機する場合、コンシューマーは待機し続ける可能性があります。これにより、消費プロセスが中断されます。
コンシューマーは、消費スレッドがブロックされないようにする必要があります。コンシューマーがコール結果を待機する必要がある場合は、待機タイムアウト期間を指定することをお勧めします。このようにして、タイムアウト期間が経過しても結果が返されない場合、消費は失敗したと見なされ、後続のメッセージの消費を続行できます。
消費率の向上
次のいずれかの方法を使用して、消費率を向上させることができます。
コンシューマーの追加
プロセスにコンシューマーを追加し、各コンシューマーが 1 つのスレッドに対応するようにすることができます。または、複数のコンシューマープロセスをデプロイすることもできます。コンシューマーの数がパーティションの数を超えると、消費率を上げることはできず、特定のコンシューマーはアイドル状態になります。
消費スレッドの追加
コンシューマーを追加することは、消費率を上げるために消費スレッドを追加することと同じです。したがって、パフォーマンスを向上させるための重要な方法は、消費スレッドを追加することです。次の手順を実行して、消費スレッドを追加できます。
スレッドプールを定義します。
データをポーリングします。
同時処理のためにスレッドプールにデータを送信します。
同時処理結果が返されたら、再度データをポーリングします。
メッセージのフィルタリング
ApsaraMQ for Kafka は、メッセージフィルタリングのセマンティクスを提供していません。次のいずれかの方法を使用して、メッセージをフィルタリングできます。
少数のタイプのメッセージのみをフィルタリングする場合は、複数のトピックを使用できます。
多数のタイプのメッセージをフィルタリングする場合は、クライアントでビジネスによってメッセージをフィルタリングすることをお勧めします。
ビジネス要件に基づいて、いずれかの方法を使用するか、両方の方法を統合できます。
メッセージのブロードキャスト
ApsaraMQ for Kafka は、メッセージブロードキャストのセマンティクスを提供していません。異なる グループ を作成することで、メッセージブロードキャストをシミュレートできます。
サブスクリプション
トラブルシューティングを容易にするために、同じ グループ 内のコンシューマーは同じトピックをサブスクライブすることをお勧めします。