EAS SDK for Java lets you call Elastic Algorithm Service (EAS) model services from Java applications. This reference covers the SDK's classes and methods, with runnable examples for common scenarios: string input/output, TensorFlow tensor input/output, the queue service, and request compression.
Prerequisites
Before you begin, ensure that you have:
Maven installed
An EAS model service deployed on PAI, with its token and endpoint ready
Add dependencies
Add the eas-sdk dependency to your pom.xml. Check the Maven repository for the latest version.
<dependency>
<groupId>com.aliyun.openservices.eas</groupId>
<artifactId>eas-sdk</artifactId>
<version>2.0.20</version>
</dependency>If you use the queue service (available since version 2.0.5), add these two additional dependencies:
<dependency>
<groupId>org.java-websocket</groupId>
<artifactId>Java-WebSocket</artifactId>
<version>1.5.1</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.1</version>
</dependency>Quick start
The following example sends a string request to an EAS model service and prints the response. All examples in this document follow the same client setup pattern.
import com.aliyun.openservices.eas.predict.http.PredictClient;
import com.aliyun.openservices.eas.predict.http.HttpConfig;
public class QuickStart {
public static void main(String[] args) throws Exception {
// Initialize a PredictClient. Share one instance across requests — do not create a new
// client per request.
PredictClient client = new PredictClient(new HttpConfig());
client.setToken("<your-service-token>");
client.setEndpoint("<your-endpoint>"); // Format: host:port
client.setModelName("<your-service-name>");
String request = "[{\"money_credit\": 3000000}]";
try {
String response = client.predict(request);
System.out.println(response);
} catch (Exception e) {
System.err.printf("Prediction failed. Error code: %d, message: %s%n",
client.getErrorCode(), client.getErrorMessage());
e.printStackTrace();
} finally {
client.shutdown();
}
}
}Replace the placeholders with your actual values:
| Placeholder | Description | Example |
|---|---|---|
<your-service-token> | Authentication token for the model service | YWFlMDYy... |
<your-endpoint> | Service endpoint in host:port format | 182848887922****.vpc.cn-shanghai.pai-eas.aliyuncs.com |
<your-service-name> | Name of the deployed model service | scorecard_pmml_example |
API reference
PredictClient class
PredictClient is the main client class. Use it to configure service connection parameters, send prediction requests, and handle responses.
| Method | Description |
|---|---|
PredictClient(HttpConfig httpConfig) | Constructs a PredictClient instance using the given HttpConfig. |
void setToken(String token) | Sets the authentication token for HTTP requests. |
void setModelName(String modelName) | Sets the name of the model service for online prediction. |
void setEndpoint(String endpoint) | Sets the service endpoint in host:port format. |
void setDirectEndpoint(String endpoint) | Sets the endpoint for VPC direct connection (bypasses the public gateway). Example: pai-eas-vpc.cn-shanghai.aliyuncs.com. |
void setRequestPath(String requestPath) | Sets the request path defined in the server code. Example: client.setRequestPath("/custom_path"). |
void setRetryCount(int retryCount) | Sets the maximum number of retries on request failure. |
void setRetryConditions(EnumSet retryConditions) | Sets the conditions that trigger a retry. Use together with setRetryCount. By default, all failures trigger a retry. |
void setContentType(String contentType) | Sets the content type for HTTP requests. Default: application/octet-stream. |
void setUrl(String url) | Sets a custom request URL. |
void setCompressor(Compressor compressor) | Sets the request compression method. Supported values: Compressor.Gzip, Compressor.Zlib. |
void addExtraHeaders(Map<String, String> extraHeaders) | Adds custom HTTP headers. |
void createChildClient(String token, String endpoint, String modelName) | Creates a child client that shares the parent's thread pool. Use this for multi-threaded predictions. |
void predict(TFRequest runRequest) | Sends a TensorFlow prediction request. |
void predict(String requestContent) | Sends a prediction request with a string body. |
void predict(byte[] requestContent) | Sends a prediction request with a byte array body. |
int getErrorCode() | Returns the HTTP status code of the last failed call. |
String getErrorMessage() | Returns the error message of the last failed call. |
Retry conditions
setRetryConditions accepts an EnumSet with one or more of the following values:
| Value | Description |
|---|---|
RetryCondition.CONNECTION_FAILED | The connection attempt failed. |
RetryCondition.CONNECTION_TIMEOUT | The connection timed out. |
RetryCondition.READ_TIMEOUT | Reading the response timed out. |
RetryCondition.RESPONSE_5XX | The server returned a 5xx status code. |
RetryCondition.RESPONSE_4XX | The server returned a 4xx status code. |
Example — retry only on read timeouts and 5xx errors:
client.setRetryConditions(
EnumSet.of(
RetryCondition.READ_TIMEOUT,
RetryCondition.RESPONSE_5XX
)
);VPC direct connection vs. public gateway
| Connection type | Method | Endpoint format |
|---|---|---|
| Public gateway | setEndpoint | <user-id>.vpc.<region>.pai-eas.aliyuncs.com |
| VPC direct connection | setDirectEndpoint | pai-eas-vpc.<region>.aliyuncs.com |
VPC direct connection bypasses the public gateway, improving stability and reducing latency. To use it, enable VPC direct connection and configure a vSwitch in the PAI console.
HttpConfig class
HttpConfig configures the underlying HTTP connection: timeouts, thread counts, and the connection pool.
| Method | Description | Default |
|---|---|---|
void setIoThreadNum(int ioThreadNum) | Sets the number of I/O threads for sending HTTP requests. | 2 |
void setReadTimeout(int readTimeout) | Sets the response read timeout (milliseconds). | 5000 |
void setConnectTimeout(int connectTimeout) | Sets the connection timeout (milliseconds). | 5000 |
void setMaxConnectionCount(int maxConnectionCount) | Sets the maximum total connections in the connection pool. | 1000 |
void setMaxConnectionPerRoute(int maxConnectionPerRoute) | Sets the maximum connections per route. | 1000 |
void setKeepAlive(boolean keepAlive) | Enables or disables HTTP keep-alive. | true |
TFRequest class
TFRequest builds input data for TensorFlow models saved in SavedModel format.
| Method | Description |
|---|---|
void setSignatureName(String value) | Sets the SignatureDef name of the TensorFlow model. |
void addFeed(String inputName, TFDataType dataType, long[] shape, ?[] content) | Adds an input tensor. content must be a one-dimensional array matching the dataType (see below). |
void addFetch(String value) | Registers an output tensor alias to retrieve after inference. |
List<Long> getTensorShape(String outputName) | Returns the shape of the specified output tensor. |
List<Float> getFloatVals(String outputName) | Returns float output values. Use for DT_FLOAT, DT_COMPLEX64, DT_BFLOAT16, DT_HALF. |
List<Double> getDoubleVals(String outputName) | Returns double output values. Use for DT_DOUBLE, DT_COMPLEX128. |
List<Integer> getIntVals(String outputName) | Returns integer output values. Use for DT_INT32, DT_UINT8, DT_INT16, DT_INT8, DT_QINT8, DT_QUINT8, DT_QINT32, DT_QINT16, DT_QUINT16, DT_UINT16. |
List<String> getStringVals(String outputName) | Returns string output values. Use for DT_STRING. |
List<Long> getInt64Vals(String outputName) | Returns long output values. Use for DT_INT64. |
List<Boolean> getBoolVals(String outputName) | Returns boolean output values. Use for DT_BOOL. |
Content array type by TFDataType
| TFDataType | Array element type | Notes |
|---|---|---|
DT_FLOAT, DT_COMPLEX64, DT_BFLOAT16, DT_HALF | float | For DT_COMPLEX64, each pair of adjacent elements represents the real and imaginary parts of a complex number. |
DT_DOUBLE, DT_COMPLEX128 | double | For DT_COMPLEX128, each pair of adjacent elements represents the real and imaginary parts of a complex number. |
DT_INT32, DT_UINT8, DT_INT16, DT_INT8, DT_QINT8, DT_QUINT8, DT_QINT32, DT_QINT16, DT_QUINT16, DT_UINT16 | int | |
DT_INT64 | long | |
DT_STRING | String | |
DT_BOOL | boolean |
QueueClient class
QueueClient interacts with the EAS queue service to produce, consume, and manage data in asynchronous inference workflows.
| Method | Description |
|---|---|
QueueClient(String endpoint, String queueName, String token, HttpConfig httpConfig, QueueUser user) | Constructs a QueueClient. user holds the UserId (default: random) and GroupName (default: eas). |
JSONObject attributes() | Returns queue metadata: meta.maxPayloadBytes, meta.name, stream.approxMaxLength, stream.firstEntry, stream.lastEntry, stream.length. |
Pair<Long, String> put(byte[] data, long priority, Map<String, String> tags) | Writes a record to the queue. Returns the record index and request ID. Priority 0 = low, 1 = high. |
DataFrame[] get(long index, long length, long timeout, boolean autoDelete, Map<String, String> tags) | Reads records starting from index. Pass -1 to start from the latest. autoDelete removes records after retrieval. |
void truncate(Long index) | Deletes all records with an index smaller than index. |
String delete(Long index) | Deletes the record at index. Returns OK on success. |
JSONObject search(long index) | Returns queue status for the record at index: ConsumerId, IsPending, WaitCount. |
WebSocketWatcher watch(long index, long window, boolean indexOnly, boolean autoCommit, Map<String, String> tags) | Subscribes to the output queue via WebSocket. Returns a WebSocketWatcher to read results. If autoCommit is set to true, the window parameter is invalid. |
String commit(Long index) or String commit(Long[] index) | Confirms consumption and removes the record(s) from the queue. Returns OK on success. |
void end(boolean force) | Stops the queue service. |
Using search with the correct group ID
When calling search, set the group ID to the service name. Otherwise, IsPending always returns false.
// Set the group ID to the service name
QueueUser u = new QueueUser(UUID.randomUUID().toString(), "<service_name>");
QueueClient inputQueue = new QueueClient(queueEndpoint, inputQueueName, queueToken, new HttpConfig(), u);
// Query the queue position of a specific record
System.out.println(inputQueue.search(index));Sample search responses:
{'ConsumerId': 'eas.****', 'IsPending': False, 'WaitCount': 2}— the record is queued and waiting.{}with log messageno data in stream— the record was not found. It may have already been processed and returned to the client, or theindexvalue is incorrect.
DataFrame class
DataFrame wraps a single data record from the queue service.
| Method | Description |
|---|---|
byte[] getData() | Returns the record's raw data as a byte array. |
long getIndex() | Returns the record's index. |
Map<String, String> getTags() | Returns the record's tags. Use df.getTags().get("requestId") to get the request ID. |
Examples
Send a string request
Use this pattern when your model service accepts and returns plain strings, for example with PMML models deployed using a custom processor.
import com.aliyun.openservices.eas.predict.http.PredictClient;
import com.aliyun.openservices.eas.predict.http.HttpConfig;
public class TestString {
public static void main(String[] args) throws Exception {
// Initialize the client. One PredictClient instance handles all requests to a service.
// Do not create a new instance per request.
PredictClient client = new PredictClient(new HttpConfig());
client.setToken("YWFlMDYyZDNmNTc3M2I3MzMwYmY0MmYwM2Y2MTYxMTY4NzBkNzdj****");
// For VPC direct connection, use setDirectEndpoint instead:
// client.setDirectEndpoint("pai-eas-vpc.cn-shanghai.aliyuncs.com");
client.setEndpoint("182848887922****.vpc.cn-shanghai.pai-eas.aliyuncs.com");
client.setModelName("scorecard_pmml_example");
String request = "[{\"money_credit\": 3000000}, {\"money_credit\": 10000}]";
System.out.println(request);
try {
String response = client.predict(request);
System.out.println(response);
} catch (Exception e) {
System.err.printf("Prediction failed. Error code: %d, message: %s%n",
client.getErrorCode(), client.getErrorMessage());
e.printStackTrace();
} finally {
client.shutdown();
}
}
}Send a TensorFlow request
For TensorFlow models, use TFRequest to build the input and TFResponse to read the output.
import java.util.List;
import com.aliyun.openservices.eas.predict.http.PredictClient;
import com.aliyun.openservices.eas.predict.http.HttpConfig;
import com.aliyun.openservices.eas.predict.request.TFDataType;
import com.aliyun.openservices.eas.predict.request.TFRequest;
import com.aliyun.openservices.eas.predict.response.TFResponse;
public class TestTF {
// Build a TFRequest for the "predict_images" signature.
// Input: 784 float values (1x784 tensor), all initialized to 0.0.
// Output: the "scores" tensor.
public static TFRequest buildPredictRequest() {
TFRequest request = new TFRequest();
request.setSignatureName("predict_images");
float[] content = new float[784];
request.addFeed("images", TFDataType.DT_FLOAT, new long[]{1, 784}, content);
request.addFetch("scores");
return request;
}
public static void main(String[] args) throws Exception {
PredictClient client = new PredictClient(new HttpConfig());
// For VPC direct connection, use setDirectEndpoint instead:
// client.setDirectEndpoint("pai-eas-vpc.cn-shanghai.aliyuncs.com");
client.setEndpoint("182848887922****.vpc.cn-shanghai.pai-eas.aliyuncs.com");
client.setModelName("mnist_saved_model_example");
client.setToken("YTg2ZjE0ZjM4ZmE3OTc0NzYxZDMyNmYzMTJjZTQ1YmU0N2FjMTAy****");
int count = 1000;
long startTime = System.currentTimeMillis();
for (int i = 0; i < count; i++) {
try {
TFResponse response = client.predict(buildPredictRequest());
List<Float> result = response.getFloatVals("scores");
System.out.print("Predict result: [");
for (int j = 0; j < result.size(); j++) {
System.out.print(result.get(j).floatValue());
if (j != result.size() - 1) {
System.out.print(", ");
}
}
System.out.print("]\n");
} catch (Exception e) {
System.err.printf("Prediction failed. Error code: %d, message: %s%n",
client.getErrorCode(), client.getErrorMessage());
e.printStackTrace();
}
}
long endTime = System.currentTimeMillis();
System.out.println("Total time: " + (endTime - startTime) + " ms");
client.shutdown();
}
}Use the queue service
The queue service manages asynchronous inference through an input queue and an output queue. Your application writes requests to the input queue; the inference service reads them, processes them, and writes results to the output queue.
import com.alibaba.fastjson.JSONObject;
import com.aliyun.openservices.eas.predict.http.HttpConfig;
import com.aliyun.openservices.eas.predict.http.QueueClient;
import com.aliyun.openservices.eas.predict.queue_client.QueueUser;
import com.aliyun.openservices.eas.predict.queue_client.WebSocketWatcher;
public class DemoWatch {
public static void main(String[] args) throws Exception {
String queueEndpoint = "18*******.cn-hangzhou.pai-eas.aliyuncs.com";
String inputQueueName = "test_queue_service";
String sinkQueueName = "test_queue_service/sink";
String queueToken = "test-token";
// Create clients for the input and output queues.
// The inference service reads from inputQueue and writes results to sinkQueue.
QueueClient inputQueue =
new QueueClient(queueEndpoint, inputQueueName, queueToken, new HttpConfig(), new QueueUser());
QueueClient sinkQueue =
new QueueClient(queueEndpoint, sinkQueueName, queueToken, new HttpConfig(), new QueueUser());
// Clear any existing data. Use with caution.
inputQueue.clear();
sinkQueue.clear();
// Write 10 records to the input queue.
// To set a priority, use: inputQueue.put(data.getBytes(), 0L, null)
// Priority 0 = low (default), 1 = high.
int count = 10;
for (int i = 0; i < count; ++i) {
String data = Integer.toString(i);
inputQueue.put(data.getBytes(), null);
}
// Subscribe to the output queue with a sending window of 5.
// The queue service pauses sending when uncommitted records exceed the window size.
// Note: when autoCommit is true, the window parameter is invalid.
// To configure retries: new WatchConfig(retryCount, retryIntervalSeconds)
// To retry indefinitely: new WatchConfig(true, retryIntervalSeconds)
WebSocketWatcher watcher = sinkQueue.watch(0L, 5L, false, true, null);
// Read results. getDataFrame() blocks until a record is available.
for (int i = 0; i < count; ++i) {
try {
byte[] data = watcher.getDataFrame().getData();
System.out.println("[watch] data = " + new String(data));
} catch (RuntimeException ex) {
System.out.println("[watch] error = " + ex.getMessage());
break;
}
}
// Close the watcher before creating another one.
// Each client supports only one active watcher at a time.
watcher.close();
Thread.sleep(2000);
JSONObject attrs = sinkQueue.attributes();
System.out.println(attrs.toString());
inputQueue.shutdown();
sinkQueue.shutdown();
}
}This example sends data and reads results in the same thread for simplicity. In production, run them in separate threads.
Compress request data
For large payloads, compress requests before sending them to reduce network overhead. The SDK supports Zlib and Gzip compression.
First, enable decompression on the server side by setting rpc.decompressor in the service deployment configuration:
{
"metadata": {
"rpc": {
"decompressor": "zlib"
}
}
}Then set the matching compressor on the client:
package com.aliyun.openservices.eas.predict;
import com.aliyun.openservices.eas.predict.http.Compressor;
import com.aliyun.openservices.eas.predict.http.PredictClient;
import com.aliyun.openservices.eas.predict.http.HttpConfig;
public class TestString {
public static void main(String[] args) throws Exception {
PredictClient client = new PredictClient(new HttpConfig());
client.setEndpoint("18*******.cn-hangzhou.pai-eas.aliyuncs.com");
client.setModelName("echo_compress");
client.setToken("YzZjZjQwN2E4NGRkMDMxNDk5NzhhZDcwZDBjOTZjOGYwZDYxZGM2****");
// Set Compressor.Gzip to use Gzip instead.
client.setCompressor(Compressor.Zlib);
String request = "[{\"money_credit\": 3000000}, {\"money_credit\": 10000}]";
System.out.println(request);
String response = client.predict(request);
System.out.println(response);
client.shutdown();
}
}What's next
Status codes — interpret HTTP status codes returned by EAS.
Service invocation methods — explore other ways to call a model service.Service access methods