全部产品
Search
文档中心

批量计算:创建作业(DAG 类型)

更新时间:Sep 22, 2022

create_job

参数说明:

说明

所有类型的参数将被转换为包含属性信息的字典对象。

参数

类型

描述

job_desc

JobDescription object, str, dict

作业的简单描述和作业对象中各个任务的描述信息,以及各个任务之间的DAG依赖关系

返回值说明:

说明

create_job 方法将返回一个CreateResponse对象, 以下是 CreateResponse 对象的属性。可以通过 response.Id 的方式获取新任务的 ID。

属性

类型

描述

Id

str

新任务的任务标识符

e.g.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
from batchcompute import Client, ClientError
from batchcompute import CN_ZHANGJIAKOU as REGION
from batchcompute.resources import (
    ClusterDescription, GroupDescription, Configs, Networks, VPC,
    JobDescription, TaskDescription, DAG,Mounts,
    AutoCluster,Disks,Notification,
)

access_key_id = "" # your access key id
access_key_secret = "" # your access key secret
image_id = "m-8vbd8lo9xxxx" # the id of a image created before,镜像需要确保已经注册给批量计算
instance_type = "ecs.sn1.medium" # instance type
inputOssPath = "oss://xxx/input/" # your input oss path
outputOssPath = "oss://xxx/output/" #your output oss path
stdoutOssPath = "oss://xxx/log/stdout/" #your stdout oss path
stderrOssPath = "oss://xxx/log/stderr/" #your stderr oss path

def getAutoClusterDesc():
    auto_desc = AutoCluster()

    auto_desc.ECSImageId = image_id

    #任务失败保留环境,程序调试阶段设置。环境保留费用会继续产生请注意及时手动清除环境任务失败保留环境,
    # 程序调试阶段设置。环境保留费用会继续产生请注意及时手动清除环境
    auto_desc.ReserveOnFail = False

    # 实例规格
    auto_desc.InstanceType = instance_type

    #case1 设置上限价格的竞价实例; 
    # auto_desc.ResourceType = "Spot"
    # auto_desc.SpotStrategy = "SpotWithPriceLimit"
    # auto_desc.SpotPriceLimit = 0.5

    #case2 系统自动出价,最高按量付费价格
    # auto_desc.ResourceType = "Spot"
    # auto_desc.SpotStrategy = "SpotAsPriceGo"

    #case3 按量
    auto_desc.ResourceType = "OnDemand"

    #Configs
    configs = Configs()
    #Configs.Networks
    networks  = Networks()
    vpc = VPC()

    #case1 只给CidrBlock
    vpc.CidrBlock = '192.168.0.0/16'

    #case2 CidrBlock和VpcId 都传入,必须保证VpcId的CidrBlock 和传入的CidrBlock保持一致
    # vpc.CidrBlock = '172.26.0.0/16'
    # vpc.VpcId = "vpc-8vbfxdyhxxxx"

    networks.VPC = vpc
    configs.Networks = networks

    # 设置系统盘type(cloud_efficiency/cloud_ssd)以及size(单位GB)
    configs.add_system_disk(size=40, type_='cloud_efficiency')

    #设置数据盘type(必须和系统盘type保持一致)size(单位GB)挂载点
    # case1 linux环境
    # configs.add_data_disk(size=40, type_='cloud_efficiency', mount_point='/path/to/mount/')

    # case2 windows环境
    # configs.add_data_disk(size=40, type_='cloud_efficiency', mount_point='E:')

    # 设置节点个数
    configs.InstanceCount = 1
    auto_desc.Configs = configs
    return auto_desc

def getDagJobDesc(clusterId = None):
    job_desc = JobDescription()
    dag_desc = DAG()
    mounts_desc = Mounts()

    job_desc.Name = "testBatchSdkJob"
    job_desc.Description = "test job"
    job_desc.Priority = 1

    # 订阅job完成或者失败事件
    noti_desc = Notification()
    noti_desc.Topic['Name'] = "test-topic"
    noti_desc.Topic['Endpoint'] = "http://[UserId].mns.[Region].aliyuncs.com/"
    noti_desc.Topic['Events'] = ["OnJobFinished", "OnJobFailed"]
    # job_desc.Notification = noti_desc

    job_desc.JobFailOnInstanceFail = False

    # 作业运行成功后户自动会被立即释放掉
    job_desc.AutoRelease = False
    job_desc.Type = "DAG"

    echo_task = TaskDescription()

    # 程序的输入路径映射,程序直接访问/home/test/input/来访问oss://xxx/input/中的文件
    # 支持文件挂载,在程序中直接访问文件
    # echo_task.InputMapping = {"oss://xxx/input/": "/home/test/input/",
    #                          "oss://xxx/test/file": "/home/test/test/file"}
    echo_task.InputMapping = {inputOssPath: "/home/test/input/"}

    # 程序的输出路径映射,可执行程序将结果输出到/home/test/output/,
    # 程序执行完毕后批量计算将/home/test/output/中的结果上传到oss://xxx/output/中
    # 输入和输出oss路径不要有交叉,如输入为oss://xxx/input/,输出为oss://xxx/input/output/;
    # 这样会导致未定义行为程序执行性能不能保证
    echo_task.OutputMapping = {"/home/test/output/":outputOssPath}

    #触发程序运行的命令行
    #case1 执行linux命令行
    echo_task.Parameters.Command.CommandLine = "/bin/bash -c 'echo BatchcomputeService'"

    #case2 执行Windows CMD.exe
    # echo_task.Parameters.Command.CommandLine = "cmd /c 'echo BatchcomputeService'"

    #case3 输入可执行文件
    # PackagePath存放commandLine中的可执行文件或者二进制包
    # echo_task.Parameters.Command.PackagePath = "oss://xxx/package/test.sh"
    # echo_task.Parameters.Command.CommandLine = "sh test.sh"

    # 设置程序运行过程中相关环境变量信息
    echo_task.Parameters.Command.EnvVars["key1"] = "value1"
    echo_task.Parameters.Command.EnvVars["key2"] = "value2"

    # 设置docker参数
    #case1 docker镜像在oss registry上
    # echo_task.Parameters.Command.EnvVars["BATCH_COMPUTE_DOCKER_IMAGE"] = "localhost:5000/yuorBucket/dockers:0.1"
    # echo_task.Parameters.Command.EnvVars["BATCH_COMPUTE_DOCKER_REGISTRY_OSS_PATH"] = "oss://your-bucket/dockers"

    #case2 docker镜像在容器仓库
    # echo_task.Parameters.Command.Docker.Image = "registry.cn-beijing.aliyuncs.com/demotest/test:0.1"

    # 设置程序的标准输出地址,程序中的print打印会实时上传到指定的oss地址
    echo_task.Parameters.StdoutRedirectPath = stdoutOssPath

    # 设置程序的标准错误输出地址,程序抛出的异常错误会实时上传到指定的oss地址
    echo_task.Parameters.StderrRedirectPath = stderrOssPath

    # 设置任务的超时时间
    echo_task.Timeout = 600

    # 设置任务所需实例个数
    # 环境变量BATCH_COMPUTE_INSTANCE_ID为0到InstanceCount-1
    # 在执行程序中访问BATCH_COMPUTE_INSTANCE_ID,实现数据访问的切片实现单任务并发执行
    echo_task.InstanceCount = 1

    # 设置任务失败后重试次数
    echo_task.MaxRetryCount = 0

    # NAS数据挂载
    #采用NAS时必须保证网络和NAS在同一个VPC内
    nasMountEntry = {
        "Source": "nas://xxxx.nas.aliyuncs.com:/",
        "Destination": "/home/mnt/",
        "WriteSupport":True,
    }
    mounts_desc.add_entry(nasMountEntry)
    mounts_desc.Locale = "utf-8"
    mounts_desc.Lock = False
    # echo_task.Mounts = mounts_desc

    # 采用固定集群提交作业
    # echo_task.ClusterId = clusterId

    #采用auto集群提交作业
    echo_task.AutoCluster = getAutoClusterDesc()

    # 添加任务
    dag_desc.add_task('echoTask', echo_task)

    # 可以设置多个task,每个task可以根据需求进行设置各项参数
    # dag_desc.add_task('echoTask2', echo_task)

    # Dependencies设置多个task之间的依赖关系,echoTask2依赖echoTask;echoTask3依赖echoTask2
    # dag_desc.Dependencies = {"echoTask":["echoTask2"], "echoTask2":["echoTask3"]}
    # 
    job_desc.DAG = dag_desc
    return job_desc

if __name__ == "__main__":
    client = Client(REGION, access_key_id, access_key_secret)
    try:
        job_desc = getDagJobDesc()
        job_id = client.create_job(job_desc).Id
        print('job created: %s' % job_id)
    except ClientError,e:
        print (e.get_status_code(), e.get_code(), e.get_requestid(), e.get_msg())
说明

Notice: 关于Mounts的注意事项 Job 级别的 Mounts 参数会覆盖 Cluster 级别的配置信息; Modify Cluster 后,需要调用 RecreateInstance 接口才能使新指定的 Mounts 配置生效; 挂载 NAS 需要以 nas:// 做为 source 的前缀,否则会出错; 每个类的具体成员信息参见以下表格

(1)JobDescription 类

参数说明:

参数

类型

描述

properties

dict, str, JobDescription object

包含作业描述信息的对象

属性说明:

序号

属性

类型

描述

1.

Name

str

作业名称

2.

Description

str

作业的简短描述信息

3.

Priority

int

优先级用一个[0,1000]范围内的整数指定,数值越高表示作业调度时的优先级越高

4.

Notification

dict

消息通知配置,可以配置 MNS 服务的 Topic 和 Job 相关事件

5.

JobFailOnInstanceFail

bool

Instance 失败是否直接使 Job 失败

6.

AutoRelease

boolean

表示 Job 运行成功自动会被立即释放(删除)掉,默认为 False

7.

Type

str

目前仅支持有向无环图(directed acycline graph,DAG)形式描述任务

8.

DAG

dict, DAG object

DAG 描述

(2)DAG 类

参数说明:

参数

类型

描述

properties

dict, str, DAG object

所有任务的映射以及任务间依赖关系的描述信息

属性说明:

序号

属性

类型

描述

1.

Tasks

dict

所有任务名与任务描述的映射关系

2.

Dependencies

dict

所有任务间的相互依赖关系

方法说明 :

序号

方法

描述

1.

add_task(task_name, task)

增加一个任务

2.

get_task(task_name)

通过任务名获取任务信息

3.

delete_task(task_name)

删除某个任务

(3) TaskDescription 类

参数说明:

参数

类型

描述

properties

dict, str, TaskDescription object

单个任务的描述信息

属性说明:

序号

属性

类型

描述

1.

Parameters

dict, Parameters object

任务参数详情

2.

InputMapping

dict

OSS 到本地路径的映射

3.

OutputMapping

dict

本地路径到 OSS 的映射

4.

LogMapping

dict

本地日志路径对 OSS 映射

5.

Timeout

int

任务超时时间

6.

InstanceCount

int

任务中实例的个数,正数

7.

MaxRetryCount

int

最大重试次数,默认为0

8.

ClusterId

str

集群标识符

9.

Mounts

dict, Mounts object

实例的网络挂载配置信息,由 Mounts 描述,目前支持 NAS 和 OSS 挂载。

10.

AutoCluster

dict, AutoCluster object

匿名集群,和集群标示符最多只能指定一个

(4) Parameters 类

参数说明:

参数

类型

描述

properties

dict, str, Parameters object

任务参数的描述信息

属性说明:

序号

属性

类型

描述

1.

Command

dict, Command object

用户程序相关命令行参数

2.

InputMappingConfig

dict, InputMappingConfig object

NFS 挂载服务配置项

3.

StdoutRedirectPath

str

标准输出的 OSS 路径

4.

StderrRedirectPath

str

标准错误的 OSS 路径

(5) AutoCluster 类

参数说明:

参数

类型

描述

properties

dict, str, AutoCluster object

匿名集群信息

属性说明:

序号

属性

类型

描述

1.

ECSImageId

str

ECS 镜像 ID,可以使用系统提供的镜像

2.

InstanceType

str

实例规格,实例类型

3.

ResourceType

str

资源类型,目前支持:“OnDemand” 和 “Spot”,默认为“OnDemand”

4.

UserData

dict

一个 KeyValue 映射,用户自定义的信息,使用 ECS 的 metaserver 获取

5.

Configs

Configs object

集群的配置信息, 详见4.13 节中 ClusterDescription 的介绍

6.

SpotStrategy

str

实例的竞价策略,只有在 ResourceType 为 Spot 的情况下有效。取值范围:SpotWithPriceLimit:设置上限价格的竞价实例; SpotAsPriceGo:系统自动出价,最高按量付费价格。

7.

SpotPriceLimit

float

实例的每小时最高价格(每个实例规格的价格而非每核小时的价格)。支持最大 3 位小数,SpotStrategy 为 SpotWithPriceLimit 生效。

8.

ReserveOnFail

bool

任务失败时不释放相关的虚拟机,会继续收取这些资源的费用直到用户删除作业,默认为 False,仅用于调查问题。

9.

DependencyIsvService

string

执行程序依赖的阿里云提供的ISV服务,目前提供的ISV服务有:“GTX”,默认为””,不依赖任何ISV服务。

(6) Command 类

参数说明:

参数

类型

描述

properties

dict, str, Command object

用户程序相关命令行参数

属性说明:

序号

属性

类型

描述

1.

CommandLine

str

执行用户程序的命令

2.

PackagePath

str

用户程序所在 OSS 路径

3.

EnvVars

dict

用户程序执行时的环境变量

(7) InputMappingConfig 类

参数说明:

参数

类型

描述

properties

dict, str, InputMappingConfig object

NFS 挂载服务配置项

属性说明:

序号

属性

类型

描述

1.

Locale

str

OSS object 挂载到本地时使用的字符集。可选范围包括 GBK、GB2312-80、BIG5、ANSI、EUC-JP、EUC-TW、EUC-KR、SHIFT-JIS、KSC5601 等

2.

Lock

bool

NFS 挂载服务是否支持网络文件锁

(8) Notification 类

参数说明:

参数

类型

描述

properties

dict, str, Command object

用户程序相关命令行参数

属性说明:

序号

属性

类型

描述

1.

Topic

Topic Object

消息 Topic

(9) Topic 类

参数说明:

参数

类型

描述

properties

dict, str, Command object

用户程序相关命令行参数

属性说明:

序号

属性

类型

描述

1.

Endpoint

str

MNS 区域 endpoint,格式如:http://${your_user_id}.mns.${region}-internal.aliyuncs.com/ ,请尽量使用内网 Endpoint。

2.

Name

str

Topic 名称。

3.

Events

list

事件列表,请填写 cluster 相关的事件名。

(10) Mounts 类

参数说明:

参数

类型

描述

properties

dict, str, Command object

创建集群时的网络磁盘挂载配置信息。

属性说明:

序号

属性

类型

描述

1.

Entries

array

网络磁盘挂载点信息列表, 由 MountPoint 描述。

2.

Locale

str

挂载 OSS,NAS 存储时语言选项。

3.

Lock

bool

挂载 OSS,NAS 存储时文件锁支持选项。

4.

NAS

dict

NAS 配置信息。

5.

OSS

dict

OSS 配置信息。

(11) MountPoint 类

参数说明:

参数

类型

描述

properties

dict, str, Command object

网络挂载点。

属性说明:

序号

属性名称

类型

描述

1.

Source

str

网络磁盘挂载来源路径,可以是 nas://oss:// 开头的字符串。

2.

Destination

str

网络磁盘本地挂载点路径。

3.

WriteSupport

bool

挂载点是否可写。

(12) NAS 类

参数说明:

参数

类型

描述

properties

dict, str, Command object

NAS 配置信息。

属性说明:

序号

属性名称

类型

描述

1.

AccessGroup

list

需要将集群实例加入到的 NAS 访问组。

2.

FileSystem

list

需要访问的文件系统。

(13) OSS 类

参数说明:

参数

类型

描述

properties

dict, str, Command object

OSS 配置信息。

属性说明:

序号

属性名称

类型

描述

1.

AccessKeyId

str

OSS挂载使用的 Access ID。

2.

AccessKeySecret

str

OSS挂载使用的 Access Secret。

3.

SecurityToken

str

OSS挂载使用的 Security Token。