All Products
Search
Document Center

Container Service for Kubernetes:Build large-scale Argo Workflows with the Python SDK

Last Updated:Jun 25, 2026

Argo Workflows is a powerful workflow management tool, widely used for scheduled tasks, machine learning, and ETL. However, defining workflows with YAML has a steep learning curve. The Hera Python SDK offers a simple alternative. Hera allows users to build workflows in Python, which supports complex tasks, simplifies testing, and integrates seamlessly with the Python ecosystem, making it much easier to design complex workflows. This topic describes how to use the Python SDK to build large-scale Argo Workflows.

Background

Argo Workflows is an open-source workflow management tool designed specifically for Kubernetes environments. It focuses on orchestrating complex workflows and allows users to define a series of tasks and flexibly arrange their execution order and dependencies. Argo Workflows helps you efficiently build and manage highly customized, automated workflows.

Argo Workflows has a wide range of use cases, including scheduled tasks, machine learning, simulation computing, scientific computing, ETL, model training, and CI/CD. It primarily uses YAML to define workflows, a design choice intended for clarity and simplicity. However, for users who are new to or unfamiliar with YAML, its strict indentation and hierarchical structure can create a steep learning curve, especially for complex workflows.

image

Hera is a Python SDK framework designed for building and submitting Argo workflows. It simplifies workflow creation and submission. For data scientists, using Python aligns with their typical practices and helps them overcome the challenges of YAML.

Authoring method comparison

YAML

Hera

Simplicity

High

High, fewer lines of code

Writing complex workflows

Difficult

Easy, effectively avoids potential YAML syntax errors

Python ecosystem integration

Difficult

Easy, access to rich Python libraries

Testability

Difficult, prone to syntax errors

Easy. You can use testing frameworks to improve code quality and maintainability.

Hera connects the Python ecosystem with the Argo Workflows framework, making workflow design more intuitive. It enables large-scale task orchestration without the complexity of YAML, allowing data scientists and engineers to work in their preferred Python environment. This makes building and optimizing machine learning workflows seamless and efficient, accelerating the iteration cycle from idea to deployment. The following examples use Hera.

Step 1: Create a cluster and get a token

  1. Create an Argo workflow cluster, then enable Argo Server and access the workflow console.

  2. Create a cluster token.

    kubectl create token default -n default

Step 2: Submit workflows with Hera

  1. Install Hera.

    pip install hera
  2. Write and submit the workflows.

    Simple DAG diamond

    In Argo Workflows, a DAG (Directed Acyclic Graph) is often used to define complex task dependencies. The diamond is a common pattern where tasks diverge and then converge. This structure is effective for parallel processing where the results are aggregated into a common downstream task. The following example shows how to use Hera to define a workflow with a diamond structure. Task A runs first, followed by two parallel tasks, B and C. A final task, D, runs after both B and C finish, completing the workflow.

    1. Create a file named simpleDAG.py with the following content.

      # Import the required packages.
      from hera.workflows import DAG, Workflow, script
      from hera.shared import global_config
      import urllib3
      urllib3.disable_warnings()
      # Configure the host address and token.
      global_config.host = "https://{{argo_server_IP}}:2746"
      global_config.token = "abcdefgxxxxxx"  # Replace with the token you obtained.
      global_config.verify_ssl = ""
      # The @script decorator is a key Hera feature that lets you orchestrate Python functions almost natively.
      # It allows you to call the decorated function within a Hera context manager, such as a Workflow or Steps context.
      # The function still runs normally outside of any Hera context, which means you can write unit tests for it.
      # This example prints the input message.
      @script(image="mirrors-ssl.aliyuncs.com/python:3.10")
      def echo(message: str):
          print(message)
      # A Workflow is the primary resource in Argo and a key class in Hera. It stores templates, sets an entrypoint, and runs them.
      with Workflow(
          generate_name="dag-diamond-",
          entrypoint="diamond",
          namespace="default",
      ) as w:
          with DAG(name="diamond"):
              A = echo(name="A", arguments={"message": "A"})  # Build a template.
              B = echo(name="B", arguments={"message": "B"})
              C = echo(name="C", arguments={"message": "C"})
              D = echo(name="D", arguments={"message": "D"})
              A >> [B, C] >> D      # Define the dependencies: Tasks B and C depend on A, and task D depends on B and C.
      # Create the workflow.
      w.create()
    2. Submit the workflow.

      python simpleDAG.py
    3. After the workflow runs, you can view the task DAG and its results in the workflow console.

      The example workflow dag-diamond-g9v45 shows a diamond-shaped DAG topology: the top-level node A completes, then nodes B and C run in parallel, and finally, they converge at node D. All nodes are marked as successfully executed.

    Map-reduce

    In Argo Workflows, you can implement MapReduce-style data processing using DAG templates to simulate the map and reduce phases. The following example demonstrates how to use Hera to build a simple MapReduce workflow. This workflow splits a task into multiple parallel map tasks and then aggregates their results in a final reduce task. Each step is a Python function, which allows for easy integration with the Python ecosystem.

    1. Configure artifacts.

    2. Create a file named map-reduce.py with the following content.

      Code

      from hera.workflows import DAG, Artifact, NoneArchiveStrategy, Parameter, OSSArtifact, Workflow, script
      from hera.shared import global_config
      import urllib3
      urllib3.disable_warnings()
      # Set the host address.
      global_config.host = "https://{{argo_server_IP}}:2746"
      global_config.token = "abcdefgxxxxxx"  # Replace with the token you obtained.
      global_config.verify_ssl = ""
      # When using the @script decorator, pass parameters like image, inputs, outputs, and resources to it.
      @script(
          image="mirrors-ssl.aliyuncs.com/python:alpine3.6",
          inputs=Parameter(name="num_parts"),
          outputs=OSSArtifact(name="parts", path="/mnt/out", archive=NoneArchiveStrategy(), key="{{workflow.name}}/parts"),
      )
      def split(num_parts: int) -> None:  # This function creates multiple output files based on the num_parts input parameter. It writes a 'foo' field and a part number to each file.
          import json
          import os
          import sys
          os.mkdir("/mnt/out")
          part_ids = list(map(lambda x: str(x), range(num_parts)))
          for i, part_id in enumerate(part_ids, start=1):
              with open("/mnt/out/" + part_id + ".json", "w") as f:
                  json.dump({"foo": i}, f)
          json.dump(part_ids, sys.stdout)
      # Define the image, inputs, and outputs in the @script decorator.
      @script(
          image="mirrors-ssl.aliyuncs.com/python:alpine3.6",
          inputs=[Parameter(name="part_id", value="0"), Artifact(name="part", path="/mnt/in/part.json"),],
          outputs=OSSArtifact(
              name="part",
              path="/mnt/out/part.json",
              archive=NoneArchiveStrategy(),
              key="{{workflow.name}}/results/{{inputs.parameters.part_id}}.json",
          ),
      )
      def map_() -> None:  # This function reads the 'foo' value from an input file, multiplies it by 2, and writes the result to a 'bar' field in a new output file.
          import json
          import os
          os.mkdir("/mnt/out")
          with open("/mnt/in/part.json") as f:
              part = json.load(f)
          with open("/mnt/out/part.json", "w") as f:
              json.dump({"bar": part["foo"] * 2}, f)
      # Define the image, inputs, outputs, and resources in the @script decorator.
      @script(
          image="mirrors-ssl.aliyuncs.com/python:alpine3.6",
          inputs=OSSArtifact(name="results", path="/mnt/in", key="{{workflow.name}}/results"),
          outputs=OSSArtifact(
              name="total", path="/mnt/out/total.json", archive=NoneArchiveStrategy(), key="{{workflow.name}}/total.json"
          ),
      )
      def reduce() -> None:   # This function calculates the sum of the 'bar' values from all the map tasks.
          import json
          import os
          os.mkdir("/mnt/out")
          total = 0
          for f in list(map(lambda x: open("/mnt/in/" + x), os.listdir("/mnt/in"))):
              result = json.load(f)
              total = total + result["bar"]
          with open("/mnt/out/total.json", "w") as f:
              json.dump({"total": total}, f)
      # Build the workflow. Define its name, entrypoint, namespace, and global parameters.
      with Workflow(generate_name="map-reduce-", entrypoint="main", namespace="default", arguments=Parameter(name="num_parts", value="4")) as w:
          with DAG(name="main"):
              s = split(arguments=Parameter(name="num_parts", value="{{workflow.parameters.num_parts}}")) # Build the templates.
              m = map_(
                  with_param=s.result,
                  arguments=[Parameter(name="part_id", value="{{item}}"), OSSArtifact(name="part", key="{{workflow.name}}/parts/{{item}}.json"),],
              )   # Pass parameters and build the templates.
              s >> m >> reduce()   # Define the task dependencies.
      # Create the workflow.
      w.create()
      
    3. Submit the workflow.

      python map-reduce.py
    4. After the workflow runs, you can view the task DAG and its results in the workflow console. In the WORKFLOW DETAILS page, the DAG view shows that the split node, the four parallel map nodes, and the reduce node have all executed successfully (indicated by a green check mark).

References