All Products
Search
Document Center

Platform For AI:SDK for Python

Last Updated:Nov 10, 2023

Official Elastic Algorithm Service (EAS) SDKs are provided to call services deployed based on their models. EAS SDKs reduce the amount of time required for defining the call logic and improve call stability. This topic describes EAS SDK for Python. Demos are provided to show how to use EAS SDK for Python to call services. In these demos, inputs and outputs are of commonly used types.

Install the SDK

pip install -U eas-prediction --user

Methods

Class

Method

Description

PredictClient

PredictClient(endpoint, service_name)

  • Description: Creates a client object of the PredictClient class.

  • Parameters:

    • endpoint: the endpoint of the server.

      To call a service in regular mode, set this parameter to the endpoint of the default gateway. Example: 182848887922***.cn-shanghai.pai-eas.aliyuncs.com.

      If you want to use a Virtual Private Cloud (VPC) direct connection, set this parameter to the common endpoint of the current region. For example, if the current region is China (Shanghai), set this parameter to pai-eas-vpc.cn-shanghai.aliyuncs.com.

    • service_name: the name of the service.

set_endpoint(endpoint)

  • Description: Specifies the endpoint of the server.

  • Parameter: endpoint: the endpoint of the server.

    To call a service in regular mode, set this parameter to the endpoint of the default gateway. Example: 182848887922***.cn-shanghai.pai-eas.aliyuncs.com.

    If you want to use a Virtual Private Cloud (VPC) direct connection, set this parameter to the common endpoint of the current region. For example, if the current region is China (Shanghai), set this parameter to pai-eas-vpc.cn-shanghai.aliyuncs.com.

set_service_name(service_name)

  • Description: Specifies the name of the service.

  • Parameter: service_name: the name of the service.

set_endpoint_type(endpoint_type)

  • Description: Specifies the gateway type of the server.

  • Parameter: endpoint_type: the gateway type to be used. The following gateway types are supported:

    • ENDPOINT_TYPE_GATEWAY: the default gateway.

    • ENDPOINT_TYPE_DIRECT: VPC direct connection channels. If you do not set this parameter, the default gateway is used to access the service.

set_token(token)

  • Description: Specifies the service access token.

  • Parameter: token: the token for service access.

set_retry_count(max_retry_count)

  • Description: Sets the maximum number of retries allowed after a request failure.

  • Parameter: max_retry_count: the maximum number of retries allowed after a request failure. Default value: 5.

    Important

    The client must resend requests if process errors occur on the server, server errors occur, or persistent connections to gateways are closed. Therefore, we recommend that you do not set this parameter to 0.

set_max_connection_count(max_connection_count)

  • Description: Sets the maximum number of persistent connections allowed in the connection pool of the client. To achieve better performance, the client establishes persistent connections to the server and stores the persistent connections in the connection pool. Each time you initiate a request, the client uses an idle connection in the connection pool to access the required service.

  • Parameter: max_connection_count: the maximum number of persistent connections allowed in the connection pool. Default value: 100.

set_timeout(timeout)

  • Description: Sets the timeout period of a request.

  • Parameter: timeout: the timeout period of a request. Unit: milliseconds. Default value: 5000.

init()

Description: Initializes a client object. After all the preceding methods that are used to set parameters are called, the parameters take effect only after you call the Init() method.

predict(request)

  • Description: Sends a prediction request to the online prediction service.

  • Parameter: request: an abstract class, which can be a request of various types, such as a request by using a string or a TensorFlow request.

  • Return value: the response to the prediction request.

StringRequest

StringRequest(request_data)

  • Description: Creates an object of the StringRequest class.

  • Parameter: request_data: the request string to be sent.

StringResponse

to_string()

  • Description: converts the response of the StringResponse class to a string.

  • Return value: the response body of the request.

TFRequest

TFRequest(signature_name)

  • Description: Creates an object of the TFRequest class.

  • Parameter: signature_name: the signature name of the model of the service to be called.

add_feed(self, input_name, shape, data_type, content)

  • Description: Specifies the input tensor of the TensorFlow model of the online prediction service to be called.

  • Parameters:

    • input_name: the alias of the input tensor.

    • shape: the shape of the input tensor.

    • data_type: the data type of the input tensor. The following data types are supported:

      • TFRequest.DT_FLOAT

      • TFRequest.DT_DOUBLE

      • TFRequest.DT_INT8

      • TFRequest.DT_INT16

      • TFRequest.DT_INT32

      • TFRequest.DT_INT64

      • TFRequest.DT_STRING

      • TFRequest.TF_BOOL

    • content: the data of the input tensor. Specify the value in the form of a one-dimensional array.

add_fetch(self, output_name)

  • Description: Specifies the alias of the output tensor to be exported of the TensorFlow model.

  • Parameter: output_name: the alias of the output tensor to be exported.

    If the TensorFlow model is in the SavedModel format, this parameter is optional. If this parameter is not specified, all output tensors are exported.

    If the TensorFlow model is a frozen model, this parameter is required.

to_string()

  • Description: Serializes the protocol buffer (PB) object into a string. The PB object is created by using the TFRequest class and is used to transmit requests.

  • Return value: the string that is obtained after the TFRequest-based serialization is complete.

TFResponse

get_tensor_shape(output_name)

  • Description: Queries the shape of the output tensor identified by the specified alias.

  • Parameter: output_name: the alias of the output tensor whose shape you want to query.

  • Return value: the shape of the output tensor.

get_values(output_name)

  • Description: Queries the data of the specified output tensor.

  • Parameter: output_name: the alias of the output tensor whose data you want to query.

  • Return value: a one-dimensional array. You can call this method together with the get_tensor_shape() method to query the shape of the output tensor. The return value is a multi-dimensional array. The data type of the output tensor determines the data type of the one-dimensional array that is returned.

TorchRequest

TorchRequest()

Description: Creates an object of the TorchRequest class.

add_feed(self, index, shape, data_type, content)

  • Description: Specifies the input tensor of the PyTorch model of the online prediction service to be called.

  • Parameters:

    • index: the index of the input tensor.

    • shape: the shape of the input tensor.

    • data_type: the data type of the input tensor. The following data types are supported:

      • TFRequest.DT_FLOAT

      • TFRequest.DT_DOUBLE

      • TFRequest.DT_INT8

      • TFRequest.DT_INT16

      • TFRequest.DT_INT32

      • TFRequest.DT_INT64

      • TFRequest.DT_STRING

      • TFRequest.TF_BOOL

    • content: the data of the input tensor. Specify the value in the form of a one-dimensional array.

add_fetch(self, output_index)

  • Description: Specifies the index of the output tensor to be exported of the PyTorch model. This method is optional. If you do not call this method to set the index of the output tensor, all output tensors are exported.

  • Parameter: output_index: the index of the output tensor to be exported.

to_string()

  • Description: Serializes the PB object into a string. The PB object is created by using the TorchRequest class and is used to transmit requests.

  • Return value: the string that is obtained after the TorchRequest-based serialization is complete.

TorchResponse

get_tensor_shape(output_index)

  • Description: Queries the shape of the output tensor identified by the specified index.

  • Parameter: output_index: the index of the output tensor whose shape you want to query.

  • Return value: the shape of the output tensor identified by the specified index.

get_values(output_index)

  • Description: Queries the data of the specified output tensor. The return value is a one-dimensional array. You can call this method together with the get_tensor_shape() method to query the shape of the output tensor. The return value is a multi-dimensional array. The data type of the output tensor determines the data type of the one-dimensional array that is returned.

  • Parameter: output_index: the index of the output tensor whose data you want to query.

  • Return value: a one-dimensional array.

QueueClient

QueueClient(endpoint, queue_name)

  • Description: Creates a client object of the QueueClient class.

  • Parameters:

    • endpoint: the endpoint of the server.

    • queueName: the name of the queue to be created.

  • Return value: the created client object.

set_token(token)

  • Description: Creates a token for a client object of the QueueClient class for authentication.

  • Parameter: token: the token of the queue to be created.

init(uid=None,gid='eas')

  • Description: Initializes a client object of the QueueClient class.

  • Parameters:

    • uid: the user ID of the client. A client is used to register on the server. Each client instance must have a unique user ID, and each user ID can be registered only once. Data pushed by the server is evenly distributed among user IDs.

    • gid: the group ID of the client. A client is used to register on the server. By default, clients with the same group ID belong to the same group. If different groups exist, one data record is pushed to all groups.

set_logger(logger=None)

  • Description: Sets a logger for a queue. By default, the logger displays the warning information as regular outputs. To disable logging, set longer to None.

  • Parameter: logger: the logger that you want to set.

truncate(index)

  • Description: Truncates the data before a specific index value and retains only data after the index value.

  • Parameter: index: the index value that is used to truncate data.

put(data,tags:dict={})

  • Description: Writes a data record to a queue.

  • Parameters:

    • data: the data record that you want to write to the queue.

    • tags: optional. The tags of the data record that you want to write to the queue.

  • Return values:

    • index: the index value of the written data record. The value can be used to query data in the queue.

    • requestId: the request ID automatically generated for the written data record in the queue. requestId can be used as a special tag to query data in the queue.

get(request_id=None, index=0, length=1, timeout='5s', auto_delete=True, tags={})

  • Description: Queries data in a queue based on specified conditions.

  • Parameters:

    • request_id: the request ID of the data record that you want to query. If this parameter is specified, the system queries the maximum number of data records starting from index. If the data records contain the record that matches the request ID, the record is returned. Otherwise, null is returned.

    • index: the start index to query. Default value: 0, which indicates that the query starts from the first data record.

    • length: the number of data records to query. If this parameter is specified, the maximum number of data records starting from index is returned. The data record that matches the index value is also returned.

    • timeout: the timeout period of the query. During the timeout period, if the queue contains data, the number of data records that meet the specified conditions is returned. Otherwise, the query stops after the timeout period is over.

    • auto_delete: specifies whether to automatically delete the obtained data records from the queue. If you set auto_delete to False, data records can be repeatedly queried. In this case, you can use the Del() method to manually delete data.

    • tags: the tags used to query data records. The data type must be DICT. If this parameter is specified, the data records starting from index that are added with specified tags are returned.

  • Return value: the obtained data records in the DataFrame format.

attributes()

  • Description: Queries the attributes of a queue. The attributes include the total number of data records in the queue and the number of data records in the current queue.

  • Return value: attrs: the attributes of the queue. The data type must be DICT.

delete(indexes)

  • Description: Deletes data records that match specified index values from a queue.

  • Parameter: indexes: the specified index values used to delete data records. You can specify a single index value as a string or specify multiple index values as a list.

watch(index, window, index_only=False, auto_commit=False)

  • Description: Subscribes to data records in a queue. Then, the queuing service pushes data to clients based on specified conditions.

  • Parameters:

    • index: the start index of the data records that are subscribed.

    • window: the maximum number of data records allowed to be pushed to a single client by the queuing service.

      Note

      If the data is not committed, the server does not push other data records to the client. Then, if N data records are committed, N data records are pushed to the server. This ensures that the number of data records handled by the client does not exceed the value specified for window. This way, the computing concurrency on the client side is controlled.

    • index_only: specifies whether to push only the index value.

    • auto_commit: specifies whether to automatically commit a data record after the record is pushed. We recommend that you set auto_commit to False. In this case, you must manually commit a data record after the record is received and computed. If an exception occurs on the instance before the computation is complete, uncommitted data records are pushed to other instances by the queuing service.

  • Return value: a watcher that is used to read pushed data.

commit(index)

  • Description: Commits specified data records.

    Note

    If a data record is processed and does not need to be pushed to other instances, it is committed. Then, the data record can be deleted from the queue.

  • Parameter: index: the specified index values that match committed data records. You can specify a single index value as a string or specify multiple index values as a list.

Watcher

run()

  • Description: Runs a watcher to establish a connection to the server by using WebSockets, receives data pushed from the server, and returns the data to the caller in real time.

  • Return value: real-time data pushed to the caller in the DataFrame format.

close()

Description: Stops a watcher to close backend connections.

Note

Only one watcher can be started for a single client. You must close the watcher before you can start another watcher.

Demos

  • Input and output as strings

    If you use custom processors to deploy models as services, strings are often used to call the services, such as a service deployed based on a Predictive Model Markup Language (PMML) model. For more information, see the following demo:

    #!/usr/bin/env python
    
    from eas_prediction import PredictClient
    from eas_prediction import StringRequest
    
    if __name__ == '__main__':
        client = PredictClient('http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com', 'scorecard_pmml_example')
        client.set_token('YWFlMDYyZDNmNTc3M2I3MzMwYmY0MmYwM2Y2MTYxMTY4NzBkNzdj****')
        client.init()
    
        request = StringRequest('[{"fea1": 1, "fea2": 2}]')
        for x in range(0, 1000000):
            resp = client.predict(request)
            print(resp)
  • Input and output as tensors

    If you use TensorFlow to deploy models as services, you must use the TFRequest and TFResponse classes to call the services. For more information, see the following demo:

    #!/usr/bin/env python
    
    from eas_prediction import PredictClient
    from eas_prediction import StringRequest
    from eas_prediction import TFRequest
    
    if __name__ == '__main__':
        client = PredictClient('http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com', 'mnist_saved_model_example')
        client.set_token('YTg2ZjE0ZjM4ZmE3OTc0NzYxZDMyNmYzMTJjZTQ1YmU0N2FjMTAy****')
        client.init()
    
        #request = StringRequest('[{}]')
        req = TFRequest('predict_images')
        req.add_feed('images', [1, 784], TFRequest.DT_FLOAT, [1] * 784)
        for x in range(0, 1000000):
            resp = client.predict(req)
            print(resp)
  • Use a VPC direct connection channel to call a service

    You can use a VPC direct connection channel to access only the services that are deployed in the dedicated resource group for EAS. In addition, to use the channel, the dedicated resource group for EAS and the specified vSwitch must be connected to the VPC. For more information, see Work with dedicated resource groups and Configure network connectivity. Compared with the regular mode, this mode contains an additional line of code: client.set_endpoint_type(ENDPOINT_TYPE_DIRECT). You can use this mode in high-concurrency and heavy-traffic scenarios. For more information, see the following demo:

    #!/usr/bin/env python
    
    from eas_prediction import PredictClient
    from eas_prediction import StringRequest
    from eas_prediction import TFRequest
    from eas_prediction import ENDPOINT_TYPE_DIRECT
    
    if __name__ == '__main__':
        client = PredictClient('http://pai-eas-vpc.cn-hangzhou.aliyuncs.com', 'mnist_saved_model_example')
        client.set_token('M2FhNjJlZDBmMzBmMzE4NjFiNzZhMmUxY2IxZjkyMDczNzAzYjFi****')
        client.set_endpoint_type(ENDPOINT_TYPE_DIRECT)
        client.init()
    
        request = TFRequest('predict_images')
        request.add_feed('images', [1, 784], TFRequest.DT_FLOAT, [1] * 784)
        for x in range(0, 1000000):
            resp = client.predict(request)
            print(resp)
  • Call a PyTorch model

    If you use PyTorch to deploy models as services, you must use the TorchRequest and TorchResponse classes to call the services. For more information, see the following demo:

    #!/usr/bin/env python
    
    from eas_prediction import PredictClient
    from eas_prediction import TorchRequest
    
    if __name__ == '__main__':
        client = PredictClient('http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com', 'pytorch_gpu_wl')
        client.init()
    
        req = TorchRequest()
        req.add_feed(0, [1, 3, 224, 224], TorchRequest.DT_FLOAT, [1] * 150528)
        # req.add_fetch(0)
        import time
        st = time.time()
        timer = 0
        for x in range(0, 10):
            resp = client.predict(req)
            timer += (time.time() - st)
            st = time.time()
            print(resp.get_tensor_shape(0))
            # print(resp)
        print("average response time: %s s" % (timer / 10) )
  • Call a Blade processor-based model

    If you use Blade processors to deploy models as services, you must use the BladeRequest and BladeResponse classes to call the services. For more information, see the following demo:

    #!/usr/bin/env python
    
    from eas_prediction import PredictClient
    from eas_prediction import BladeRequest 
    
    if __name__ == '__main__':
        client = PredictClient('http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com', 'nlp_model_example')
        client.init()
    
        req = BladeRequest()
    
        req.add_feed('input_data', 1, [1, 360, 128], BladeRequest.DT_FLOAT, [0.8] * 85680)
        req.add_feed('input_length', 1, [1], BladeRequest.DT_INT32, [187])
        req.add_feed('start_token', 1, [1], BladeRequest.DT_INT32, [104])
        req.add_fetch('output', BladeRequest.DT_FLOAT)
        import time
        st = time.time()
        timer = 0
        for x in range(0, 10):
            resp = client.predict(req)
            timer += (time.time() - st)
            st = time.time()
            # print(resp)
            # print(resp.get_values('output'))
            print(resp.get_tensor_shape('output'))
        print("average response time: %s s" % (timer / 10) )
  • Call a Blade processor-based model that is compatible with default TensorFlow methods

    You can use the TFRequest and TFResponse classes to call a Blade processor-based model that is compatible with default TensorFlow methods supported by EAS. For more information, see the following demo:

    #!/usr/bin/env python
    
    from eas_prediction import PredictClient
    from eas_prediction.blade_tf_request import TFRequest # Need Importing blade TFRequest 
    
    if __name__ == '__main__':
        client = PredictClient('http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com', 'nlp_model_example')
        client.init()
    
        req = TFRequest(signature_name='predict_words')
    
        req.add_feed('input_data', [1, 360, 128], TFRequest.DT_FLOAT, [0.8] * 85680)
        req.add_feed('input_length', [1], TFRequest.DT_INT32, [187])
        req.add_feed('start_token', [1], TFRequest.DT_INT32, [104])
        req.add_fetch('output')
        import time
        st = time.time()
        timer = 0
        for x in range(0, 10):
            resp = client.predict(req)
            timer += (time.time() - st)
            st = time.time()
            # print(resp)
            # print(resp.get_values('output'))
            print(resp.get_tensor_shape('output'))
        print("average response time: %s s" % (timer / 10) )
  • Use the queuing service to send and subscribe to data

    You can send and query data in a queue, query the state of a queue, and subscribe to data pushed by a queue. In the following demo, a thread pushes data to a queue, and another thread uses a watcher to subscribe to the pushed data. For more information, see the following demo:

    #!/usr/bin/env python
    
    from eas_prediction import QueueClient
    import threading
    
    if __name__ == '__main__':
        endpoint = '182848887922****.cn-shanghai.pai-eas.aliyuncs.com'
        queue_name = 'test_group.qservice/sink'
        token = 'YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MTUx****'
    
        queue = QueueClient(endpoint, queue_name)
        queue.set_token(token)
        queue.init()
        queue.set_timeout(30000)
    
        # truncate all messages in the queue
        attributes = queue.attributes()
        if 'stream.lastEntry' in attributes:
            queue.truncate(int(attributes['stream.lastEntry']) + 1)
    
        count = 100
        # create a thread to send messages to the queue
        def send_thread():
            for i in range(count):
                index, request_id = queue.put('[{}]')
                print('send: ', i, index, request_id)
    
        # create a thread to watch messages from the queue
        def watch_thread():
            watcher = queue.watch(0, 5, auto_commit=True)
            i = 0
            for x in watcher.run():
                print('recv: ', i, x.index, x.tags['requestId'])
                i += 1
                if i == count:
                    break
            watcher.close()
    
        thread1 = threading.Thread(target=watch_thread)
        thread2 = threading.Thread(target=send_thread)
    
        thread1.start()
        thread2.start()
    
        thread1.join()
        thread2.join()