全部产品
Search
文档中心

批量计算:并发任务

更新时间:Sep 22, 2022

一个作业(Job)中可以有多个任务(Task),一个任务可以指定在多个实例(Instance)上运行程序。

如何运行并发任务

请看下面 job description json 例子:

{
    "DAG": {
        ...
        "Tasks": {
            ...
            "count": {
                "InstanceCount": 3,  //指定需要实例数:3台VM
                "LogMapping": {},
                "AutoCluster": {
                    "ResourceType": "OnDemand",
                    "ImageId": "img-ubuntu",
                    "InstanceType": "ecs.sn1.medium"
                },
                "Parameters": {
                    "Command": {
                        "EnvVars": {},
                        "CommandLine": "python count.py",
                        "PackagePath": "oss://your-bucket/log-count/worker.tar.gz"
                    },
                    "InputMappingConfig": {
                        "Lock": true
                    },
                    "StdoutRedirectPath": "oss://your-bucket/log-count/logs/",
                    "StderrRedirectPath": "oss://your-bucket/log-count/logs/"
                },
                "OutputMapping": {},
                "MaxRetryCount": 0,
                "Timeout": 21600,
                "InputMapping": {}
            }
        }
    },
    "Description": "batchcompute job",
    "Priority": 0,
    "JobFailOnInstanceFail": true,
    "Type": "DAG",
    "Name": "log-count"
}

任务count中配置了InstanceCount为3, 表示需要实例数3台, 即在3台VM上运行这个任务的程序。

并发处理不同数据片段

您可以使用 环境变量同时在多台 VM 上运行同一的任务程序,但是处理不同的数据呢。例如使用 BATCH_COMPUTE_DAG_INSTANCE_ID(实例ID) 可以处理不同片段的数据。以下是 count.py 代码片段:

...
# instance_id: should start from 0
instance_id = os.environ['BATCH_COMPUTE_DAG_INSTANCE_ID']

...

filename = 'part_%s.txt' %  instance_id
...

# 1. download a part
oss_tool.download_file('%s/%s/%s.txt' % (pre, split_results, instance_id ), filename)


...
# 3. upload result to oss
upload_to = '%s/count_results/%s.json' % (pre, instance_id )
print('upload to %s' % upload_to)
oss_tool.put_data(json.dumps(m), upload_to)
...

更多详情,请参阅 快速开始