全部產品
Search
文件中心

Container Service for Kubernetes:使用PythonSDK構建大規模Argo Workflows

更新時間:Mar 24, 2026

Argo Workflows廣泛應用於定時任務、機器學習和ETL資料處理等情境,但當對Kubernetes不太熟練時,YAML定義工作流程可能會增加學習難度。Hera Python SDK提供了一種簡潔易用的替代方案,允許以Python代碼構建工作流程,支援複雜任務情境,易於測試,並與Python生態無縫整合。

功能介紹

Argo Workflows主要依賴YAML來定義工作流程,以實現配置的清晰與簡潔。但當資料科學家不熟悉YAML時,在複雜的工作流程設計中,YAML的嚴格縮排要求及層次化的結構可能會增加配置難度。

Hera是一個專為構建和提交Argo工作流程設計的Python SDK架構,旨在簡化工作流程的構建和提交。在處理複雜工作流程時,使用Hera可以有效避免YAML可能產生的語法錯誤。使用Hera PythonSDK還具有以下優勢。

  • 代碼簡潔性:Hera提供了易於理解和編寫的代碼,可提升開發效率。

  • Python生態整合簡單:每個Function就是一個Template,與Python生態中的各種架構無縫整合,提供了豐富的Python庫和工具。

  • 可測試性:可直接利用Python的測試架構,有助於提高代碼的品質和可維護性。

前提條件

  • 已安裝Argo組件和控制台,並擷取訪問憑證和Argo Server 訪問IP。具體操作,請參見啟用批量任務編排能力

  • 已安裝Hera。

    pip install hera-workflows

情境一:Simple DAG Diamond

在Argo Workflows中,DAG(有向非循環圖)常用於定義複雜的任務依賴關係,其中Diamond結構是一種常見的工作流程模式,可以實現多個任務並存執行後,並將結果匯聚到一個共同的後續任務。這種結構適用於需要合并不同資料流或處理結果的情境。以下展示如何使用Hera定義一個具有Diamond結構的工作流程,其中兩個任務taskA和taskB並行運行,它們的輸出共同作為輸入傳遞給taskC。

  1. 使用以下內容,建立simpleDAG.py。

    # 匯入相關包。
    from hera.workflows import DAG, Workflow, script
    from hera.shared import global_config
    import urllib3
    
    urllib3.disable_warnings()
    
    # 配置訪問地址和Token。
    global_config.host = "https://${IP}:2746"
    global_config.token = "abcdefgxxxxxx"  # 填入之前擷取的Token。
    global_config.verify_ssl = ""
    
    # 裝飾器函數script是Hera實現近乎原生的Python函數編排的關鍵功能。
    # 它允許您在Hera上下文管理器(例如Workflow或Steps上下文)下調用該函數。
    # 該函數在任何Hera上下文之外仍將正常運行,這意味著您可以在給定函數上編寫單元測試。
    # 該樣本是列印輸入的資訊。
    @script()
    def echo(message: str):
        print(message)
    
    # 構建Workflow,Workflow是Argo中的主要資源,也是Hera的關鍵類,負責儲存模板、設定進入點和運行模板。
    with Workflow(
        generate_name="dag-diamond-",
        entrypoint="diamond",
        namespace="argo",
    ) as w:
        with DAG(name="diamond"):
            A = echo(name="A", arguments={"message": "A"})  # 構建Template。
            B = echo(name="B", arguments={"message": "B"})
            C = echo(name="C", arguments={"message": "C"})
            D = echo(name="D", arguments={"message": "D"})
            A >> [B, C] >> D      # 構建依賴關係,B、C任務依賴A,D依賴B和C。
    # 建立Workflow。
    w.create()
  2. 執行以下命令, 提交工作流程。

    python simpleDAG.py
  3. 工作流程運行後,在工作流程控制台(Argo)查看任務DAG流程與運行結果。

    image

情境二:Map-Reduce

在Argo Workflows中實現MapReduce風格的資料處理時,需要有效利用其DAG模板,以組織和協調多個任務,從而類比Map和Reduce階段。以下展示如何使用Hera構建一個簡單的MapReduce工作流程,用於處理文字檔的單詞計數任務。每一步都是一個Python函數,便於和Python生態進行整合。

  1. 配置Artifacts,相關操作,請參見配置Artifacts

  2. 使用以下內容,建立map-reduce.py。

    展開查看代碼內容

    from hera.workflows import DAG, Artifact, NoneArchiveStrategy, Parameter, OSSArtifact, Workflow, script
    from hera.shared import global_config
    import urllib3
    
    urllib3.disable_warnings()
    # 設定訪問地址。
    global_config.host = "https://${IP}:2746"
    global_config.token = "abcdefgxxxxxx"  # 填入之前擷取的Token。
    global_config.verify_ssl = ""
    
    # 使用script裝飾函數時,將script參數傳遞給script裝飾器。這包括image、inputs、outputs、resources等。
    @script(
        image="mirrors-ssl.aliyuncs.com/python:alpine",
        inputs=Parameter(name="num_parts"),
        outputs=OSSArtifact(name="parts", path="/mnt/out", archive=NoneArchiveStrategy(), key="{{workflow.name}}/parts"),
    )
    def split(num_parts: int) -> None:  # 根據輸入參數num_parts建立多個檔案,檔案中寫入foo字元和parts編號
        import json
        import os
        import sys
    
        os.mkdir("/mnt/out")
    
        part_ids = list(map(lambda x: str(x), range(num_parts)))
        for i, part_id in enumerate(part_ids, start=1):
            with open("/mnt/out/" + part_id + ".json", "w") as f:
                json.dump({"foo": i}, f)
        json.dump(part_ids, sys.stdout)
    
    # script中定義image、inputs、outputs
    @script(
        image="mirrors-ssl.aliyuncs.com/python:alpine",
        inputs=[Parameter(name="part_id", value="0"), Artifact(name="part", path="/mnt/in/part.json"),],
        outputs=OSSArtifact(
            name="part",
            path="/mnt/out/part.json",
            archive=NoneArchiveStrategy(),
            key="{{workflow.name}}/results/{{inputs.parameters.part_id}}.json",
        ),
    )
    def map_() -> None:  # 根據檔案中foo字元的個數,產生新檔案,將foo內容parts編號乘以2,寫入bar內容
        import json
        import os
    
        os.mkdir("/mnt/out")
        with open("/mnt/in/part.json") as f:
            part = json.load(f)
        with open("/mnt/out/part.json", "w") as f:
            json.dump({"bar": part["foo"] * 2}, f)
    
    # script中定義image、inputs、outputs、resources
    @script(
        image="mirrors-ssl.aliyuncs.com/python:alpine",
        inputs=OSSArtifact(name="results", path="/mnt/in", key="{{workflow.name}}/results"),
        outputs=OSSArtifact(
            name="total", path="/mnt/out/total.json", archive=NoneArchiveStrategy(), key="{{workflow.name}}/total.json"
        ),
    )
    def reduce() -> None:   # 計算每個parts對應bar值的總和。
        import json
        import os
    
        os.mkdir("/mnt/out")
    
        total = 0
        for f in list(map(lambda x: open("/mnt/in/" + x), os.listdir("/mnt/in"))):
            result = json.load(f)
            total = total + result["bar"]
        with open("/mnt/out/total.json", "w") as f:
            json.dump({"total": total}, f)
    
    # 構建workflow,輸入name、設定進入點、namespace、全域參數等。
    with Workflow(generate_name="map-reduce-", entrypoint="main", namespace="argo", arguments=Parameter(name="num_parts", value="4")) as w:
        with DAG(name="main"):
            s = split(arguments=Parameter(name="num_parts", value="{{workflow.parameters.num_parts}}")) # 構建Templetes。
            m = map_(
                with_param=s.result,
                arguments=[Parameter(name="part_id", value="{{item}}"), OSSArtifact(name="part", key="{{workflow.name}}/parts/{{item}}.json"),],
            )   # 輸入參數並構建templetes。
            s >> m >> reduce()   # 構建任務依賴關係。
    # 建立工作流程。
    w.create()
    
  3. 執行以下命令,提交工作流程。

    python map-reduce.py
  4. 工作流程運行後,您可以在工作流程控制台(Argo)查看任務DAG流程與運行結果。image

相關文檔

  • Hera相關文檔。

    • 如果您需要詳細瞭解Hera相關資訊,請參見Hera概述

    • 若您想學習如何設定和使用Hera來進行LLM的訓練過程,請參見Train LLM with Hera

  • YAML部署樣本。

    • 如果您想瞭解以YAML的方式部署simple-diamond,請參見dag-diamond.yaml

    • 如果您想瞭解以YAML的方式部署map-reduce,請參見map-reduce.yaml

聯絡我們

若您有任何產品建議或疑問,請加入DingTalk群(DingTalk群號:35688562)聯絡我們。