This topic describes how to integrate a topic of Message Service (MNS) in the wait-for-callback mode of a task step and publish messages to the topic. After the MNS topic receives a message, the ReportTaskSucceeded or ReportTaskFailed operation is called to call back the task status.

How it works

After an application is deployed, the application is executed based on the following steps:

  1. Execute the flow. The task step publishes a message to the MNS topic. The TaskToken of the task step is placed in the message body and sent to the topic.
  2. The task step of the flow is suspended and waits for the task callback.
  3. After the MNS topic receives the message, the message and the TaskToken are pushed to the HTTP trigger of the function in Function Compute over HTTP to trigger the execution.
  4. The function in Function Compute obtains the TaskToken and calls ReportTaskSucceeded to report the task status.
  5. Then, the flow continues.
11

Deploy an application

  1. In the Serverless workflow console, click Create Flow. On the page that appears, select Sample Project and Task MNS Topics, and then click Next Step.
    2
  2. On the Create Application page, create an application corresponding to the template, and then click Deploy.
    3

    Where:

    Application Name: Enter a name for the application. The name must be unique in the same account.

    TopicName: Enter a name for the topic. If the specified MNS topic does not exist, the system automatically creates it.

    After you click Deploy, all resources that you created in the application are displayed.4
  3. Execute the flow.

    Input of the execution

    {
    "messageBody": "hello world"
    }
    5

Application code

  1. Orchestrate a flow of the MNS topic.

    Encapsulate the TaskToken called back in the task step into MessageBody of the message for subsequent callback. Read output specified in ReportTaskSucceeded from outputMappings.

    version: v1
    type: flow
    steps:
      - type: task
        name: mns-topic-task
        resourceArn: acs:mns:::/topics/<topic>/messages
        pattern: waitForCallback
        inputMappings:
          - target: messageBody
            source: $input.messageBody
          - target: taskToken
            source: $context.task.token
        outputMappings:
          - target: status
            source: $local.status
        serviceParams:
          MessageBody: $
  2. Call back the function of the task step that is deployed in Function Compute.

    Read the TaskToken that is encapsulated in MessageBody, set the TaskToken callback status to set output, and then set TaskToken to {"status":"success"}.

    def handler(environ, start_response):
        # Get request body
        try:
            request_body_size = int(environ.get('CONTENT_LENGTH',
    0))
        except ValueError:
            request_body_size = 0
        request_body =
    environ['wsgi.input'].read(request_body_size)
        print('Request body:
    {}'.format(request_body))
    
        body = json.loads(request_body)
        message_body_str =
    body['Message']
    
        # Read MessageBody and TaskToken from
    message body
        message_body =
    json.loads(message_body_str)
        task_token =
    message_body['taskToken']
        ori_message_body =
    message_body['messageBody']
        print('Task token: {}\norigin message
    body: {}'.format(task_token, ori_message_body))
    
        # Init fnf client use sts token
        context = environ['fc.context']
        creds = context.credentials
        sts_creds =
    StsTokenCredential(creds.access_key_id, creds.access_key_secret, creds.security_token)
        fnf_client =
    AcsClient(credential=sts_creds, region_id=context.region)
    
        # Report task succeeded to serverless
    workflow
        req =
    ReportTaskSucceededRequest()
        req.set_TaskToken(task_token)
        req.set_Output('{"status":
    "success"}')
        resp =
    fnf_client.do_action_with_exception(req)
        print('Report task response:
    {}'.format(resp))
    
        # Response to http request
        status = '200 OK'
        response_headers = [('Content-type',
    'text/plain')]
        start_response(status,
    response_headers)
        return [b'OK']

References

For more information about how to use task steps to orchestrate MNS topics, see task-mns-topics.