すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for Kafka:コンシューマーのベストプラクティス

最終更新日:Jan 11, 2025

このトピックでは、メッセージ消費エラーを減らすのに役立つ ApsaraMQ for Kafka コンシューマーのベストプラクティスについて説明します。

メッセージ消費のプロセス

ApsaraMQ for Kafka コンシューマーは、データをポーリングし、消費ロジックを実行し、再度データをポーリングするというプロセスに従ってメッセージを消費します。次の図は、このプロセスを示しています。

負荷分散

グループ ApsaraMQ for Kafka 内の各 は、複数のコンシューマーインスタンスで構成されています。複数のコンシューマーを起動し、コンシューマーの group.id パラメーターを同じ値に設定できます。同じ グループ 内のコンシューマーは、グループがサブスクライブしているトピックから負荷分散モードでメッセージを消費します。

たとえば、グループ A がトピック A をサブスクライブし、コンシューマー C1、C2、C3 がグループ内で起動されているとします。この場合、トピック A が受信する各メッセージは、C1、C2、C3 のいずれか 1 つにのみ配信されます。デフォルトでは、ApsaraMQ for Kafka は、消費負荷を分散するためにメッセージをコンシューマーに均等に配信します。

消費における負荷分散を実装するために、ApsaraMQ for Kafka は、サブスクライブされたトピックのパーティションを各コンシューマーに均等に分散します。コンシューマーの数は、パーティションの数を超えることはできません。超えると、特定のコンシューマーにパーティションが割り当てられず、アイドル状態になる可能性があります。負荷分散は、コンシューマーが最初に起動、再起動、追加、または削除されたときにトリガーされます。

コンシューマークライアントでの頻繁なリバランス

クライアントのハートビートがタイムアウトした場合、コンシューマークライアントでリバランスが発生します。リバランスを防ぐには、関連パラメーターを変更するか、消費率を上げます。詳細については、「コンシューマークライアントで頻繁にリバランスが発生するのはなぜですか?」をご参照ください。

パーティション

パーティションの数は、同時コンシューマーの数に影響します。

1 つのパーティション内のメッセージは、同じグループ内の 1 つのコンシューマーのみが消費できます。コンシューマーの数は、パーティションの数を超えることはできません。超えると、特定のコンシューマーにパーティションが割り当てられず、アイドル状態になる可能性があります。

デフォルトでは、ApsaraMQ for Kafka コンソールでパーティションの数は 12 に設定されています。これは、ほとんどのシナリオのビジネス要件を満たすことができます。ビジネス要件に基づいて値を増やすことができます。パーティションの数は 12 ~ 100 の範囲内の値に設定することをお勧めします。12 未満の値は、メッセージの生成と消費のパフォーマンスに影響を与える可能性があります。100 を超える値は、コンシューマークライアントでリバランスをトリガーする可能性があります。

重要

パーティションの数を増やした後、減らすことはできません。パーティションの数を少し増やすことをお勧めします。

サブスクリプションモード

ApsaraMQ for Kafka は、次のサブスクリプションモードをサポートしています。

  • 1 つのグループが複数のトピックをサブスクライブする

    1 つのグループは複数のトピックをサブスクライブできます。このサブスクリプションモードでは、複数のトピックからのメッセージは、グループ内のコンシューマーによって均等に消費されます。たとえば、グループ A がトピック A、トピック B、トピック C をサブスクライブしているとします。3 つのトピックからのメッセージは、グループ A 内のコンシューマーによって均等に消費されます。

    サンプルコード:

    String topicStr = kafkaProperties.getProperty("topic");
    String[] topics = topicStr.split(",");
    for (String topic: topics) {
        // 各トピックをトリミングして追加
        subscribedTopics.add(topic.trim());
    }
    consumer.subscribe(subscribedTopics);
  • 複数のグループが 1 つのトピックをサブスクライブする

    複数のグループが同じトピックをサブスクライブできます。このサブスクリプションモードでは、各グループはトピックからのすべてのメッセージを個別に消費します。たとえば、グループ A とグループ B がトピック A をサブスクライブしているとします。トピック A が受信する各メッセージは、グループ A とグループ B 内のコンシューマーに配信されます。配信プロセスは相互に独立しており、相互に影響を与えることはありません。

1 つのアプリケーションにつき 1 つのグループ

1 つのアプリケーションにつき 1 つのグループを設定することをお勧めします。これは、各アプリケーションがメッセージを消費するために使用するコードが異なることを意味します。同じアプリケーションで異なるコードを記述する場合は、kafka1.properties や kafka2.properties など、異なる kafka.properties ファイルを準備する必要があります。

コンシューマーオフセット

ApsaraMQ for Kafka では、各トピックには複数のパーティションがあります。各パーティションはメッセージの総数を計算し、これは最大オフセットと呼ばれます。

ApsaraMQ for Kafka コンシューマーは、パーティション内のメッセージを順番に消費し、消費されたメッセージの数を記録します。これはコンシューマーオフセットと呼ばれます。

未消費のメッセージの数は、最大オフセットからコンシューマーオフセットを引くことによって計算されます。この数は、累積メッセージの数を示します。

コンシューマーオフセットのコミット

ApsaraMQ for Kafka は、コンシューマーがコンシューマーオフセットをコミットするための次のパラメーターを提供します。

  • enable.auto.commit: 自動コミットを有効にするかどうかを指定します。デフォルト値: true。

  • auto.commit.interval.ms: コンシューマーオフセットが自動的にコミットされる間隔。デフォルト値: 1000。単位: ミリ秒。

上記の パラメーターを設定した後、クライアントは各ポーリングの前にコンシューマーオフセットが最後にコミットされた時間を確認します。オフセットが最後にコミットされた時間と現在の時間の間隔が auto.commit.interval.ms パラメーターで指定された間隔を超える場合、クライアントはコンシューマーオフセットをコミットします。

enable.auto.commit パラメーターを true に設定する場合は、各ポーリングの前に、最後にポーリングされたすべてのデータが消費されていることを確認する必要があります。そうでない場合、未消費のメッセージがスキップされる可能性があります。

コンシューマーオフセットを手動でコミットするには、enable.auto.commit パラメーターを false に設定し、commit(offsets) 関数を呼び出します。

コンシューマーオフセットのリセット

コンシューマーオフセットは、次のシナリオでリセットされます。

  • ブローカーにオフセットがコミットされていない。たとえば、コンシューマーが最初にブローカーに接続された場合。

  • 無効なオフセットからメッセージがプルされた。たとえば、パーティションの最大オフセットが 10 であるのに、コンシューマーがオフセット 11 から消費を開始した場合。

Java クライアントでは、auto.offset.reset パラメーターを次のいずれかの値に設定して、コンシューマーオフセットのリセット方法を指定できます。

  • latest: コンシューマーオフセットを最大オフセットにリセットします。

  • earliest: コンシューマーオフセットを最小オフセットにリセットします。

  • none: コンシューマーオフセットをリセットしません。

説明
  • このパラメーターは、earliest ではなく latest に設定することをお勧めします。これにより、無効なオフセットが原因でコンシューマーが最初からメッセージを消費することを防ぎます。

  • commit(offsets) 関数を呼び出してオフセットをコミットする場合は、このパラメーターを none に設定できます。

大きなメッセージのプル

メッセージ消費中、クライアントはブローカーからメッセージを積極的にプルします。大きなメッセージをプルする場合は、次のパラメーターを設定することでプルレートを制御できます。

  • max.poll.records: poll メソッドの 1 回の呼び出しで返されるメッセージの最大数。各メッセージのサイズが 1 MB を超える場合は、このパラメーターを 1 に設定することをお勧めします。

  • fetch.max.bytes: フェッチリクエストで返されるデータの最大量。このパラメーターは、単一メッセージのサイズよりわずかに大きい値に設定します。

  • max.partition.fetch.bytes: フェッチリクエストでパーティションごとに返されるデータの最大量。このパラメーターは、単一メッセージのサイズよりわずかに大きい値に設定します。

クライアントは大きなメッセージを 1 つずつプルします。

メッセージの重複と消費の冪等性

ApsaraMQ for Kafka の配信セマンティクスは、少なくとも 1 回です。これは、メッセージが失われないように、メッセージが少なくとも 1 回配信されることを意味します。ただし、これはメッセージが重複しないことを保証するものではありません。ネットワークエラーが発生した場合、またはクライアントが再起動した場合、少数のメッセージが繰り返し配信される可能性があります。オンライン取引シナリオなど、コンシューマーがメッセージの重複に敏感な場合は、消費の冪等性を実装する必要があります。

アプリケーションがデータベースアプリケーションの場合は、次の操作を実行して冪等性チェックを実装できます。

  • メッセージを送信するときに、キーを一意のトランザクション ID として指定します。

  • メッセージを消費するときに、キーが消費されているかどうかを確認します。キーが消費されている場合は、メッセージをスキップします。キーが消費されていない場合は、メッセージを 1 回消費します。

アプリケーションが少数のメッセージの重複に敏感でない場合は、冪等性チェックは必要ありません。

消費の失敗

ApsaraMQ for Kafka のメッセージは、パーティション内で 1 つずつ消費されます。コンシューマーがメッセージを受信した後に消費ロジックを実行できない場合は、次の方法を使用してトラブルシューティングできます。このような失敗の例として、アプリケーションサーバーのダーティデータが原因でメッセージの処理に失敗することがあります。

  • 失敗時に消費ロジックの実行を継続して試みます。この方法では、消費スレッドが現在のメッセージでブロックされ、メッセージが蓄積される可能性があります。

  • ApsaraMQ for Kafka は、失敗したメッセージを処理するロジックを指定していません。失敗したメッセージをエクスポートするか、サービスにメッセージを保存できます。たとえば、失敗したメッセージを格納するための専用のトピックを作成できます。その後、失敗したメッセージを定期的にチェックし、原因を分析し、適切な対策を講じることができます。

消費レイテンシ

ApsaraMQ for Kafka では、クライアントはブローカーからメッセージを積極的にプルします。クライアントがデータを迅速に消費できる場合、レイテンシは低くなります。レイテンシが高い場合は、メッセージが蓄積されているかどうかを確認し、ビジネス要件に基づいて消費率を上げます。

消費のブロッキングとメッセージの蓄積

メッセージの蓄積は、コンシューマークライアントで最も一般的な問題です。この問題は、次の原因によって発生する可能性があります。

  • 消費率が生産率よりも低い。この場合は、消費率を上げる必要があります。詳細については、「消費率の向上」をご参照ください。

  • コンシューマースレッドがブロックされている。

コンシューマーがメッセージを受信した後、コンシューマーは消費ロジックを実行するためにリモートコールを開始します。このプロセス中にコンシューマーがコール結果を待機する場合、コンシューマーは待機し続ける可能性があります。これにより、消費プロセスが中断されます。

コンシューマーは、消費スレッドがブロックされないようにする必要があります。コンシューマーがコール結果を待機する必要がある場合は、待機タイムアウト期間を指定することをお勧めします。このようにして、タイムアウト期間が経過しても結果が返されない場合、消費は失敗したと見なされ、後続のメッセージの消費を続行できます。

消費率の向上

次のいずれかの方法を使用して、消費率を向上させることができます。

  • コンシューマーの追加

    プロセスにコンシューマーを追加し、各コンシューマーが 1 つのスレッドに対応するようにすることができます。または、複数のコンシューマープロセスをデプロイすることもできます。コンシューマーの数がパーティションの数を超えると、消費率を上げることはできず、特定のコンシューマーはアイドル状態になります。

  • 消費スレッドの追加

    コンシューマーを追加することは、消費率を上げるために消費スレッドを追加することと同じです。したがって、パフォーマンスを向上させるための重要な方法は、消費スレッドを追加することです。次の手順を実行して、消費スレッドを追加できます。

    1. スレッドプールを定義します。

    2. データをポーリングします。

    3. 同時処理のためにスレッドプールにデータを送信します。

    4. 同時処理結果が返されたら、再度データをポーリングします。

メッセージのフィルタリング

ApsaraMQ for Kafka は、メッセージフィルタリングのセマンティクスを提供していません。次のいずれかの方法を使用して、メッセージをフィルタリングできます。

  • 少数のタイプのメッセージのみをフィルタリングする場合は、複数のトピックを使用できます。

  • 多数のタイプのメッセージをフィルタリングする場合は、クライアントでビジネスによってメッセージをフィルタリングすることをお勧めします。

ビジネス要件に基づいて、いずれかの方法を使用するか、両方の方法を統合できます。

メッセージのブロードキャスト

ApsaraMQ for Kafka は、メッセージブロードキャストのセマンティクスを提供していません。異なる グループ を作成することで、メッセージブロードキャストをシミュレートできます。

サブスクリプション

トラブルシューティングを容易にするために、同じ グループ 内のコンシューマーは同じトピックをサブスクライブすることをお勧めします。