All Products
Search
Document Center

Platform For AI:Access a queue service

Last Updated:Apr 01, 2026

PAI EAS exposes three interfaces for interacting with an asynchronous inference queue: an HTTP API, a Python SDK, and the eascmd CLI. This document covers all three.

How it works

When you deploy an asynchronous inference service, EAS automatically creates two queues:

  • Input queue — accepts inference requests from clients.

  • Output queue (sink) — stores inference results written by the inference service.

The data flow is:

  1. Submit an inference request to the input queue. EAS returns a request ID and a queue index immediately.

  2. The inference service reads data from the input queue, runs inference, and writes results to the output queue.

  3. Retrieve the result from the output queue using the request ID or index.

Prerequisites

Before you begin, make sure you have:

  • A deployed asynchronous inference service in PAI EAS

  • The input queue endpoint, output queue endpoint, and token for your service

To find these values, go to the Inference Service tab, click the service name to open the Overview page, and then click View Endpoint Information in the Basic Information section.

image

The endpoint formats are:

EndpointFormatExample
Input queue{domain}/api/predict/{service_name}xxx.cn-shanghai.pai-eas.aliyuncs.com/api/predict/{service_name}
Output queue (sink){domain}/api/predict/{service_name}/sinkxxx.cn-shanghai.pai-eas.aliyuncs.com/api/predict/{service_name}/sink

Access a queue service by API

All HTTP requests require an Authorization header containing your service token.

Quick reference:

OperationMethodPathKey parameters
Send dataPOST/api/predict/{service}_priority_=1 (optional)
View queue detailsGET/api/predict/{service}?_attrs_=true
Query data by indexGET/api/predict/{service}?_index_=N_length_, _auto_delete_, _timeout_
Query data by request IDGET/api/predict/{service}?requestId=..._timeout_
Query inference resultsGET/api/predict/{service}/sink?requestId=..._timeout_
Delete a single itemDELETE/api/predict/{service}?_index_=N
Truncate the queueDELETE/api/predict/{service}?_index_=N&_trunc_=true

Send data to a queue

Submit a request to the input queue using curl:

curl -v http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com/api/predict/qservice \
  -H 'Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==' \
  -d '[{}]'

The response includes:

> POST /api/predict/qservice HTTP/1.1
> Host: 182848887922****.cn-shanghai.pai-eas.aliyuncs.com
> Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==
>
< HTTP/1.1 200 OK
< Content-Length: 19
< X-Eas-Queueservice-Request-Id: 4e034bnvb-e783-4272-9333-68x6a1v8dc6x
<
1033

Two identifiers are returned that you can use to query data later:

  • `X-Eas-Queueservice-Request-Id` header — the request ID (4e034bnvb-e783-4272-9333-68x6a1v8dc6x).

  • Response body — the queue index (1033).

Send priority data

The queue processes data in First-In, First-Out (FIFO) order by default. To prioritize specific requests, add _priority_=1 to the query string:

curl -v "http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com/api/predict/qservice?_priority_=1" \
  -H 'Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==' \
  -d '[{}]'

Priority data is pushed to subscribers before standard-priority data.

View queue details

Add _attrs_=true to a GET request to retrieve queue metadata:

curl -v -H 'Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==' \
  "http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com/api/predict/qservice?_attrs_=true"

The response is a JSON object:

{"consumers.stats.total":"0","consumers.status.total":"0","meta.header.group":"X-EAS-QueueService-Gid","meta.header.priority":"X-EAS-QueueService-Priority","meta.header.user":"X-EAS-QueueService-Uid","stream.maxPayloadBytes":"524288","meta.name":"pmml_test","meta.state":"Normal","stream.approxMaxLength":"4095","stream.firstEntry":"0","stream.lastEntry":"0","stream.length":"1"}

Key fields:

FieldDescription
stream.maxPayloadBytesMaximum size in bytes of a single data item
stream.approxMaxLengthMaximum number of data items the queue can hold
stream.firstEntryIndex of the first item in the queue
stream.lastEntryIndex of the last item in the queue
stream.lengthCurrent number of items in the queue
meta.stateCurrent queue state

Alternatively, go to the Elastic Algorithm Service (EAS) page, click the service name, and switch to the Asynchronous Queue tab.

image

Query data

Choose a retrieval method based on your use case:

MethodHow it worksWhen to use
PollingQuery data by index or request ID on demandRetrieving a specific item or checking queue state occasionally
SubscriptionSubscribe via WebSocket to receive data as it arrivesContinuous stream processing or high-throughput scenarios

Poll data from the input queue

Query data by index or request ID:

# Query by index
curl -v -H 'Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==' \
  "http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com/api/predict/qservice?_index_=1022"

# Query by request ID
curl -v -H 'Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==' \
  "http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com/api/predict/qservice?requestId=87633037-39a4-40bf-8405-14f8e0c31896"

The response:

> GET /api/predict/qservice?_index_=1022&_auto_delete_=false HTTP/1.1
> Host: 182848887922****.cn-shanghai.pai-eas.aliyuncs.com
> Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==
>
< HTTP/1.1 200 OK
< Content-Length: 4
< Content-Type: text/plain; charset=utf-8
<
[{}]

Response status codes:

Status codeMeaning
200 OKData found and returned
204 No ContentNo matching data exists (returned immediately when _timeout_=0)

Query parameters:

ParameterTypeDefaultDescription
_index_INT0Starting index. Set this close to the target item's index for better query efficiency.
_length_INT1Number of items to retrieve.
_auto_delete_BOOLtrueDelete queried items from the queue after retrieval.
_timeout_STRING0How long to wait if no matching data exists. 0 returns HTTP 204 immediately. Example values: 1s, 1m.
requestIdSTRINGBuilt-in tag used to locate a specific item. The EAS framework tags each input item with requestId and carries it through to the output queue, so you can use the same request ID to query the inference result from the output queue.

Poll inference results from the output queue

Query results from the output queue using the request ID from the original submission:

curl -v -H 'Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==' \
  "http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com/api/predict/qservice/sink?requestId=0337f7a1-a6f6-49a6-8ad7-ff2fd12bbe2d"

The response:

> GET /api/predict/qservice/sink?requestId=0337f7a1-a6f6-49a6-8ad7-ff2fd12b**** HTTP/1.1
> Host: 182848887922****.cn-shanghai.pai-eas.aliyuncs.com
> Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==
>
< HTTP/1.1 200 OK
< Content-Length: 53
< Content-Type: text/plain; charset=utf-8
<
[{"p_0":0.5224580736905329,"p_1":0.4775419263094671}]

Delete data

Remove data from the queue in two ways: delete a single item or truncate all items up to a given index.

Delete a single item

curl -XDELETE -v -H 'Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==' \
  "http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com/api/predict/qservice?_index_=1022"

Response:

> DELETE /api/predict/qservice?_index_=1022 HTTP/1.1
> Host: 182848887922****.cn-shanghai.pai-eas.aliyuncs.com
> Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==
>
< HTTP/1.1 200 OK
< Content-Length: 4
< Content-Type: text/plain; charset=utf-8
<
OK

Parameter:

ParameterTypeDescription
_index_INTIndex of the item to delete

Truncate the queue

Delete all items with an index lower than the specified value:

curl -XDELETE -v -H 'Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==' \
  "http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com/api/predict/qservice?_index_=1023&_trunc_=true"

Response:

> DELETE /api/predict/qservice?_index_=1023&_trunc_=true HTTP/1.1
> Host: 182848887922****.cn-shanghai.pai-eas.aliyuncs.com
> Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==
>
< HTTP/1.1 200 OK
< Content-Length: 4
< Content-Type: text/plain; charset=utf-8
<
OK

Parameters:

ParameterTypeDescription
_index_INTCutoff index. All items with an index lower than this value are deleted.
_trunc_BOOLMust be true to perform a truncation. If omitted or false, the request performs a single-item deletion instead.

Subscribe to a queue (Python SDK)

For continuous stream processing, subscribe to the output queue using the Python SDK. The queue service uses the WebSocket protocol to maintain a persistent connection and push data to subscribers as it arrives. The subscription window size is controlled by the worker_threads setting on the inference service instance.

An inference service is not required. Use the SDK to subscribe to the input queue in a custom service and write results to a third-party message queue or storage such as Object Storage Service (OSS).

Install the SDK:

pip install eas_prediction --user

Subscribe to the output queue:

The following example uses QueueClient to send 10 items to the input queue and subscribe to results from the output queue. In production, use separate threads for sending and subscribing.

#!/usr/bin/env python
from eas_prediction import QueueClient

# Create a client for the input queue.
input_queue = QueueClient('182848887922****.cn-shanghai.pai-eas.aliyuncs.com', 'qservice')
# To set a custom user or group:
# input_queue = QueueClient('182848887922****.cn-shanghai.pai-eas.aliyuncs.com', 'qservice', uid='your_user_id', gid='your_group_id')
input_queue.set_token('YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==')
input_queue.init()

# Create a client for the output queue.
sink_queue = QueueClient('182848887922****.cn-shanghai.pai-eas.aliyuncs.com', 'qservice/sink')
sink_queue.set_token('YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==')
sink_queue.init()

# Send 10 items to the input queue.
for x in range(10):
    index, request_id = input_queue.put('[{}]')
    print(index, request_id)

    # Print queue attributes.
    attrs = input_queue.attributes()
    print(attrs)

# Subscribe to the output queue with a window size of 5.
i = 0
watcher = sink_queue.watch(0, 5, auto_commit=False)
for x in watcher.run():
    print(x.data.decode('utf-8'))

    # Commit each item after processing.
    sink_queue.commit(x.index)
    i += 1
    if i == 10:
        break

# Close the watcher. Each QueueClient instance supports only one active watcher.
# Failing to close the watcher causes an error on the next watch() call.
watcher.close()

Access a queue service by eascmd

eascmd encapsulates the full queue service API. Use the eascmd stream subcommand to operate and debug a queue service from the command line.

eascmd version must be later than 2.6.0. For download and setup instructions, see Download and authenticate the client.

Quick reference:

CommandDescription
eascmd stream config --url=... --token=...Configure queue service access
eascmd stream infoView queue details
eascmd stream put -d "..."Send data inline
eascmd stream put -f test.dataSend all lines from a file
eascmd stream get -l10 --timeout=3sQuery up to 10 items with a timeout
eascmd stream get --tags requestId=...Filter by request ID
eascmd stream delete 3Delete a single item by index
eascmd stream trunc 4Truncate all items up to index 4
eascmd stream watchSubscribe (manual commit)
eascmd stream watch --auto-commitSubscribe (auto-commit)

Configure access

Run eascmd stream config to point the CLI at your queue service:

eascmd stream config \
  --url=http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com/api/predict/qservice \
  --token=YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==

After configuration, eascmd uses default_group and default_user as the default group_id and user_id. Override them with --group and --user in any subsequent command.

View queue details

eascmd stream info

Example output:

[OK] Attributes:
consumers.list.[0] : Id: imageasync.imageasync-35d72370-5f576f7c8d-2mdb4, Index: 0, Pending: 0, Status: Running, Idle: 19.997s, Window: 5, Slots: 5, AutoCommit: false
consumers.stats.total : 1
consumers.status.total : 1
groups.list.[0] : Id: imageasync, Index: 0, Pending: 0, Delivered: 1, Consumers: 1
meta.header.group : X-EAS-QueueService-Gid
meta.header.priority : X-EAS-QueueService-Priority
meta.header.user : X-EAS-QueueService-Uid
meta.maxPayloadBytes : 8192
meta.name : imageasync-queue-38895e88
meta.state : Normal
stream.approxMaxLength : 230399
stream.firstEntry : 0
stream.lastEntry : 0
stream.length : 0

For field descriptions, see View queue details. Use info to check queue state and verify connectivity.

Send data

Send data inline with -d:

eascmd stream put -d "10s"

Output:

[OK] 1
[INFO] Put data done.
Total time cost: 401.892141ms
Total size: 3.00 B
Total: 1, success: 1, failed: 0

Send all lines from a file with -f:

eascmd stream put -f test.data

Output:

[INFO] Opening data file: test.data
[OK] 2
[OK] 3
[OK] 4
...

Query data

Fetch up to 10 items with a 3-second timeout:

eascmd stream get -l10 --timeout=3s

Output:

[OK] [0 - 1] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=e47b76e2-2648-40fe-9197-a268015cbd1f ts@source=1685802680575] data1
[OK] [1 - 2] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=51d13952-6ba3-4d52-b548-e58837675c7a ts@source=1685807531686] data2
[OK] [2 - 3] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=ef6940e3-159c-45f3-a96d-bc0acd71275f ts@source=1685807531701] OK
...

Each row contains three columns:

  • [received_index - queue_index] — for example, [0 - 1] means the first item received has queue index 1.

  • tags[...] — metadata attached to the item:

    • Header:* — HTTP headers from the original request.

    • requestId — automatically generated request ID.

    • ts@source — Unix timestamp when the input queue received the request.

    • ts@sink — Unix timestamp when the output queue received the data.

  • Data content.

Important

If an inference service instance is running alongside, it may consume data from the input queue before you can query it. Add -k to query from the output queue instead.

To filter by request ID:

eascmd stream get --tags requestId=ef6940e3-159c-45f3-a96d-bc0acd71275f

Output:

[OK] [0 - 3] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=ef6940e3-159c-45f3-a96d-bc0acd71275f ts@source=1685807531701] OK

Delete data

Delete a single item by index:

eascmd stream delete 3

Confirm when prompted:

Deleting index(es):
3 [y/N]y
[OK] deleted

Truncate all items up to a given index:

eascmd stream trunc 4

Confirm when prompted:

trunc stream from index: 4 [y/N]y
[OK] truncated

Subscribe to the queue

Use watch to subscribe and receive data as it is pushed. By default, the client commits each item manually:

eascmd stream watch

Output:

[INFO] Start to watch: index: 0, indexOnly: false, autoCommit: false, window: 10
I0604 09:20:45.211243   66197 queue.go:532] watch via websocket
[OK] [0 - 4] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=2a5645b6-9ee5-4026-bdee-fab31e435934 ts@sink=1685807531718 ts@source=1685807531715] data4
commit: 4 ? [Y/n]
  • Enter Y to commit the item and receive the next one.

  • Enter n to perform a negative commit (reject the item):

commit: 5 ? [Y/n]n
negative: 5 ? [Y/n]y

For details on commit and negative commit semantics, see Commit and Negative.

To have the server commit items automatically:

eascmd stream watch --auto-commit

Output:

[INFO] Start to watch: index: 0, indexOnly: false, autoCommit: true, window: 10
I0604 09:30:08.554542   66408 queue.go:532] watch via websocket
[OK] [0 - 5] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=64ba6aaf-49b0-45c7-8d79-6cf6dc1065d0 ts@sink=1685807531733 ts@source=1685807531730] data5
[OK] [1 - 6] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=5825dd3e-a5e2-4754-a946-96e068d643c8 ts@sink=1685807531771 ts@source=1685807531768] data6
...

For all available eascmd stream options and commands, run eascmd stream help.

What's next