This topic describes how to develop custom processors by using Python.

The SDK for Python provided by Elastic Algorithm Service (EAS) supports Python-based open-source machine learning frameworks such as TensorFlow, PyTorch, and Scikit-Learn, and data analysis and processing frameworks such as Pandas. You can use the SDK for Python to convert local prediction logic into online services. The SDK for Python is preset with the high-performance remote procedure call (RPC) framework customized by EAS for AI inference scenarios and the internal operations required to interact with EAS clusters. You can call these operations to deploy models to EAS clusters and use the features provided by EAS, such as model monitoring, blue-green release, auto scaling, and VPC direct connection channel.

The following example shows how to develop a custom processor by using Python.

Step 1: Build a development environment

You can use a Python package management tool, such as Pyenv or Conda, to build a development environment. The EASCMD client provided by EAS encapsulates the building process so that you can build the SDK for Python development environment by running only one command. If customization is required, you can manually build a development environment.

You can build a development environment by using one of the following methods:
  • EASCMD (for Linux only)
    EASCMD is a client provided by EAS. This tool encapsulates the logic for initializing the SDK for Python. After you download EASCMD, you can initialize the SDK for Python environment by running only one command. The corresponding file template is generated.
    # Install and initialize EASCMD. In this example, the EASCMD client for Linux is installed.
    $ wget http://eas-data.oss-cn-shanghai.aliyuncs.com/tools/eascmd64
    # After you download EASCMD, modify access permissions and configure your AccessKey pair information.
    $ chmod +x eascmd64
    $ ./eascmd64 config -i <access_id> -k <access_key>
    
    # Initialize the environment.
    $ ./eascmd64 pysdk init ./pysdk_demo
    Then, enter the Python version. The default version is 3.6. The ENV directory, the prediction service template app.py, and the service deployment template app.json are automatically created for the Python environment.
  • Manual mode
    If EASCMD cannot meet your requirements or you encounter problems during the initialization process, you can try to manually initialize the development environment. We recommend that you use Conda to deploy the environment.
    mkdir demo
    cd demo
    # Use Conda to create a Python environment. The directory must be ENV.
    conda create -p ENV python=2.6
    # Install the SDK for Python of EAS.
    ENV/bin/pip install http://eas-data.oss-cn-shanghai.aliyuncs.com/sdk/allspark-0.9-py2.py3-none-any.whl
    # Install other dependencies, such as TensorFlow 1.14.
    ENV/bin/pip install tensorflow==1.14
    If you have not locally installed Conda, run the following commands to install it:
    $ wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh
    $ sh Miniconda3-latest-Linux-x86_64.sh
  • Pre-built development image (recommended)
    This section uses a pre-built development image provided by EAS, for which Conda has been pre-installed and a Python ENV environment is generated. EAS provides the following three pre-built images:
    # The base image installed with only Conda.
    registry.cn-shanghai.aliyuncs.com/eas/eas-python-base-image:latest
    # The SDK for Python installed with Conda, Python 2.7, and EAS Allspark 0.8.
    registry.cn-shanghai.aliyuncs.com/eas/eas-python-base-image:py2.7-allspark-0.8
    # The SDK for Python installed with Conda, Python 3.6, and EAS Allspark 0.8.
    registry.cn-shanghai.aliyuncs.com/eas/eas-python-base-image:py3.6-allspark-0.8
    Run the following run command on the image to obtain the Python development environment:
    $sudo docker run -ti registry.cn-shanghai.aliyuncs.com/eas/eas-python-base-image:py3.6-allspark-0.8
    (/data/eas/ENV) [root@487a04df4b21 eas]#
    (/data/eas/ENV) [root@487a04df4b21 eas]# ENV/bin/python app.py
    [INFO] initialize new lua plugin
    [INFO] loading builtin config script
    [INFO] current meritc id:0
    [INFO] loading builtin lua scripts
    [INFO] Success load all lua scripts.
    [INFO] create service
    [INFO] rpc binds to predefined port 8080
    [INFO] updating rpc port to 8080
    [INFO] install builtin handler call to /api/builtin/call
    [INFO] install builtin handler eastool to /api/builtin/eastool
    [INFO] install builtin handler monitor to /api/builtin/monitor
    [INFO] install builtin handler ping to /api/builtin/ping
    [INFO] install builtin handler prop to /api/builtin/prop
    [INFO] install builtin handler realtime_metrics to /api/builtin/realtime_metrics
    [INFO] install builtin handler tell to /api/builtin/tell
    [INFO] install builtin handler term to /api/builtin/term
    [INFO] Service start successfully
    [INFO] shutting down context ... press Ctrl+C again to force quit
    You can install your dependency libraries such as TensorFlow 1.12 based on the ENV environment of the base image, and then submit the modified container as a data image.
    ENV/bin/pip install tensorflow==1.12
    You can also build an ENV environment outside Docker and then copy the deployment package to the /data/eas/ directory of a Docker image. If you build a development environment by using an image, you do not need to upload the ENV environment package upon each deployment, which speeds up deployment.

Step 2: Define prediction logic

Create the main file app.py for the prediction service in a directory of the same level as the ENV directory. This file has been created in the pre-built EAS development image. EAS provides the following SDK encapsulation code. When EASCMD is used to initialize the environment, the system automatically generates the template file.
# -*- coding: utf-8 -*-
import allspark
class MyProcessor(allspark.BaseProcessor):
    """ MyProcessor is a example
        you can send mesage like this to predict
        curl -v http://127.0.0.1:8080/api/predict/service_name -d '2 105'
    """
    def initialize(self):
        """ load module, executed once at the start of the service
             do service intialization and load models in this function.
        """
        self.module = {'w0': 100, 'w1': 2}
    def pre_process(self, data):
        """ data format pre process
        """
        x, y = data.split(b' ')
        return int(x), int(y)
    def post_process(self, data):
        """ process after process
        """
        return bytes(data, encoding='utf8')
    def process(self, data):
        """ process the request data
        """
        x, y = self.pre_process(data)
        w0 = self.module['w0']
        w1 = self.module['w1']
        y1 = w1 * x + w0
        if y1 >= y:
            return self.post_process("True"), 200
        else:
            return self.post_process("False"), 400
if __name__ == '__main__':
    # paramter worker_threads indicates concurrency of processing
    runner = MyProcessor(worker_threads=10)
    runner.run()
The preceding code is a simple example for the SDK for Python. You must use the base class BaseProcessor provided by EAS to implement the initialize() and process() functions. The input and output of the process() function are of the BYTES type. The output parameters are response_data and status_code. The status_code parameter is 0 or 200 for a normal request.
Function Function description Parameter description
init(worker_threads=5, worker_processes=1,endpoint=None) The function for building a processor.
  • worker_threads: the number of worker threads. Default value: 5.
  • worker_processes: the number of processes. Default value: 1. If the value of worker_processes is 1, the single-process multi-thread mode is used. If the value of worker_processes is greater than 1, worker_threads only reads data. Requests are concurrently processed by multiple processes, and each process executes the initialize() function.
  • endpoint: the endpoint that the service listens to. This parameter specifies the IP address and port number that the service listens to, for example, endpoint='0.0.0.0:8079'.
initialize() The function for initializing the processor, which performs initialization such as loading a model during service startup. No parameter is involved.
process(data) The function for processing requests. Each request transfers the request body as a parameter to the process() function, which then sends the return value to the client. data: the request body, which is of the BYTES type. The return value is also of the BYTES type.
run() The function for starting the service. No parameter is involved.

Step 3: Test the service locally

. /ENV/bin/python app.py
curl http://127.0.0.1:8080/test  -d '10 20'

Step 4: Release the service online

  1. Compress the code.
    You can compress the code in one of the following ways:
    • Compress the complete environment and code
      You can compress the complete environment file manually or by using EASCMD.
      • Use EASCMD to compress the environment file
        $ ./eascmd64 pysdk pack ./demo
        [PYSDK] Creating package: /home/xingke.lwp/code/test/demo.tar.gz
      • Manually compress the environment file into a .zip or .tar.gz package
        The root directory of the package must be /ENV. The complete .tar.gz package for this sample has been uploaded to Object Storage Service (OSS) (download the package). You can use the following service configuration file to deploy the service:
        {
          "name": "pysdk_demo",
          "processor_entry": "./app.py",
          "processor_type": "python",
          "processor_path": "oss://eas-model-beijing/1955570263925790/pack.tar.gz",
          "metadata": {
            "instance": 1,
             "memory": 2000,
             "cpu": 1
            }
        }
    • Upload the environment package by using an image (recommended)
      Generally, the Python ENV environment file generated by Conda is large. If you compress and upload the environment file upon each development and deployment, a lot of time and storage resources will be wasted. EAS provides the data image deployment method. You can build an ENV environment based on the pre-built image, install the required Python dependency, submit the container as your own data image, and upload it to the image repository. The following code shows an example:
      sudo docker commit 487a04df4b21 registry.cn-shanghai.aliyuncs.com/eas-service/develop:latest
      sudo docker push registry.cn-shanghai.aliyuncs.com/eas-service/develop:latest
      You only need to compress and upload files including app.py to OSS. You can use the following service description file to deploy the service:
      {
        "name": "pysdk_demo",
        "processor_entry": "./service.py",
        "processor_type": "python",
        "processor_path": "http://eas-data.oss-cn-shanghai.aliyuncs.com/demo/service.py",
        "data_image": "registry.cn-shanghai.aliyuncs.com/eas-service/develop:latest",
        "metadata": {
          "instance": 1,
           "memory": 2000,
           "cpu": 1
          }
      }
  2. Deploy the service.
    $ ./eascmd64 create app.json
    [RequestId]: 1202D427-8187-4BCB-8D32-D7096E95B5CA
    +-------------------+-------------------------------------------------------------------+
    | Intranet Endpoint | http://1828488879222746.vpc.cn-beijing.pai-eas.aliyuncs.com/api/predict/pysdk_demo |
    |             Token | ZTBhZTY3ZjgwMmMyMTQ5OTgyMTQ5YmM0NjdiMmNiNmJkY2M5ODI0Zg==          |
    +-------------------+-------------------------------------------------------------------+
    [OK] Waiting task server to be ready
    [OK] Fetching processor from [oss://eas-model-beijing/1955570263925790/pack.tar.gz]
    [OK] Building image [registry-vpc.cn-beijing.aliyuncs.com/eas/pysdk_demo_cn-beijing:v0.0.1-20190806082810]
    [OK] Pushing image [registry-vpc.cn-beijing.aliyuncs.com/eas/pysdk_demo_cn-beijing:v0.0.1-20190806082810]
    [OK] Waiting [Total: 1, Pending: 1, Running: 0]
    [OK] Service is running
    # Test the service.
    $ curl http://1828488879222746.vpc.cn-beijing.pai-eas.aliyuncs.com/api/predict/pysdk_demo -H 'Authorization: ZTBhZTY3ZjgwMmMyMTQ5OTgyMTQ5YmM0NjdiMmNiNmJkY2M5ODI0Zg==' -d 'hello eas'