All Products
Search
Document Center

Platform For AI:Access queue service

Last Updated:Apr 22, 2024

This topic describes how to access the queue service by calling API operations, or by using SDK or the EASCMD client.

Access the queue service by calling API operations

After you deploy an asynchronous inference service, two types of addresses are automatically generated for the input queue and output queue (sink queue). The following table provides sample HTTP endpoints.

Endpoint type

Endpoint format

Example

Input queue endpoint

{domain}/api/predict/{service_name}

xxx.cn-shanghai.pai-eas.aliyuncs.com/api/predict/{service_name}

Output queue endpoint

{domain}/api/predict/{service_name}/sink

xxx.cn-shanghai.pai-eas.aliyuncs.com/api/predict/{service_name}/sink

To obtain the addresses and service tokens of the input and output queues, go to the EAS-Online Model Services page, find the asynchronous inference service that you want to view and click Invocation Method in the Service Type column.

image

image

Send data to the queue service

Use the curl command to send a synchronous request or an asynchronous inference request to the input queue service. Sample code:

$ curl -v http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com/api/predict/qservice -H 'Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==' -d '[{}]'

The following output is returned:

> POST /api/predict/qservice HTTP/1.1
> Host: 182848887922****.cn-shanghai.pai-eas.aliyuncs.com
> Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==
>
< HTTP/1.1 200 OK
< Content-Length: 19
< X-Eas-Queueservice-Request-Id: 4e034bnvb-e783-4272-9333-68x6a1v8dc6x
<
1033

In the preceding response:

  • The value 4e034bnvb-e783-4272-9333-68x6a1v8dc6x of X-Eas-Queueservice-Request-Id in the response header is the request ID. You can use the request ID to query data.

  • The value 1033 of Index in the response body is the index of the request in the queue. You can use the index to query data in the queue.

Send priority data

Data is pushed in the queue service in the first-in-first-out (FIFO) order. However, specific data must be pushed and processed first in many scenarios. The queue service supports pushing data based on data priority. You can add the _priority_=1 parameter to push priority data to the queue service.

$ curl -v http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com/api/predict/qservice?_priority_=1 -H 'Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==' -d '[{}]'

The following output is returned:

> POST /api/predict/qservice?_priority_=1 HTTP/1.1
> Host: 182848887922****.cn-shanghai.pai-eas.aliyuncs.com
> Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==
>
< HTTP/1.1 200 OK
< Content-Length: 19
< X-Eas-Queueservice-Request-Id: 4033eb55-e783-4922-9777-68d6a1383c76
<
1034

After the priority data is written to the queue, the data is pushed to the subscriber as a priority.

View the details of a queue service

If you add the _attrs_=true parameter when you send a request to the queue service, the details of the queue are returned in the response. Sample code:

$ curl -v -H 'Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==' http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com/api/predict/qservice?_attrs_=true

The following output is returned:

> GET /api/predict/qservice?_attrs_=true HTTP/1.1
> Host: 182848887922****.cn-shanghai.pai-eas.aliyuncs.com
> Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==
>
< HTTP/1.1 200 OK
< Content-Length: 320
<
{"consumers.stats.total":"0","consumers.status.total":"0","meta.header.group":"X-EAS-QueueService-Gid","meta.header.priority":"X-EAS-QueueService-Priority","meta.header.user":"X-EAS-QueueService-Uid","stream.maxPayloadBytes":"524288","meta.name":"pmml_test","meta.state":"Normal","stream.approxMaxLength":"4095","stream.firstEntry":"0","stream.lastEntry":"0","stream.length":"1"}

The returned content is in the JSON format. The following table describes the fields.

Field

Description

stream.maxPayloadBytes

The maximum size of each data entry in the queue. Unit: bytes.

stream.approxMaxLength

The maximum number of data entries that can be stored in the queue.

stream.firstEntry

The index of the first data entry in the queue.

stream.lastEntry

The index of the last data entry in the queue.

stream.length

The number of data entries stored in the queue.

meta.state

The status of the queue.

You can also query the details of the queue service in the Platform for AI (PAI) console. Go to the EAS-Online Model Services page and click the name of the asynchronous inference service to go to the Service Details page. On the Service Details page, you can view details such as the number of data entries stored in the queue, the maximum size of each data entry, the maximum number of data entries that can be stored in the queue, and the number of subscribed instances.image

Query data

  • Query data based on conditions

    If you use only one queue service, you can use the index or the request ID to obtain data from the input queue. Sample code:

    # Use the index to query data. 
    $ curl -v -H 'Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==' http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com/api/predict/qservice?_index_=1022
    # Use the request ID to query data. 
    $ curl -v -H 'Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==' http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com/api/predict/qservice?requestId=87633037-39a4-40bf-8405-14f8e0c31896

    The following output is returned:

    > GET /api/predict/qservice?_index_=1022&_auto_delete_=false HTTP/1.1
    > Host: 182848887922****.cn-shanghai.pai-eas.aliyuncs.com
    > Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==
    >
    < HTTP/1.1 200 OK
    < Content-Length: 4
    < Content-Type: text/plain; charset=utf-8
    <
    [{}]

    You can configure the required parameters to query the inference result. The following table describes the parameters.

    Parameter

    Type

    Description

    _index_

    INT

    The start index of the data that you want to query. The default value is 0, which specifies that the query starts from the first data entry. The closer an index is to the queried data, the higher the efficiency of the query.

    _length_

    INT

    The number of data entries that you want to query. The default value is 1, which specifies that only one data entry is queried.

    _auto_delete_

    BOOL

    Specifies whether to delete the queried data entries from the queue. Default value: TRUE. This value specifies that the queried data entries are automatically deleted from the queue after the query is completed.

    _timeout_

    STRING

    The timeout period. The default value is 0, which specifies that the 204 status code is immediately returned if no data meets the query conditions. Otherwise, the request waits for a specified time. If data that meets the query conditions exists in the queue within the timeout period, the data is returned. Example: 1s (1 second), 1m (1 minute).

    requestId

    STRING

    The requestId is a built-in tag that can be used as a query condition.

    Note

    When you asynchronously call an inference service, the input queue returns the request. The Elastic Algorithm Service (EAS) service framework obtains, processes, and writes data to the output queue. The service framework uses the requestId tag to associate the input data with the output data. You can use the request ID in the input data to obtain the inference result from the output queue.

  • Query asynchronous inference results

    When you use the queue service together with an inference service, the inference service automatically retrieves request data from the input queue, performs inference, and then writes the result to the output queue (sink queue). The following sample code provides an example on how to query data from the output queue service by using the request ID 0337f7a1-a6f6-49a6-8ad7-ff2fd12b****.

    $ curl -v -H 'Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==' http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com/api/predict/qservice/sink?requestId=0337f7a1-a6f6-49a6-8ad7-ff2fd12bbe2d

    The following output is returned:

    > GET /api/predict/qservice/sink?requestId=0337f7a1-a6f6-49a6-8ad7-ff2fd12b**** HTTP/1.1
    > Host: 182848887922****.cn-shanghai.pai-eas.aliyuncs.com
    > Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==
    >
    < HTTP/1.1 200 OK
    < Content-Length: 53
    < Content-Type: text/plain; charset=utf-8
    <
    [{"p_0":0.5224580736905329,"p_1":0.4775419263094671}]

Clean up data

If you no longer require specific data in your queue, you can clean up the data by calling API operations. You can clear up data by deleting a single data entry or perform data truncation to delete multiple data entries.

  • Delete a single data entry

    # Use index to delete data. 
    $ curl -XDELETE -v -H 'Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==' http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com/api/predict/qservice?_index_=1022

    The following output is returned:

    > GET /api/predict/qservice?_index_=1022 HTTP/1.1
    > Host: 182848887922****.cn-shanghai.pai-eas.aliyuncs.com
    > Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==
    >
    < HTTP/1.1 200 OK
    < Content-Length: 4
    < Content-Type: text/plain; charset=utf-8
    <
    OK

    You can configure the following parameters to query the inference result.

    Parameter

    Type

    Description

    _index_

    INT

    The index of the data that you want to delete.

  • Delete multiple data entries

    # Use index to delete data. 
    $ curl -XDELETE -v -H 'Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==' http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com/api/predict/qservice?_index_=1023&_trunc_=true

    The following output is returned:

    > GET /api/predict/qservice?_index_=1023&_trunc_=true HTTP/1.1
    > Host: 182848887922****.cn-shanghai.pai-eas.aliyuncs.com
    > Authorization: YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==
    >
    < HTTP/1.1 200 OK
    < Content-Length: 4
    < Content-Type: text/plain; charset=utf-8
    <
    OK

    You can configure the following parameters to query the inference result.

    Parameter

    Type

    Description

    _index_

    INT

    The end index of the data that you want to delete. Data whose index is lower than the end index is deleted.

    _trunc_

    BOOL

    If you want to delete multiple entries at the same time, set this parameter to true. Otherwise, only a single data entry is deleted.

Subscribe to the queue service

In addition to queries, you can subscribe to the queue service to obtain asynchronous inference results. The queue service provides watch as the subscription interface. The client can use the watch interface to obtain inference results. The queue service controls the subscription window size based on the upper concurrency limit (worker_threads) of the inference service instances. When new data is written to the queue, the queue service automatically pushes the data to the client.

This feature is implemented by using QueueClient that is encapsulated in the WebSocket-based SDK. The feature pushes data over persistent connections. In the following example, a video and audio stream processing service is used to describe how to use QueueClient in EAS SDK for Python to subscribe to data in the queue.

Note

The inference service is optional. You can use the SDK to subscribe to the input queue of a custom service and write the output data to a third-party message queue or a data store. For example, you can output images to Object Storage Service (OSS).

  1. Install EAS SDK for Python.

    pip install eas_prediction --user
  2. You can use the put() function of QueueClient to send data to the input queue, and use the watch() function to subscribe to data from the output queue. Data transmission and subscription can occur in different threads. In this example, both are completed in the same thread by using the put and watch functions.

    #!/usr/bin/env python
    from eas_prediction import QueueClient
    # Create input queue objects to receive input data. 
    input_queue = QueueClient('182848887922****.cn-shanghai.pai-eas.aliyuncs.com', 'qservice')
    # To create a custom user and group, specify the user by using uid and the group by using gid. Example:
    # input_queue = QueueClient('182848887922****.cn-shanghai.pai-eas.aliyuncs.com', 'qservice', uid='your_user_id', gid='your_group_id')
    input_queue.set_token('YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==')
    input_queue.init()
    
    # Create output queue objects to subscribe to the processing results in the output queues. 
    sink_queue = QueueClient('182848887922****.cn-shanghai.pai-eas.aliyuncs.com', 'qservice/sink')
    sink_queue.set_token('YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==')
    sink_queue.init()
    
    # Push 10 data entries to each input queue. 
    for x in range(10):
        index, request_id = input_queue.put('[{}]')
        print(index, request_id)
    
        # View details of the input queues. 
        attrs = input_queue.attributes()
        print(attrs)
    
    # Subscribe to the data in the output queues. The window size is 5. 
    i = 0
    watcher = sink_queue.watch(0, 5, auto_commit=False)
    for x in watcher.run():
        print(x.data.decode('utf-8'))
    
        # Manually commit after each request is processed. 
        sink_queue.commit(x.index)
        i += 1
        if i == 10:
            break
    # Disable the watcher object. Each client can use only one watcher object. If you do not disable the watcher object, an error occurs when you rerun the client. 
    watcher.close()
    

Use the EASCMD client to access the queue service

The EASCMD client encapsulates the complete queue service APIs. You can run the eascmd stream subcommand to manage and debug the queue service.

Download the EASCMD client

Make sure that the EASCMD version is later than 2.6.0. For information about how to download, update, and configure the EASCMD client, see Download the EASCMD client and complete user authentication.

Configure EASCMD to access the queue service

Run the following easmd stream config command to configure access to the queue service:

eascmd stream config --url=http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com/api/predict/qservice --token=YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MT****==

After you complete the configuration, EASCMD uses default_group as the default group_id and default_user as the default user_id. For information about group and users, see Subscription and pushing of queue service. You can use the -- group and the -- user parameters to create custom group_id and user_id. All parameters in the easmd stream config command can be overwritten by other read and write commands.

Query queue details

Run the info command to view the queue details. Sample code:

eascmd stream info

The following output is returned:

[OK] Attributes: 
consumers.list.[0] : Id: imageasync.imageasync-35d72370-5f576f7c8d-2mdb4, Index: 0, Pending: 0, Status: Running, Idle: 19.997s, Window: 5, Slots: 5, AutoCommit: false
consumers.stats.total : 1
consumers.status.total : 1
groups.list.[0] : Id: imageasync, Index: 0, Pending: 0, Delivered: 1, Consumers: 1
meta.header.group : X-EAS-QueueService-Gid
meta.header.priority : X-EAS-QueueService-Priority
meta.header.user : X-EAS-QueueService-Uid
meta.maxPayloadBytes : 8192
meta.name : imageasync-queue-38895e88
meta.state : Normal
stream.approxMaxLength : 230399
stream.firstEntry : 0
stream.lastEntry : 0
stream.length : 0

For information about the parameters in the response, see the Access the queue service by calling API operations section of this topic. You can use the info command to obtain the attributes of the queue and test connectivity to the queue service.

Send data to a queue

Run the put command to send data to the queue. Sample code:

eascmd stream put -d "10s"

The following output is returned:

[OK] 1
[INFO] Put data done.
Total time cost: 401.892141ms
Total size: 3.00 B
Total: 1, success: 1, failed: 0

You can also send all data in the file to the queue by using the -f parameter. Sample code:

eascmdm stream put -f test.data

The following output is returned:

[INFO] Opening data file: test.data
[OK] 2
[OK] 3
[OK] 4
[OK] 5
[OK] 6
[OK] 7
[OK] 8
[OK] 9
[OK] 10
[OK] 11
[OK] 12
[OK] 13
....

In this case, you can run the info command to view the queue status.

Query data in a queue

Run the get command to query data from the queue. Sample code:

eascmd stream get -l10 --timeout=3s

The following output is returned:

[OK] [0 - 1] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=e47b76e2-2648-40fe-9197-a268015cbd1f ts@source=1685802680575] data1
[OK] [1 - 2] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=51d13952-6ba3-4d52-b548-e58837675c7a ts@source=1685807531686] data2
[OK] [2 - 3] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=ef6940e3-159c-45f3-a96d-bc0acd71275f ts@source=1685807531701] OK
[OK] [3 - 4] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=2a5645b6-9ee5-4026-bdee-fab31e435934 ts@source=1685807531715] data4
[OK] [4 - 5] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=64ba6aaf-49b0-45c7-8d79-6cf6dc1065d0 ts@source=1685807531730] data5
...

In the first data entry received:

  • The [0 - 1] column indicates that the index of data 0 is 1.

  • The tags[Header:Content-Type=text/plain; charset= ...] column indicates that the data contains tags. In the preceding response:

    • Header indicates the header of the HTTP request when you input data.

    • requestId indicates the built-in request ID that is automatically generated.

    • ts@source indicates the UNIX timestamp when the input queue receives the request. ts@sink indicates the timestamp when the output queue receives the data.

  • The last column indicates the data you entered.

Important

If the queue is matched with an inference service instance, the input data may be consumed by the instance after the data is sent to the queue. In this case, add the -k parameter to the command to query data in the output queue.

You can also use the -- tags parameter to add query conditions. For example, run the following command to query data by using requestId:

eascmd stream get --tags requestId=ef6940e3-159c-45f3-a96d-bc0acd71275f

The following output is returned:

[OK] [0 - 3] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=ef6940e3-159c-45f3-a96d-bc0acd71275f ts@source=1685807531701] OK

Delete data from a queue

You can run the delete command to delete a single data entry or the trunc command to delete multiple data entries. The following section provides examples.

Delete a single data entry:

 eascmd stream delete 3

After the data entry is deleted, the following output is returned:

Deleting index(es):
3 [y/N]y
[OK] deleted

Delete multiple data entries:

eascmd stream trunc 4

After the data entries are deleted, the following output is returned:

trunc stream from index: 4 [y/N]y
[OK] truncated

Subscribe to a queue

Run the watch command to subscribe to the queue service. Sample code:

 eascmd stream watch

The following output is returned:

[INFO] Start to watch: index: 0, indexOnly: false, autoCommit: false, window: 10
I0604 09:20:45.211243   66197 queue.go:532] watch via websocket
[OK] [0 - 4] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=2a5645b6-9ee5-4026-bdee-fab31e435934 ts@sink=1685807531718 ts@source=1685807531715] data4
commit: 4 ? [Y/n]

Enter Y to commit and obtain new data:

[OK] [1 - 5] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=64ba6aaf-49b0-45c7-8d79-6cf6dc1065d0 ts@sink=1685807531733 ts@source=1685807531730] data5
commit: 5 ? [Y/n]

Enter n to specify whether to perform a negative commit:

[OK] [1 - 5] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=64ba6aaf-49b0-45c7-8d79-6cf6dc1065d0 ts@sink=1685807531733 ts@source=1685807531730] data5
commit: 5 ? [Y/n]n
negative: 5 ? [Y/n]y

For information about commit and negative commit, see the "Commit and Negative" section in the Subscription and pushing of queue service topic.

If you use the -- auto-commit option, data is automatically committed on the server:

 eascmd stream  watch --auto-commit

The following output is returned:

[INFO] Start to watch: index: 0, indexOnly: false, autoCommit: true, window: 10
I0604 09:30:08.554542   66408 queue.go:532] watch via websocket
[OK] [0 - 5] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=64ba6aaf-49b0-45c7-8d79-6cf6dc1065d0 ts@sink=1685807531733 ts@source=1685807531730] data5
[OK] [1 - 6] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=5825dd3e-a5e2-4754-a946-96e068d643c8 ts@sink=1685807531771 ts@source=1685807531768] data6
[OK] [2 - 7] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=e7edf9b8-de78-41a0-8d9c-0a4aaf7dcaaf ts@sink=1685807531786 ts@source=1685807531783] data7
[OK] [3 - 8] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=3ddc3481-934a-4408-8d08-11c2c2248ef6 ts@sink=1685807531801 ts@source=1685807531798] data8
[OK] [4 - 9] tags[Header:Content-Type=text/plain; charset=utf-8 requestId=561da95d-b99a-4710-bb82-9402baa21f36 ts@sink=1685807531816 ts@source=1685807531812] data9
....

Other options and commands

The preceding section describes the most commonly used commands and options of eascmd stream. For information about the extended functions of eascmd stream, run the eascmd stream help command.