All Products
Search
Document Center

Container Service for Kubernetes:Upload objects to OSS to trigger workflows

Last Updated:Feb 29, 2024

This topic describes how to use Object Storage Service (OSS) together with Message Service to trigger workflows by uploading objects to OSS.

Prerequisites

Step 1: Configure notifications for OSS events

  1. Log on to the OSS console.

  2. Click Buckets, and then create an OSS bucket or select an existing OSS bucket. For more information about how to create an OSS bucket, see Create a bucket.

  3. On the Objects page, choose Data Processing > Event Notification, click Create Rule, configure the parameters, and then click OK. For more information about how to create an event notification rule, see Use event notifications to monitor object changes in real time.

    Parameter

    Example

    Rule Name

    upload-complete

    Event Type

    PutObject, PostObject

    Resource Description

    Select Prefix and Suffix and specify the complete suffix. This way, when you upload an object with the .complete suffix, a workflow is triggered.

    Endpoint

    Select Queue and set the name to oss-event-queue.

    After the rule is created, a topic is automatically created in Message Service.

  4. Log on to the MNS console.

  5. Click Queues and create a queue named oss-event-queue. For more information, see Create an MNS queue. After the queue is created, obtain the endpoint in the Endpoint section of the Queue Details page.

    Important

    The queue name must be the same as the queue name specified in the Endpoint parameter in Step 3.

Step 2: Create an event bus

An event bus can be shared by event-driven workflows in the same namespace. If an event bus is already created, proceed to Step 3: Create an event source.

Method 1: Use NATS

  1. Create a file named event-bus.yaml. Sample code of the event bus:

    apiVersion: argoproj.io/v1alpha1
    kind: EventBus
    metadata:
      name: default
    spec:
      nats:
        native:
          replicas: 3
          auth: token
  2. Run the following command to create an event bus:

    kubectl apply -f event-bus.yaml
    Note

    After you run the command, an event bus pod is created in the default namespace. Perform the subsequent operations in the same namespace.

  3. Run the following command to check whether the event bus pod runs as normal:

    kubectl get pod

Method 2: Use a Message Service queue

  1. Log on to the MNS console.

  2. On the Topics page, create a topic named argoeventbus. Obtain the endpoint in the Endpoint section of the Topic Details page.

  3. Log on to the RAM console as a RAM user who has administrative rights.

  4. Create a Resource Access Management (RAM) user, grant the AliyunMNSFullAccess permission to the RAM user, and obtain the AccessKey pair of the RAM user.

  5. Run the following command to create a Secret to store the AccessKey pair:

    kubectl create secret generic mns-secret\
      --from-literal=accesskey=*** \
      --from-literal=secretkey=***
  6. Create a file named event-bus-mns.yaml. The following code block shows a sample event bus:

    • topic: Replace the value with the name of the topic that you created in Step 2.

    • endpoint: Replace the value with the endpoint that you obtained in Step 2.

    apiVersion: argoproj.io/v1alpha1
    kind: EventBus
    metadata:
      name: default
    spec:
      alimns:
        accessKey:
          key: accesskey
          name: mns-secret
        secretKey:
          key: secretkey
          name: mns-secret
        topic: argoeventbus  # Specify the name of the topic that you created. 
        endpoint: http://165***368.mns.<region>.aliyuncs.com
  7. Run the following command to create a file named event-bus-mns.yaml:

    kubectl apply -f event-bus.yaml
Note
  • No pod is created if you use a Message Service queue to create an event bus.

  • To use triggers, use NATS to create an event bus. Event buses created by using Message Service do not support sensor triggers provided by open source Argo Events.

Step 3: Create an event source

  1. Log on to the RAM console as a RAM user who has administrative rights.

  2. Create a RAM user, grant the AliyunMNSFullAccess permission to the RAM user, and obtain the AccessKey pair of the RAM user. For more information, see Create a RAM user, Grant permissions to a RAM user, Create an AccessKey pair, and View the information about AccessKey pairs of a RAM user.

  3. Run the following command to create a Secret to store the AccessKey pair:

    kubectl create secret generic mns-secret\
     --from-literal=accesskey=*** \
     --from-literal=secretkey=***
  4. Create a file named event-source.yaml. The following code block shows a sample event source:

    • queue: Replace the value with the name of the queue that you created in Step 5.

    • endpoint: Replace the value with the endpoint that you obtained in Step 5.

    apiVersion: argoproj.io/v1alpha1
    kind: EventSource
    metadata:
      name: ali-mns
    spec:
      mns:
        example:
          jsonBody: true
          accessKey:
            key: accesskey
            name: mns-secret
          secretKey:
            key: secretkey
            name: mns-secret
          queue: oss-event-queue # Specify the name of the queue that you created in Step 1.
          waitTimeSeconds: 20
          endpoint: http://165***368.mns.<region>.aliyuncs.com # Specify the endpoint of the queue that you created in Step 1.
  5. Run the following command to create an event source:

    kubectl apply -f event-source.yaml
  6. Run the following command to check whether the event source pod runs as normal:

    kubectl get pod

Step 4: Create an event sensor

  1. Create a file named event-sensor.yaml and nest the workflow definition in the event sensor. Sample code of the event sensor:

    View the sample code

    apiVersion: argoproj.io/v1alpha1
    kind: Sensor
    metadata:
      name: process-oss-file
    spec:
      template:
        serviceAccountName: default
      dependencies:
        - name: dep1
          eventSourceName: ali-mns
          eventName: example
      triggers:
        - template:
            name: process-oss-file-workflow
            k8s:
              operation: create
              source:
                resource:
                  apiVersion: argoproj.io/v1alpha1
                  kind: Workflow
                  metadata:
                    generateName: process-oss-file-
                    namespaces: default
                  spec:
                    entrypoint: process-oss-file
                    volumes:
                    - name: workdir
                      persistentVolumeClaim:
                        claimName: pvc-oss
                    arguments:
                      parameters:
                      - name: message
                        # this is the value that should be overridden
                        value: event message
                    templates:
                    - name: process-oss-file
                      steps:
                      - - name: parse-event-body
                          template: parse-event-body
                      - - name: process-file
                          template: process-file
                          arguments:
                            parameters:
                            - name: file-name
                              value: "{{steps.parse-event-body.outputs.parameters.file-name}}"
                    - name: parse-event-body
                      container:
                        image: acr-multiple-clusters-registry.cn-hangzhou.cr.aliyuncs.com/ack-multiple-clusters/jq-alpine
                        command: [sh,-c]
                        args:
                        - echo "Event body:";
                          echo {{workflow.parameters.message}} | base64 -d;
                          TriggerFileName=$(echo {{workflow.parameters.message}} | base64 -d | jq .events[0].oss.object.key | cut -c2- | rev | cut -c2- |rev);
                          echo "" && echo "TriggerFileName from event is $TriggerFileName";
                          Tmp=${TriggerFileName%%.complete} && DataFileName=${Tmp##*/};
                          echo "DataFileName after cutting .complete is $DataFileName, and pass file name to next step";
                          echo $DataFileName > /tmp/file-name.txt
                      outputs:
                        parameters:
                        - name: file-name
                          valueFrom:
                            path: /tmp/file-name.txt
                    - name: process-file
                      inputs:
                        parameters:
                          - name: file-name
                      container:
                        image: acr-multiple-clusters-registry.cn-hangzhou.cr.aliyuncs.com/ack-multiple-clusters/jq-alpine
                        imagePullPolicy: Always
                        command: [sh,-c]
                        args:
                        - echo "Show data-file:" && echo "";
                          ls -l /mnt/vol/{{inputs.parameters.file-name}};
                          echo "Content of data file:" && echo "";
                          cat /mnt/vol/{{inputs.parameters.file-name}} ;
                          echo "" && echo "Finished" ;
                        volumeMounts:
                        - name: workdir
                          mountPath: /mnt/vol
              parameters:
                - src:
                    dependencyName: dep1
                    dataKey: body
                  dest: spec.arguments.parameters.0.value
  2. Run the following command to create an event sensor:

    kubectl apply -f event-sensor.yaml
  3. Run the following command to check whether the event sensor pod runs as normal:

    kubectl get pod
Note

A Message Service queue is automatically created after an event sensor is created if you use Message Service to create an event bus. The name of the queue is in the ackone-argowf-<namespace>-<sensor-name>-<sensor-uid> format.

Step 5: Upload objects to OSS to trigger the workflow

  1. Log on to the OSS console.

  2. Upload the following objects to the OSS bucket that you created in Step 1: Configure notifications for OSS events to trigger the workflow:

    • datafile: a text file that contains custom content.

    • datafile.complete: an empty trigger file.

  3. Run the following command to query the status of the workflow in the workflow cluster:

    argo list

    Expected output:

    NAME STATUS AGE DURATION PRIORITY
    process-oss-file-kmb4k Running 13s 13s 0
  4. Run the following command to print the workflow log:

    argo logs process-oss-file-kmb4k
    Important
    • The workflow name in the command must be the same as the workflow name returned in the previous step. ali-mns-workflow-5prz7 is an example. Replace it with the actual name.

    • The message content is encoded by using Base64.

    Expected output:

    image.png

Step 6: Delete event-related resources

  1. Run the following command to delete event-related resources:

    kubectl delete sensor process-oss-file
    kubectl delete eventsource ali-mns
    kubectl delete eventbus default
  2. Run the following command to query pods. Make sure that all resources are deleted.

    kubectl get pod