All Products
Search
Document Center

Batch Compute:Concurrent tasks

Last Updated:Apr 12, 2018

One job can have multiple tasks and one task program can be run in multiple instances.

How to run tasks concurrently?

See the following job description example in JSON format:

  1. {
  2. "DAG": {
  3. ...
  4. "Tasks": {
  5. ...
  6. "count": {
  7. "InstanceCount": 3, //Specifies the number of instances: three VMs
  8. "LogMapping": {},
  9. "AutoCluster": {
  10. "ResourceType": "OnDemand",
  11. "ImageId": "img-ubuntu",
  12. "InstanceType": "ecs.sn1.medium"
  13. },
  14. "Parameters": {
  15. "Command": {
  16. "EnvVars": {},
  17. "CommandLine": "python count.py",
  18. "PackagePath": "oss://your-bucket/log-count/worker.tar.gz"
  19. },
  20. "InputMappingConfig": {
  21. "Lock": true
  22. },
  23. "StdoutRedirectPath": "oss://your-bucket/log-count/logs/",
  24. "StderrRedirectPath": "oss://your-bucket/log-count/logs/"
  25. },
  26. "OutputMapping": {},
  27. "MaxRetryCount": 0,
  28. "Timeout": 21600,
  29. "InputMapping": {}
  30. }
  31. }
  32. },
  33. "Description": "batchcompute job",
  34. "Priority": 0,
  35. "JobFailOnInstanceFail": true,
  36. "Type": "DAG",
  37. "Name": "log-count"
  38. }

The InstanceCount in the task is set to 3, which means three instances are needed, that is, the task program runs on three virtual machines (VMs).

Process data of different segments concurrently

You can run one same task program to process different data on several VMs? In the task program, use an environment variable BATCH_COMPUTE_DAG_INSTANCE_ID(instance ID) for division to make the task program process data of different segments. The following is a count.py code snippet:

  1. ...
  2. # instance_id: should start from 0
  3. instance_id = os.environ['BATCH_COMPUTE_DAG_INSTANCE_ID']
  4. ...
  5. filename = 'part_%s.txt' % instance_id
  6. ...
  7. # 1. download a part
  8. oss_tool.download_file('%s/%s/%s.txt' % (pre, split_results, instance_id ), filename)
  9. ...
  10. # 3. upload result to oss
  11. upload_to = '%s/count_results/%s.json' % (pre, instance_id )
  12. print('upload to %s' % upload_to)
  13. oss_tool.put_data(json.dumps(m), upload_to)
  14. ...

For more information, see Quick start example.