All Products
Search
Document Center

CloudOps Orchestration Service:QuickStarts

Last Updated:Dec 27, 2023

This topic describes how to use the Operation Orchestration Service (OOS) SDK for Python to perform common operations, such as creating a template, executing a template, and checking the execution result.

Create a template

Run the following code to create a template:

# coding: utf-8
from aliyunsdkcore.client import AcsClient
from aliyunsdkoos.request.v20190601.CreateTemplateRequest import CreateTemplateRequest

client = AcsClient(ak='<AccessKeyId>', secret='<AccessKeySecret>', region_id='cn-hangzhou')
content = '''
{
  "FormatVersion": "OOS-2019-06-01",
  "Description": "Descirbe instances of given status",
  "Parameters": {
    "Status": {
      "Type": "String",
      "Description": "(Required) The status of the Ecs instance."
    }
  },
  "Tasks": [
    {
      "Properties": {
        "Parameters": { "Status": "{{ Status }}" },
        "API": "DescribeInstances",
        "Service": "ECS"
      },
      "Name": "describeInstances",
      "Action": "ACS::ExecuteAPI"
    }
  ]
}
'''

def create_template(template_name, content):
    """
    Create a template.
    """
    request = CreateTemplateRequest()
    request.set_TemplateName(template_name)
    request.set_Content(content)
    resp = client.do_action_with_exception(request)
    return resp

resp = create_template(template_name='MyTemplate', content=content)
print(resp)

Execute a template

Run the following code to execute a template:

# coding: utf-8
import json
from aliyunsdkcore.client import AcsClient
from aliyunsdkoos.request.v20190601.StartExecutionRequest import StartExecutionRequest

client = AcsClient(ak='<AccessKeyId>', secret='<AccessKeySecret>', region_id='cn-hangzhou')

def start_execution(template_name, parameters=None):
    """
    Execute a template.
    """
    request = StartExecutionRequest()
    request.set_TemplateName(template_name)
    if parameters:
        parameters = json.dumps(parameters)
        request.set_Parameters(parameters)
    resp = client.do_action_with_exception(request)
    return resp

resp = start_execution(template_name='MyTemplate', parameters={"Status":"Running"})
print(resp)

Check the execution result

Run the following code to check the execution result:

# coding: utf-8
from aliyunsdkcore.client import AcsClient
from aliyunsdkoos.request.v20190601.ListExecutionsRequest import ListExecutionsRequest

client = AcsClient(ak='<AccessKeyId>', secret='<AccessKeySecret>', region_id='cn-hangzhou')

def list_exexutions(execution_id=None):
    """
    Check the execution result.
    """
    request = ListExecutionsRequest()
    if execution_id:
        request.set_ExecutionId(execution_id)
    resp = client.do_action_with_exception(request)
    return resp

resp = list_exexutions(execution_id='<ExecutionId>')
print(resp)