All Products
Search
Document Center

Platform For AI:SDK for Python

Last Updated:Apr 01, 2026

Call Elastic Algorithm Service (EAS) inference services from Python using the eas-prediction package. The SDK supports TensorFlow, PyTorch, Blade, string-based models, and queue-based streaming services.

Installation

pip install -U eas-prediction --user

Quick start

All examples follow the same three-step pattern: create a client, configure it, then send prediction requests.

from eas_prediction import PredictClient
from eas_prediction import StringRequest

client = PredictClient('http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com', 'scorecard_pmml_example')
client.set_token('YWFlMDYyZDNmNTc3M2I3MzMwYmY0MmYwM2Y2MTYxMTY4NzBkNzdj****')
client.init()

request = StringRequest('[{"fea1": 1, "fea2": 2}]')
resp = client.predict(request)
print(resp)

Choose your request type

If your service uses...Use these classes
Custom processors, PMML modelsStringRequest / StringResponse
TensorFlow modelsTFRequest / TFResponse
PyTorch modelsTorchRequest / TorchResponse
Blade accelerated modelsBladeRequest / BladeResponse
Queue-based streamingQueueClient / Watcher

API reference

Common parameters

endpoint: the server address for your EAS service.

  • Default gateway — use the service-specific endpoint. Example: 182848887922***.cn-shanghai.pai-eas.aliyuncs.com

  • VPC direct connection — use the regional common endpoint. Example for China (Shanghai): pai-eas-vpc.cn-shanghai.aliyuncs.com

PredictClient

PredictClient is the main client for sending prediction requests to EAS services.

MethodDescription
PredictClient(endpoint, service_name, custom_url)Creates a client. custom_url is optional — use it only for services with non-standard endpoints (not in <uid>.<region>.pai-eas.aliyuncs.com format), such as web UI services. Example: PredictClient(custom_url='<url>').
set_endpoint(endpoint)Sets the server endpoint. See Common parameters.
set_service_name(service_name)Sets the service name.
set_endpoint_type(endpoint_type)Sets the connection mode. Use ENDPOINT_TYPE_DIRECT for high-concurrency scenarios with VPC direct connection; defaults to ENDPOINT_TYPE_GATEWAY (default gateway).
set_token(token)Sets the service access token.
set_retry_count(max_retry_count)Sets the maximum retries after a request failure. Default: 5. Do not set this to `0` — the client must be able to resend requests when server errors occur or gateway connections close.
set_max_connection_count(max_connection_count)Sets the maximum persistent connections in the connection pool. The client reuses pooled connections for better performance. Default: 100.
set_timeout(timeout)Sets the request timeout in milliseconds. Default: 5000.
init()Initializes the client. Call this after all configuration methods — configuration does not take effect until init() is called.
predict(request)Sends a prediction request. Accepts StringRequest, TFRequest, TorchRequest, and other request objects. Returns the prediction response.

StringRequest

MethodDescription
StringRequest(request_data)Creates a request with a string body. request_data is the string to send.

StringResponse

MethodDescription
to_string()Returns the response body as a string.

TFRequest

MethodDescription
TFRequest(signature_name)Creates a TensorFlow request. signature_name is the model signature.
add_feed(input_name, shape, data_type, content)Specifies an input tensor. content is a flat one-dimensional array — use the shape parameter to specify the tensor dimensions separately. Supported data_type values: DT_FLOAT, DT_DOUBLE, DT_INT8, DT_INT16, DT_INT32, DT_INT64, DT_STRING, TF_BOOL.
add_fetch(output_name)Specifies which output tensor to return. Optional for SavedModel format — if omitted, all output tensors are returned. Required for frozen models.
to_string()Serializes the request protocol buffer to a string for transmission.

TFResponse

MethodDescription
get_tensor_shape(output_name)Returns the shape of the specified output tensor.
get_values(output_name)Returns the data of the specified output tensor as a one-dimensional array. Use with get_tensor_shape() to reshape it to the required dimensions. The array type matches the tensor's data type.

TorchRequest

TorchRequest uses a numeric index to identify tensors instead of names.

MethodDescription
TorchRequest()Creates a PyTorch request.
add_feed(index, shape, data_type, content)Specifies an input tensor by index. content is a flat one-dimensional array. Supported data_type values: same as TFRequest.
add_fetch(output_index)Specifies which output tensor to return by index. Optional — if not called, all output tensors are returned.
to_string()Serializes the request protocol buffer to a string for transmission.

TorchResponse

MethodDescription
get_tensor_shape(output_index)Returns the shape of the specified output tensor.
get_values(output_index)Returns the data of the specified output tensor as a one-dimensional array. Use with get_tensor_shape() to reshape it.

BladeRequest

BladeRequest is for Blade accelerated models. Unlike TFRequest and TorchRequest, add_feed takes an explicit batch_size parameter before shape.

MethodDescription
BladeRequest()Creates a Blade request.
add_feed(input_name, batch_size, shape, data_type, content)Specifies an input tensor. batch_size is the number of samples in the batch. content is a flat one-dimensional array.
add_fetch(output_name, data_type)Specifies an output tensor to return by name and data type.

QueueClient

QueueClient supports queue-based data streaming, where one process pushes data and another subscribes to results.

MethodDescription
QueueClient(endpoint, queue_name)Creates a queue client.
set_token(token)Sets the authentication token for the queue.
init(uid=None, gid='eas')Initializes the client. uid is a unique identifier for this client instance — each instance must have a unique ID, registered once, and data is distributed evenly across IDs. gid groups clients together; clients with the same group ID belong to the same group. If different groups exist, a copy of each data record is pushed to all groups.
set_logger(logger=None)Configures logging. Defaults to warnings only. Pass None to disable logging.
truncate(index)Removes all records before index, keeping only data at index and later.
put(data, tags={})Writes a record to the queue. Returns (index, requestId) — the index can be used for queries, and the auto-generated requestId can be used as a tag filter.
get(request_id=None, index=0, length=1, timeout='5s', auto_delete=True, tags={})Reads records from the queue. Returns up to length records starting from index. If request_id is specified, queries length records starting from index and returns the matching record if found, otherwise null. Waits up to timeout if fewer records are available. If auto_delete=False, records remain in the queue and can be read again; use delete() to remove them manually. Use tags to filter records. Returns a DataFrame.
attributes()Returns queue attributes (total length and current data length) as a dict.
delete(indexes)Deletes records at the specified indexes. indexes can be a single string or a list.
search(index)Returns queue status for the record at index as a JSON object with the following fields: ConsumerId (the instance processing the record), IsPending (True if being processed, False if waiting), and WaitCount (records ahead in queue, valid only when IsPending is False). If the returned log displays search error:Code 404, Message: b'no data in stream' and {} is returned, the record is not in the queue — it has either been processed and returned, or the index is incorrect.
watch(index, window, index_only=False, auto_commit=False)Subscribes to records starting from index. window controls the maximum records pushed to this client at once — the server pauses pushing until committed records free up capacity, limiting client-side concurrency. Set auto_commit=False (recommended) to commit manually after processing; uncommitted records are pushed to other instances if an exception occurs before processing completes. Returns a Watcher.
commit(index)Marks a record as processed. Once committed, the record is no longer pushed to other instances. index can be a single string or a list.

Watcher

MethodDescription
run()Establishes a WebSocket connection and returns server-pushed records in real time as a DataFrame.
close()Closes the WebSocket connection and terminates backend connections. Only one watcher can run per client — close the current watcher before starting another.

Examples

String input and output

For services using custom processors (such as PMML models), send and receive data as strings:

#!/usr/bin/env python

from eas_prediction import PredictClient
from eas_prediction import StringRequest

if __name__ == '__main__':
    client = PredictClient('http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com', 'scorecard_pmml_example')
    client.set_token('YWFlMDYyZDNmNTc3M2I3MzMwYmY0MmYwM2Y2MTYxMTY4NzBkNzdj****')
    client.init()

    request = StringRequest('[{"fea1": 1, "fea2": 2}]')
    for x in range(0, 1000000):
        resp = client.predict(request)
        print(resp)

TensorFlow tensor input and output

For TensorFlow services, use TFRequest to specify input tensors by name and TFResponse to read output tensors:

#!/usr/bin/env python

from eas_prediction import PredictClient
from eas_prediction import TFRequest

if __name__ == '__main__':
    client = PredictClient('http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com', 'mnist_saved_model_example')
    client.set_token('YTg2ZjE0ZjM4ZmE3OTc0NzYxZDMyNmYzMTJjZTQ1YmU0N2FjMTAy****')
    client.init()

    req = TFRequest('predict_images')
    # add_feed takes a flat 1D array; shape [1, 784] describes the tensor dimensions
    req.add_feed('images', [1, 784], TFRequest.DT_FLOAT, [1] * 784)
    for x in range(0, 1000000):
        resp = client.predict(req)
        print(resp)

VPC direct connection

VPC direct connection is available only for services in EAS dedicated resource groups where the resource group and vSwitch are in the same Virtual Private Cloud (VPC). For setup, see Work with dedicated resource groups and Network access configuration.

The only difference from the standard setup is set_endpoint_type(ENDPOINT_TYPE_DIRECT) and the regional VPC endpoint:

#!/usr/bin/env python

from eas_prediction import PredictClient
from eas_prediction import TFRequest
from eas_prediction import ENDPOINT_TYPE_DIRECT

if __name__ == '__main__':
    # Use the regional VPC endpoint instead of the service-specific endpoint
    client = PredictClient('http://pai-eas-vpc.cn-hangzhou.aliyuncs.com', 'mnist_saved_model_example')
    client.set_token('M2FhNjJlZDBmMzBmMzE4NjFiNzZhMmUxY2IxZjkyMDczNzAzYjFi****')
    client.set_endpoint_type(ENDPOINT_TYPE_DIRECT)
    client.init()

    request = TFRequest('predict_images')
    request.add_feed('images', [1, 784], TFRequest.DT_FLOAT, [1] * 784)
    for x in range(0, 1000000):
        resp = client.predict(request)
        print(resp)

PyTorch tensor input and output

For PyTorch services, use TorchRequest with numeric indexes instead of tensor names:

#!/usr/bin/env python

from eas_prediction import PredictClient
from eas_prediction import TorchRequest

if __name__ == '__main__':
    client = PredictClient('http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com', 'pytorch_gpu_wl')
    client.init()

    req = TorchRequest()
    # add_feed takes a flat 1D array; shape [1, 3, 224, 224] describes a single 224x224 RGB image
    req.add_feed(0, [1, 3, 224, 224], TorchRequest.DT_FLOAT, [1] * 150528)
    # req.add_fetch(0)  # Uncomment to return only tensor at index 0

    import time
    st = time.time()
    timer = 0
    for x in range(0, 10):
        resp = client.predict(req)
        timer += (time.time() - st)
        st = time.time()
        print(resp.get_tensor_shape(0))
    print("average response time: %s s" % (timer / 10))

Blade processor-based model

For Blade accelerated services, use BladeRequest. Unlike TFRequest and TorchRequest, add_feed takes an explicit batch_size parameter before shape:

#!/usr/bin/env python

from eas_prediction import PredictClient
from eas_prediction import BladeRequest

if __name__ == '__main__':
    client = PredictClient('http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com', 'nlp_model_example')
    client.init()

    req = BladeRequest()
    req.add_feed('input_data', 1, [1, 360, 128], BladeRequest.DT_FLOAT, [0.8] * 85680)
    req.add_feed('input_length', 1, [1], BladeRequest.DT_INT32, [187])
    req.add_feed('start_token', 1, [1], BladeRequest.DT_INT32, [104])
    req.add_fetch('output', BladeRequest.DT_FLOAT)

    import time
    st = time.time()
    timer = 0
    for x in range(0, 10):
        resp = client.predict(req)
        timer += (time.time() - st)
        st = time.time()
        print(resp.get_tensor_shape('output'))
    print("average response time: %s s" % (timer / 10))

Blade with TensorFlow compatibility

For EAS Blade models that expose a TensorFlow-compatible interface, import TFRequest from eas_prediction.blade_tf_request instead of the standard path:

#!/usr/bin/env python

from eas_prediction import PredictClient
from eas_prediction.blade_tf_request import TFRequest  # Blade-compatible TFRequest

if __name__ == '__main__':
    client = PredictClient('http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com', 'nlp_model_example')
    client.init()

    req = TFRequest(signature_name='predict_words')
    req.add_feed('input_data', [1, 360, 128], TFRequest.DT_FLOAT, [0.8] * 85680)
    req.add_feed('input_length', [1], TFRequest.DT_INT32, [187])
    req.add_feed('start_token', [1], TFRequest.DT_INT32, [104])
    req.add_fetch('output')

    import time
    st = time.time()
    timer = 0
    for x in range(0, 10):
        resp = client.predict(req)
        timer += (time.time() - st)
        st = time.time()
        print(resp.get_tensor_shape('output'))
    print("average response time: %s s" % (timer / 10))

Queue service for data streaming

Send records to a queue while a separate thread subscribes to results using a Watcher. This pattern is common for asynchronous batch processing pipelines.

#!/usr/bin/env python

from eas_prediction import QueueClient
import threading

if __name__ == '__main__':
    endpoint = '182848887922****.cn-shanghai.pai-eas.aliyuncs.com'
    queue_name = 'test_group.qservice/sink'
    token = 'YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MTUx****'

    queue = QueueClient(endpoint, queue_name)
    queue.set_token(token)
    queue.init()
    queue.set_timeout(30000)

    # Clear the queue before starting
    attributes = queue.attributes()
    if 'stream.lastEntry' in attributes:
        queue.truncate(int(attributes['stream.lastEntry']) + 1)

    count = 100

    def send_thread():
        for i in range(count):
            index, request_id = queue.put('[{}]')
            print('send: ', i, index, request_id)

    def watch_thread():
        # window=5 limits the server to pushing at most 5 unacknowledged records at a time
        watcher = queue.watch(0, 5, auto_commit=True)
        i = 0
        for x in watcher.run():
            print('recv: ', i, x.index, x.tags['requestId'])
            i += 1
            if i == count:
                break
        watcher.close()

    thread1 = threading.Thread(target=watch_thread)
    thread2 = threading.Thread(target=send_thread)

    thread1.start()
    thread2.start()

    thread1.join()
    thread2.join()