本文介紹了如何使用任務步驟的等待回調(waitForCallback)模式整合Simple Message Queue (formerly MNS)主題,並發布訊息到主題。Simple Message Queue (formerly MNS)主題接收到訊息後,調用工作流程ReportTaskSucceeded或ReportTaskFailed API回調任務狀態。
架構原理
應用部署後執行流程如下:
執行工作流程,任務步驟發布訊息到MNS主題。任務步驟的
TaskToken會被放入訊息體一起發送到主題。工作流程工作步驟暫停執行,等待任務回調。
Simple Message Queue (formerly MNS)主題接收到訊息後,將訊息和
TaskToken通過HTTP推送發送到Function ComputeFC的函數HTTP觸發器,觸發函數執行。Function Compute函數最終擷取到
TaskToken,並調用ReportTaskSucceeded - 彙報指定的任務執行成功來報告工作狀態。流程繼續執行。

部署應用
在流程頁面,單擊建立流程。
在建立流程頁面,選擇模板,單擊下一步。

在建立應用頁面,配置相關資訊,建立模板對應的應用,並單擊部署。

應用程式名稱:自訂參數,同一帳號下必須唯一。
TopicName:自訂參數,如果對應Simple Message Queue (formerly MNS)主題不存在會自動建立。
單擊部署後,會顯示應用下建立的所有資源。

執行工作流程。
執行以下命令。
{ "messageBody": "hello world" }執行成功後,您可以看到執行結果的狀態。

應用代碼
編排Simple Message Queue (formerly MNS)主題的工作流程。
將任務步驟回調的
TaskToken封裝在訊息的MessageBody中,用於後續的回調。outputMappings中讀取ReportTaskSucceeded設定的output。version: v1 type: flow steps: - type: task name: mns-topic-task resourceArn: acs:mns:::/topics/<topic>/messages pattern: waitForCallback inputMappings: - target: messageBody source: $input.messageBody - target: taskToken source: $context.task.token outputMappings: - target: status source: $local.status serviceParams: MessageBody: $回調任務步驟的FC函數。
讀取
MessageBody中封裝的TaskToken,回調任務狀態設定output為{"status":"success"}。def handler(environ, start_response): # Get request body try: request_body_size = int(environ.get('CONTENT_LENGTH', 0)) except ValueError: request_body_size = 0 request_body = environ['wsgi.input'].read(request_body_size) print('Request body: {}'.format(request_body)) body = json.loads(request_body) message_body_str = body['Message'] # Read MessageBody and TaskToken from message body message_body = json.loads(message_body_str) task_token = message_body['taskToken'] ori_message_body = message_body['messageBody'] print('Task token: {}\norigin message body: {}'.format(task_token, ori_message_body)) # Init fnf client use sts token context = environ['fc.context'] creds = context.credentials sts_creds = StsTokenCredential(creds.access_key_id, creds.access_key_secret, creds.security_token) fnf_client = AcsClient(credential=sts_creds, region_id=context.region) # Report task succeeded to serverless workflow req = ReportTaskSucceededRequest() req.set_TaskToken(task_token) req.set_Output('{"status": "success"}') resp = fnf_client.do_action_with_exception(req) print('Report task response: {}'.format(resp)) # Response to http request status = '200 OK' response_headers = [('Content-type', 'text/plain')] start_response(status, response_headers) return [b'OK']
更多資訊
任務步驟編排Simple Message Queue (formerly MNS)主題應用的更多資訊,請參見task-mns-topics應用代碼。