全部產品
Search
文件中心

Container Service for Kubernetes:通過向OSS上傳檔案觸發工作流程

更新時間:Sep 27, 2025

本文介紹如何整合阿里雲Object Storage Service與阿里雲輕量訊息佇列(原 MNS),通過將檔案上傳至Object Storage Service中,自動觸發工作流程處理檔案,並產生結果。

前提條件

步驟一:建立Event Bus

Event Bus可以被命名空間中的事件驅動工作流程共用。如果已經建立,可跳過該步驟,直接執行步驟二:建立Event Source

說明
  • 使用輕量訊息佇列(原 MNS)方式建立Event Bus時,不會建立Pod。

  • 如需使用Trigger功能,請使用NATS方式建立EventBus。目前輕量訊息佇列(原 MNS)方式不支援開源Argo Event的Sensor Trigger。

方式一:使用NATS

  1. 建立event-bus.yaml檔案。Event Bus範例程式碼如下所示:

    apiVersion: argoproj.io/v1alpha1
    kind: EventBus
    metadata:
      name: default
    spec:
      nats:
        native:
          replicas: 3
          auth: token
  2. 執行以下命令,建立EventBus。

    kubectl apply -f event-bus.yaml
    說明

    命令執行成功後,會在default命名空間下建立Event Bus Pod。後續操作需在同一命名空間下。

  3. 執行以下命令,查看Event Bus Pod是否正常啟動。

    kubectl get pod

方式二:使用輕量訊息佇列(原 MNS)

  1. 登入輕量訊息佇列(原 MNS)控制台

  2. 主題列表頁面建立主題argoeventbus,並在主題詳情頁面的存取點地區擷取Endpoint。

  3. 使用Resource Access Management員登入RAM控制台

  4. 建立RAM使用者,授權AliyunMNSFullAccess許可權,並擷取RAM使用者的AK和SK。

  5. 執行以下命令,建立Secret用於儲存AK和SK。

    kubectl create secret generic mns-secret\
      --from-literal=accesskey=*** \
      --from-literal=secretkey=***
  6. 建立event-bus-mns.yaml檔案,EventBus範例程式碼如下所示:

    • topic:需替換為2中建立的輕量訊息佇列(原 MNS)中的主題名稱。

    • endpoint:需替換為2中擷取的Endpoint。

    apiVersion: argoproj.io/v1alpha1
    kind: EventBus
    metadata:
      name: default
    spec:
      alimns:
        accessKey:
          key: accesskey
          name: mns-secret
        secretKey:
          key: secretkey
          name: mns-secret
        topic: argoeventbus  # 對應輕量訊息佇列(原 MNS)中的主題名稱。
        endpoint: http://165***368.mns.<region>.aliyuncs.com
  7. 執行以下命令,建立event-bus-mns.yaml

    kubectl apply -f event-bus-mns.yaml

步驟二:建立Event Source

  1. 使用Resource Access Management員登入RAM控制台

  2. 建立RAM使用者,為其授予AliyunMNSFullAccess許可權,並擷取RAM使用者的AK和SK。具體操作,請參見建立RAM使用者為RAM使用者授權建立AccessKey查看RAM使用者的AccessKey資訊

  3. 建立event-source.yaml檔案,Event Source範例程式碼如下所示:

    • queue:需替換為輕量訊息佇列(原 MNS)的名稱。

    • endpoint:需替換為輕量訊息佇列(原 MNS)的存取點。

    apiVersion: argoproj.io/v1alpha1
    kind: EventSource
    metadata:
      name: ali-mns
    spec:
      mns:
        example:
          jsonBody: true
          accessKey:
            key: accesskey
            name: mns-secret
          secretKey:
            key: secretkey
            name: mns-secret
          queue: oss-event-queue # 輕量訊息佇列(原 MNS)名稱。
          waitTimeSeconds: 20
          endpoint: http://165***368.mns.<region>.aliyuncs.com # 輕量訊息佇列(原 MNS)存取點。
  4. 執行以下命令,建立Event Source。

    kubectl apply -f event-source.yaml
  5. 執行以下命令,查看Event Source Pod是否正常啟動。

    kubectl get pod

步驟三:建立Event Sensor

  1. 建立event-sensor.yaml檔案,在Event Sensor中嵌入待執行的工作流程定義。範例程式碼如下所示:

    展開查看範例程式碼

    apiVersion: argoproj.io/v1alpha1
    kind: Sensor
    metadata:
      name: process-oss-file
    spec:
      template:
        serviceAccountName: default
      dependencies:
        - name: dep1
          eventSourceName: ali-mns
          eventName: example
      triggers:
        - template:
            name: process-oss-file-workflow
            k8s:
              operation: create
              source:
                resource:
                  apiVersion: argoproj.io/v1alpha1
                  kind: Workflow
                  metadata:
                    generateName: process-oss-file-
                    namespace: default
                  spec:
                    entrypoint: process-oss-file
                    volumes:
                    - name: workdir
                      persistentVolumeClaim:
                        claimName: pvc-oss
                    arguments:
                      parameters:
                      - name: message
                        # this is the value that should be overridden
                        value: event message
                    templates:
                    - name: process-oss-file
                      steps:
                      - - name: parse-event-body
                          template: parse-event-body
                      - - name: process-file
                          template: process-file
                          arguments:
                            parameters:
                            - name: file-name
                              value: "{{steps.parse-event-body.outputs.parameters.file-name}}"
                    - name: parse-event-body
                      container:
                        image: acr-multiple-clusters-registry.cn-hangzhou.cr.aliyuncs.com/ack-multiple-clusters/jq-alpine
                        command: [sh, -c]
                        args:
                        - echo "Event body:";
                          echo {{workflow.parameters.message}} | base64 -d;
                          TriggerFileName=$(echo {{workflow.parameters.message}} | base64 -d | jq .events[0].oss.object.key | cut -c2- | rev | cut -c2- |rev);
                          echo "" && echo "TriggerFileName from event is $TriggerFileName";
                          Tmp=${TriggerFileName%%.complete} && DataFileName=${Tmp##*/};
                          echo "DataFileName after cutting .complete is $DataFileName, and pass file name to next step";
                          echo $DataFileName > /tmp/file-name.txt
                      outputs:
                        parameters:
                        - name: file-name
                          valueFrom:
                            path: /tmp/file-name.txt
                    - name: process-file
                      inputs:
                        parameters:
                          - name: file-name
                      container:
                        image: acr-multiple-clusters-registry.cn-hangzhou.cr.aliyuncs.com/ack-multiple-clusters/jq-alpine
                        imagePullPolicy: Always
                        command: [sh, -c]
                        args:
                        - echo "Show data-file:" && echo "";
                          ls -l /mnt/vol/{{inputs.parameters.file-name}};
                          echo "Content of data file:" && echo "";
                          cat /mnt/vol/{{inputs.parameters.file-name}} ;
                          echo "" && echo "Finished" ;
                        volumeMounts:
                        - name: workdir
                          mountPath: /mnt/vol
              parameters:
                - src:
                    dependencyName: dep1
                    dataKey: body
                  dest: spec.arguments.parameters.0.value
  2. 執行以下命令,建立Event Sensor。

    kubectl apply -f event-sensor.yaml
  3. 執行以下命令,查看Event Sensor Pod是否正常啟動。

    kubectl get pod
說明

使用輕量訊息佇列(原 MNS)方式建立Eventbus時,在Event Sensor建立完成後,會自動建立一個輕量訊息佇列(原 MNS)與之對應,隊列命名格式為:ackone-argowf-<namespace>-<sensor-name>-<sensor-uid>

步驟四:驗證向OSS上傳檔案觸發工作流程

  1. 登入OSS管理主控台

  2. 向OSS Bucket中上傳以下2個檔案(該檔案需自備),觸發工作流程運行。

    • datafile:資料檔案,文字格式設定,內容自訂。

    • datafile.complete:trigger檔案,可以是空檔案。

  3. 執行以下命令,在工作流程叢集中查看工作流程運行情況。

    argo list

    預期輸出如下:

    NAME STATUS AGE DURATION PRIORITY
    process-oss-file-kmb4k Running 13s 13s 0
  4. 執行以下命令,擷取工作流程日誌。

    argo logs process-oss-file-kmb4k
    重要
    • 該命令中的工作流程名稱必須和上一步驟中返回的工作流程名稱一致,ali-mns-workflow-5prz7僅為樣本值,請您修改為實際環境中的傳回值。

    • 訊息內容使用Base64編碼。

    預期輸出如下:

    image.png

步驟五:清除Event相關資源

  1. 依次執行以下命令,清除Event相關資源。

    kubectl delete sensor process-oss-file
    kubectl delete eventsource ali-mns
    kubectl delete eventbus default
  2. 執行以下命令查看Pod,確認所有資源已清除。

    kubectl get pod