All Products
Search
Document Center

Container Compute Service:Use Argo Workflows SDK for Python to create large-scale workflows

Last Updated:Dec 27, 2024

Argo Workflows is widely used in scheduled tasks, machine learning, and ETL. If you are unfamiliar with Kubernetes, creating workloads based on YAML can be difficult. Hera is an Argo Workflows SDK for Python. Hera is an alternative to YAML and provides an easy method to orchestrate and test complex workflows in Python. In addition, Hera is seamlessly integrated with the Python ecosystem.

Feature overview

Argo Workflows uses YAML files to configure workflows for clarity and simplicity. The strict indent requirements and layered structure will make complex workflow configuration even harder if you are unfamiliar with YAML.

Hera is an Argo Workflows SDK for Python intended for workflow creation and submission based on Argo Workflows. Hera aims to simplify the procedures of creating and submitting workflows. Hera helps eliminate YAML syntax errors in complex workflow orchestration. Hera provides the following advantages:

  • Simplicity: Hera provides intuitive and easy-to-use code to greatly improve development efficiency.

  • Simple Python ecosystem integration: Each function is a template, which is seamlessly integrated with frameworks in the Python ecosystem. The Python ecosystem also provides various Python libraries and tools.

  • Observability: Hera supports Python testing frameworks to help improve code quality and maintainability.

Prerequisites

  • The Argo components and console are installed and the credentials and the IP address of Argo Server are obtained. For more information, see Enable batch task orchestration.

  • Hera is installed.

    pip install hera-workflows

Scenario 1: Simple DAG Diamond

Argo Workflows uses directed acyclic graphs (DAGs) to define complex dependencies for tasks in a workflow. The Diamond structure is commonly adopted by workflows. In a Diamond workflow, the execution results of multiple parallel tasks are aggregated into the input of a subsequent task. The Diamond structure can efficiently aggregate data flows and execution results. The following sample code shows how to use Hera to orchestrate a Diamond workflow where Task A and Task B run in parallel and the execution results of Task A and Task B are aggregated into the input of Task C.

  1. Create a file named simpleDAG.py and copy the following content to the file:

    # Import the required packages. 
    from hera.workflows import DAG, Workflow, script
    from hera.shared import global_config
    import urllib3
    
    urllib3.disable_warnings()
    
    # Specify the endpoint and token of the workflow cluster. 
    global_config.host = "https://${IP}:2746"
    global_config.token = "abcdefgxxxxxx"  # Enter the token you obtained. 
    global_config.verify_ssl = ""
    
    # The script decorator is the key to enabling Python-like function orchestration by using Hera. 
    # You can call the function below a Hera context manager such as a Workflow or Steps context. 
    # The function still runs as normal outside Hera contexts, which means that you can write unit tests on the given function. 
    # The following code provides a sample input. 
    @script()
    def echo(message: str):
        print(message)
    
    # Orchestrate a workflow. The Workflow is the main resource in Argo and a key class of Hera. The Workflow is responsible for storing templates, setting entry points, and running templates. 
    with Workflow(
        generate_name="dag-diamond-",
        entrypoint="diamond",
        namespace="argo",
    ) as w:
        with DAG(name="diamond"):
            A = echo(name="A", arguments={"message": "A"})  # Create a template. 
            B = echo(name="B", arguments={"message": "B"})
            C = echo(name="C", arguments={"message": "C"})
            D = echo(name="D", arguments={"message": "D"})
            A >> [B, C] >> D      # Define dependencies. In this example, Task A is the dependency of Task B and Task C. Task B and Task C are the dependencies of Task D. 
    # Create the workflow. 
    w.create()
  2. Run the following command to submit the workflow:

    python simpleDAG.py
  3. After the workflow starts running, you can go to the Workflow Console (Argo) to view the DAG process and the result.

    image

Scenario 2: MapReduce

In Argo Workflows, the key to processing data in the MapReduce style is to use DAG templates to organize and coordinate multiple tasks in order to simulate the Map and Reduce phases. The following sample code shows how to use Hera to orchestrate a sample MapReduce workflow that is used to count words in text files. Each step is defined in a Python function to integrate with the Python ecosystem.

  1. To configure artifacts, see Configure artifacts.

  2. Create a file named map-reduce.py and copy the following content to the file:

    View code

    from hera.workflows import DAG, Artifact, NoneArchiveStrategy, Parameter, OSSArtifact, Workflow, script
    from hera.shared import global_config
    import urllib3
    
    urllib3.disable_warnings()
    # Specify the endpoint of the workflow cluster. 
    global_config.host = "https://${IP}:2746"
    global_config.token = "abcdefgxxxxxx"  # Enter the token you obtained. 
    global_config.verify_ssl = ""
    
    # When you use the script decorator, you need to pass the script parameters to the script decorator. The parameters include image, inputs, outputs, and resources. 
    @script(
        image="mirrors-ssl.aliyuncs.com/python:alpine",
        inputs=Parameter(name="num_parts"),
        outputs=OSSArtifact(name="parts", path="/mnt/out", archive=NoneArchiveStrategy(), key="{{workflow.name}}/parts"),
    )
    def split(num_parts: int) -> None:  # Create multiple files based on the num_parts input parameter. Each file contains the foo key and a part number as the value.
        import json
        import os
        import sys
    
        os.mkdir("/mnt/out")
    
        part_ids = list(map(lambda x: str(x), range(num_parts)))
        for i, part_id in enumerate(part_ids, start=1):
            with open("/mnt/out/" + part_id + ".json", "w") as f:
                json.dump({"foo": i}, f)
        json.dump(part_ids, sys.stdout)
    
    # Define the image, inputs, and outputs parameters in the script decorator.
    @script(
        image="mirrors-ssl.aliyuncs.com/python:alpine",
        inputs=[Parameter(name="part_id", value="0"), Artifact(name="part", path="/mnt/in/part.json"),],
        outputs=OSSArtifact(
            name="part",
            path="/mnt/out/part.json",
            archive=NoneArchiveStrategy(),
            key="{{workflow.name}}/results/{{inputs.parameters.part_id}}.json",
        ),
    )
    def map_() -> None:  # Generate new files based on the number of files that contain foo. Each new file contains the bar key and a value that equals the result of multiplying the corresponding part number by 2.
        import json
        import os
    
        os.mkdir("/mnt/out")
        with open("/mnt/in/part.json") as f:
            part = json.load(f)
        with open("/mnt/out/part.json", "w") as f:
            json.dump({"bar": part["foo"] * 2}, f)
    
    # Define the image, inputs, and outputs parameters in the script decorator.
    @script(
        image="mirrors-ssl.aliyuncs.com/python:alpine",
        inputs=OSSArtifact(name="results", path="/mnt/in", key="{{workflow.name}}/results"),
        outputs=OSSArtifact(
            name="total", path="/mnt/out/total.json", archive=NoneArchiveStrategy(), key="{{workflow.name}}/total.json"
        ),
    )
    def reduce() -> None:   # Aggregate the value of the bar key for each part number. 
        import json
        import os
    
        os.mkdir("/mnt/out")
    
        total = 0
        for f in list(map(lambda x: open("/mnt/in/" + x), os.listdir("/mnt/in"))):
            result = json.load(f)
            total = total + result["bar"]
        with open("/mnt/out/total.json", "w") as f:
            json.dump({"total": total}, f)
    
    # Orchestrate a workflow. Specify the workflow name, entry point, namespace, and global parameters. 
    with Workflow(generate_name="map-reduce-", entrypoint="main", namespace="argo", arguments=Parameter(name="num_parts", value="4")) as w:
        with DAG(name="main"):
            s = split(arguments=Parameter(name="num_parts", value="{{workflow.parameters.num_parts}}")) # Orchestrate templates. 
            m = map_(
                with_param=s.result,
                arguments=[Parameter(name="part_id", value="{{item}}"), OSSArtifact(name="part", key="{{workflow.name}}/parts/{{item}}.json"),],
            )   # Specify input parameters and orchestrate templates. 
            s >> m >> reduce()   # Define the dependencies of tasks. 
    # Create the workflow. 
    w.create()
    
  3. Run the following command to submit the workflow:

    python map-reduce.py
  4. After the workflow starts running, you can go to the Workflow Console (Argo) to view the DAG process and the result.image

References

  • The Hera documentation:

  • Sample YAML deployment configurations:

    • For more information about how to use YAML files to deploy simple-diamond, see dag-diamond.yaml.

    • For more information about how to use YAML files to deploy map-reduce, see map-reduce.yaml.

Contact us

If you have suggestions or questions about this product, join the DingTalk group 35688562 to contact us.