This topic describes how to integrate a topic of Message Service (MNS) in the wait-for-callback mode of a task step and publish messages to the topic. After the MNS topic receives a message, the ReportTaskSucceeded or ReportTaskFailed operation is called to call back the task status.
How it works
After an application is deployed, the application is executed based on the following steps:
- Execute the flow. The task step publishes a message to the MNS topic. The
TaskToken
of the task step is placed in the message body and sent to the topic. - The task step of the flow is suspended and waits for the task callback.
- After the MNS topic receives the message, the message and the
TaskToken
are pushed to the HTTP trigger of the function in Function Compute over HTTP to trigger the execution. - The function in Function Compute obtains the
TaskToken
and calls ReportTaskSucceeded to report the task status. - Then, the flow continues.
Deploy an application
- In the Serverless workflow console, click Create Flow. On the page that appears, select Sample Project and Task MNS Topics, and then click Next Step.
- On the Create Application page, create an application corresponding to the template, and then click Deploy.
Where:
Application Name: Enter a name for the application. The name must be unique in the same account.
TopicName: Enter a name for the topic. If the specified MNS topic does not exist, the system automatically creates it.
After you click Deploy, all resources that you created in the application are displayed. - Execute the flow.
Input of the execution
{ "messageBody": "hello world" }
Application code
- Orchestrate a flow of the MNS topic.
Encapsulate the
TaskToken
called back in the task step intoMessageBody
of the message for subsequent callback. Readoutput
specified in ReportTaskSucceeded from outputMappings.version: v1 type: flow steps: - type: task name: mns-topic-task resourceArn: acs:mns:::/topics/<topic>/messages pattern: waitForCallback inputMappings: - target: messageBody source: $input.messageBody - target: taskToken source: $context.task.token outputMappings: - target: status source: $local.status serviceParams: MessageBody: $
- Call back the function of the task step that is deployed in Function Compute.
Read the
TaskToken
that is encapsulated inMessageBody
, set the TaskToken callback status to setoutput
, and then set TaskToken to{"status":"success"}
.def handler(environ, start_response): # Get request body try: request_body_size = int(environ.get('CONTENT_LENGTH', 0)) except ValueError: request_body_size = 0 request_body = environ['wsgi.input'].read(request_body_size) print('Request body: {}'.format(request_body)) body = json.loads(request_body) message_body_str = body['Message'] # Read MessageBody and TaskToken from message body message_body = json.loads(message_body_str) task_token = message_body['taskToken'] ori_message_body = message_body['messageBody'] print('Task token: {}\norigin message body: {}'.format(task_token, ori_message_body)) # Init fnf client use sts token context = environ['fc.context'] creds = context.credentials sts_creds = StsTokenCredential(creds.access_key_id, creds.access_key_secret, creds.security_token) fnf_client = AcsClient(credential=sts_creds, region_id=context.region) # Report task succeeded to serverless workflow req = ReportTaskSucceededRequest() req.set_TaskToken(task_token) req.set_Output('{"status": "success"}') resp = fnf_client.do_action_with_exception(req) print('Report task response: {}'.format(resp)) # Response to http request status = '200 OK' response_headers = [('Content-type', 'text/plain')] start_response(status, response_headers) return [b'OK']
References
For more information about how to use task steps to orchestrate MNS topics, see task-mns-topics.