This topic describes how to use Argo Workflows to progressively merge objects in OSS and how to batch process data. This method is typically used in high-precision map processing and animation rendering. Using Argo Workflows to orchestrate tasks can greatly improve the efficiency and concurrency of data processing.
Prerequisites
Components and the Alibaba Cloud Argo CLI are installed. For more information, see Enable batch task orchestration.
An OSS volume is created. For more information, see Use volumes.
Step 1: Prepare data
Create a file named prepare-data.yaml based on the following content.
apiVersion: argoproj.io/v1alpha1 kind: Workflow metadata: generateName: aggregation-prepare-data- namespace: argo spec: entrypoint: main volumes: - name: workdir persistentVolumeClaim: claimName: pvc-oss templates: - name: main dag: tasks: - name: prepare-data template: create-file - name: create-file script: image: mirrors-ssl.aliyuncs.com/python:alpine command: - python source: | import os import sys import random os.makedirs("/mnt/vol/aggregation-demo/l1/", exist_ok=True) for i in range(32): with open('/mnt/vol/aggregation-demo/l1/' + str(i) + '.txt', 'w') as conbine_file: combined_content = random.choice('ABCDEFGHIJKLMNOPQRSTUVWXYZ') conbine_file.write(combined_content) volumeMounts: - name: workdir mountPath: /mnt/volRun the following command to submit the workflow:
argo submit prepare-data.yamlAfter the workflow runs, a directory named
aggregation-demo/l1is created in the OSS volume and 32 files are generated, each of which contains a random string.
Step 2: Batch process data
Create a YAML file named process-data.yaml to deploy a data processing workflow.
apiVersion: argoproj.io/v1alpha1 kind: Workflow metadata: generateName: process-data- # Data processing workflow. namespace: argo spec: entrypoint: main volumes: # Mount an OSS volume. - name: workdir persistentVolumeClaim: claimName: pvc-oss arguments: parameters: - name: numbers value: "16" templates: - name: main steps: - - name: process-data-l1 # Step 1. Launch 16 pods to merge 32 files. template: process-data arguments: parameters: - name: file_number value: "{{item}}" - name: level value: "1" withSequence: count: "{{workflow.parameters.numbers}}" - - name: process-data-l2 # Step 2. Launch 8 pods to merge 16 files. This step is performed after Step 1 is completed. template: process-data arguments: parameters: - name: file_number value: "{{item}}" - name: level value: "2" withSequence: count: "{{=asInt(workflow.parameters.numbers)/2}}" - - name: merge-data # Step 3. Launch a pod to merge 8 files. This step is performed after Step 2 is completed. template: merge-data arguments: parameters: - name: number value: "{{=asInt(workflow.parameters.numbers)/2}}" - name: process-data # Data processing. inputs: parameters: - name: file_number - name: level container: image: serverlessargo-registry.cn-hangzhou.cr.aliyuncs.com/argo-workflow-examples/python:3.11-amd # Enable VPC Internet access to pull images. imagePullPolicy: Always command: [python3] # command args: ["process.py", "{{inputs.parameters.file_number}}", "{{inputs.parameters.level}}"]# Receive input parameters that specify the files processed by the pods. volumeMounts: - name: workdir mountPath: /mnt/vol - name: merge-data # Data merging. inputs: parameters: - name: number container: image: serverlessargo-registry.cn-hangzhou.cr.aliyuncs.com/argo-workflow-examples/python:3.11-amd imagePullPolicy: Always command: [python3] args: ["merge.py", "{{inputs.parameters.number}}"] # Receive input parameters that specify the number of files to be processed. volumeMounts: - name: workdir mountPath: /mnt/volRun the following command to submit the workflow:
argo submit process-data.yaml -n argoRun the following command to view the mount result:
argo get @latest -n argoExpected results:
argo get @latest -n argo Name: process-data-8sn2q Namespace: argo ServiceAccount: unset (will run with the default ServiceAccount) Status: Succeeded Conditions: PodRunning False Completed True Created: Thu Dec 12 13:15:39 +0800 (4 minutes ago) Started: Thu Dec 12 13:15:39 +0800 (4 minutes ago) Finished: Thu Dec 12 13:16:40 +0800 (3 minutes ago) Duration: 1 minute 1 seconds Progress: 25/25 ResourcesDuration: 4s*(1 cpu),2m51s*(100Mi memory) Parameters: numbers: 16 STEP TEMPLATE PODNAME DURATION MESSAGE ✔ process-data-8sn2q main ├─┬─✔ process-data-l1(0:0) process-data process-data-8sn2q-process-data-3064646785 17s │ ├─✔ process-data-l1(1:1) process-data process-data-8sn2q-process-data-140728989 20s │ ├─✔ process-data-l1(2:2) process-data process-data-8sn2q-process-data-499182361 18s │ ├─✔ process-data-l1(3:3) process-data process-data-8sn2q-process-data-3152865965 20s │ ├─✔ process-data-l1(4:4) process-data process-data-8sn2q-process-data-1363784105 16s │ ├─✔ process-data-l1(5:5) process-data process-data-8sn2q-process-data-3270437485 20s │ ├─✔ process-data-l1(6:6) process-data process-data-8sn2q-process-data-1788045361 16s │ ├─✔ process-data-l1(7:7) process-data process-data-8sn2q-process-data-913839549 20s │ ├─✔ process-data-l1(8:8) process-data process-data-8sn2q-process-data-1562179905 16s │ ├─✔ process-data-l1(9:9) process-data process-data-8sn2q-process-data-573517021 20s │ ├─✔ process-data-l1(10:10) process-data process-data-8sn2q-process-data-3769586203 16s │ ├─✔ process-data-l1(11:11) process-data process-data-8sn2q-process-data-3700909073 20s │ ├─✔ process-data-l1(12:12) process-data process-data-8sn2q-process-data-2818003295 19s │ ├─✔ process-data-l1(13:13) process-data process-data-8sn2q-process-data-278901825 20s │ ├─✔ process-data-l1(14:14) process-data process-data-8sn2q-process-data-3986961347 16s │ └─✔ process-data-l1(15:15) process-data process-data-8sn2q-process-data-2905592609 19s ├─┬─✔ process-data-l2(0:0) process-data process-data-8sn2q-process-data-2056515729 9s │ ├─✔ process-data-l2(1:1) process-data process-data-8sn2q-process-data-2141620461 4s │ ├─✔ process-data-l2(2:2) process-data process-data-8sn2q-process-data-352538601 9s │ ├─✔ process-data-l2(3:3) process-data process-data-8sn2q-process-data-2144734909 6s │ ├─✔ process-data-l2(4:4) process-data process-data-8sn2q-process-data-2290907961 8s │ ├─✔ process-data-l2(5:5) process-data process-data-8sn2q-process-data-4197561341 5s │ ├─✔ process-data-l2(6:6) process-data process-data-8sn2q-process-data-1620613249 9s │ └─✔ process-data-l2(7:7) process-data process-data-8sn2q-process-data-3126908173 10s └───✔ merge-data merge-data process-data-8sn2q-merge-data-1171626309 7sView the result in the console.

The result indicates that the workflow is successful. View the OSS bucket. The result.txt file is generated.

References
To use SDK for Python to create and submit workflows, see Use SDK for Python to create large numbers of Argo workflows.
Contact us
If you have suggestions or questions about this product, join the DingTalk group 35688562 to contact us.