All Products
Search
Document Center

Container Service for Kubernetes:Build large-scale Argo Workflows with the Python SDK

Last Updated:Mar 26, 2026

Defining Argo Workflows in YAML is error-prone for Python developers unfamiliar with Kubernetes' strict indentation and hierarchy requirements. The Hera Python SDK lets you define and submit Argo Workflows in pure Python—no YAML required. This guide walks through two complete examples: a directed acyclic graph (DAG) diamond pattern and a MapReduce word-counting pipeline.

How it works

Argo Workflows natively uses YAML to define workflows. Hera bridges the gap for Python developers by compiling Python functions into Argo templates at submission time.

The core building block is the @script decorator. Applying it to any Python function turns that function into an Argo Script template:

@script()
def echo(message: str):
    print(message)

When submitted to Argo Workflows, Hera serializes the function body into a YAML Script template. This means the Python function you write is exactly what runs inside the container. Beyond template generation, the @script decorator also provides:

  • Python ecosystem integration — each decorated function is a standard Python callable, so it works with any library or framework

  • Direct testability — the function runs normally outside any Hera context, letting you write unit tests with pytest or any other testing framework

  • Parameterization — pass image, inputs, outputs, and resources directly in the decorator to configure the template without touching YAML

To define task dependencies, Hera uses the >> operator:

A >> [B, C] >> D   # B and C start after A completes; D starts after both B and C complete

The following equivalent forms all produce the same DAG:

# Single line
A >> [B, C] >> D

# Explicit chains
A >> B >> D
A >> C >> D

Prerequisites

Before you begin, ensure that you have:

  • Argo components and console installed, with the access credentials and Argo Server IP address — see Enable batch task orchestration

  • Hera installed:

    pip install hera-workflows

Example 1: Simple DAG diamond

A diamond DAG runs two tasks in parallel (B and C) after an initial task (A) completes, then feeds their combined output into a final task (D). This pattern is common for merging data streams or aggregating parallel processing results.

All steps in this example use the @script decorator to define tasks and the >> operator to declare dependencies.

  1. Create a file named simpleDAG.py with the following content.

    # Import the required packages.
    from hera.workflows import DAG, Workflow, script
    from hera.shared import global_config
    import urllib3
    
    urllib3.disable_warnings()
    
    # Configure the Argo Server endpoint and access token.
    global_config.host = "https://<argo-server-ip>:2746"
    global_config.token = "<your-access-token>"
    global_config.verify_ssl = ""
    
    # The @script decorator turns this function into an Argo Script template.
    # It runs normally outside of Hera—write unit tests for it directly.
    @script()
    def echo(message: str):
        print(message)
    
    # Define the workflow: name prefix, entrypoint template, and namespace.
    with Workflow(
        generate_name="dag-diamond-",
        entrypoint="diamond",
        namespace="argo",
    ) as w:
        with DAG(name="diamond"):
            A = echo(name="A", arguments={"message": "A"})
            B = echo(name="B", arguments={"message": "B"})
            C = echo(name="C", arguments={"message": "C"})
            D = echo(name="D", arguments={"message": "D"})
            A >> [B, C] >> D  # B and C run in parallel after A; D runs after both B and C.
    
    # Submit the workflow to Argo.
    w.create()

    Replace the following placeholders before running:

    Placeholder Description Example
    <argo-server-ip> IP address of the Argo Server 192.168.0.1
    <your-access-token> Access token obtained from the Argo setup abcdefgxxxxxx
  2. Submit the workflow.

    python simpleDAG.py
  3. View the DAG and results in the Workflow Console (Argo).

    image

Example 2: MapReduce

This example implements a word-counting MapReduce pipeline with three stages: split, map, and reduce. Each stage is a Python function decorated with @script, and artifacts are passed between stages using OSS.

Before you begin, configure artifacts — see Configure artifacts.

All three functions in this example use OSSArtifact to read and write intermediate data, and the with_param argument on the map task fans out one task instance per split part.

  1. Create a file named map-reduce.py with the following content.

    from hera.workflows import DAG, Artifact, NoneArchiveStrategy, Parameter, OSSArtifact, Workflow, script
    from hera.shared import global_config
    import urllib3
    
    urllib3.disable_warnings()
    
    # Configure the Argo Server endpoint and access token.
    global_config.host = "https://<argo-server-ip>:2746"
    global_config.token = "<your-access-token>"
    global_config.verify_ssl = ""
    
    # Stage 1 — Split: divides the input into num_parts chunks and writes each to OSS.
    @script(
        image="mirrors-ssl.aliyuncs.com/python:alpine",
        inputs=Parameter(name="num_parts"),
        outputs=OSSArtifact(name="parts", path="/mnt/out", archive=NoneArchiveStrategy(), key="{{workflow.name}}/parts"),
    )
    def split(num_parts: int) -> None:
        import json
        import os
        import sys
    
        os.mkdir("/mnt/out")
    
        part_ids = list(map(lambda x: str(x), range(num_parts)))
        for i, part_id in enumerate(part_ids, start=1):
            with open("/mnt/out/" + part_id + ".json", "w") as f:
                json.dump({"foo": i}, f)
        json.dump(part_ids, sys.stdout)
    
    # Stage 2 — Map: runs once per part; reads "foo" from each part and writes "bar" = foo x 2.
    @script(
        image="mirrors-ssl.aliyuncs.com/python:alpine",
        inputs=[Parameter(name="part_id", value="0"), Artifact(name="part", path="/mnt/in/part.json"),],
        outputs=OSSArtifact(
            name="part",
            path="/mnt/out/part.json",
            archive=NoneArchiveStrategy(),
            key="{{workflow.name}}/results/{{inputs.parameters.part_id}}.json",
        ),
    )
    def map_() -> None:
        import json
        import os
    
        os.mkdir("/mnt/out")
        with open("/mnt/in/part.json") as f:
            part = json.load(f)
        with open("/mnt/out/part.json", "w") as f:
            json.dump({"bar": part["foo"] * 2}, f)
    
    # Stage 3 — Reduce: reads all map outputs from OSS and sums the "bar" values into a total.
    @script(
        image="mirrors-ssl.aliyuncs.com/python:alpine",
        inputs=OSSArtifact(name="results", path="/mnt/in", key="{{workflow.name}}/results"),
        outputs=OSSArtifact(
            name="total", path="/mnt/out/total.json", archive=NoneArchiveStrategy(), key="{{workflow.name}}/total.json"
        ),
    )
    def reduce() -> None:
        import json
        import os
    
        os.mkdir("/mnt/out")
    
        total = 0
        for f in list(map(lambda x: open("/mnt/in/" + x), os.listdir("/mnt/in"))):
            result = json.load(f)
            total = total + result["bar"]
        with open("/mnt/out/total.json", "w") as f:
            json.dump({"total": total}, f)
    
    # Define the workflow with num_parts=4 as the default input.
    with Workflow(generate_name="map-reduce-", entrypoint="main", namespace="argo", arguments=Parameter(name="num_parts", value="4")) as w:
        with DAG(name="main"):
            s = split(arguments=Parameter(name="num_parts", value="{{workflow.parameters.num_parts}}"))
            m = map_(
                with_param=s.result,  # Fan out: one map task per split output.
                arguments=[Parameter(name="part_id", value="{{item}}"), OSSArtifact(name="part", key="{{workflow.name}}/parts/{{item}}.json"),],
            )
            s >> m >> reduce()  # Each stage starts after the previous one completes.
    
    # Submit the workflow to Argo.
    w.create()

    Replace the following placeholders before running:

    Placeholder Description Example
    <argo-server-ip> IP address of the Argo Server 192.168.0.1
    <your-access-token> Access token obtained from the Argo setup abcdefgxxxxxx
  2. Submit the workflow.

    python map-reduce.py
  3. View the DAG and results in the Workflow Console (Argo).

    image

References

Contact us

For product suggestions or questions, join the DingTalk group (ID: 35688562).