All Products
Search
Document Center

CloudFlow:Integrate MNS topics to publish messages

Last Updated:Oct 30, 2023

This topic describes how to integrate a topic of Message Service (MNS) in the wait-for-callback mode of a task step and publish messages to the topic. After the MNS topic receives a message, the ReportTaskSucceeded or ReportTaskFailed operation is called to call back the task status.

How it works

After an application is deployed, the application is executed based on the following steps:

  1. Execute the flow. The task step publishes a message to the MNS topic. The TaskToken of the task step is placed in the message body and sent to the topic.
  2. The task step of the flow is suspended and waits for the task callback.
  3. After the MNS topic receives the message, the message and the TaskToken are pushed to the HTTP trigger of the function in Function Compute over HTTP to trigger the execution.
  4. The function in Function Compute obtains the TaskToken and calls ReportTaskSucceeded to report the task status.
  5. Then, the flow continues.
11

Deploy an application

  1. In the Serverless workflow console, click Create Flow. On the page that appears, select Sample Project and Task MNS Topics, and then click Next Step.
    2
  2. On the Create Application page, create an application corresponding to the template, and then click Deploy.
    3

    Where:

    Application Name: Enter a name for the application. The name must be unique in the same account.

    TopicName: Enter a name for the topic. If the specified MNS topic does not exist, the system automatically creates it.

    After you click Deploy, all resources that you created in the application are displayed.4
  3. Execute the flow.

    Input of the execution

    {
    "messageBody": "hello world"
    }
    5

Application code

  1. Orchestrate a flow of the MNS topic.

    Encapsulate the TaskToken called back in the task step into MessageBody of the message for subsequent callback. Read output specified in ReportTaskSucceeded from outputMappings.

    version: v1
    type: flow
    steps:
     - type: task
     name: mns-topic-task
     resourceArn: acs:mns:::/topics/<topic>/messages
     pattern: waitForCallback
     inputMappings:
     - target: messageBody
     source: $input.messageBody
     - target: taskToken
     source: $context.task.token
     outputMappings:
     - target: status
     source: $local.status
     serviceParams:
     MessageBody: $
  2. Call back the function of the task step that is deployed in Function Compute.

    Read the TaskToken that is encapsulated in MessageBody, set the TaskToken callback status to set output, and then set TaskToken to {"status":"success"}.

    def handler(environ, start_response):
     # Get request body
     try:
     request_body_size = int(environ.get('CONTENT_LENGTH',
    0))
     except ValueError:
     request_body_size = 0
     request_body =
    environ['wsgi.input'].read(request_body_size)
     print('Request body:
    {}'.format(request_body))
    
     body = json.loads(request_body)
     message_body_str =
    body['Message']
    
     # Read MessageBody and TaskToken from
    message body
     message_body =
    json.loads(message_body_str)
     task_token =
    message_body['taskToken']
     ori_message_body =
    message_body['messageBody']
     print('Task token: {}\norigin message
    body: {}'.format(task_token, ori_message_body))
    
     # Init fnf client use sts token
     context = environ['fc.context']
     creds = context.credentials
     sts_creds =
    StsTokenCredential(creds.access_key_id, creds.access_key_secret, creds.security_token)
     fnf_client =
    AcsClient(credential=sts_creds, region_id=context.region)
    
     # Report task succeeded to serverless
    workflow
     req =
    ReportTaskSucceededRequest()
     req.set_TaskToken(task_token)
     req.set_Output('{"status":
    "success"}')
     resp =
    fnf_client.do_action_with_exception(req)
     print('Report task response:
    {}'.format(resp))
    
     # Response to http request
     status = '200 OK'
     response_headers = [('Content-type',
    'text/plain')]
     start_response(status,
    response_headers)
     return [b'OK']

References

For more information about how to use task steps to orchestrate MNS topics, see task-mns-topics.