Call Elastic Algorithm Service (EAS) inference services from Python using the eas-prediction package. The SDK supports TensorFlow, PyTorch, Blade, string-based models, and queue-based streaming services.
Installation
pip install -U eas-prediction --userQuick start
All examples follow the same three-step pattern: create a client, configure it, then send prediction requests.
from eas_prediction import PredictClient
from eas_prediction import StringRequest
client = PredictClient('http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com', 'scorecard_pmml_example')
client.set_token('YWFlMDYyZDNmNTc3M2I3MzMwYmY0MmYwM2Y2MTYxMTY4NzBkNzdj****')
client.init()
request = StringRequest('[{"fea1": 1, "fea2": 2}]')
resp = client.predict(request)
print(resp)Choose your request type
| If your service uses... | Use these classes |
|---|---|
| Custom processors, PMML models | StringRequest / StringResponse |
| TensorFlow models | TFRequest / TFResponse |
| PyTorch models | TorchRequest / TorchResponse |
| Blade accelerated models | BladeRequest / BladeResponse |
| Queue-based streaming | QueueClient / Watcher |
API reference
Common parameters
endpoint: the server address for your EAS service.
Default gateway — use the service-specific endpoint. Example:
182848887922***.cn-shanghai.pai-eas.aliyuncs.comVPC direct connection — use the regional common endpoint. Example for China (Shanghai):
pai-eas-vpc.cn-shanghai.aliyuncs.com
PredictClient
PredictClient is the main client for sending prediction requests to EAS services.
| Method | Description |
|---|---|
PredictClient(endpoint, service_name, custom_url) | Creates a client. custom_url is optional — use it only for services with non-standard endpoints (not in <uid>.<region>.pai-eas.aliyuncs.com format), such as web UI services. Example: PredictClient(custom_url='<url>'). |
set_endpoint(endpoint) | Sets the server endpoint. See Common parameters. |
set_service_name(service_name) | Sets the service name. |
set_endpoint_type(endpoint_type) | Sets the connection mode. Use ENDPOINT_TYPE_DIRECT for high-concurrency scenarios with VPC direct connection; defaults to ENDPOINT_TYPE_GATEWAY (default gateway). |
set_token(token) | Sets the service access token. |
set_retry_count(max_retry_count) | Sets the maximum retries after a request failure. Default: 5. Do not set this to `0` — the client must be able to resend requests when server errors occur or gateway connections close. |
set_max_connection_count(max_connection_count) | Sets the maximum persistent connections in the connection pool. The client reuses pooled connections for better performance. Default: 100. |
set_timeout(timeout) | Sets the request timeout in milliseconds. Default: 5000. |
init() | Initializes the client. Call this after all configuration methods — configuration does not take effect until init() is called. |
predict(request) | Sends a prediction request. Accepts StringRequest, TFRequest, TorchRequest, and other request objects. Returns the prediction response. |
StringRequest
| Method | Description |
|---|---|
StringRequest(request_data) | Creates a request with a string body. request_data is the string to send. |
StringResponse
| Method | Description |
|---|---|
to_string() | Returns the response body as a string. |
TFRequest
| Method | Description |
|---|---|
TFRequest(signature_name) | Creates a TensorFlow request. signature_name is the model signature. |
add_feed(input_name, shape, data_type, content) | Specifies an input tensor. content is a flat one-dimensional array — use the shape parameter to specify the tensor dimensions separately. Supported data_type values: DT_FLOAT, DT_DOUBLE, DT_INT8, DT_INT16, DT_INT32, DT_INT64, DT_STRING, TF_BOOL. |
add_fetch(output_name) | Specifies which output tensor to return. Optional for SavedModel format — if omitted, all output tensors are returned. Required for frozen models. |
to_string() | Serializes the request protocol buffer to a string for transmission. |
TFResponse
| Method | Description |
|---|---|
get_tensor_shape(output_name) | Returns the shape of the specified output tensor. |
get_values(output_name) | Returns the data of the specified output tensor as a one-dimensional array. Use with get_tensor_shape() to reshape it to the required dimensions. The array type matches the tensor's data type. |
TorchRequest
TorchRequest uses a numeric index to identify tensors instead of names.
| Method | Description |
|---|---|
TorchRequest() | Creates a PyTorch request. |
add_feed(index, shape, data_type, content) | Specifies an input tensor by index. content is a flat one-dimensional array. Supported data_type values: same as TFRequest. |
add_fetch(output_index) | Specifies which output tensor to return by index. Optional — if not called, all output tensors are returned. |
to_string() | Serializes the request protocol buffer to a string for transmission. |
TorchResponse
| Method | Description |
|---|---|
get_tensor_shape(output_index) | Returns the shape of the specified output tensor. |
get_values(output_index) | Returns the data of the specified output tensor as a one-dimensional array. Use with get_tensor_shape() to reshape it. |
BladeRequest
BladeRequest is for Blade accelerated models. Unlike TFRequest and TorchRequest, add_feed takes an explicit batch_size parameter before shape.
| Method | Description |
|---|---|
BladeRequest() | Creates a Blade request. |
add_feed(input_name, batch_size, shape, data_type, content) | Specifies an input tensor. batch_size is the number of samples in the batch. content is a flat one-dimensional array. |
add_fetch(output_name, data_type) | Specifies an output tensor to return by name and data type. |
QueueClient
QueueClient supports queue-based data streaming, where one process pushes data and another subscribes to results.
| Method | Description |
|---|---|
QueueClient(endpoint, queue_name) | Creates a queue client. |
set_token(token) | Sets the authentication token for the queue. |
init(uid=None, gid='eas') | Initializes the client. uid is a unique identifier for this client instance — each instance must have a unique ID, registered once, and data is distributed evenly across IDs. gid groups clients together; clients with the same group ID belong to the same group. If different groups exist, a copy of each data record is pushed to all groups. |
set_logger(logger=None) | Configures logging. Defaults to warnings only. Pass None to disable logging. |
truncate(index) | Removes all records before index, keeping only data at index and later. |
put(data, tags={}) | Writes a record to the queue. Returns (index, requestId) — the index can be used for queries, and the auto-generated requestId can be used as a tag filter. |
get(request_id=None, index=0, length=1, timeout='5s', auto_delete=True, tags={}) | Reads records from the queue. Returns up to length records starting from index. If request_id is specified, queries length records starting from index and returns the matching record if found, otherwise null. Waits up to timeout if fewer records are available. If auto_delete=False, records remain in the queue and can be read again; use delete() to remove them manually. Use tags to filter records. Returns a DataFrame. |
attributes() | Returns queue attributes (total length and current data length) as a dict. |
delete(indexes) | Deletes records at the specified indexes. indexes can be a single string or a list. |
search(index) | Returns queue status for the record at index as a JSON object with the following fields: ConsumerId (the instance processing the record), IsPending (True if being processed, False if waiting), and WaitCount (records ahead in queue, valid only when IsPending is False). If the returned log displays search error:Code 404, Message: b'no data in stream' and {} is returned, the record is not in the queue — it has either been processed and returned, or the index is incorrect. |
watch(index, window, index_only=False, auto_commit=False) | Subscribes to records starting from index. window controls the maximum records pushed to this client at once — the server pauses pushing until committed records free up capacity, limiting client-side concurrency. Set auto_commit=False (recommended) to commit manually after processing; uncommitted records are pushed to other instances if an exception occurs before processing completes. Returns a Watcher. |
commit(index) | Marks a record as processed. Once committed, the record is no longer pushed to other instances. index can be a single string or a list. |
Watcher
| Method | Description |
|---|---|
run() | Establishes a WebSocket connection and returns server-pushed records in real time as a DataFrame. |
close() | Closes the WebSocket connection and terminates backend connections. Only one watcher can run per client — close the current watcher before starting another. |
Examples
String input and output
For services using custom processors (such as PMML models), send and receive data as strings:
#!/usr/bin/env python
from eas_prediction import PredictClient
from eas_prediction import StringRequest
if __name__ == '__main__':
client = PredictClient('http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com', 'scorecard_pmml_example')
client.set_token('YWFlMDYyZDNmNTc3M2I3MzMwYmY0MmYwM2Y2MTYxMTY4NzBkNzdj****')
client.init()
request = StringRequest('[{"fea1": 1, "fea2": 2}]')
for x in range(0, 1000000):
resp = client.predict(request)
print(resp)TensorFlow tensor input and output
For TensorFlow services, use TFRequest to specify input tensors by name and TFResponse to read output tensors:
#!/usr/bin/env python
from eas_prediction import PredictClient
from eas_prediction import TFRequest
if __name__ == '__main__':
client = PredictClient('http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com', 'mnist_saved_model_example')
client.set_token('YTg2ZjE0ZjM4ZmE3OTc0NzYxZDMyNmYzMTJjZTQ1YmU0N2FjMTAy****')
client.init()
req = TFRequest('predict_images')
# add_feed takes a flat 1D array; shape [1, 784] describes the tensor dimensions
req.add_feed('images', [1, 784], TFRequest.DT_FLOAT, [1] * 784)
for x in range(0, 1000000):
resp = client.predict(req)
print(resp)VPC direct connection
VPC direct connection is available only for services in EAS dedicated resource groups where the resource group and vSwitch are in the same Virtual Private Cloud (VPC). For setup, see Work with dedicated resource groups and Network access configuration.
The only difference from the standard setup is set_endpoint_type(ENDPOINT_TYPE_DIRECT) and the regional VPC endpoint:
#!/usr/bin/env python
from eas_prediction import PredictClient
from eas_prediction import TFRequest
from eas_prediction import ENDPOINT_TYPE_DIRECT
if __name__ == '__main__':
# Use the regional VPC endpoint instead of the service-specific endpoint
client = PredictClient('http://pai-eas-vpc.cn-hangzhou.aliyuncs.com', 'mnist_saved_model_example')
client.set_token('M2FhNjJlZDBmMzBmMzE4NjFiNzZhMmUxY2IxZjkyMDczNzAzYjFi****')
client.set_endpoint_type(ENDPOINT_TYPE_DIRECT)
client.init()
request = TFRequest('predict_images')
request.add_feed('images', [1, 784], TFRequest.DT_FLOAT, [1] * 784)
for x in range(0, 1000000):
resp = client.predict(request)
print(resp)PyTorch tensor input and output
For PyTorch services, use TorchRequest with numeric indexes instead of tensor names:
#!/usr/bin/env python
from eas_prediction import PredictClient
from eas_prediction import TorchRequest
if __name__ == '__main__':
client = PredictClient('http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com', 'pytorch_gpu_wl')
client.init()
req = TorchRequest()
# add_feed takes a flat 1D array; shape [1, 3, 224, 224] describes a single 224x224 RGB image
req.add_feed(0, [1, 3, 224, 224], TorchRequest.DT_FLOAT, [1] * 150528)
# req.add_fetch(0) # Uncomment to return only tensor at index 0
import time
st = time.time()
timer = 0
for x in range(0, 10):
resp = client.predict(req)
timer += (time.time() - st)
st = time.time()
print(resp.get_tensor_shape(0))
print("average response time: %s s" % (timer / 10))Blade processor-based model
For Blade accelerated services, use BladeRequest. Unlike TFRequest and TorchRequest, add_feed takes an explicit batch_size parameter before shape:
#!/usr/bin/env python
from eas_prediction import PredictClient
from eas_prediction import BladeRequest
if __name__ == '__main__':
client = PredictClient('http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com', 'nlp_model_example')
client.init()
req = BladeRequest()
req.add_feed('input_data', 1, [1, 360, 128], BladeRequest.DT_FLOAT, [0.8] * 85680)
req.add_feed('input_length', 1, [1], BladeRequest.DT_INT32, [187])
req.add_feed('start_token', 1, [1], BladeRequest.DT_INT32, [104])
req.add_fetch('output', BladeRequest.DT_FLOAT)
import time
st = time.time()
timer = 0
for x in range(0, 10):
resp = client.predict(req)
timer += (time.time() - st)
st = time.time()
print(resp.get_tensor_shape('output'))
print("average response time: %s s" % (timer / 10))Blade with TensorFlow compatibility
For EAS Blade models that expose a TensorFlow-compatible interface, import TFRequest from eas_prediction.blade_tf_request instead of the standard path:
#!/usr/bin/env python
from eas_prediction import PredictClient
from eas_prediction.blade_tf_request import TFRequest # Blade-compatible TFRequest
if __name__ == '__main__':
client = PredictClient('http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com', 'nlp_model_example')
client.init()
req = TFRequest(signature_name='predict_words')
req.add_feed('input_data', [1, 360, 128], TFRequest.DT_FLOAT, [0.8] * 85680)
req.add_feed('input_length', [1], TFRequest.DT_INT32, [187])
req.add_feed('start_token', [1], TFRequest.DT_INT32, [104])
req.add_fetch('output')
import time
st = time.time()
timer = 0
for x in range(0, 10):
resp = client.predict(req)
timer += (time.time() - st)
st = time.time()
print(resp.get_tensor_shape('output'))
print("average response time: %s s" % (timer / 10))Queue service for data streaming
Send records to a queue while a separate thread subscribes to results using a Watcher. This pattern is common for asynchronous batch processing pipelines.
#!/usr/bin/env python
from eas_prediction import QueueClient
import threading
if __name__ == '__main__':
endpoint = '182848887922****.cn-shanghai.pai-eas.aliyuncs.com'
queue_name = 'test_group.qservice/sink'
token = 'YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MTUx****'
queue = QueueClient(endpoint, queue_name)
queue.set_token(token)
queue.init()
queue.set_timeout(30000)
# Clear the queue before starting
attributes = queue.attributes()
if 'stream.lastEntry' in attributes:
queue.truncate(int(attributes['stream.lastEntry']) + 1)
count = 100
def send_thread():
for i in range(count):
index, request_id = queue.put('[{}]')
print('send: ', i, index, request_id)
def watch_thread():
# window=5 limits the server to pushing at most 5 unacknowledged records at a time
watcher = queue.watch(0, 5, auto_commit=True)
i = 0
for x in watcher.run():
print('recv: ', i, x.index, x.tags['requestId'])
i += 1
if i == count:
break
watcher.close()
thread1 = threading.Thread(target=watch_thread)
thread2 = threading.Thread(target=send_thread)
thread1.start()
thread2.start()
thread1.join()
thread2.join()