All Products
Search
Document Center

Container Compute Service:Use Argo Workflows to batch process data

Last Updated:Dec 27, 2024

This topic describes how to use Argo Workflows to progressively merge objects in OSS and how to batch process data. This method is typically used in high-precision map processing and animation rendering. Using Argo Workflows to orchestrate tasks can greatly improve the efficiency and concurrency of data processing.

Prerequisites

Step 1: Prepare data

  1. Create a file named prepare-data.yaml based on the following content.

    apiVersion: argoproj.io/v1alpha1
    kind: Workflow
    metadata:
      generateName: aggregation-prepare-data-
      namespace: argo
    spec:
      entrypoint: main
      volumes:
        - name: workdir
          persistentVolumeClaim:
            claimName: pvc-oss
      templates:
        - name: main
          dag:
            tasks:
              - name: prepare-data
                template: create-file
    
        - name: create-file
          script:
            image: mirrors-ssl.aliyuncs.com/python:alpine
            command:
              - python
            source: |
              import os
              import sys
              import random
              os.makedirs("/mnt/vol/aggregation-demo/l1/", exist_ok=True)
              for i in range(32):
                with open('/mnt/vol/aggregation-demo/l1/' + str(i) + '.txt', 'w') as conbine_file:              
                  combined_content = random.choice('ABCDEFGHIJKLMNOPQRSTUVWXYZ')
                  conbine_file.write(combined_content)
            volumeMounts:
            - name: workdir
              mountPath: /mnt/vol
  2. Run the following command to submit the workflow:

    argo submit prepare-data.yaml

    After the workflow runs, a directory named aggregation-demo/l1 is created in the OSS volume and 32 files are generated, each of which contains a random string.

    image

Step 2: Batch process data

  1. Create a YAML file named process-data.yaml to deploy a data processing workflow.

    apiVersion: argoproj.io/v1alpha1
    kind: Workflow
    metadata:
      generateName: process-data- # Data processing workflow. 
      namespace: argo
    spec:
      entrypoint: main
      volumes: # Mount an OSS volume. 
        - name: workdir
          persistentVolumeClaim:
            claimName: pvc-oss
      arguments:
        parameters:
          - name: numbers
            value: "16"
      templates:
        - name: main
          steps:
            - - name: process-data-l1 # Step 1. Launch 16 pods to merge 32 files. 
                template: process-data
                arguments:
                  parameters:
                    - name: file_number
                      value: "{{item}}"
                    - name: level
                      value: "1"
                withSequence:
                  count: "{{workflow.parameters.numbers}}"
            - - name: process-data-l2 # Step 2. Launch 8 pods to merge 16 files. This step is performed after Step 1 is completed. 
                template: process-data
                arguments:
                  parameters:
                    - name: file_number
                      value: "{{item}}"
                    - name: level
                      value: "2"
                withSequence:
                  count: "{{=asInt(workflow.parameters.numbers)/2}}"
            - - name: merge-data # Step 3. Launch a pod to merge 8 files. This step is performed after Step 2 is completed. 
                template: merge-data
                arguments:
                  parameters:
                    - name: number
                      value: "{{=asInt(workflow.parameters.numbers)/2}}"
    
        - name: process-data # Data processing. 
          inputs:
            parameters:
              - name: file_number
              - name: level
          container:
            image: serverlessargo-registry.cn-hangzhou.cr.aliyuncs.com/argo-workflow-examples/python:3.11-amd # Enable VPC Internet access to pull images.
            imagePullPolicy: Always
            command: [python3] # command
            args: ["process.py", "{{inputs.parameters.file_number}}", "{{inputs.parameters.level}}"]# Receive input parameters that specify the files processed by the pods.
            volumeMounts:
            - name: workdir
              mountPath: /mnt/vol
    
        - name: merge-data # Data merging. 
          inputs:
            parameters:
              - name: number
          container:
            image: serverlessargo-registry.cn-hangzhou.cr.aliyuncs.com/argo-workflow-examples/python:3.11-amd
            imagePullPolicy: Always
            command: [python3]
            args: ["merge.py", "{{inputs.parameters.number}}"] # Receive input parameters that specify the number of files to be processed. 
            volumeMounts:
            - name: workdir
              mountPath: /mnt/vol
  2. Run the following command to submit the workflow:

    argo submit process-data.yaml -n argo
  3. Run the following command to view the mount result:

    argo get @latest -n argo

    Expected results:

    argo get @latest -n argo
    Name:                process-data-8sn2q
    Namespace:           argo
    ServiceAccount:      unset (will run with the default ServiceAccount)
    Status:              Succeeded
    Conditions:          
     PodRunning          False
     Completed           True
    Created:             Thu Dec 12 13:15:39 +0800 (4 minutes ago)
    Started:             Thu Dec 12 13:15:39 +0800 (4 minutes ago)
    Finished:            Thu Dec 12 13:16:40 +0800 (3 minutes ago)
    Duration:            1 minute 1 seconds
    Progress:            25/25
    ResourcesDuration:   4s*(1 cpu),2m51s*(100Mi memory)
    Parameters:          
      numbers:           16
    
    STEP                           TEMPLATE      PODNAME                                     DURATION  MESSAGE
     ✔ process-data-8sn2q          main                                                                  
     ├─┬─✔ process-data-l1(0:0)    process-data  process-data-8sn2q-process-data-3064646785  17s         
     │ ├─✔ process-data-l1(1:1)    process-data  process-data-8sn2q-process-data-140728989   20s         
     │ ├─✔ process-data-l1(2:2)    process-data  process-data-8sn2q-process-data-499182361   18s         
     │ ├─✔ process-data-l1(3:3)    process-data  process-data-8sn2q-process-data-3152865965  20s         
     │ ├─✔ process-data-l1(4:4)    process-data  process-data-8sn2q-process-data-1363784105  16s         
     │ ├─✔ process-data-l1(5:5)    process-data  process-data-8sn2q-process-data-3270437485  20s         
     │ ├─✔ process-data-l1(6:6)    process-data  process-data-8sn2q-process-data-1788045361  16s         
     │ ├─✔ process-data-l1(7:7)    process-data  process-data-8sn2q-process-data-913839549   20s         
     │ ├─✔ process-data-l1(8:8)    process-data  process-data-8sn2q-process-data-1562179905  16s         
     │ ├─✔ process-data-l1(9:9)    process-data  process-data-8sn2q-process-data-573517021   20s         
     │ ├─✔ process-data-l1(10:10)  process-data  process-data-8sn2q-process-data-3769586203  16s         
     │ ├─✔ process-data-l1(11:11)  process-data  process-data-8sn2q-process-data-3700909073  20s         
     │ ├─✔ process-data-l1(12:12)  process-data  process-data-8sn2q-process-data-2818003295  19s         
     │ ├─✔ process-data-l1(13:13)  process-data  process-data-8sn2q-process-data-278901825   20s         
     │ ├─✔ process-data-l1(14:14)  process-data  process-data-8sn2q-process-data-3986961347  16s         
     │ └─✔ process-data-l1(15:15)  process-data  process-data-8sn2q-process-data-2905592609  19s         
     ├─┬─✔ process-data-l2(0:0)    process-data  process-data-8sn2q-process-data-2056515729  9s          
     │ ├─✔ process-data-l2(1:1)    process-data  process-data-8sn2q-process-data-2141620461  4s          
     │ ├─✔ process-data-l2(2:2)    process-data  process-data-8sn2q-process-data-352538601   9s          
     │ ├─✔ process-data-l2(3:3)    process-data  process-data-8sn2q-process-data-2144734909  6s          
     │ ├─✔ process-data-l2(4:4)    process-data  process-data-8sn2q-process-data-2290907961  8s          
     │ ├─✔ process-data-l2(5:5)    process-data  process-data-8sn2q-process-data-4197561341  5s          
     │ ├─✔ process-data-l2(6:6)    process-data  process-data-8sn2q-process-data-1620613249  9s          
     │ └─✔ process-data-l2(7:7)    process-data  process-data-8sn2q-process-data-3126908173  10s         
     └───✔ merge-data              merge-data    process-data-8sn2q-merge-data-1171626309    7s 
  4. View the result in the console.

    image

    The result indicates that the workflow is successful. View the OSS bucket. The result.txt file is generated.

    image

References

To use SDK for Python to create and submit workflows, see Use SDK for Python to create large numbers of Argo workflows.

Contact us

If you have suggestions or questions about this product, join the DingTalk group 35688562 to contact us.