このトピックでは、待機コールバックモードを使用するタスクステップで Simple Message Queue (formerly MNS) (SMQ) トピックを統合し、トピックにメッセージをパブリッシュする方法について説明します。 SMQ トピックがメッセージを受信した後、ReportTaskSucceeded または ReportTaskFailed オペレーションが呼び出され、タスクステータスがコールバックされます。
タスクステップのしくみ
アプリケーションがデプロイされると、アプリケーションは次の手順に基づいて実行されます。
フローを実行します。タスクステップは SMQ トピックにメッセージをパブリッシュします。タスクステップの
task token
がメッセージ本文に配置され、トピックに送信されます。フローのタスクステップは一時停止し、タスクコールバックを待機します。
SMQ トピックがメッセージを受信すると、メッセージと
task token
は、HTTP 経由で Function Compute の関数の HTTP トリガーにプッシュされ、実行がトリガーされます。詳細については、「HTTP」をご参照ください。Function Compute の関数は
task token
を取得し、ReportTaskSucceeded オペレーションを呼び出してタスクステータスを報告します。フローは続行されます。
アプリケーションのデプロイ
Serverless Workflow コンソールにログインします。
[フロー] ページで、[フローの作成] をクリックします。
[フローの作成] ページで、[サンプルプロジェクト] と [タスク SMQ トピック] を選択し、[次のステップ] をクリックします。
[アプリケーションの作成] ページで、テンプレートに基づいてアプリケーションを作成し、[デプロイ] をクリックします。
アプリケーション名: アプリケーションの名前を指定します。名前は同じアカウント内で一意である必要があります。
TopicName: トピックの名前を指定します。指定された SMQ トピックが存在しない場合、システムは自動的に作成します。
[デプロイ] をクリックすると、アプリケーションで作成したすべてのリソースが表示されます。
フローを実行します。
次のコードを実行します。
{ "messageBody": "hello world" // メッセージ本文 }
実行が成功すると、実行結果のステータスを表示できます。
アプリケーションコード
SMQ トピックのフローをオーケストレーションします。
タスクステップでコールバックされた
task token
を、後続のコールバックのためにメッセージのmessage body
にカプセル化します。output
で指定された ReportTaskSucceeded をoutputMappings
から読み取ります。version: v1 type: flow steps: - type: task name: mns-topic-task resourceArn: acs:mns:::/topics/<topic>/messages pattern: waitForCallback inputMappings: - target: messageBody source: $input.messageBody - target: taskToken source: $context.task.token outputMappings: - target: status source: $local.status serviceParams: MessageBody: $
Function Compute にデプロイされているタスクステップの関数をコールバックします。
message body
にカプセル化されているtask token
を読み取り、コールバックタスクステータスのoutput
を{"status":"success"}
に設定します。def handler(environ, start_response): # リクエストボディを取得する try: request_body_size = int(environ.get('CONTENT_LENGTH', 0)) except ValueError: request_body_size = 0 request_body = environ['wsgi.input'].read(request_body_size) print('Request body: {}'.format(request_body)) body = json.loads(request_body) message_body_str = body['Message'] # メッセージボディから MessageBody と TaskToken を読み取る message_body = json.loads(message_body_str) task_token = message_body['taskToken'] ori_message_body = message_body['messageBody'] print('Task token: {}\norigin message body: {}'.format(task_token, ori_message_body)) # sts トークンを使用して fnf クライアントを初期化する context = environ['fc.context'] creds = context.credentials sts_creds = StsTokenCredential(creds.access_key_id, creds.access_key_secret, creds.security_token) fnf_client = AcsClient(credential=sts_creds, region_id=context.region) # Serverless Workflow にタスクの成功を報告する req = ReportTaskSucceededRequest() req.set_TaskToken(task_token) req.set_Output('{"status": "success"}') resp = fnf_client.do_action_with_exception(req) print('Report task response: {}'.format(resp)) # HTTP リクエストに応答する status = '200 OK' response_headers = [('Content-type', 'text/plain')] start_response(status, response_headers) return [b'OK']
参照
タスクステップを使用して SMQ トピックをオーケストレーションする方法の詳細については、GitHub をご覧ください。