All Products
Search
Document Center

Batch Compute:Quick start for Python SDK

Last Updated:Feb 20, 2024

This section describes how to use the Python SDK to submit a job. The job aims to count the number of times INFO, WARN, ERROR, and DEBUG appear in a log file.

Note: Make sure that you have signed up Batch Compute service in advance.

Contents:

  • Prepare a job

    • Upload data file to OSS

    • Upload task program to OSS

  • Use SDK to submit job

  • Check result

The job aims to count the number of times INFO, WARN, ERROR, and DEBUG appear in a log file.

This job contains the following tasks:

  • The split task is used to divide the log file into three parts.

  • The count task is used to count the number of times INFO, WARN, ERROR, and DEBUG appear in each part of the log file. In the count task, InstanceCount must be set to 3, indicating that three count tasks are started concurrently.

  • The merge task is used to merge all the count results.

DAG

1.1. Upload data file to OSS

Download the data file used in this example: log-count-data.txt

Upload the log-count-data.txt file to oss://your-bucket/log-count/log-count-data.txt.

  • your-bucket indicates the bucket created by yourself. In this example, it is assumed that the region is cn-shenzhen.

  • To upload the file to the OSS, see Upload files to the OSS.

1.2. Upload task program to OSS

The job program used in this example is compiled using Python. Download the program: log-count.tar.gz.

In this example, it is unnecessary to modify the sample codes. You can directly upload log-count.tar.gz to the OSS, for example oss://your-bucket/log-count/log-count.tar.gz.

The upload method has been described earlier.

  • Batch Compute supports only the compressed packages with the extension tar.gz. Make sure that you use the preceding method (gzip) for packaging; otherwise, the package cannot be parsed.

  • If you must modify codes, decompress the file, modify the codes, and then follow these steps to pack the modified codes:

    The command is as follows:

    > cd log-count  # Switch to the directory.
    > tar -czf log-count.tar.gz * # Pack all files under this directory to log-count.tar.gz.

    You can run the following command to check the content of the compressed package:

    $ tar -tvf log-count.tar.gz

    The following list is displayed:

    conf.py
    count.py
    merge.py
    split.py

2. Use SDK to submit job

For more information about how to upload and install the Python SDK, click here.

If the SDK version is v20151111, you must specify a cluster ID or use the AutoCluster parameters when submitting a job.In this example, the AutoCluster is used. You must configure the following parameters for the AutoCluster:

  • Available image ID. You can use the image provided by the system or custom an image. For more information about how to custom an image, see Use an image.

  • InstanceType. For more information about the instance type, see Currently supported instance types.

Create a path for storing the StdoutRedirectPath (program outputs) and StderrRedirectPath (error logs) in the OSS. In this example, the created path is oss://your-bucket/log-count/logs/.

  • To run the program in this example, modify variables with comments in the program based on the previously described variables and OSS path variables.

The following provides a program submission template when the Python SDK is used. For specific meanings of parameters in the program, click here.


#encoding=utf-8
import sys

from batchcompute import Client, ClientError
from batchcompute import CN_SHENZHEN as REGION
from batchcompute.resources import (
    JobDescription, TaskDescription, DAG, AutoCluster
)

ACCESS_KEY_ID='' # Enter your AccessKeyID
ACCESS_KEY_SECRET='' # Enter your AccessKeySecret

IMAGE_ID = 'img-ubuntu' # Enter your image ID
INSTANCE_TYPE = 'ecs.sn1.medium' # Enter the instance type based on the region
WORKER_PATH = '' # 'oss://your-bucket/log-count/log-count.tar.gz' Enter the OSS storage path of the uploaded log-count.tar.gz
LOG_PATH = '' # 'oss://your-bucket/log-count/logs/' Enter the OSS storage path of the error feedback and task outputs
OSS_MOUNT= '' # 'oss://your-bucket/log-count/' Mount on to the "/home/inputs" and "/home/outputs"

client = Client(REGION, ACCESS_KEY_ID, ACCESS_KEY_SECRET)

def main():
    try:
        job_desc = JobDescription()

        # Create auto cluster.
        cluster = AutoCluster()
        cluster.InstanceType = INSTANCE_TYPE
        cluster.ResourceType = "OnDemand"
        cluster.ImageId = IMAGE_ID

        # Create split task.
        split_task = TaskDescription()
        split_task.Parameters.Command.CommandLine = "python split.py"
        split_task.Parameters.Command.PackagePath = WORKER_PATH
        split_task.Parameters.StdoutRedirectPath = LOG_PATH
        split_task.Parameters.StderrRedirectPath = LOG_PATH
        split_task.InstanceCount = 1
        split_task.AutoCluster = cluster
        split_task.InputMapping[OSS_MOUNT]='/home/input'
        split_task.OutputMapping['/home/output'] = OSS_MOUNT


        # Create map task.
        count_task = TaskDescription(split_task)
        count_task.Parameters.Command.CommandLine = "python count.py"
        count_task.InstanceCount = 3
        count_task.InputMapping[OSS_MOUNT] = '/home/input'
        count_task.OutputMapping['/home/output'] = OSS_MOUNT

        # Create merge task
        merge_task = TaskDescription(split_task)
        merge_task.Parameters.Command.CommandLine = "python merge.py"
        merge_task.InstanceCount = 1
        merge_task.InputMapping[OSS_MOUNT] = '/home/input'
        merge_task.OutputMapping['/home/output'] = OSS_MOUNT

        # Create task dag.
        task_dag = DAG()
        task_dag.add_task(task_name="split", task=split_task)
        task_dag.add_task(task_name="count", task=count_task)
        task_dag.add_task(task_name="merge", task=merge_task)
        task_dag.Dependencies = {
            'split': ['count'],
            'count': ['merge']
        }

        # Create job description.
        job_desc.DAG = task_dag
        job_desc.Priority = 99 # 0-1000
        job_desc.Name = "log-count"
        job_desc.Description = "PythonSDKDemo"
        job_desc.JobFailOnInstanceFail = True

        job_id = client.create_job(job_desc).Id
        print('job created: %s' % job_id)

    except ClientError, e:
        print (e.get_status_code(), e.get_code(), e.get_requestid(), e.get_msg())
if __name__ == '__main__':
    sys.exit(main())

3. Check job status

You can view the job status by referring to Obtain the job information.

jobInfo = client.get_job(job_id)
print (jobInfo.State)

A job may be in one of the following states: Waiting, Running, Finished, Failed, and Stopped.

4. Check job execution result

You can log on to the OSS console and check the following file under your bucket: /log-count/merge_result.json.

The expected result is as follows:

{"INFO": 2460, "WARN": 2448, "DEBUG": 2509, "ERROR": 2583}

Alternatively, you can use the Overview to obtain the results.