本文介紹如何使用Argo Workflows將OSS上的檔案進行逐級合并以展示如何進行批量資料處理。這種處理方法常見於高精度地圖處理、動畫渲染等情境,使用Argo Workflows進行任務編排可以有效提升資料處理的並發度和處理速度。
前提條件
已參見啟用批量任務編排能力完成組件的安裝和阿里雲Argo CLI的安裝。
已參見使用儲存卷建立OSS儲存卷。
步驟一:準備資料
使用以下樣本內容,建立prepare-data.yaml。
apiVersion: argoproj.io/v1alpha1 kind: Workflow metadata: generateName: aggregation-prepare-data- namespace: argo spec: entrypoint: main volumes: - name: workdir persistentVolumeClaim: claimName: pvc-oss templates: - name: main dag: tasks: - name: prepare-data template: create-file - name: create-file script: image: mirrors-ssl.aliyuncs.com/python:alpine command: - python source: | import os import sys import random os.makedirs("/mnt/vol/aggregation-demo/l1/", exist_ok=True) for i in range(32): with open('/mnt/vol/aggregation-demo/l1/' + str(i) + '.txt', 'w') as conbine_file: combined_content = random.choice('ABCDEFGHIJKLMNOPQRSTUVWXYZ') conbine_file.write(combined_content) volumeMounts: - name: workdir mountPath: /mnt/vol執行以下命令,提交工作流程。
argo submit prepare-data.yaml任務運行完成後會在OSS上建立出
aggregation-demo/l1目錄,並產生32個檔案,每個檔案內有單個隨機字元。
步驟二:處理批量資料
使用以下樣本內容,通過YAML方式編輯資料處理作業,建立process-data.yaml。
apiVersion: argoproj.io/v1alpha1 kind: Workflow metadata: generateName: process-data- # 資料處理工作流程。 namespace: argo spec: entrypoint: main volumes: # Object Storage Service掛載。 - name: workdir persistentVolumeClaim: claimName: pvc-oss arguments: parameters: - name: numbers value: "16" templates: - name: main steps: - - name: process-data-l1 # 第一級處理,啟動16個Pods,Merge 32 個files。 template: process-data arguments: parameters: - name: file_number value: "{{item}}" - name: level value: "1" withSequence: count: "{{workflow.parameters.numbers}}" - - name: process-data-l2 # 第二級處理,啟動 8個Pods,Merge 16 個files, 上一步處理完後啟動。 template: process-data arguments: parameters: - name: file_number value: "{{item}}" - name: level value: "2" withSequence: count: "{{=asInt(workflow.parameters.numbers)/2}}" - - name: merge-data # 最後一級處理,啟動一個Pod,Merge 8 files, 上一步處理完後啟動。 template: merge-data arguments: parameters: - name: number value: "{{=asInt(workflow.parameters.numbers)/2}}" - name: process-data # process-data 任務定義。 inputs: parameters: - name: file_number - name: level container: image: serverlessargo-registry.cn-hangzhou.cr.aliyuncs.com/argo-workflow-examples/python:3.11-amd # 注意開通vpc訪問公網能力以便於拉取鏡像 imagePullPolicy: Always command: [python3] # command args: ["process.py", "{{inputs.parameters.file_number}}", "{{inputs.parameters.level}}"]# 接收輸入的參數,pod處理哪個file volumeMounts: - name: workdir mountPath: /mnt/vol - name: merge-data # merge-data 任務定義。 inputs: parameters: - name: number container: image: serverlessargo-registry.cn-hangzhou.cr.aliyuncs.com/argo-workflow-examples/python:3.11-amd imagePullPolicy: Always command: [python3] args: ["merge.py", "{{inputs.parameters.number}}"] # 接收輸入的參數,處理多少個檔案。 volumeMounts: - name: workdir mountPath: /mnt/vol執行以下命令,提交工作流程。
argo submit process-data.yaml -n argo執行以下命令查看結果。
argo get @latest -n argo預期輸出:
argo get @latest -n argo Name: process-data-8sn2q Namespace: argo ServiceAccount: unset (will run with the default ServiceAccount) Status: Succeeded Conditions: PodRunning False Completed True Created: Thu Dec 12 13:15:39 +0800 (4 minutes ago) Started: Thu Dec 12 13:15:39 +0800 (4 minutes ago) Finished: Thu Dec 12 13:16:40 +0800 (3 minutes ago) Duration: 1 minute 1 seconds Progress: 25/25 ResourcesDuration: 4s*(1 cpu),2m51s*(100Mi memory) Parameters: numbers: 16 STEP TEMPLATE PODNAME DURATION MESSAGE ✔ process-data-8sn2q main ├─┬─✔ process-data-l1(0:0) process-data process-data-8sn2q-process-data-3064646785 17s │ ├─✔ process-data-l1(1:1) process-data process-data-8sn2q-process-data-140728989 20s │ ├─✔ process-data-l1(2:2) process-data process-data-8sn2q-process-data-499182361 18s │ ├─✔ process-data-l1(3:3) process-data process-data-8sn2q-process-data-3152865965 20s │ ├─✔ process-data-l1(4:4) process-data process-data-8sn2q-process-data-1363784105 16s │ ├─✔ process-data-l1(5:5) process-data process-data-8sn2q-process-data-3270437485 20s │ ├─✔ process-data-l1(6:6) process-data process-data-8sn2q-process-data-1788045361 16s │ ├─✔ process-data-l1(7:7) process-data process-data-8sn2q-process-data-913839549 20s │ ├─✔ process-data-l1(8:8) process-data process-data-8sn2q-process-data-1562179905 16s │ ├─✔ process-data-l1(9:9) process-data process-data-8sn2q-process-data-573517021 20s │ ├─✔ process-data-l1(10:10) process-data process-data-8sn2q-process-data-3769586203 16s │ ├─✔ process-data-l1(11:11) process-data process-data-8sn2q-process-data-3700909073 20s │ ├─✔ process-data-l1(12:12) process-data process-data-8sn2q-process-data-2818003295 19s │ ├─✔ process-data-l1(13:13) process-data process-data-8sn2q-process-data-278901825 20s │ ├─✔ process-data-l1(14:14) process-data process-data-8sn2q-process-data-3986961347 16s │ └─✔ process-data-l1(15:15) process-data process-data-8sn2q-process-data-2905592609 19s ├─┬─✔ process-data-l2(0:0) process-data process-data-8sn2q-process-data-2056515729 9s │ ├─✔ process-data-l2(1:1) process-data process-data-8sn2q-process-data-2141620461 4s │ ├─✔ process-data-l2(2:2) process-data process-data-8sn2q-process-data-352538601 9s │ ├─✔ process-data-l2(3:3) process-data process-data-8sn2q-process-data-2144734909 6s │ ├─✔ process-data-l2(4:4) process-data process-data-8sn2q-process-data-2290907961 8s │ ├─✔ process-data-l2(5:5) process-data process-data-8sn2q-process-data-4197561341 5s │ ├─✔ process-data-l2(6:6) process-data process-data-8sn2q-process-data-1620613249 9s │ └─✔ process-data-l2(7:7) process-data process-data-8sn2q-process-data-3126908173 10s └───✔ merge-data merge-data process-data-8sn2q-merge-data-1171626309 7s在控制台查看結果。

可以看到任務均已經執行成功。查看OSS,可以發現最終結果result.txt已經產生。

相關文檔
若您希望使用Python SDK構建並提交工作流程,請參見使用Python SDK構建大規模Argo Workflows。
聯絡我們
若您有任何產品建議或疑問,請加入DingTalk群(DingTalk群號:35688562)聯絡我們。