All Products
Search
Document Center

Quick start for Python SDK

Last Updated: Aug 06, 2019

This section describes how to use the Python SDK to submit a job. The job aims to count the number of times INFO, WARN, ERROR, and DEBUG appear in a log file.

Note: Make sure that you have signed up Batch Compute service in advance.

Contents:

  • Prepare a job
    • Upload data file to OSS
    • Upload task program to OSS
  • Use SDK to submit job
  • Check result

1. Prepare a job

The job aims to count the number of times INFO, WARN, ERROR, and DEBUG appear in a log file.

This job contains the following tasks:

  • The split task is used to divide the log file into three parts.
  • The count task is used to count the number of times INFO, WARN, ERROR, and DEBUG appear in each part of the log file. In the count task, InstanceCount must be set to 3, indicating that three count tasks are started concurrently.
  • The merge task is used to merge all the count results.

DAG

DAG

1.1. Upload data file to OSS

Download the data file used in this example: log-count-data.txt

Upload the log-count-data.txt file to oss://your-bucket/log-count/log-count-data.txt.

  • your-bucket indicates the bucket created by yourself. In this example, it is assumed that the region is cn-shenzhen.
  • To upload the file to the OSS, see Upload files to the OSS.

1.2. Upload task program to OSS

The job program used in this example is complied using Python. Download the program: log-count.tar.gz.

In this example, it is unnecessary to modify the sample codes. You can directly upload log-count.tar.gz to the OSS, for example oss://your-bucket/log-count/log-count.tar.gz.

The upload method has been described earlier.

  • Batch Compute supports only the compressed packages with the extension tar.gz. Make sure that you use the preceding method (gzip) for packaging; otherwise, the package cannot be parsed.
  • If you must modify codes, decompress the file, modify the codes, and then follow these steps to pack the modified codes:

    The command is as follows:

    1. > cd log-count # Switch to the directory.
    2. > tar -czf log-count.tar.gz * # Pack all files under this directory to log-count.tar.gz.

    You can run the following command to check the content of the compressed package:

    1. $ tar -tvf log-count.tar.gz

    The following list are displayed:

    1. conf.py
    2. count.py
    3. merge.py
    4. split.py

2. Use SDK to submit job

For more information about how to upload and install the Python SDK, click here.

If the SDK version is v20151111, you must specify a cluster ID or use the AutoCluster parameters when submitting a job.In this example, the AutoCluster is used. You must configure the following parameters for the AutoCluster:

  • Available image ID. You can use the image provided by the system or custom an image. For more information about how to custom an image, see Use an image.
  • InstanceType. For more information about the instance type, see Currently supported instance types.

Create a path for storing the StdoutRedirectPath (program outputs) and StderrRedirectPath (error logs) in the OSS. In this example, the created path is oss://your-bucket/log-count/logs/.

  • To run the program in this example, modify variables with comments in the program based on the previously described variables and OSS path variables.

The following provides a program submission template when the Python SDK is used. For specific meanings of parameters in the program, click here.

  1. #encoding=utf-8
  2. import sys
  3. from batchcompute import Client, ClientError
  4. from batchcompute import CN_SHENZHEN as REGION
  5. from batchcompute.resources import (
  6. JobDescription, TaskDescription, DAG, AutoCluster
  7. )
  8. ACCESS_KEY_ID='' # Enter your AccessKeyID
  9. ACCESS_KEY_SECRET='' # Enter your AccessKeySecret
  10. IMAGE_ID = 'img-ubuntu' # Enter your image ID
  11. INSTANCE_TYPE = 'ecs.sn1.medium' # Enter the instance type based on the region
  12. WORKER_PATH = '' # 'oss://your-bucket/log-count/log-count.tar.gz' Enter the OSS storage path of the uploaded log-count.tar.gz
  13. LOG_PATH = '' # 'oss://your-bucket/log-count/logs/' Enter the OSS storage path of the error feedback and task outputs
  14. OSS_MOUNT= '' # 'oss://your-bucket/log-count/' Mount on to the "/home/inputs" and "/home/outputs"
  15. client = Client(REGION, ACCESS_KEY_ID, ACCESS_KEY_SECRET)
  16. def main():
  17. try:
  18. job_desc = JobDescription()
  19. # Create auto cluster.
  20. cluster = AutoCluster()
  21. cluster.InstanceType = INSTANCE_TYPE
  22. cluster.ResourceType = "OnDemand"
  23. cluster.ImageId = IMAGE_ID
  24. # Create split task.
  25. split_task = TaskDescription()
  26. split_task.Parameters.Command.CommandLine = "python split.py"
  27. split_task.Parameters.Command.PackagePath = WORKER_PATH
  28. split_task.Parameters.StdoutRedirectPath = LOG_PATH
  29. split_task.Parameters.StderrRedirectPath = LOG_PATH
  30. split_task.InstanceCount = 1
  31. split_task.AutoCluster = cluster
  32. split_task.InputMapping[OSS_MOUNT]='/home/input'
  33. split_task.OutputMapping['/home/output'] = OSS_MOUNT
  34. # Create map task.
  35. count_task = TaskDescription(split_task)
  36. count_task.Parameters.Command.CommandLine = "python count.py"
  37. count_task.InstanceCount = 3
  38. count_task.InputMapping[OSS_MOUNT] = '/home/input'
  39. count_task.OutputMapping['/home/output'] = OSS_MOUNT
  40. # Create merge task
  41. merge_task = TaskDescription(split_task)
  42. merge_task.Parameters.Command.CommandLine = "python merge.py"
  43. merge_task.InstanceCount = 1
  44. merge_task.InputMapping[OSS_MOUNT] = '/home/input'
  45. merge_task.OutputMapping['/home/output'] = OSS_MOUNT
  46. # Create task dag.
  47. task_dag = DAG()
  48. task_dag.add_task(task_name="split", task=split_task)
  49. task_dag.add_task(task_name="count", task=count_task)
  50. task_dag.add_task(task_name="merge", task=merge_task)
  51. task_dag.Dependencies = {
  52. 'split': ['count'],
  53. 'count': ['merge']
  54. }
  55. # Create job description.
  56. job_desc.DAG = task_dag
  57. job_desc.Priority = 99 # 0-1000
  58. job_desc.Name = "log-count"
  59. job_desc.Description = "PythonSDKDemo"
  60. job_desc.JobFailOnInstanceFail = True
  61. job_id = client.create_job(job_desc).Id
  62. print('job created: %s' % job_id)
  63. except ClientError, e:
  64. print (e.get_status_code(), e.get_code(), e.get_requestid(), e.get_msg())
  65. if __name__ == '__main__':
  66. sys.exit(main())

3. Check job status

You can view the job status by referring to Obtain the job information.

  1. jobInfo = client.get_job(job_id)
  2. print (jobInfo.State)

A job may be in one of the following states: Waiting, Running, Finished, Failed, and Stopped.

4. Check job execution result

You can log on to the OSS console and check the following file under your bucket: /log-count/merge_result.json.

The expected result is as follows:

  1. {"INFO": 2460, "WARN": 2448, "DEBUG": 2509, "ERROR": 2583}
  • Alternatively, you can use the OSS SDK to obtain the results.

  • Alternatively, you can use the OSS SDK to obtain the results.