このトピックでは、HTTP API、SDK、または eascmd を使用してキューサービスにアクセスする方法について説明します。
API によるキューサービスへのアクセス
非同期推論サービスをデプロイすると、システムは入力キューと出力キュー (シンクキューとも呼ばれます) のエンドポイントを自動的に生成します。次の表は、これらの HTTP エンドポイントの形式について説明します。
エンドポイントの種類 | エンドポイントのフォーマット | 例 |
入力キュー エンドポイント |
|
|
出力キュー エンドポイント |
|
|
Inference Service タブで、対象サービスの名前をクリックして Overview ページを開きます。Basic Information セクションで View Endpoint Information をクリックして、入力キューエンドポイント、出力キューエンドポイント、およびトークンを確認します。

キューへのデータ送信
curl コマンドを使用して、同期または非同期の推論リクエストを入力キューに送信します。
$ curl -v http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com/api/predict/qservice -H 'Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==' -d '[{}]'次の例は、応答を示しています。
> POST /api/predict/qservice HTTP/1.1
> Host: 182848887922****.cn-shanghai.pai-eas.aliyuncs.com
> Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==
>
< HTTP/1.1 200 OK
< Content-Length: 19
< X-Eas-Queueservice-Request-Id: 4e034bnvb-e783-4272-9333-68x6a1v8dc6x
<
1033応答には、次の情報が含まれています。
X-Eas-Queueservice-Request-Idレスポンスヘッダーには、リクエスト ID4e034bnvb-e783-4272-9333-68x6a1v8dc6xが含まれており、これを使用してデータをクエリできます。応答本文には、キュー内のリクエストのインデックス
1033が含まれており、これを使用してデータをクエリできます。
優先度の高いデータの送信
キューサービスは通常、先入れ先出し (FIFO) 順でデータを処理します。ただし、特定のデータをより高い優先度で処理するには、リクエストに _priority_=1 クエリパラメーターを追加します。
$ curl -v http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com/api/predict/qservice?_priority_=1 -H 'Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==' -d '[{}]'次の例は、応答を示しています。
> POST /api/predict/qservice?_priority_=1 HTTP/1.1
> Host: 182848887922****.cn-shanghai.pai-eas.aliyuncs.com
> Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==
>
< HTTP/1.1 200 OK
< Content-Length: 19
< X-Eas-Queueservice-Request-Id: 4033eb55-e783-4922-9777-68d6a1383c76
<
1034優先度データがキューに書き込まれると、サービスは標準優先度データよりも先に、処理のためにサブスクライバーにプッシュします。
キューサービスの詳細
リクエストに _attrs_=true パラメーターを追加すると、応答にキューに関する詳細情報が含まれます。
$ curl -v -H 'Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==' http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com/api/predict/qservice?_attrs_=true次の例は、応答を示しています。
> GET /api/predict/qservice?_attrs_=true HTTP/1.1
> Host: 182848887922****.cn-shanghai.pai-eas.aliyuncs.com
> Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==
>
< HTTP/1.1 200 OK
< Content-Length: 320
<
{"consumers.stats.total":"0","consumers.status.total":"0","meta.header.group":"X-EAS-QueueService-Gid","meta.header.priority":"X-EAS-QueueService-Priority","meta.header.user":"X-EAS-QueueService-Uid","stream.maxPayloadBytes":"524288","meta.name":"pmml_test","meta.state":"Normal","stream.approxMaxLength":"4095","stream.firstEntry":"0","stream.lastEntry":"0","stream.length":"1"}応答は JSON 形式で詳細を返します。次の表は、主要フィールドについて説明します。
パラメーター | 説明 |
stream.maxPayloadBytes | キューが受け入れるデータ項目の最大サイズ (バイト単位)。 |
stream.approxMaxLength | キューに格納できるデータ項目の最大数。 |
stream.firstEntry | キュー内の最初のデータ項目のインデックス。 |
stream.lastEntry | キュー内の最後のデータ項目のインデックス。 |
stream.length | キュー内の現在のデータ項目の数。 |
meta.state | キューの現在の状態。 |
Elastic Algorithm Service (EAS) ページに移動し、非同期推論サービスの名前をクリックして Overview ページを開き、次に Asynchronous Queue タブに切り替えてキュー情報を表示することもできます。
データクエリ
条件による結果のクエリ
単一キューサービスを使用する場合、インデックスまたはリクエスト ID で入力キューからデータをクエリできます。
# Query data by index. $ curl -v -H 'Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==' http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com/api/predict/qservice?_index_=1022 # Query data by request ID. $ curl -v -H 'Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==' http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com/api/predict/qservice?requestId=87633037-39a4-40bf-8405-14f8e0c31896次の例は、応答を示しています。
> GET /api/predict/qservice?_index_=1022&_auto_delete_=false HTTP/1.1 > Host: 182848887922****.cn-shanghai.pai-eas.aliyuncs.com > Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****== > < HTTP/1.1 200 OK < Content-Length: 4 < Content-Type: text/plain; charset=utf-8 < [{}]推論結果をクエリするために、次のパラメーターを構成できます。次の表は、パラメーターについて説明します。
パラメーター
タイプ
説明
_index_
INT
クエリするデータの開始インデックス。デフォルト値は 0 で、キュー内の最初のデータ項目からクエリが開始されることを示します。このインデックスがクエリするデータに近いほど、クエリはより効率的になります。
_length_
INT
クエリするデータ項目の数。デフォルト値は 1 で、1 つのデータ項目のみがクエリされることを示します。
_auto_delete_
BOOL
クエリされたデータをキューから削除するかどうかを指定します。デフォルト値は TRUE で、クエリが完了した後、サービスがクエリされたデータ項目をキューから自動的に削除することを示します。
_timeout_
STRING
タイムアウト期間。デフォルト値は 0 で、条件を満たすデータがキューに存在しない場合、システムはすぐに 204 状態コードを返すことを示します。それ以外の場合、システムは指定された期間待機します。タイムアウト期間内に条件を満たすデータがキューに現れると、システムはデータを返します。例: 1s (1 秒)、1m (1 分)。
requestId
STRING
requestIdは、データをクエリするために使用される組み込みタグです。説明非同期推論サービスを使用する場合、EAS フレームワークは入力キューからリクエストデータを読み取り、それを処理し、結果を出力キューに書き込みます。フレームワークは
requestIdタグを使用して、入力データとそれに対応する出力データを関連付けます。したがって、最初のリクエストからのrequestIdを使用して、出力キューから推論結果をクエリできます。非同期推論結果のクエリ
キューサービスを推論サービスとペアリングすると、推論サービスは入力キューからリクエストデータを自動的に読み取り、推論計算を実行し、結果を出力キュー (シンク) に書き込みます。
0337f7a1-a6f6-49a6-8ad7-ff2fd12b****などのリクエスト ID を使用して出力キューからデータをクエリするには、次のコードを使用します。$ curl -v -H 'Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==' http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com/api/predict/qservice/sink?requestId=0337f7a1-a6f6-49a6-8ad7-ff2fd12bbe2d次の例は、応答を示しています。
> GET /api/predict/qservice/sink?requestId=0337f7a1-a6f6-49a6-8ad7-ff2fd12b**** HTTP/1.1 > Host: 182848887922****.cn-shanghai.pai-eas.aliyuncs.com > Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****== > < HTTP/1.1 200 OK < Content-Length: 53 < Content-Type: text/plain; charset=utf-8 < [{"p_0":0.5224580736905329,"p_1":0.4775419263094671}]
データクリーンアップ
キュー内の特定のデータが不要になった場合、API を使用してそれを削除できます。データを削除する方法は、単一データ項目を削除する方法 (delete) と、指定されたポイントまでのデータを削除する方法 (truncate) の 2 つがあります。
単一データ項目の削除
# Delete data by index. $ curl -XDELETE -v -H 'Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==' http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com/api/predict/qservice?_index_=1022次の例は、応答を示しています。
> DELETE /api/predict/qservice?_index_=1022 HTTP/1.1 > Host: 182848887922****.cn-shanghai.pai-eas.aliyuncs.com > Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****== > < HTTP/1.1 200 OK < Content-Length: 4 < Content-Type: text/plain; charset=utf-8 < OK推論結果を削除するために、次のパラメーターを構成できます。次の表は、パラメーターについて説明します。
パラメーター
タイプ
説明
_index_
INT
削除するデータ項目のインデックス。
データの一括削除
# Delete data by index. $ curl -XDELETE -v -H 'Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==' http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com/api/predict/qservice?_index_=1023&_trunc_=true次の例は、応答を示しています。
> DELETE /api/predict/qservice?_index_=1023&_trunc_=true HTTP/1.1 > Host: 182848887922****.cn-shanghai.pai-eas.aliyuncs.com > Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****== > < HTTP/1.1 200 OK < Content-Length: 4 < Content-Type: text/plain; charset=utf-8 < OK推論結果を削除するために、次のパラメーターを構成できます。次の表は、パラメーターについて説明します。
パラメーター
タイプ
説明
_index_
INT
データ削除のカットオフインデックス。サービスは、このインデックスより低いインデックスを持つデータ項目を削除します。
_trunc_
BOOL
このパラメーターは、一括削除の場合に
trueに設定する必要があります。そうしないと、サービスは代わりに単一項目削除を実行します。
キューサービスのサブスクリプションとプッシュ
非同期推論シナリオでは、ブロッキングクエリを使用するだけでなく、サービスをサブスクライブして推論結果を受信できます。キューサービスは、クライアントが推論結果を取得するためのサブスクリプション (監視) インターフェイスを提供します。キューサービスは、現在の推論サービスインスタンスに構成されている同時実行数 (worker_threads) に基づいて、サブスクリプションウィンドウサイズを制御します。新しいデータがキューに書き込まれると、キューサービスはデータをサブスクライブしているクライアントに自動的にプッシュします。
この機能は、WebSocket プロトコルを使用してプッシュ通知用の持続的接続を確立することにより、SDK の QueueClient 実装をカプセル化します。次の例は、一般的なビデオおよびオーディオストリーム処理シナリオで、Python SDK の QueueClient を使用してキュー内のデータをサブスクライブする方法を示しています。
推論サービスは不要です。SDK を使用して、カスタムサービスでキューサービスの入力キューをサブスクライブすることもできます。出力結果をサードパーティのメッセージキューまたはその他のターゲットストレージ (OSS への画像の書き込みなど) に書き込むことができます。
EAS Python SDK をインストールします。
pip install eas_prediction --userQueueClientのput()メソッドを使用して入力キューにデータを送信し、watch()メソッドを使用して出力キューからデータをサブスクライブします。本番シナリオでは、通常、データの送信およびサブスクライブは異なるスレッドで処理されます。この例では、両方の操作に単一のスレッドを使用します。つまり、まずput()を使用してデータを送信し、次にwatch()を使用して結果を受信します。#!/usr/bin/env python from eas_prediction import QueueClient # Create an input queue object to write input data. input_queue = QueueClient('182848887922****.cn-shanghai.pai-eas.aliyuncs.com', 'qservice') # To customize the user and group, you can specify them by using uid and gid. Example: # input_queue = QueueClient('182848887922****.cn-shanghai.pai-eas.aliyuncs.com', 'qservice', uid='your_user_id', gid='your_group_id') input_queue.set_token('YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==') input_queue.init() # Create an output queue object to subscribe to and read output data. sink_queue = QueueClient('182848887922****.cn-shanghai.pai-eas.aliyuncs.com', 'qservice/sink') sink_queue.set_token('YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==') sink_queue.init() # Push 10 data items to the input queue. for x in range(10): index, request_id = input_queue.put('[{}]') print(index, request_id) # View the details of the input queue. attrs = input_queue.attributes() print(attrs) # Watch data from the output queue with a window of 5. i = 0 watcher = sink_queue.watch(0, 5, auto_commit=False) for x in watcher.run(): print(x.data.decode('utf-8')) # Manually commit after a data item is received and processed. sink_queue.commit(x.index) i += 1 if i == 10: break # Close the watcher object. Each client instance can have only one watcher object. If the watcher object is not closed, an error occurs on the next run. watcher.close()
eascmd によるキューサービスへのアクセス
eascmd は、完全なキューサービス API をカプセル化します。eascmd stream サブコマンドを使用して、キューサービスを迅速に操作およびデバッグできます。
eascmd のダウンロード
eascmd のバージョンが 2.6.0 以降であることを確認してください。eascmd コマンドラインクライアントのダウンロード、更新、および構成の詳細については、「クライアントのダウンロードと認証」をご参照ください。
eascmd アクセスの構成
eascmd stream config コマンドを実行して、アクセスするキューサービスを構成します。例:
eascmd stream config --url=http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com/api/predict/qservice --token=YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==構成が完了すると、eascmd は default_group と default_user をそれぞれデフォルトの group_id と user_id として使用します。グループとユーザーの詳細については、「キューサービスのサブスクリプションとプッシュ」をご参照ください。異なる group_id または user_id を使用するには、--group および --user パラメーターを使用してそれらを指定します。stream config 内のすべてのパラメーターは、後続の読み取りおよび書き込みコマンドでオーバーライドできます。
キューの詳細
info コマンドを実行して、キュー情報を表示します。例:
eascmd stream info次の例は、応答を示しています。
[OK] Attributes:
consumers.list.[0] : Id: imageasync.imageasync-35d72370-5f576f7c8d-2mdb4, Index: 0, Pending: 0, Status: Running, Idle: 19.997s, Window: 5, Slots: 5, AutoCommit: false
consumers.stats.total : 1
consumers.status.total : 1
groups.list.[0] : Id: imageasync, Index: 0, Pending: 0, Delivered: 1, Consumers: 1
meta.header.group : X-EAS-QueueService-Gid
meta.header.priority : X-EAS-QueueService-Priority
meta.header.user : X-EAS-QueueService-Uid
meta.maxPayloadBytes : 8192
meta.name : imageasync-queue-38895e88
meta.state : Normal
stream.approxMaxLength : 230399
stream.firstEntry : 0
stream.lastEntry : 0
stream.length : 0応答内のパラメーターの詳細については、「API を使用したキューサービスへのアクセス」をご参照ください。info コマンドを使用して、キュープロパティを監視し、キューサービスへの接続性をテストできます。
キューへのデータ送信
put コマンドを実行して、キューにデータを送信します。例:
eascmd stream put -d "10s"次の例は、応答を示しています。
[OK] 1
[INFO] Put data done.
Total time cost: 401.892141ms
Total size: 3.00 B
Total: 1, success: 1, failed: 0-f パラメーターを使用して、ファイルからすべてのデータをキューに送信することもできます。例:
eascmd stream put -f test.data次の例は、応答を示しています。
[INFO] Opening data file: test.data
[OK] 2
[OK] 3
[OK] 4
[OK] 5
[OK] 6
[OK] 7
[OK] 8
[OK] 9
[OK] 10
[OK] 11
[OK] 12
[OK] 13
....その後、info コマンドを実行してキューの状態を監視できます。
データクエリ
get コマンドを実行して、入力キューからデータをクエリします。例:
eascmd stream get -l10 --timeout=3s次の例は、応答を示しています。
[OK] [0 - 1] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=e47b76e2-2648-40fe-9197-a268015cbd1f ts@source=1685802680575] data1
[OK] [1 - 2] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=51d13952-6ba3-4d52-b548-e58837675c7a ts@source=1685807531686] data2
[OK] [2 - 3] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=ef6940e3-159c-45f3-a96d-bc0acd71275f ts@source=1685807531701] OK
[OK] [3 - 4] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=2a5645b6-9ee5-4026-bdee-fab31e435934 ts@source=1685807531715] data4
[OK] [4 - 5] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=64ba6aaf-49b0-45c7-8d79-6cf6dc1065d0 ts@source=1685807531730] data5
...次のリストは、最初のデータ項目を例として、出力形式について説明します。
最初の列
[0 - 1]は、0 番目に受信したデータ項目のインデックスが 1 であることを示します。2 番目の列
tags[Header:Content-Type=text/plain; charset=...]は、データに関連付けられたタグを示します。Headerで始まるタグは、入力リクエストの HTTP ヘッダーに対応します。requestIdは、組み込みの自動生成されたリクエスト ID です。ts@sourceは、入力キューがリクエストを受信した Unix タイムスタンプを示します。ts@sinkは、出力キューがデータを受信したタイムスタンプを示します。
最後の列には、入力したデータが表示されます。
ペアになった推論サービスインスタンスがある場合、インスタンスはキューに送信したデータを消費する可能性があります。この場合、-k パラメーターをコマンドに追加して、出力キューからデータをクエリする必要があります。
--tags パラメーターを使用してクエリ条件を追加することもできます。たとえば、requestId でデータをクエリするには、次のコマンドを実行します。
eascmd stream get --tags requestId=ef6940e3-159c-45f3-a96d-bc0acd71275f次の例は、応答を示しています。
[OK] [0 - 3] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=ef6940e3-159c-45f3-a96d-bc0acd71275f ts@source=1685807531701] OKデータ削除
delete および trunc コマンドを実行して、単一データ項目またはデータの一括削除を実行できます。
単一データ項目の削除:
eascmd stream delete 3削除を確認すると、次の応答が返されます。
Deleting index(es):
3 [y/N]y
[OK] deletedデータの一括削除:
eascmd stream trunc 4削除を確認すると、次の応答が返されます。
trunc stream from index: 4 [y/N]y
[OK] truncatedキューサブスクリプション
watch コマンドを実行して、キューサービスをサブスクライブします。
eascmd stream watch次の例は、応答を示しています。
[INFO] Start to watch: index: 0, indexOnly: false, autoCommit: false, window: 10
I0604 09:20:45.211243 66197 queue.go:532] watch via websocket
[OK] [0 - 4] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=2a5645b6-9ee5-4026-bdee-fab31e435934 ts@sink=1685807531718 ts@source=1685807531715] data4
commit: 4 ? [Y/n]Y を入力すると、クライアントはデータをコミットし、新しいデータを受信します。
[OK] [1 - 5] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=64ba6aaf-49b0-45c7-8d79-6cf6dc1065d0 ts@sink=1685807531733 ts@source=1685807531730] data5
commit: 5 ? [Y/n]n を入力すると、ネガティブコミットを実行するように求められます。
[OK] [1 - 5] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=64ba6aaf-49b0-45c7-8d79-6cf6dc1065d0 ts@sink=1685807531733 ts@source=1685807531730] data5
commit: 5 ? [Y/n]n
negative: 5 ? [Y/n]ycommit および negative commit の詳細については、「コミットとネガティブ」をご参照ください。
--auto-commit オプションを使用すると、サーバーはデータを自動的にコミットします。
eascmd stream watch --auto-commit次の例は、応答を示しています。
[INFO] Start to watch: index: 0, indexOnly: false, autoCommit: true, window: 10
I0604 09:30:08.554542 66408 queue.go:532] watch via websocket
[OK] [0 - 5] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=64ba6aaf-49b0-45c7-8d79-6cf6dc1065d0 ts@sink=1685807531733 ts@source=1685807531730] data5
[OK] [1 - 6] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=5825dd3e-a5e2-4754-a946-96e068d643c8 ts@sink=1685807531771 ts@source=1685807531768] data6
[OK] [2 - 7] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=e7edf9b8-de78-41a0-8d9c-0a4aaf7dcaaf ts@sink=1685807531786 ts@source=1685807531783] data7
[OK] [3 - 8] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=3ddc3481-934a-4408-8d08-11c2c2248ef6 ts@sink=1685807531801 ts@source=1685807531798] data8
[OK] [4 - 9] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=561da95d-b99a-4710-bb82-9402baa21f36 ts@sink=1685807531816 ts@source=1685807531812] data9
....その他のオプションとコマンド
上記のセクションでは、eascmd stream の主要なコマンドとオプションについて説明しました。その他の機能の詳細については、eascmd stream help コマンドを実行してください。