このトピックでは、Argo Workflowsを使用してOSS内のオブジェクトを段階的にマージする方法と、データをバッチ処理する方法について説明します。この方法は通常、高精度マップ処理やアニメーションレンダリングで使用されます。Argo Workflowsを使用してタスクをオーケストレーションすることで、データ処理の効率と並行性を大幅に向上させることができます。
前提条件
コンポーネントとAlibaba Cloud Argo CLIがインストールされていること。詳細については、バッチタスクオーケストレーションを有効にするを参照してください。
OSSボリュームが作成されていること。詳細については、ボリュームを使用するを参照してください。
ステップ1:データの準備
次の内容に基づいて、prepare-data.yamlという名前のファイルを作成します。
apiVersion: argoproj.io/v1alpha1 kind: Workflow metadata: generateName: aggregation-prepare-data- namespace: argo spec: entrypoint: main volumes: - name: workdir persistentVolumeClaim: claimName: pvc-oss templates: - name: main dag: tasks: - name: prepare-data template: create-file - name: create-file script: image: mirrors-ssl.aliyuncs.com/python:alpine command: - python source: | import os import sys import random os.makedirs("/mnt/vol/aggregation-demo/l1/", exist_ok=True) for i in range(32): with open('/mnt/vol/aggregation-demo/l1/' + str(i) + '.txt', 'w') as conbine_file: combined_content = random.choice('ABCDEFGHIJKLMNOPQRSTUVWXYZ') conbine_file.write(combined_content) volumeMounts: - name: workdir mountPath: /mnt/vol
次のコマンドを実行して、ワークフローを送信します。
argo submit prepare-data.yaml
ワークフローの実行後、
aggregation-demo/l1
という名前のディレクトリがOSSボリュームに作成され、32個のファイルが生成されます。各ファイルにはランダムな文字列が含まれています。
ステップ2:データのバッチ処理
process-data.yamlという名前のYAMLファイルを作成して、データ処理ワークフローをデプロイします。
apiVersion: argoproj.io/v1alpha1 kind: Workflow metadata: generateName: process-data- # データ処理ワークフロー namespace: argo spec: entrypoint: main volumes: # OSSボリュームをマウントします。 - name: workdir persistentVolumeClaim: claimName: pvc-oss arguments: parameters: - name: numbers value: "16" templates: - name: main steps: - - name: process-data-l1 # ステップ1. 16個のPodを起動して32個のファイルをマージします。 template: process-data arguments: parameters: - name: file_number value: "{{item}}" - name: level value: "1" withSequence: count: "{{workflow.parameters.numbers}}" - - name: process-data-l2 # ステップ2. 8個のPodを起動して16個のファイルをマージします。このステップは、ステップ1が完了した後に実行されます。 template: process-data arguments: parameters: - name: file_number value: "{{item}}" - name: level value: "2" withSequence: count: "{{=asInt(workflow.parameters.numbers)/2}}" - - name: merge-data # ステップ3. 1つのPodを起動して8個のファイルをマージします。このステップは、ステップ2が完了した後に実行されます。 template: merge-data arguments: parameters: - name: number value: "{{=asInt(workflow.parameters.numbers)/2}}" - name: process-data # データ処理 inputs: parameters: - name: file_number - name: level container: image: serverlessargo-registry.cn-hangzhou.cr.aliyuncs.com/argo-workflow-examples/python:3.11-amd # VPCインターネットアクセスを有効にしてイメージをプルします。 imagePullPolicy: Always command: [python3] # コマンド args: ["process.py", "{{inputs.parameters.file_number}}", "{{inputs.parameters.level}}"]# Podによって処理されるファイルを指定する入力パラメータを受け取ります。 volumeMounts: - name: workdir mountPath: /mnt/vol - name: merge-data # データマージ inputs: parameters: - name: number container: image: serverlessargo-registry.cn-hangzhou.cr.aliyuncs.com/argo-workflow-examples/python:3.11-amd imagePullPolicy: Always command: [python3] args: ["merge.py", "{{inputs.parameters.number}}"] # 処理するファイルの数を指定する入力パラメータを受け取ります。 volumeMounts: - name: workdir mountPath: /mnt/vol
次のコマンドを実行して、ワークフローを送信します。
argo submit process-data.yaml -n argo
次のコマンドを実行して、マウント結果を表示します。
argo get @latest -n argo
期待される結果:
argo get @latest -n argo Name: process-data-8sn2q Namespace: argo ServiceAccount: unset (will run with the default ServiceAccount) Status: Succeeded Conditions: PodRunning False Completed True Created: Thu Dec 12 13:15:39 +0800 (4 minutes ago) Started: Thu Dec 12 13:15:39 +0800 (4 minutes ago) Finished: Thu Dec 12 13:16:40 +0800 (3 minutes ago) Duration: 1 minute 1 seconds Progress: 25/25 ResourcesDuration: 4s*(1 cpu),2m51s*(100Mi memory) Parameters: numbers: 16 STEP TEMPLATE PODNAME DURATION MESSAGE ✔ process-data-8sn2q main ├─┬─✔ process-data-l1(0:0) process-data process-data-8sn2q-process-data-3064646785 17s │ ├─✔ process-data-l1(1:1) process-data process-data-8sn2q-process-data-140728989 20s │ ├─✔ process-data-l1(2:2) process-data process-data-8sn2q-process-data-499182361 18s │ ├─✔ process-data-l1(3:3) process-data process-data-8sn2q-process-data-3152865965 20s │ ├─✔ process-data-l1(4:4) process-data process-data-8sn2q-process-data-1363784105 16s │ ├─✔ process-data-l1(5:5) process-data process-data-8sn2q-process-data-3270437485 20s │ ├─✔ process-data-l1(6:6) process-data process-data-8sn2q-process-data-1788045361 16s │ ├─✔ process-data-l1(7:7) process-data process-data-8sn2q-process-data-913839549 20s │ ├─✔ process-data-l1(8:8) process-data process-data-8sn2q-process-data-1562179905 16s │ ├─✔ process-data-l1(9:9) process-data process-data-8sn2q-process-data-573517021 20s │ ├─✔ process-data-l1(10:10) process-data process-data-8sn2q-process-data-3769586203 16s │ ├─✔ process-data-l1(11:11) process-data process-data-8sn2q-process-data-3700909073 20s │ ├─✔ process-data-l1(12:12) process-data process-data-8sn2q-process-data-2818003295 19s │ ├─✔ process-data-l1(13:13) process-data process-data-8sn2q-process-data-278901825 20s │ ├─✔ process-data-l1(14:14) process-data process-data-8sn2q-process-data-3986961347 16s │ └─✔ process-data-l1(15:15) process-data process-data-8sn2q-process-data-2905592609 19s ├─┬─✔ process-data-l2(0:0) process-data process-data-8sn2q-process-data-2056515729 9s │ ├─✔ process-data-l2(1:1) process-data process-data-8sn2q-process-data-2141620461 4s │ ├─✔ process-data-l2(2:2) process-data process-data-8sn2q-process-data-352538601 9s │ ├─✔ process-data-l2(3:3) process-data process-data-8sn2q-process-data-2144734909 6s │ ├─✔ process-data-l2(4:4) process-data process-data-8sn2q-process-data-2290907961 8s │ ├─✔ process-data-l2(5:5) process-data process-data-8sn2q-process-data-4197561341 5s │ ├─✔ process-data-l2(6:6) process-data process-data-8sn2q-process-data-1620613249 9s │ └─✔ process-data-l2(7:7) process-data process-data-8sn2q-process-data-3126908173 10s └───✔ merge-data merge-data process-data-8sn2q-merge-data-1171626309 7s
コンソールで結果を表示します。
結果は、ワークフローが成功したことを示しています。OSSバケットを表示します。result.txtファイルが生成されます。
参考資料
SDK for Pythonを使用してワークフローを作成および送信する方法については、SDK for Pythonを使用して多数のArgoワークフローを作成するを参照してください。
お問い合わせ
この製品についてご提案やご質問がある場合は、DingTalkグループ35688562にご参加ください。