全部產品
Search
文件中心

CloudFlow:Simple Message Queue (formerly MNS)主題整合和訊息發布

更新時間:Oct 25, 2024

本文介紹了如何使用任務步驟的等待回調(waitForCallback)模式整合Simple Message Queue (formerly MNS)主題,並發布訊息到主題。Simple Message Queue (formerly MNS)主題接收到訊息後,調用工作流程ReportTaskSucceeded或ReportTaskFailed API回調任務狀態。

架構原理

應用部署後執行流程如下:

  1. 執行工作流程,任務步驟發布訊息到MNS主題。任務步驟的TaskToken會被放入訊息體一起發送到主題。

  2. 工作流程工作步驟暫停執行,等待任務回調。

  3. Simple Message Queue (formerly MNS)主題接收到訊息後,將訊息和TaskToken通過HTTP推送發送到Function ComputeFC的函數HTTP觸發器,觸發函數執行。

  4. Function Compute函數最終擷取到TaskToken,並調用ReportTaskSucceeded - 彙報指定的任務執行成功來報告工作狀態。

  5. 流程繼續執行。

image

部署應用

  1. 登入Serverless工作流程控制台

  2. 流程頁面,單擊建立流程

  3. 建立流程頁面,選擇樣本專案 > 任務步驟編排Simple Message Queue (formerly MNS)主題模板,單擊下一步

    2

  4. 建立應用頁面,配置相關資訊,建立模板對應的應用,並單擊部署

    3

    • 應用程式名稱:自訂參數,同一帳號下必須唯一。

    • TopicName:自訂參數,如果對應Simple Message Queue (formerly MNS)主題不存在會自動建立。

    單擊部署後,會顯示應用下建立的所有資源。4

  5. 執行工作流程。

    執行以下命令。

    {
       "messageBody": "hello world"
    }

    執行成功後,您可以看到執行結果的狀態。

    5

應用代碼

  1. 編排Simple Message Queue (formerly MNS)主題的工作流程。

    將任務步驟回調的TaskToken封裝在訊息的MessageBody中,用於後續的回調。outputMappings中讀取ReportTaskSucceeded設定的output

    version: v1
    type: flow
    steps:
     - type: task
     name: mns-topic-task
     resourceArn: acs:mns:::/topics/<topic>/messages
     pattern: waitForCallback
     inputMappings:
     - target: messageBody
     source: $input.messageBody
     - target: taskToken
     source: $context.task.token
     outputMappings:
     - target: status
     source: $local.status
     serviceParams:
     MessageBody: $
  2. 回調任務步驟的FC函數。

    讀取MessageBody中封裝的TaskToken,回調任務狀態設定output{"status":"success"}

    def handler(environ, start_response):
     # Get request body
     try:
     request_body_size = int(environ.get('CONTENT_LENGTH',
    0))
     except ValueError:
     request_body_size = 0
     request_body =
    environ['wsgi.input'].read(request_body_size)
     print('Request body:
    {}'.format(request_body))
    
     body = json.loads(request_body)
     message_body_str =
    body['Message']
    
     # Read MessageBody and TaskToken from
    message body
     message_body =
    json.loads(message_body_str)
     task_token =
    message_body['taskToken']
     ori_message_body =
    message_body['messageBody']
     print('Task token: {}\norigin message
    body: {}'.format(task_token, ori_message_body))
    
     # Init fnf client use sts token
     context = environ['fc.context']
     creds = context.credentials
     sts_creds =
    StsTokenCredential(creds.access_key_id, creds.access_key_secret, creds.security_token)
     fnf_client =
    AcsClient(credential=sts_creds, region_id=context.region)
    
     # Report task succeeded to serverless
    workflow
     req =
    ReportTaskSucceededRequest()
     req.set_TaskToken(task_token)
     req.set_Output('{"status":
    "success"}')
     resp =
    fnf_client.do_action_with_exception(req)
     print('Report task response:
    {}'.format(resp))
    
     # Response to http request
     status = '200 OK'
     response_headers = [('Content-type',
    'text/plain')]
     start_response(status,
    response_headers)
     return [b'OK']

更多資訊

任務步驟編排Simple Message Queue (formerly MNS)主題應用的更多資訊,請參見task-mns-topics應用代碼