すべてのプロダクト
Search
ドキュメントセンター

CloudFlow:Simple Message Queue (formerly MNS) トピックを統合してメッセージをパブリッシュする

最終更新日:Jan 12, 2025

このトピックでは、待機コールバックモードを使用するタスクステップで Simple Message Queue (formerly MNS) (SMQ) トピックを統合し、トピックにメッセージをパブリッシュする方法について説明します。 SMQ トピックがメッセージを受信した後、ReportTaskSucceeded または ReportTaskFailed オペレーションが呼び出され、タスクステータスがコールバックされます。

タスクステップのしくみ

アプリケーションがデプロイされると、アプリケーションは次の手順に基づいて実行されます。

  1. フローを実行します。タスクステップは SMQ トピックにメッセージをパブリッシュします。タスクステップの task token がメッセージ本文に配置され、トピックに送信されます。

  2. フローのタスクステップは一時停止し、タスクコールバックを待機します。

  3. SMQ トピックがメッセージを受信すると、メッセージと task token は、HTTP 経由で Function Compute の関数の HTTP トリガーにプッシュされ、実行がトリガーされます。詳細については、「HTTP」をご参照ください。

  4. Function Compute の関数は task token を取得し、ReportTaskSucceeded オペレーションを呼び出してタスクステータスを報告します。

  5. フローは続行されます。

image

アプリケーションのデプロイ

  1. Serverless Workflow コンソールにログインします。

  2. [フロー] ページで、[フローの作成] をクリックします。

  3. [フローの作成] ページで、[サンプルプロジェクト][タスク SMQ トピック] を選択し、[次のステップ] をクリックします。

  4. [アプリケーションの作成] ページで、テンプレートに基づいてアプリケーションを作成し、[デプロイ] をクリックします。

    • アプリケーション名: アプリケーションの名前を指定します。名前は同じアカウント内で一意である必要があります。

    • TopicName: トピックの名前を指定します。指定された SMQ トピックが存在しない場合、システムは自動的に作成します。

    [デプロイ] をクリックすると、アプリケーションで作成したすべてのリソースが表示されます。4

  5. フローを実行します。

    次のコードを実行します。

    {
       "messageBody": "hello world" // メッセージ本文
    }

    実行が成功すると、実行結果のステータスを表示できます。

    5

アプリケーションコード

  1. SMQ トピックのフローをオーケストレーションします。

    タスクステップでコールバックされた task token を、後続のコールバックのためにメッセージの message body にカプセル化します。 output で指定された ReportTaskSucceeded を outputMappings から読み取ります。

    version: v1
    type: flow
    steps:
     - type: task
     name: mns-topic-task
     resourceArn: acs:mns:::/topics/<topic>/messages
     pattern: waitForCallback
     inputMappings:
     - target: messageBody
     source: $input.messageBody
     - target: taskToken
     source: $context.task.token
     outputMappings:
     - target: status
     source: $local.status
     serviceParams:
     MessageBody: $
  2. Function Compute にデプロイされているタスクステップの関数をコールバックします。

    message body にカプセル化されている task token を読み取り、コールバックタスクステータスの output{"status":"success"} に設定します。

    def handler(environ, start_response):
     # リクエストボディを取得する
     try:
     request_body_size = int(environ.get('CONTENT_LENGTH', 0))
     except ValueError:
     request_body_size = 0
     request_body = environ['wsgi.input'].read(request_body_size)
     print('Request body: {}'.format(request_body))
    
     body = json.loads(request_body)
     message_body_str = body['Message']
    
     # メッセージボディから MessageBody と TaskToken を読み取る
     message_body = json.loads(message_body_str)
     task_token = message_body['taskToken']
     ori_message_body = message_body['messageBody']
     print('Task token: {}\norigin message body: {}'.format(task_token, ori_message_body))
    
     # sts トークンを使用して fnf クライアントを初期化する
     context = environ['fc.context']
     creds = context.credentials
     sts_creds = StsTokenCredential(creds.access_key_id, creds.access_key_secret, creds.security_token)
     fnf_client = AcsClient(credential=sts_creds, region_id=context.region)
    
     # Serverless Workflow にタスクの成功を報告する
     req = ReportTaskSucceededRequest()
     req.set_TaskToken(task_token)
     req.set_Output('{"status": "success"}')
     resp = fnf_client.do_action_with_exception(req)
     print('Report task response: {}'.format(resp))
    
     # HTTP リクエストに応答する
     status = '200 OK'
     response_headers = [('Content-type', 'text/plain')]
     start_response(status, response_headers)
     return [b'OK']

参照

タスクステップを使用して SMQ トピックをオーケストレーションする方法の詳細については、GitHub をご覧ください。