All Products
Search
Document Center

Container Service for Kubernetes:Use Message Service to trigger event-driven workflows

Last Updated:Feb 29, 2024

You can use Message Service queues to ingest events from different event sources and trigger workflows in workflow clusters. When an event occurs, such as an Object Storage Service (OSS) event or EventBridge event, the event is delivered to Message Service. Argo then automatically triggers a workflow based on the trigger condition that the event matches.

Prerequisites

Step 1: Create an event bus

An event bus can be shared by event-driven workflows in the same namespace. You can use NATS or Message Service to create an event bus. If you already have an event bus, proceed to Step 2: Create an event source.

Use NATS

  1. Create a file named event-bus.yaml. Sample code of the event bus:

    apiVersion: argoproj.io/v1alpha1
    kind: EventBus
    metadata:
      name: default
    spec:
      nats:
        native:
          replicas: 3
          auth: token
  2. Run the following command to create an event bus:

    kubectl apply -f event-bus.yaml
    Note

    After you run the command, an event bus pod is created in the default namespace. Perform the subsequent operations in the same namespace.

  3. Run the following command to check whether the event bus pod runs as normal:

    kubectl get pod

Use Message Service

  1. Log on to the MNS console.

  2. On the Topics page, create a topic named argoeventbus. On the Topic Details page, obtain the endpoint in the Endpoint section.

  3. Log on to the RAM console as a RAM user who has administrative rights.

  4. Create a Resource Access Management (RAM) user and grant the AliyunMNSFullAccess permission to the RAM user. Then, obtain the AccessKey ID and AccessKey secret of the RAM user.

    For more information, see Create a RAM user, Grant permissions to a RAM user, Create an AccessKey pair, and View the information about AccessKey pairs of a RAM user.

  5. Run the following command to create a Secret to store the AccessKey pair:

    kubectl create secret generic mns-secret\
      --from-literal=accesskey=*** \
      --from-literal=secretkey=***
  6. Create a file named event-bus-mns.yaml and set the parameters to the actual values.

    • topic: Replace the value with the name of the topic created in Step 2.

    • endpoint: Replace the value with the endpoint that you obtained in Step 2.

    apiVersion: argoproj.io/v1alpha1
    kind: EventBus
    metadata:
      name: default
    spec:
      alimns:
        accessKey:
          key: accesskey
          name: mns-secret
        secretKey:
          key: secretkey
          name: mns-secret
        topic: argoeventbus  # The name of the topic that you created. 
        endpoint: http://165***368.mns.<region>.aliyuncs.com  # The endpoint that you obtained.
  7. Run the following command to apply the event-bus-mns.yaml file to create an event bus:

    kubectl apply -f event-bus-mns.yaml

Step 2: Create an event source

  1. Log on to the MNS console.

  2. On the Queues page, create a queue named test-event-queue and obtain the endpoint in the Endpoint section of the Queue Details page.

    Note

    If you use Message Service to create the event bus, you can skip steps 3 to 5 and proceed to Step 6.

  3. Log on to the RAM console as a RAM user who has administrative rights.

  4. Create a RAM user and grant the AliyunMNSFullAccess permission to the RAM user. Then, obtain the AccessKey ID and AccessKey secret of the RAM user.

    For more information, see Create a RAM user, Grant permissions to a RAM user, Create an AccessKey pair, and View the information about AccessKey pairs of a RAM user.

  5. Run the following command to create a Secret to store the AccessKey pair:

    kubectl create secret generic mns-secret\
      --from-literal=accesskey=*** \
      --from-literal=secretkey=***
  6. Create a file named event-source.yaml and set the parameters to the actual values.

    • topic: Replace the value with the name of the topic that you created in Step 2.

    • endpoint: Replace the value with the endpoint that you obtained in Step 2.

    apiVersion: argoproj.io/v1alpha1
    kind: EventSource
    metadata:
      name: ali-mns
    spec:
      mns:
        example:
          jsonBody: true
          accessKey:
            key: accesskey
            name: mns-secret
          secretKey:
            key: secretkey
            name: mns-secret
          queue: test-event-queue # The name of the queue that you created. 
          waitTimeSeconds: 20
          endpoint: http://165***368.mns.<region>.aliyuncs.com # The endpoint that you obtained.

  7. Run the following command to apply the event-source.yaml file to create an event source:

    kubectl apply -f event-source.yaml
  8. Run the following command to check whether the event source pod runs as normal:

    kubectl get pod

Step 3: Create an event sensor

  1. Create a file named event-sensor.yaml and nest the workflow definition in the event sensor. The following code block shows a sample event sensor:

    View sample code

    apiVersion: argoproj.io/v1alpha1
    kind: Sensor
    metadata:
      name: ali-mns
    spec:
      template:
        serviceAccountName: default
      dependencies:
        - name: test-dep
          eventSourceName: ali-mns    # Match the name of the event source. 
          eventName: example          # Match the name of the event in the event source. 
      triggers:
        - template:
            name: mns-workflow
            k8s:
              operation: create
              source:
                resource:
                  apiVersion: argoproj.io/v1alpha1    # Nest the workflow definition. 
                  kind: Workflow
                  metadata:
                    generateName: ali-mns-workflow-
                  spec:
                    entrypoint: whalesay
                    arguments:
                      parameters:        # Delivery the event content through parameters. 
                        - name: message
                          # this is the value that should be overridden
                          value: hello world
                    templates:
                      - name: whalesay
                        inputs:
                          parameters:
                            - name: message
                        container:
                          image: docker/whalesay:latest
                          command: [cowsay]
                          args: ["{{inputs.parameters.message}}"]
              parameters:
                - src:        # Parse the event content and deliver it to the workflow. 
                    dependencyName: test-dep
                    dataKey: body
                  dest: spec.arguments.parameters.0.value
  2. Run the following command to apply the event-sensor.yaml file to create an event sensor:

    kubectl apply -f event-sensor.yaml
  3. Run the following command to check whether the event sensor pod runs as normal:

    kubectl get pod
Note

A Message Service queue is automatically created after the event sensor is created if you use Message Service to create an event bus. The name of the queue is in the ackone-argowf-<namespace>-<sensor-name>-<sensor-uid> format.

Step 4: Send messages to Message Service to trigger the workflow

  1. Log on to the MNS console.

  2. On the Queues page, find the queue named test-event-queue and choose More > Send Messages in the Actions column of the queue.

  3. On the Quick Experience page, enter the message content test trigger argo workflow and click Send Message.

  4. Run the following command to query the status of the workflow in the workflow cluster:

    argo list

    Expected output:

    NAME                     STATUS    AGE   DURATION   PRIORITY
    ali-mns-workflow-5prz7   Running   6s    6s         0
  5. Print the workflow log to view the message content.

    argo logs ali-mns-workflow-5prz7
    Important
    • The workflow name in the command must be the same as the workflow name returned in the previous step. ali-mns-workflow-5prz7 is an example. Replace it with the actual name.

    • The message content is encoded by using Base64.

    Expected output:image.png

Step 5: Delete event-related resources

  1. Run the following commands to delete event-related resources.

    • Delete the event sensor.

      kubectl delete sensor ali-mns
    • Delete the event source.

      kubectl delete eventsource ali-mns
    • Delete the event bus.

      kubectl delete eventbus default
  2. Run the following command to query the status of the pods. Make sure that all resources are deleted.

    kubectl get pod