All Products
Search
Document Center

Container Service for Kubernetes:Use Argo Workflows for batch data processing

Last Updated:Mar 25, 2026

This tutorial shows you how to build a hierarchical file-merging pipeline with Argo Workflows and Object Storage Service (OSS). The same pattern applies to workloads such as high-precision map processing and animation rendering.

The pipeline runs in three sequential levels, each completing before the next begins:

LevelStepPodsFiles inFiles out
1process-data-l1163216
2process-data-l28168
Finalmerge-data18result.txt

Within each level, all pods run in parallel.

Prerequisites

Before you begin, ensure that you have:

Step 1: Prepare the data

Create a file named prepare-data.yaml with the following content:

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: aggregation-prepare-data-
  namespace: argo
spec:
  entrypoint: main
  volumes:
    - name: workdir
      persistentVolumeClaim:
        claimName: pvc-oss
  templates:
    - name: main
      dag:
        tasks:
          - name: prepare-data
            template: create-file

    - name: create-file
      script:
        image: mirrors-ssl.aliyuncs.com/python:alpine
        command:
          - python
        source: |
          import os
          import sys
          import random
          os.makedirs("/mnt/vol/aggregation-demo/l1/", exist_ok=True)
          for i in range(32):
            with open('/mnt/vol/aggregation-demo/l1/' + str(i) + '.txt', 'w') as conbine_file:
              combined_content = random.choice('ABCDEFGHIJKLMNOPQRSTUVWXYZ')
              conbine_file.write(combined_content)
        volumeMounts:
        - name: workdir
          mountPath: /mnt/vol

Submit the workflow:

argo submit prepare-data.yaml

After the workflow completes, OSS contains a directory named aggregation-demo/l1 with 32 files (0.txt through 31.txt), each holding a single random letter.

image

Step 2: Process the batch data

Fan-out strategies in Argo Workflows

The process-data.yaml workflow uses withSequence to fan out a step across multiple parallel pods. Argo Workflows supports three fan-out strategies:

StrategyWhen to use
withSequenceThe number of parallel instances is fixed and numeric (iterates over 0..N-1)
withItemsThe input is a predefined list of values
withParamThe list is generated dynamically at runtime by a previous step

This tutorial uses withSequence because the number of files at each level is known in advance.

Create and submit the workflow

Create a file named process-data.yaml with the following content:

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: process-data-
  namespace: argo
spec:
  entrypoint: main
  volumes:
    - name: workdir                    # Mount the OSS volume to share data across steps.
      persistentVolumeClaim:
        claimName: pvc-oss
  arguments:
    parameters:
      - name: numbers
        value: "16"
  templates:
    - name: main
      steps:
        - - name: process-data-l1      # Level 1: 16 pods merge 32 files into 16 files.
            template: process-data     # Double dash (- -) = new sequential stage.
            arguments:
              parameters:
                - name: file_number
                  value: "{{item}}"
                - name: level
                  value: "1"
            withSequence:              # withSequence iterates over a numeric range (0..N-1).
              count: "{{workflow.parameters.numbers}}"
        - - name: process-data-l2      # Level 2: 8 pods merge 16 files into 8 files.
            template: process-data     # Runs after Level 1 completes.
            arguments:
              parameters:
                - name: file_number
                  value: "{{item}}"
                - name: level
                  value: "2"
            withSequence:
              count: "{{=asInt(workflow.parameters.numbers)/2}}"
        - - name: merge-data           # Final level: 1 pod merges 8 files into result.txt.
            template: merge-data
            arguments:
              parameters:
                - name: number
                  value: "{{=asInt(workflow.parameters.numbers)/2}}"

    - name: process-data               # Template: merges two input files into one output file.
      inputs:
        parameters:
          - name: file_number
          - name: level
      container:
        # Make sure VPC internet access is enabled to pull this image.
        image: serverlessargo-registry.cn-hangzhou.cr.aliyuncs.com/argo-workflow-examples/python:3.11-amd
        imagePullPolicy: Always
        command: [python3]
        args: ["process.py", "{{inputs.parameters.file_number}}", "{{inputs.parameters.level}}"]
        volumeMounts:
          - name: workdir
            mountPath: /mnt/vol

    - name: merge-data                 # Template: merges all remaining files into result.txt.
      inputs:
        parameters:
          - name: number
      container:
        image: serverlessargo-registry.cn-hangzhou.cr.aliyuncs.com/argo-workflow-examples/python:3.11-amd
        imagePullPolicy: Always
        command: [python3]
        args: ["merge.py", "{{inputs.parameters.number}}"]
        volumeMounts:
          - name: workdir
            mountPath: /mnt/vol

Steps template execution model: In the steps template, a double-dash entry (- -) marks the start of a new sequential stage. Items indented under the same stage with a single dash ( -) run in parallel within that stage. This is how each level waits for the previous level to finish before starting.

Submit the workflow:

argo submit process-data.yaml -n argo

Verify the results

Check the workflow status. @latest is a shortcut that refers to the most recently submitted workflow:

argo get @latest -n argo

A successful run produces output similar to the following:

Name:                process-data-8sn2q
Namespace:           argo
ServiceAccount:      unset (will run with the default ServiceAccount)
Status:              Succeeded
Conditions:
 PodRunning          False
 Completed           True
Created:             Thu Dec 12 13:15:39 +0800 (4 minutes ago)
Started:             Thu Dec 12 13:15:39 +0800 (4 minutes ago)
Finished:            Thu Dec 12 13:16:40 +0800 (3 minutes ago)
Duration:            1 minute 1 seconds
Progress:            25/25
ResourcesDuration:   4s*(1 cpu),2m51s*(100Mi memory)
Parameters:
  numbers:           16

STEP                           TEMPLATE      PODNAME                                     DURATION  MESSAGE
 ✔ process-data-8sn2q          main
 ├─┬─✔ process-data-l1(0:0)    process-data  process-data-8sn2q-process-data-3064646785  17s
 │ ├─✔ process-data-l1(1:1)    process-data  process-data-8sn2q-process-data-140728989   20s
 │ ├─✔ process-data-l1(2:2)    process-data  process-data-8sn2q-process-data-499182361   18s
 │ ├─✔ process-data-l1(3:3)    process-data  process-data-8sn2q-process-data-3152865965  20s
 │ ├─✔ process-data-l1(4:4)    process-data  process-data-8sn2q-process-data-1363784105  16s
 │ ├─✔ process-data-l1(5:5)    process-data  process-data-8sn2q-process-data-3270437485  20s
 │ ├─✔ process-data-l1(6:6)    process-data  process-data-8sn2q-process-data-1788045361  16s
 │ ├─✔ process-data-l1(7:7)    process-data  process-data-8sn2q-process-data-913839549   20s
 │ ├─✔ process-data-l1(8:8)    process-data  process-data-8sn2q-process-data-1562179905  16s
 │ ├─✔ process-data-l1(9:9)    process-data  process-data-8sn2q-process-data-573517021   20s
 │ ├─✔ process-data-l1(10:10)  process-data  process-data-8sn2q-process-data-3769586203  16s
 │ ├─✔ process-data-l1(11:11)  process-data  process-data-8sn2q-process-data-3700909073  20s
 │ ├─✔ process-data-l1(12:12)  process-data  process-data-8sn2q-process-data-2818003295  19s
 │ ├─✔ process-data-l1(13:13)  process-data  process-data-8sn2q-process-data-278901825   20s
 │ ├─✔ process-data-l1(14:14)  process-data  process-data-8sn2q-process-data-3986961347  16s
 │ └─✔ process-data-l1(15:15)  process-data  process-data-8sn2q-process-data-2905592609  19s
 ├─┬─✔ process-data-l2(0:0)    process-data  process-data-8sn2q-process-data-2056515729  9s
 │ ├─✔ process-data-l2(1:1)    process-data  process-data-8sn2q-process-data-2141620461  4s
 │ ├─✔ process-data-l2(2:2)    process-data  process-data-8sn2q-process-data-352538601   9s
 │ ├─✔ process-data-l2(3:3)    process-data  process-data-8sn2q-process-data-2144734909  6s
 │ ├─✔ process-data-l2(4:4)    process-data  process-data-8sn2q-process-data-2290907961  8s
 │ ├─✔ process-data-l2(5:5)    process-data  process-data-8sn2q-process-data-4197561341  5s
 │ ├─✔ process-data-l2(6:6)    process-data  process-data-8sn2q-process-data-1620613249  9s
 │ └─✔ process-data-l2(7:7)    process-data  process-data-8sn2q-process-data-3126908173  10s
 └───✔ merge-data              merge-data    process-data-8sn2q-merge-data-1171626309    7s

You can also view the results in the console. The following screenshot shows all tasks completed.

image

The final output file, result.txt, is saved to OSS.

image

What's next

To build larger-scale pipelines or generate workflow parameters dynamically, see Build large-scale Argo Workflows with the Python SDK.

Contact us

If you have questions or feedback, join DingTalk group 35688562.