All Products
Search
Document Center

CloudFlow:Integrate Simple Message Queue (formerly MNS) queues

Last Updated:Dec 05, 2024

Serverless Workflow allows you to send a message to a specific Simple Message Queue (formerly MNS) (SMQ) queue in a task step. This topic describes the scenarios, common parameters, integration modes, and permission configurations for integrating SMQ queues.

Scenarios

You can integrate SMQ queues in task steps in request-response or wait-for-callback mode. The following table describes the scenarios of these two modes.

Integration mode

Value for the pattern parameter

Scenario

Description

Request-response mode

requestResponse

Event notification

Services other than flow execution are persistently notified, and the flow execution is irrelevant to how the notified services process the message.

Wait-for-callback mode

waitForCallback

Orchestration of custom task types

After a task executor in an environment, such as an Elastic Compute Service (ECS) instance or a server in an on-premises data center, receives a message sent to a queue, the task executor calls back Serverless Workflow after it executes the relevant task. For more information, see Best practices.

Common parameters

In the following example, Serverless Workflow sends a message to the queue specified by {queue-name} in resourceArn. {queue-name} is replaced with the actual queue name. The body and parameters of the message are specified in serviceParams. The following parameters are supported:

  • MessageBody: the body of the message. This parameter is required. MessageBody: $ specifies that the message body is generated by using the inputMappings parameter. The following content describes how a message body is generated when a StepEntered event of a task step is triggered:

    • The input object is {"key": "value_1"}.

    • The mapped input is {"key_1": "value_1", "key_2": "value"}.

    • The task step uses the {"key_1": "value_1", "key_2": "value"} string as the message body and sends it to the SMQ queue.

  • Priority: the priority of the message. This parameter is required.

  • DelaySeconds: the message latency, in seconds. This parameter is optional.

For more information, see SendMessage.

version: v1
type: flow
steps:
 - type: task
   name: Task_1
   resourceArn: acs:mns:::/queues/{queue-name}/messages  # The task step sends a message to the SMQ queue named {queue-name} within the same account in the same region. 
   pattern: requestResponse                              # The task step ends after the message is sent to the SMQ queue. 
   inputMappings:
      - target: key_1
        source: $input.key
      - target: key_2
        source: value
   serviceParams:     # The service integration parameters. 
       MessageBody: $  # The mapped input is used as the body of the message that you want to send. 
       DelaySeconds: 0 # The message latency, in seconds. 
       Priority: 1     # The priority of the SMQ queue.

Integration modes

  • Request-response mode

    In the task step, pattern: requestResponse specifies the request-response mode. In this mode, Serverless Workflow sends a message to the queue specified by {queue-name} in resourceArn. After the message is sent, the task step is successful. If the message fails to be sent, the task step fails or is retried based on the retry configurations.

    version: v1
    type: flow
    steps:
     - type: task
       name: mytask
       resourceArn: acs:mns:::/queues/{queue-name}/messages  # The task step sends a message to an SMQ queue within the same account in the same region. 
       pattern: requestResponse                              # The task step ends after the message is sent to the SMQ queue. 
       inputMappings:
          - target: key_3
            source: value
       serviceParams:     # The service integration parameters. 
            MessageBody: $  # {"key_3": "value"} is used as the body of the message that you want to send to the SMQ queue. 
            DelaySeconds: 0 # The message latency, in seconds. 
            Priority: 1     # The priority of the SMQ queue.

  • Wait-for-callback mode

    In the task step, pattern: waitForCallback specifies the wait-for-callback mode. Serverless Workflow sends a message to the queue specified by {queue-name} in resourceArn. After the message is sent, the task step suspends and waits for a callback. The task executor needs to use the corresponding task token to call back Serverless Workflow. If the callback result indicates that the call to the ReportTaskSuccceeded operation is successful, the task step is successful. If the callback result indicates that the ReportTaskFailed operation was called, the task step fails or is retried based on the retry configurations.

    The task token (taskToken) of a task step can be obtained from the context object of the task step. inputMappings is used to assign the task token to a field in the JSON object of the message body. In the following example, the context object in this task step is {"task":{"token":"my-token-1"}}. The mapped task input is {"task_token": "my-token-1", "key": "value"}, which is sent as the message body to the SMQ queue.

    version: v1
    type: flow
    steps:
     - type: task
       name: mytask
       resourceArn: acs:mns:::/queues/{queue-name}/messages  # The task step sends a message to the SMQ queue named {queue-name} within the same account in the same region. 
       pattern: waitForCallback  # The task step suspends after the message is sent to the SMQ queue and waits until it receives the callback. 
       inputMappings:
          - target: task_token
            source: $context.task.token  # Serverless Workflow obtains the task token from the context object. 
          - target: key
            source: value
       serviceParams:     # The service integration parameters. 
            MessageBody: $  # The mapped input is used as the body of the message that you want to send. 
            Priority: 1     # The priority of the SMQ queue.

Flow role configuration

Serverless Workflow assumes the Resource Access Management (RAM) role specified for a flow to obtain your temporary AccessKey pair to access SMQ on your behalf. Therefore, you must grant the flow role the SendMessage permission to send messages to SMQ. You can select a system policy or a custom policy to grant the flow role that you want Serverless Workflow to assume the permission to send messages to your SMQ queue.

  • System policy: Grant the AliyunMNSFullAccess system permission to the flow role.

  • Custom policy: If you allow the flow role to send messages to only the {queue-name} queue, create the following policy and attach it to the flow role:

    {
      "Version": "1",
      "Statement": [
        {
          "Action": "mns:SendMessage",
          "Resource": "acs:mns:*:*:/queues/{queue-name}/messages",
          "Effect": "Allow"
        }
      ]
    }