All Products
Search
Document Center

Container Service for Kubernetes:Build large-scale Argo Workflows with the Python SDK

Last Updated:Dec 25, 2024

Argo Workflows is widely used in scenarios such as scheduled tasks, machine learning, and extract, transform, and load (ETL). Defining workflows in YAML can be challenging if you are not familiar with Kubernetes. The Hera Python SDK offers a user-friendly alternative, enabling the construction of workflows using Python code. It supports complex task scenarios, is easy to test, and integrates seamlessly with the Python ecosystem.

Introduction

Argo Workflows uses YAML to define workflows, providing clear and concise configurations. However, for users unfamiliar with YAML, the strict indentation and hierarchical structure can complicate workflow configuration.

Hera is a Python SDK framework designed for simplifying the development and submission process of workflows. When handling complex workflows, you can effectively prevent syntax errors that might arise from YAML by using Hera. The Hera Python SDK offers several benefits:

  • Code simplicity: Straightforward and efficient code development.

  • Seamless integration with the Python ecosystem: Functions act as templates, integrating with Python frameworks and offering many libraries and tools.

  • Testability: You can use the Python testing framework directly, enhancing code quality and maintainability.

Prerequisites

  • The Argo console and components are installed, and access credentials and the Argo Server IP address are retrieved. For more information about the instructions, see Enable batch task orchestration.

  • Hera is installed. You can refer to the following command to install it:

    pip install hera-workflows

Scenario 1: Simple DAG diamond

In Argo Workflows, a directed acyclic graph (DAG) defines complex task dependencies. The diamond structure allows parallel task execution with converge results in the subsequent task. This structure is ideal for merging data streams or processing results. The following example shows how to define a workflow with a diamond structured using Hera, where task A and task B run in parallel, and their outputs are collectively passed as inputs to task C.

  1. Use the following sample code to create a file named simpleDAG.py:

    # Import required librires.
    from hera.workflows import DAG, Workflow, script
    from hera.shared import global_config
    import urllib3
    
    urllib3.disable_warnings()
    
    # Specify the endpoint and token.
    global_config.host = "https://${IP}:2746"
    global_config.token = "abcdefgxxxxxx"  # Enter the token you retrieved.
    global_config.verify_ssl = ""
    
    # The decorator function script is the key feature of Hera that enables Python-like function orchestration by using Hera.
    # You can call the function within a Hera context manager, such as a Workflow or Steps context.
    # The function still runs as normal outside of any Hera context, which means that you can write unit tests for the given function.
    # This example prints the input message.
    @script()
    def echo(message: str):
        print(message)
    
    # Orchestrate the workflow. Workflow is the primary resource in Argo and a key class in Hera. It is responsible for storing templates, specifying entry points, and running templates.
    with Workflow(
        generate_name="dag-diamond-",
        entrypoint="diamond",
        namespace="argo",
    ) as w:
        with DAG(name="diamond"):
            A = echo(name="A", arguments={"message": "A"})  # Create a template.
            B = echo(name="B", arguments={"message": "B"})
            C = echo(name="C", arguments={"message": "C"})
            D = echo(name="D", arguments={"message": "D"})
            A >> [B, C] >> D      # Define dependencies: Tasks B and C depend on Task A, and Task D depends on Tasks B and C.
    # Create the workflow.
    w.create()
  2. Run the following command to submit the workflow:

    python simpleDAG.py
  3. After the workflow starts running, view the DAG task process and result in the Workflow Console (Argo).

    image

Scenario 2: MapReduce

To process MapReduce-style data in Argo Workflows, DAG templates are used to manage and coordinate tasks, simulating the Map and Reduce phases. The following example shows how to use Hera to create a simple MapReduce workflow for counting words in text files, with each step defined as a Python function for integration with the Python ecosystem.

  1. Configure artifacts. For instructions on related operations, see Configure artifacts.

  2. Use the following sample code to create a file named map-reduce.py:

    View Code

    from hera.workflows import DAG, Artifact, NoneArchiveStrategy, Parameter, OSSArtifact, Workflow, script
    from hera.shared import global_config
    import urllib3
    
    urllib3.disable_warnings()
    # Specify the endpoint.
    global_config.host = "https://${IP}:2746"
    global_config.token = "abcdefgxxxxxx"  # Enter the token you retrieved.
    global_config.verify_ssl = ""
    
    # When you use the decorator function script, you must pass the script parameters to the script decorator. These parameters include image, inputs, outputs, and resources.
    @script(
        image="mirrors-ssl.aliyuncs.com/python:alpine",
        inputs=Parameter(name="num_parts"),
        outputs=OSSArtifact(name="parts", path="/mnt/out", archive=NoneArchiveStrategy(), key="{{workflow.name}}/parts"),
    )
    def split(num_parts: int) -> None:  # Create multiple files based on the num_parts input parameter. Each file contains the foo key and a part number as the value.
        import json
        import os
        import sys
    
        os.mkdir("/mnt/out")
    
        part_ids = list(map(lambda x: str(x), range(num_parts)))
        for i, part_id in enumerate(part_ids, start=1):
            with open("/mnt/out/" + part_id + ".json", "w") as f:
                json.dump({"foo": i}, f)
        json.dump(part_ids, sys.stdout)
    
    # Define the image, inputs, and outputs parameters in the script decorator.
    @script(
        image="mirrors-ssl.aliyuncs.com/python:alpine",
        inputs=[Parameter(name="part_id", value="0"), Artifact(name="part", path="/mnt/in/part.json"),],
        outputs=OSSArtifact(
            name="part",
            path="/mnt/out/part.json",
            archive=NoneArchiveStrategy(),
            key="{{workflow.name}}/results/{{inputs.parameters.part_id}}.json",
        ),
    )
    def map_() -> None:  # Generate new files based on the number of files that contain foo. Each new file contains the bar key and a value that equals the result of multiplying the corresponding part number by 2.
        import json
        import os
    
        os.mkdir("/mnt/out")
        with open("/mnt/in/part.json") as f:
            part = json.load(f)
        with open("/mnt/out/part.json", "w") as f:
            json.dump({"bar": part["foo"] * 2}, f)
    
    # Define the image, inputs, outputs, and resources parameters in the script decorator.
    @script(
        image="mirrors-ssl.aliyuncs.com/python:alpine",
        inputs=OSSArtifact(name="results", path="/mnt/in", key="{{workflow.name}}/results"),
        outputs=OSSArtifact(
            name="total", path="/mnt/out/total.json", archive=NoneArchiveStrategy(), key="{{workflow.name}}/total.json"
        ),
    )
    def reduce() -> None:   # Aggregate the value of the bar key for each part number.
        import json
        import os
    
        os.mkdir("/mnt/out")
    
        total = 0
        for f in list(map(lambda x: open("/mnt/in/" + x), os.listdir("/mnt/in"))):
            result = json.load(f)
            total = total + result["bar"]
        with open("/mnt/out/total.json", "w") as f:
            json.dump({"total": total}, f)
    
    # Orchestrate a workflow. Specify the workflow name, entry point, namespace, and global parameters.
    with Workflow(generate_name="map-reduce-", entrypoint="main", namespace="argo", arguments=Parameter(name="num_parts", value="4")) as w:
        with DAG(name="main"):
            s = split(arguments=Parameter(name="num_parts", value="{{workflow.parameters.num_parts}}")) # Orchestrate templates.
            m = map_(
                with_param=s.result,
                arguments=[Parameter(name="part_id", value="{{item}}"), OSSArtifact(name="part", key="{{workflow.name}}/parts/{{item}}.json"),],
            )   # Specify input parameters and orchestrate templates.
            s >> m >> reduce()   # Define the dependencies of tasks.
    # Create the workflow.
    w.create()
    
  3. Run the following command to submit the workflow:

    python map-reduce.py
  4. After the workflow starts running, view the DAG task process and result in the Workflow Console (Argo).image

References

  • Hera documentation

  • Sample YAML deployment configurations

    • For more information about how to use YAML files to deploy a simple-diamond workflow, see dag-diamond.yaml.

    • For more information about how to use YAML files to deploy a MapReduce workflow, see map-reduce.yaml.

Contact us

If you have suggestions or questions about this product, join the DingTalk group 35688562 to contact us.