すべてのプロダクト
Search
ドキュメントセンター

DataHub:共同消費

最終更新日:Jan 12, 2025

共同消費

用語

オフセットベースのデータ消費

オフセットベースのデータ消費機能を使用すると、消費オフセットをサーバーに保存できます。消費オフセットは、レコードのシーケンス番号と、レコードが DataHub に書き込まれたときのタイムスタンプで構成されます。

トピックのサブスクリプションを作成し、アプリケーションが特定のデータを消費した後、消費オフセットはサーバーに送信されます。アプリケーションが次回起動すると、アプリケーションはサーバーから消費オフセットを取得し、次のレコードからデータを使用できます。シャードが再割り当てされた後、アプリケーションが保存された消費オフセットからデータを使用できるように、消費オフセットをサーバーに保存する必要があります。これは、共同消費の前提条件です。

コンシューマーグループで消費オフセットを処理する必要はありません。 config 句で、消費オフセットを送信する間隔を指定します。アプリケーションが消費オフセットに基づいてレコードを読み取ると、アプリケーションは消費オフセットより前のレコードが処理されたと見なします。指定された間隔内に消費オフセットが送信されない場合、アプリケーションは消費オフセットを送信します。消費オフセットの送信に失敗し、アプリケーションが中断された場合、消費オフセットが時間内に送信されない可能性があります。この場合、アプリケーションは特定のデータを繰り返し使用することがあります。

共同消費

共同消費機能は、複数のコンシューマーが同時にトピックを使用する場合、シャードを自動的に割り当てます。この機能により、コンシューマークライアントでのデータ処理が簡素化されます。コンシューマーは異なるマシンで動作している可能性があり、独自の調整によってシャードを割り当てるのが難しい場合があります。同じトピックをサブスクライブするコンシューマーが同じコンシューマーグループに属している場合、シャードはコンシューマーグループ内の1つのコンシューマーにのみ割り当てられます。

シナリオ

A、B、C は3つのコンシューマーインスタンスであり、トピックには 10 個のシャードがあります。

  1. コンシューマーインスタンス A が最初に起動されると、10個のシャードが割り当てられます。

  2. 他の2つのコンシューマーインスタンスが起動されると、シャードは次のように再割り当てされます。A に 4 つ、B に 3 つ、C に 3 つ。

  3. コンシューマーインスタンス A によって消費されたシャードの1つが2つに分割され、2つのシャードが消費後に解放されると、シャードは次のように再割り当てされます。A に 4 つ、B に 4 つ、C に 3 つ。

  4. コンシューマーインスタンス C が停止すると、シャードは次のように再割り当てされます。A に 6 つ、B に 5 つ。

ハートビート

共同消費を実装するには、必要な情報をサーバーに通知するためにハートビートメカニズムが必要です。情報には、コンシューマーインスタンスのステータス、割り当てられたシャード、および解放する必要のあるシャードが含まれます。指定された間隔内にコンシューマーインスタンスからハートビートが受信されない場合、コンシューマーインスタンスは停止したと見なされます。コンシューマーインスタンスのステータスが変更されると、サーバーはシャードを再割り当てします。サーバーは、ハートビート要求で新しい割り当て計画を返します。したがって、クライアントはシャードの再割り当てを検出するのに時間がかかります。

バージョン

Maven の依存関係と JDK:

maven pom

<dependency>
    <groupId>com.aliyun.datahub</groupId>
    <artifactId>aliyun-sdk-datahub</artifactId>
    <version>2.18.0-public</version>
</dependency>
<dependency>
      <groupId>com.aliyun.datahub</groupId>
      <artifactId>datahub-client-library</artifactId>
      <version>1.1.12-public</version>
</dependency>

jdk

jdk: >= 1.8

サンプルコード

データの読み書き方法の詳細については、「データの読み書き」をご参照ください。