すべてのプロダクト
Search
ドキュメントセンター

Container Service for Kubernetes:OSS へのファイルのアップロードによるワークフローのトリガー

最終更新日:Nov 09, 2025

このトピックでは、Alibaba Cloud Object Storage Service (OSS) と Alibaba Cloud Simple Message Queue (旧称:MNS) を統合する方法について説明します。OSS にファイルをアップロードして、ファイルを処理して結果を生成するワークフローを自動的にトリガーできます。

前提条件

ステップ 1: イベントバスの作成

名前空間内のイベント駆動型ワークフローは、イベントバスを共有できます。すでにイベントバスを作成している場合は、このステップをスキップしてステップ 2: イベントソースの作成に進むことができます。

説明
  • Simple Message Queue (旧称:MNS) を使用してイベントバスを作成する場合、Pod は作成されません。

  • トリガー機能を使用するには、NATS を使用して EventBus を作成する必要があります。Simple Message Queue (旧称:MNS) メソッドは、オープンソースの Argo Events Sensor Trigger をサポートしていません。

方法 1: NATS の使用

  1. event-bus.yaml ファイルを作成します。次のコードは一例です。

    apiVersion: argoproj.io/v1alpha1
    kind: EventBus
    metadata:
      name: default
    spec:
      nats:
        native:
          replicas: 3
          auth: token
  2. 次のコマンドを実行して EventBus を作成します。

    kubectl apply -f event-bus.yaml
    説明

    コマンドが正常に実行されると、デフォルトの名前空間にイベントバス Pod が作成されます。後続のすべての操作は、同じ名前空間で実行する必要があります。

  3. 次のコマンドを実行して、イベントバス Pod が開始されたことを確認します。

    kubectl get pod

方法 2: Simple Message Queue (旧称:MNS) の使用

  1. Simple Message Queue (旧称:MNS) コンソールにログインします。

  2. [Topic リスト] ページで、argoeventbus という名前の Topic を作成します。[Topic 詳細] ページで、[エンドポイント] セクションからエンドポイントを取得します。

  3. RAM ユーザーを作成し、ユーザーに AliyunMNSFullAccess 権限を付与して、ユーザーの AccessKey ID と AccessKey Secret を取得します。

  4. 次のコマンドを実行して、AccessKey ID と AccessKey Secret を格納する Secret を作成します。

    kubectl create secret generic mns-secret\
      --from-literal=accesskey=*** \
      --from-literal=secretkey=***
  5. event-bus-mns.yaml ファイルを作成します。次のコードは一例です。

    • topic: これを、ステップ 2 で作成した Simple Message Queue (旧称:MNS) Topic の名前に置き換えます。

    • endpoint: これを、ステップ 2 で取得したエンドポイントに置き換えます。

    apiVersion: argoproj.io/v1alpha1
    kind: EventBus
    metadata:
      name: default
    spec:
      alimns:
        accessKey:
          key: accesskey
          name: mns-secret
        secretKey:
          key: secretkey
          name: mns-secret
        topic: argoeventbus  # Simple Message Queue (旧称:MNS) の Topic の名前。
        endpoint: http://165***368.mns.<region>.aliyuncs.com
  6. 次のコマンドを実行して event-bus-mns.yaml ファイルを適用します。

    kubectl apply -f event-bus-mns.yaml

ステップ 2: イベントソースの作成

  1. RAM ユーザーを作成し、ユーザーに AliyunMNSFullAccess 権限を付与して、ユーザーの AccessKey ID と AccessKey Secret を取得します。詳細については、「RAM ユーザーの作成」、「RAM ユーザーへの権限付与」、「AccessKey ペアの作成」、および「RAM ユーザーの AccessKey 情報の表示」をご参照ください。

  2. event-source.yaml ファイルを作成します。次のコードは一例です。

    • queue: これを Simple Message Queue (旧称:MNS) キューの名前に置き換えます。

    • endpoint: これを Simple Message Queue (旧称:MNS) のエンドポイントに置き換えます。

    apiVersion: argoproj.io/v1alpha1
    kind: EventSource
    metadata:
      name: ali-mns
    spec:
      mns:
        example:
          jsonBody: true
          accessKey:
            key: accesskey
            name: mns-secret
          secretKey:
            key: secretkey
            name: mns-secret
          queue: oss-event-queue # Simple Message Queue (旧称:MNS) キューの名前。
          waitTimeSeconds: 20
          endpoint: http://165***368.mns.<region>.aliyuncs.com # Simple Message Queue (旧称:MNS) のエンドポイント。
  3. 次のコマンドを実行してイベントソースを作成します。

    kubectl apply -f event-source.yaml
  4. 次のコマンドを実行して、イベントソース Pod が開始されたことを確認します。

    kubectl get pod

ステップ 3: イベントセンサーの作成

  1. event-sensor.yaml ファイルを作成し、ワークフロー定義をイベントセンサーに埋め込みます。次のコードは一例です。

    クリックしてサンプルコードを表示

    apiVersion: argoproj.io/v1alpha1
    kind: Sensor
    metadata:
      name: process-oss-file
    spec:
      template:
        serviceAccountName: default
      dependencies:
        - name: dep1
          eventSourceName: ali-mns
          eventName: example
      triggers:
        - template:
            name: process-oss-file-workflow
            k8s:
              operation: create
              source:
                resource:
                  apiVersion: argoproj.io/v1alpha1
                  kind: Workflow
                  metadata:
                    generateName: process-oss-file-
                    namespace: default
                  spec:
                    entrypoint: process-oss-file
                    volumes:
                    - name: workdir
                      persistentVolumeClaim:
                        claimName: pvc-oss
                    arguments:
                      parameters:
                      - name: message
                        # これは上書きされるべき値です
                        value: event message
                    templates:
                    - name: process-oss-file
                      steps:
                      - - name: parse-event-body
                          template: parse-event-body
                      - - name: process-file
                          template: process-file
                          arguments:
                            parameters:
                            - name: file-name
                              value: "{{steps.parse-event-body.outputs.parameters.file-name}}"
                    - name: parse-event-body
                      container:
                        image: acr-multiple-clusters-registry.cn-hangzhou.cr.aliyuncs.com/ack-multiple-clusters/jq-alpine
                        command: [sh, -c]
                        args:
                        - echo "Event body:";
                          echo {{workflow.parameters.message}} | base64 -d;
                          TriggerFileName=$(echo {{workflow.parameters.message}} | base64 -d | jq .events[0].oss.object.key | cut -c2- | rev | cut -c2- |rev);
                          echo "" && echo "TriggerFileName from event is $TriggerFileName";
                          Tmp=${TriggerFileName%%.complete} && DataFileName=${Tmp##*/};
                          echo "DataFileName after cutting .complete is $DataFileName, and pass file name to next step";
                          echo $DataFileName > /tmp/file-name.txt
                      outputs:
                        parameters:
                        - name: file-name
                          valueFrom:
                            path: /tmp/file-name.txt
                    - name: process-file
                      inputs:
                        parameters:
                          - name: file-name
                      container:
                        image: acr-multiple-clusters-registry.cn-hangzhou.cr.aliyuncs.com/ack-multiple-clusters/jq-alpine
                        imagePullPolicy: Always
                        command: [sh, -c]
                        args:
                        - echo "Show data-file:" && echo "";
                          ls -l /mnt/vol/{{inputs.parameters.file-name}};
                          echo "Content of data file:" && echo "";
                          cat /mnt/vol/{{inputs.parameters.file-name}} ;
                          echo "" && echo "Finished" ;
                        volumeMounts:
                        - name: workdir
                          mountPath: /mnt/vol
              parameters:
                - src:
                    dependencyName: dep1
                    dataKey: body
                  dest: spec.arguments.parameters.0.value
  2. 次のコマンドを実行してイベントセンサーを作成します。

    kubectl apply -f event-sensor.yaml
  3. 次のコマンドを実行して、イベントセンサー Pod が開始されたことを確認します。

    kubectl get pod
説明

Simple Message Queue (旧称:MNS) を使用して EventBus を作成すると、イベントセンサーの作成後に、対応する Simple Message Queue (旧称:MNS) キューが自動的に作成されます。キューの名前は、ackone-argowf-<namespace>-<sensor-name>-<sensor-uid> というフォーマットになります。

ステップ 4: OSS へのファイルのアップロードによるワークフローのトリガーの確認

  1. OSS コンソールにログインします。

  2. ワークフローをトリガーするには、次の 2 つのファイルを OSS バケットにアップロードします。これらのファイルは事前に準備しておく必要があります。

    • datafile: カスタムコンテンツを含むテキスト形式のデータファイル。

    • datafile.complete: トリガーファイル。これは空のファイルでかまいません。

  3. ワークフロークラスターで次のコマンドを実行して、ワークフローのステータスを表示します。

    argo list

    予想される出力は次のとおりです。

    NAME STATUS AGE DURATION PRIORITY
    process-oss-file-kmb4k Running 13s 13s 0
  4. 次のコマンドを実行して、ワークフローのログを取得します。

    argo logs process-oss-file-kmb4k
    重要
    • コマンド内のワークフロー名は、前のステップで返された名前と一致する必要があります。名前 ali-mns-workflow-5prz7 は一例です。お使いの環境の実際の名前で置き換えてください。

    • メッセージの内容は Base64 でエンコードされています。

    予想される出力は次のとおりです。

    image.png

ステップ 5: イベント関連リソースのクリーンアップ

  1. 次のコマンドを実行して、イベント関連のリソースをクリーンアップします。

    kubectl delete sensor process-oss-file
    kubectl delete eventsource ali-mns
    kubectl delete eventbus default
  2. 次のコマンドを実行して Pod を表示し、すべてのリソースが削除されたことを確認します。

    kubectl get pod