All Products
Search
Document Center

Platform For AI:SDK for Python

Last Updated:Apr 16, 2025

We recommend that you use official Elastic Algorithm Service (EAS) SDKs to reduce the amount of time required for defining the call logic and improve call stability. This topic describes EAS SDK for Python and provides commonly used inputs and outputs and demos to show how to use EAS SDK for Python to call services.

Install the SDK

pip install -U eas-prediction --user

Methods

Common parameter description

endpoint: the endpoint of the server.

  • If you want to call a service in regular mode, set this parameter to the endpoint of the default gateway. Example: 182848887922***.cn-shanghai.pai-eas.aliyuncs.com.

  • If you want to call a service over a Virtual Private Cloud (VPC) direct connection, set this parameter to the common endpoint of the region. For example, in the China (Shanghai) region, set this parameter to pai-eas-vpc.cn-shanghai.aliyuncs.com.

PredictClient

Method

Description

PredictClient(endpoint, service_name, custom_url)

  • Description: creates a client object of the PredictClient class.

  • Parameters:

    • endpoint: the server endpoint. For more information, see Common parameter description.

    • service_name: the service name.

    • custom_url: the URL of the service. This parameter is optional and is required only for services whose endpoints are not in the <uid>.<region>.pai-eas.aliyuncs.com format, such as web UI services. You can configure this parameter to create a client. Example: client = PredictClient(custom_url='<url>').

set_endpoint(endpoint)

  • Description: specifies the server endpoint. For more information, see Common parameter description.

  • Parameter: endpoint: the server endpoint.

set_service_name(service_name)

  • Description: specifies the service name.

  • Parameter: service_name: the service name.

set_endpoint_type(endpoint_type)

  • Description: specifies the gateway type of the server.

  • Parameter: endpoint_type: the gateway type to be used. The following gateway types are supported:

    • ENDPOINT_TYPE_GATEWAY: the default gateway.

    • ENDPOINT_TYPE_DIRECT: VPC direct connection channels. If you do not set this parameter, the default gateway is used to access the service.

set_token(token)

  • Description: specifies the token that is used to access the service.

  • Parameter: token: the token for service access.

set_retry_count(max_retry_count)

  • Description: sets the maximum number of retries allowed after a request failed.

  • Parameter: max_retry_count: the maximum number of retries allowed after a request failed. Default value: 5.

    Important

    The client must resend requests if process errors occur on the server, server errors occur, or persistent connections to gateways are closed. Therefore, we recommend that you do not set this parameter to 0.

set_max_connection_count(max_connection_count)

  • Description: specifies the maximum number of persistent connections allowed in the connection pool of the client. To achieve better performance, the client establishes persistent connections to the server and stores the persistent connections in the connection pool. Each time you initiate a request, the client uses an idle connection in the connection pool to access the required service.

  • Parameter: max_connection_count: the maximum number of persistent connections allowed in the connection pool. Default value: 100.

set_timeout(timeout)

  • Description: specifies the timeout period of a request.

  • Parameter: timeout: the timeout period of a request. Default value: 5000. Unit: milliseconds.

init()

Description: initializes a client object. After all preceding methods are called, you must call the Init() method to make the parameters take effect.

predict(request)

  • Description: sends a prediction request to the online prediction service.

  • Parameter: request: an abstract class, which can be a request of various classes, such as the StringRequest or TFRequest class.

  • Return value: the response to the prediction request.

StringRequest

Method

Description

StringRequest(request_data)

  • Description: creates an object of the StringRequest class.

  • Parameter: request_data: the request string to be sent.

StringResponse

Method

Description

to_string()

  • Description: converts the response of the StringResponse class to a string.

  • Return value: the response body of the request.

TFRequest

Method

Description

TFRequest(signature_name)

  • Description: creates an object of the TFRequest class.

  • Parameter: signature_name: the signature name of the model of the service to be called.

add_feed(self, input_name, shape, data_type, content)

  • Description: specifies the input tensor of the TensorFlow online prediction service to be called.

  • Parameters:

    • input_name: the alias of the input tensor.

    • shape: the shape of the input tensor.

    • data_type: the data type of the input tensor. The following data types are supported:

      • TFRequest.DT_FLOAT

      • TFRequest.DT_DOUBLE

      • TFRequest.DT_INT8

      • TFRequest.DT_INT16

      • TFRequest.DT_INT32

      • TFRequest.DT_INT64

      • TFRequest.DT_STRING

      • TFRequest.TF_BOOL

    • content: the content of the input tensor. The content is in the form of a one-dimensional array.

add_fetch(self, output_name)

  • Description: specifies the alias of the output tensor to return when the TensorFlow online prediction service is called.

  • Parameter: output_name: the alias of the output tensor to return.

    If the TensorFlow model is in the SavedModel format, this parameter is optional. If this parameter is not specified, all output tensors are returned.

    If the TensorFlow model is a frozen model, this parameter is required.

to_string()

  • Description: serializes the protocol buffer (PB) object into a string. The PB object is created by using the TFRequest class and is used to transmit requests.

  • Return value: the string obtained from the TFRequest-based serialization.

TFResponse

Method

Description

get_tensor_shape(output_name)

  • Description: queries the shape of the output tensor identified by the specified alias.

  • Parameter: output_name: the alias of the output tensor whose shape you want to query.

  • Return value: the shape of the output tensor.

get_values(output_name)

  • Description: queries the data vectors of the specified output tensor.

  • Parameter: output_name: the alias of the output tensor whose data you want to query.

  • Return value: a one-dimensional array. You can call this method together with the get_tensor_shape() method to query the shape of the output tensor and restore the one-dimensional array to the required multi-dimensional array. The data type of the output tensor determines the data type of the one-dimensional array that is returned.

TorchRequest

Method

Description

TorchRequest()

Description: creates an object of the TorchRequest class.

add_feed(self, index, shape, data_type, content)

  • Description: specifies the input tensor of the PyTorch online prediction service to be called.

  • Parameters:

    • index: the index of the input tensor.

    • shape: the shape of the input tensor.

    • data_type: the data type of the input tensor. The following data types are supported:

      • TFRequest.DT_FLOAT

      • TFRequest.DT_DOUBLE

      • TFRequest.DT_INT8

      • TFRequest.DT_INT16

      • TFRequest.DT_INT32

      • TFRequest.DT_INT64

      • TFRequest.DT_STRING

      • TFRequest.TF_BOOL

    • content: the content of the input tensor. The content is in the form of a one-dimensional array.

add_fetch(self, output_index)

  • Description: specifies the index of the output tensor to return when the PyTorch online prediction service is called. This method is optional. If you do not call this method to specify the index of the output tensor, all output tensors are returned.

  • Parameter: output_index: the index of the output tensor to return.

to_string()

  • Description: serializes the PB object into a string. The PB object is created by using the TorchRequest class and is used to transmit requests.

  • Return value: the string obtained from the TorchRequest-based serialization.

TorchResponse

Method

Description

get_tensor_shape(output_index)

  • Description: queries the shape of the output tensor identified by the specified index.

  • Parameter: output_index: the index of the output tensor whose shape you want to query.

  • Return value: the shape of the output tensor identified by the specified index.

get_values(output_index)

  • Description: queries the data vector of the specified output tensor. The return value is a one-dimensional array. You can call this method together with get_tensor_shape() method to query the shape of the output tensor and restore the one-dimensional array to the required multi-dimensional array. The data type of the output tensor determines the data type of the one-dimensional array that is returned.

  • Parameter: output_index: the index of the output tensor that you want to query.

  • Return value: a one-dimensional array.

QueueClient

Method

Description

QueueClient(endpoint, queue_name)

  • Description: creates a client object of the QueueClient class.

  • Parameters:

    • endpoint: the endpoint of the server.

    • queue_name: the name of the queue to be created.

  • Return value: the created client object.

set_token(token)

  • Description: creates a token for a client object of the QueueClient class for authentication.

  • Parameter: token: the token of the queue to be created.

init(uid=None,gid='eas')

  • Description: initializes an object of the QueueClient class.

  • Parameters:

    • uid: the user ID of the client. A client is used to register on the server. The user ID of each client instance must be unique, and each user ID can be registered only once. Data pushed by the server is evenly distributed among user IDs.

    • gid: the group ID of the client. A client is used to register on the server. By default, clients with the same group ID belong to the same group. If different groups exist, a copy of the same data record is pushed to all groups.

set_logger(logger=None)

  • Description: configures a logger for a queue. By default, the logger displays the warning information as regular outputs. To disable logging, set logger to None.

  • Parameter: logger: the logger that you want to configure.

truncate(index)

  • Description: truncates the data in a queue before a specific index and retains only data after the index.

  • Parameter: index: the index that is used to truncate data in the queue.

put(data,tags:dict={})

  • Description: writes a data record to a queue.

  • Parameters:

    • data: the data record that you want to write to the queue.

    • tags: optional. The tags of the data record that you want to write to the queue.

  • Return values:

    • index: the index of the data record written to the queue. The value can be used to query data in the queue.

    • requestId: the request ID automatically generated for the data record written to the queue. You can use reuqestId as a special tag to query data in the queue.

get(request_id=None, index=0, length=1, timeout='5s', auto_delete=True, tags={})

  • Description: queries data in a queue based on specified conditions.

  • Parameters:

    • request_id: the request ID of the data record that you want to query. If this parameter is specified, the system queries a number of data records specified by length starting from the index. If the query results contain the record that matches the request ID, the record is returned. Otherwise, null is returned.

    • index: the index from which the query starts. Default value: 0, which indicates that the query starts from the first data record.

    • length: the number of data records to query. If this parameter is specified, the system returns a maximum of length data records starting from the index. The data record specified by the index is also returned.

    • timeout: the timeout period of the query. During the timeout period, if the queue contains length data records, the data records are immediately returned. Otherwise, the query stops after the specified timeout period is over.

    • auto_delete: specifies whether to automatically delete the obtained data records from the queue. If you set auto_delete to False, data records are repeatedly queried. In this case, you can use the Del() method to manually delete data.

    • tags: the tags used to query data records. The data type of the tags is dict. If this parameter is specified, the system queries length data records starting from the specified index and returns data records with specified tags.

  • Return value: the obtained data records in the DataFrame format.

attributes()

  • Description: queries the attributes of a queue. The attributes include the total length of the queue and the current data length in the queue.

  • Return value: attrs: the attributes of the queue. The data type of the attributes is dict.

delete(indexes)

  • Description: deletes data records specified by the index values from a queue.

  • Parameter: indexes: the specified index values used to delete data records. You can specify a single index value as a string or specify multiple index values as a list.

search(index)

  • Description: queries the queue information of a data record.

  • Parameter: index: the index of the data record.

  • Return value: the queue information of the data record. The queue information is of the JSONObject type and contains the following fields:

    • ConsumerId: the ID of the instance that processes the data record.

    • IsPending: indicates whether the data record is being processed. Valid values:

      • True: The data record is being processed.

      • False: The data record is waiting in a queue to be processed.

    • WaitCount: indicates the number of the data records that are waiting to be processed before the current data record in the queue. This parameter is valid only when IsPending is set to False. If IsPending is set to True, the value of this parameter is 0.

    Sample responses:

    • If {'ConsumerId': 'eas.****', 'IsPending': False, 'WaitCount':2} is returned, the data record is waiting to be processed in a queue.

    • If the returned log displays search error:Code 404, Message: b'no data in stream' and {} is returned, the data record is not found in the queue. This may be because the data record has been processed by the server and a result has been returned to the client, or the index parameter is incorrectly configured.

watch(index, window, index_only=False, auto_commit=False)

  • Description: subscribes to data records in a queue. Then, the queuing service pushes data to clients based on specified conditions.

  • Parameters:

    • index: the index where data record subscription starts.

    • window: the maximum number of data records allowed to be pushed to a single client by the queuing service.

      Note

      If the pushed data records are not committed, the server does not push other data records to the client. After N data records are committed, the server pushes N data records to the client to ensure that the number of data records handled by the client does not exceed the value specified by window. This way, client-side concurrency in data processing is controlled.

    • index_only: specifies whether to push only the index value.

    • auto_commit: specifies whether to automatically commit a data record after the data record is pushed. We recommend that you set auto_commit to False. In this case, you must manually commit a data record after the pushed data record is received and computed. If an exception occurs on an instance before the computation is complete, uncommitted data records on the instance are pushed to other instances by the queuing service.

  • Return value: a watcher that is used to read the pushed data.

commit(index)

  • Description: commits the data record specified by the index.

    Note

    If a data record is processed and no longer needs to be pushed to other instances, it is committed. Then, the data record can be deleted from the queue.

  • Parameter: index: the index values that are used to specify the data records to be committed. You can specify a single index value as a string or specify multiple index values as a list.

Watcher

Method

Description

run()

  • Description: runs a watcher to establish a WebSocket connection to the server, receives data pushed from the server, and returns the data to the caller in real time.

  • Return value: real-time data pushed to the caller in the DataFrame format.

close()

Description: closes a watcher to terminate backend connections.

Note

Only one watcher can be started for a single client. You must close the watcher before you can start another watcher.

Demos

  • Input and output as strings

    If you use custom processors to deploy services, strings are often used to call the services, such as the service deployed based on a Predictive Model Markup Language (PMML) model. Demo code:

    #!/usr/bin/env python
    
    from eas_prediction import PredictClient
    from eas_prediction import StringRequest
    
    if __name__ == '__main__':
        client = PredictClient('http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com', 'scorecard_pmml_example')
        client.set_token('YWFlMDYyZDNmNTc3M2I3MzMwYmY0MmYwM2Y2MTYxMTY4NzBkNzdj****')
        client.init()
    
        request = StringRequest('[{"fea1": 1, "fea2": 2}]')
        for x in range(0, 1000000):
            resp = client.predict(request)
            print(resp)
  • Input and output as tensors

    If you use TensorFlow to deploy services, you must use the TFRequest and TFResponse classes to call the services. Demo code:

    #!/usr/bin/env python
    
    from eas_prediction import PredictClient
    from eas_prediction import StringRequest
    from eas_prediction import TFRequest
    
    if __name__ == '__main__':
        client = PredictClient('http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com', 'mnist_saved_model_example')
        client.set_token('YTg2ZjE0ZjM4ZmE3OTc0NzYxZDMyNmYzMTJjZTQ1YmU0N2FjMTAy****')
        client.init()
    
        #request = StringRequest('[{}]')
        req = TFRequest('predict_images')
        req.add_feed('images', [1, 784], TFRequest.DT_FLOAT, [1] * 784)
        for x in range(0, 1000000):
            resp = client.predict(req)
            print(resp)
  • Use a VPC direct connection channel to call a service

    You can use a VPC direct connection channel to access only the services deployed in an EAS dedicated resource group. To use the VPC direct connection channel, the EAS dedicated resource group and the specified vSwitch must reside in the same VPC. For more information how to purchase EAS dedicated resource groups and how to configure network connectivity, see Work with dedicated resource groups and Configure network connectivity. Compared with the regular mode, this mode contains an additional line of code: client.set_endpoint_type(ENDPOINT_TYPE_DIRECT). You can use this mode in high-concurrency and heavy-traffic scenarios. Demo code:

    #!/usr/bin/env python
    
    from eas_prediction import PredictClient
    from eas_prediction import StringRequest
    from eas_prediction import TFRequest
    from eas_prediction import ENDPOINT_TYPE_DIRECT
    
    if __name__ == '__main__':
        client = PredictClient('http://pai-eas-vpc.cn-hangzhou.aliyuncs.com', 'mnist_saved_model_example')
        client.set_token('M2FhNjJlZDBmMzBmMzE4NjFiNzZhMmUxY2IxZjkyMDczNzAzYjFi****')
        client.set_endpoint_type(ENDPOINT_TYPE_DIRECT)
        client.init()
    
        request = TFRequest('predict_images')
        request.add_feed('images', [1, 784], TFRequest.DT_FLOAT, [1] * 784)
        for x in range(0, 1000000):
            resp = client.predict(request)
            print(resp)
  • Call a PyTorch model

    If you use PyTorch to deploy services, you must use the TorchRequest and TorchResponse classes to call the services. Demo code:

    #!/usr/bin/env python
    
    from eas_prediction import PredictClient
    from eas_prediction import TorchRequest
    
    if __name__ == '__main__':
        client = PredictClient('http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com', 'pytorch_gpu_wl')
        client.init()
    
        req = TorchRequest()
        req.add_feed(0, [1, 3, 224, 224], TorchRequest.DT_FLOAT, [1] * 150528)
        # req.add_fetch(0)
        import time
        st = time.time()
        timer = 0
        for x in range(0, 10):
            resp = client.predict(req)
            timer += (time.time() - st)
            st = time.time()
            print(resp.get_tensor_shape(0))
            # print(resp)
        print("average response time: %s s" % (timer / 10) )
  • Call a Blade processor-based model

    If you use Blade processors to deploy services, you must use the BladeRequest and BladeResponse classes to call the services. Demo code:

    #!/usr/bin/env python
    
    from eas_prediction import PredictClient
    from eas_prediction import BladeRequest 
    
    if __name__ == '__main__':
        client = PredictClient('http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com', 'nlp_model_example')
        client.init()
    
        req = BladeRequest()
    
        req.add_feed('input_data', 1, [1, 360, 128], BladeRequest.DT_FLOAT, [0.8] * 85680)
        req.add_feed('input_length', 1, [1], BladeRequest.DT_INT32, [187])
        req.add_feed('start_token', 1, [1], BladeRequest.DT_INT32, [104])
        req.add_fetch('output', BladeRequest.DT_FLOAT)
        import time
        st = time.time()
        timer = 0
        for x in range(0, 10):
            resp = client.predict(req)
            timer += (time.time() - st)
            st = time.time()
            # print(resp)
            # print(resp.get_values('output'))
            print(resp.get_tensor_shape('output'))
        print("average response time: %s s" % (timer / 10) )
  • Call an EAS Blade processor-based model that is compatible with default TensorFlow methods

    You can use the TFRequest and TFResponse classes to call a Blade processor-based model that is compatible with default TensorFlow methods supported by EAS. Demo code:

    #!/usr/bin/env python
    
    from eas_prediction import PredictClient
    from eas_prediction.blade_tf_request import TFRequest # Need Importing blade TFRequest 
    
    if __name__ == '__main__':
        client = PredictClient('http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com', 'nlp_model_example')
        client.init()
    
        req = TFRequest(signature_name='predict_words')
    
        req.add_feed('input_data', [1, 360, 128], TFRequest.DT_FLOAT, [0.8] * 85680)
        req.add_feed('input_length', [1], TFRequest.DT_INT32, [187])
        req.add_feed('start_token', [1], TFRequest.DT_INT32, [104])
        req.add_fetch('output')
        import time
        st = time.time()
        timer = 0
        for x in range(0, 10):
            resp = client.predict(req)
            timer += (time.time() - st)
            st = time.time()
            # print(resp)
            # print(resp.get_values('output'))
            print(resp.get_tensor_shape('output'))
        print("average response time: %s s" % (timer / 10) )
  • Use the queuing service to send and subscribe to data

    You can send and query data in a queue, query the state of a queue, and subscribe to data pushed by a queue. In the following demo, a thread pushes data to a queue, and another thread uses a watcher to subscribe to the pushed data. Demo code:

    #!/usr/bin/env python
    
    from eas_prediction import QueueClient
    import threading
    
    if __name__ == '__main__':
        endpoint = '182848887922****.cn-shanghai.pai-eas.aliyuncs.com'
        queue_name = 'test_group.qservice/sink'
        token = 'YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MTUx****'
    
        queue = QueueClient(endpoint, queue_name)
        queue.set_token(token)
        queue.init()
        queue.set_timeout(30000)
    
        # truncate all messages in the queue
        attributes = queue.attributes()
        if 'stream.lastEntry' in attributes:
            queue.truncate(int(attributes['stream.lastEntry']) + 1)
    
        count = 100
        # create a thread to send messages to the queue
        def send_thread():
            for i in range(count):
                index, request_id = queue.put('[{}]')
                print('send: ', i, index, request_id)
    
        # create a thread to watch messages from the queue
        def watch_thread():
            watcher = queue.watch(0, 5, auto_commit=True)
            i = 0
            for x in watcher.run():
                print('recv: ', i, x.index, x.tags['requestId'])
                i += 1
                if i == count:
                    break
            watcher.close()
    
        thread1 = threading.Thread(target=watch_thread)
        thread2 = threading.Thread(target=send_thread)
    
        thread1.start()
        thread2.start()
    
        thread1.join()
        thread2.join()