サブスクリプションとプッシュを使用すると、準備完了状態のデータをクライアントに即座にプッシュできます。これにより、Get API によるポーリングで発生するレイテンシと、キューサービスによるクエリで発生するキューサービスの負荷を軽減できます。ただし、サブスクリプションとプッシュには、多くの追加概念が関係するため、ある程度の複雑さがあります。このトピックでは、キューサービスのサブスクリプションとプッシュ機能の使用方法について説明します。
コンシューマー
コンシューマーとは、キューサービスからデータのサブスクライブを行うクライアントプログラムのことです。クライアントが Watch API を使用してデータを呼び出すと、キューサービスにコンシューマーオブジェクトが作成されます。API のパラメーター(ウィンドウサイズやタグなど)は、コンシューマー属性として使用されます。Attribute API を使用して、コンシューマーの状態を表示できます。例:
[OK] Attributes:
consumers.list.[0] : Id: default_group.u1, Index: 0, Pending: 0, Status: Complete, Idle: 2.091s, Window: 0, Slots: 0, AutoCommit: true
consumers.list.[1] : Id: default_group.u2, Index: 0, Pending: 0, Status: Complete, Idle: 1.124s, Window: 0, Slots: 0, AutoCommit: true
consumers.stats.total : 2consumers.stats.total は、コンシューマーの総数を示します。 consumers.list は、コンシューマーのリストです。次の表に、各列の説明を示します。
パラメーター | 説明 |
Id | コンシューマーの ID。 |
Index | コンシューマーが消費しているデータのインデックス。 |
Pending | 現在のコンシューマーによって処理中であるが、コミットされていないデータの量。 |
Status | コンシューマーのステータス。有効な値:
|
Window | コンシューマーウィンドウのサイズ。プッシュされるデータの最大量です。 |
Slots | アイドル状態のウィンドウの数。値 0 は、ウィンドウが使用中であることを示します。 |
AutoCommit | データの送信後に自動的にコミットするかどうかを示します。 |
Tags | コンシューマーのフィルター条件。 説明 タグ付きで Watch API を使用する場合は、同じコンシューマーグループ内のコンシューマーが使用するタグがすべて同じであることを確認してください。 |
Watch API を使用する場合、データタグを実行すると、Tags 列が追加されます。例:
consumers.list.[0] : Id: ..., Pending: 0, ..., Window: ..., Tags: tags[foo=bar]この列は、コンシューマーがサブスクライブするデータタグを示します。データが条件を満たす場合にのみ、コンシューマーに配信されます。
コンシューマーグループ
コンシューマーグループとは、同じフィルター条件でキューサービスにサブスクライブするコンシューマーの集合です。同じグループ内のコンシューマーは同じ名前を持つことはできませんが、異なるグループのコンシューマーは同じ名前を持つことができます。
同じコンシューマーグループ内では、データは各コンシューマーに均等に配信されます。異なるグループ間では、データは各グループのコンシューマーに並行してプッシュされます。例:
同じグループのコンシューマーは異なるデータを受信します。
異なるグループのコンシューマーは同じデータを受信します。
コンシューマーが API を介してデータを削除すると、データはすぐに削除され、他のグループのコンシューマーはデータを受信できなくなります。
Attribute API を使用して、キューサービス内のコンシューマーの状態を表示できます。次に例を示します。
groups.list.[0] : Id: default_group, Index: 0, Pending: 0, Delivered: 0, Consumers: 1
groups.list.[1] : Id: group, Index: 0, Pending: 0, Delivered: 1, Consumers: 0groups.list は、コンシューマーのリストです。次の表に、各列の説明を示します。
パラメーター | 説明 |
Id | コンシューマーグループの ID。 |
Index | 現在のコンシューマーグループによって消費されているインデックス。コンシューマーグループ内のコンシューマーの最大インデックスです。 |
Pending | 現在のコンシューマーグループによって処理中であるが、コミットされていないデータの量。 |
Delivered | プッシュされたメッセージの数。 |
Consumers | コンシューマーグループ内の消費済みデータの量。 |
コンシューマーグループの数に制限はありませんが、自動的にクリーンアップされることはありません。作成後は、状態が保持されます。
コンシューマーとコンシューマーグループの使用
コンシューマーとコンシューマーグループは、Watch API の HTTP ヘッダー、または SDK でクライアントを初期化するときに宣言できます。また、Attributes API を使用して HTTP ヘッダーのキーを取得することもできます。
meta.header.group : X-EAS-QueueService-Gid
meta.header.user : X-EAS-QueueService-Uid参加するコンシューマー ID とコンシューマーグループ ID を、それぞれ X-EAS-QueueService-Uid と X-EAS-QueueService-Gid で宣言します。
コミットとネガティブ
キューサービスは、コミットとネガティブの 2 つの消費方法をサポートしています。どちらもデータインデックスに対して動作しますが、セマンティクスが異なります。
コミットは、コンシューマーがデータを受信して処理を完了し、次のバッチをプッシュできることを示します。
ネガティブは、コンシューマーがデータを受信したが処理できないことを示し、キューサービスはエラーコードに基づいて次のバッチをプッシュするかどうかを決定します。ネガティブモードでは、原因とエラーコードをテキストで宣言できるため、データは他のコンシューマーにプッシュされます。次の表に、キューサービスが処理できる特別なエラーコードを示します。
コード
説明
Shutdown
コンシューマーが終了しており、キューサービスはデータのプッシュを続行しないことを示します。
データリバランス
次のシナリオでは、データをコミットできない場合があります。
予測サービスのローリングアップデート中に、一部のコンシューマーが終了し、コミットできないデータが処理される。
何らかの内部エラーが発生し、コンシューマーがクラッシュする。
コンシューマーが受信したデータを処理できず、ネガティブコミットを実行する。
処理できないこのデータは、キューサービスによって他のコンシューマーに再配信されます。このメカニズムは、データリバランスと呼ばれます。データリバランスは、次の場合に実行されます。
いずれかのコンシューマーが Exit 状態になる。
ウィンドウに空き領域がある場合に、コンシューマーが新しいデータプッシュを受信しない。
キューサービスは、各データの配信カウンターを保持します。データがリバランスされて配信されるたびに、カウンターが増加します。リバランスプロセス中に、データの配信カウンターが最大配信数を超えていることが判明した場合、データはデッドレターとして扱われます。キューサービスは、構成されたデッドレターポリシーを実行します。デフォルトでは、データはテールキューに配信されます。
テールキュー
テールキューは、コンシューマーにプッシュされないデータ(デッドレターやカスタム制御データなど)を格納するために使用される補助キューです。これは、キューサービス内のキューインスタンスであり、同じ API を持ちます。各入力キューと出力キューには、テールキューがあります。
テールキューとスタンダードキューは、キューの最大長を共有します。最大長が 10 で、スタンダードキューが 6 を占有している場合、テールキューは最大 4 になります。テールキューがいっぱいの場合、書き込みを行うとキュー長のエラーが返されます。したがって、テールキューを定期的に監視し、クリーンアップする必要があります。
API を呼び出すときは、次の HTTP ヘッダーを追加することで、テールキューへのアクセスを宣言できます。
X-EAS-QueueService-Access-Rear: true