すべてのプロダクト
Search
ドキュメントセンター

CloudFlow:分散マルチステップトランザクションを確実に処理する

最終更新日:Jan 12, 2025

このトピックでは、Serverless Workflow を使用して、分散トランザクションが複雑なフローで確実に処理されるようにする方法について説明します。ビジネスロジックに集中できます。

概要

E コマース Web サイト、ホテル予約、航空券予約などの注文管理に関連する複雑なシナリオでは、アプリケーションは複数のリモートサービスにアクセスする必要があり、トランザクションの操作セマンティクスに対する高い要件があります。つまり、すべての手順が中間状態なしで成功または失敗する必要があります。トラフィックが少なく、データストレージが一元化されているアプリケーションでは、リレーショナルデータベースの原子性、一貫性、分離性、耐久性(ACID)プロパティにより、トランザクションが確実に処理されます。ただし、トラフィック量の多いシナリオでは、通常、高可用性とスケーラビリティのために分散マイクロサービスが使用されます。マルチステップトランザクションの信頼性の高い処理を保証するために、サービスプロバイダーは通常、キューと永続メッセージを導入し、フローの状態を分散アーキテクチャに表示する必要があります。これにより、開発と O&M のコストが追加されます。前述の問題を解決するために、Serverless Workflow は、複雑なフローにおける分散トランザクションの信頼性の高い処理を保証します。

シナリオ

アプリケーションが電車のチケット、飛行機、ホテルの予約機能を提供し、トランザクションが 3 つの手順で確実に処理されることを保証するとします。この機能を実装するには、3 つのリモートコールが必要です(たとえば、12306 API を呼び出して電車のチケットを予約する必要があります)。3 つの呼び出しすべてが成功した場合、注文は成功です。ただし、3 つのリモートコールのいずれかが失敗する可能性があります。したがって、アプリケーションは、完了した操作をロールバックするために、さまざまな障害シナリオの補正ロジックを備えている必要があります。次の図は詳細を示しています。

  • BuyTrainTicket が成功したが ReserveFlight が失敗した場合、アプリケーションは CancelTrainTicket を呼び出し、注文が失敗したことをユーザーに通知します。
  • BuyTrainTicket と ReserveFlight の両方が成功したが ReserveHotel が失敗した場合、アプリケーションは CancelFlight と CancelTrainTicket を呼び出し、注文が失敗したことをユーザーに通知します。
longtxn-saga_train_flight_hotel

Serverless Workflow での実装

次の例では、Function Compute にデプロイされた関数が Serverless Workflow のフローに編成され、3 つの手順で信頼性の高いマルチステップの複雑なフローを実装します。

  1. Function Compute で関数を作成する
  2. フローを作成する
  3. フローを実行し、結果を表示する

手順 1:Function Compute で BuyTrainTicket、ReserveFlight、ReserveHotel 操作をシミュレートする関数を作成する

Python 2.7 で関数を作成します。詳細については、「関数をすばやく作成する」をご参照ください。Function Compute のサービスと関数にそれぞれ次の名前を付けることをお勧めします。
  • サービス:fnf-demo
  • 関数:Operation

Operation 関数は、ReserveFlight や ReserveHotel などの操作をシミュレートします。Operation の結果(成功または失敗)は入力によって決まります。

import json
import logging
import uuid

def handler(event, context):
  evt = json.loads(event)
  logger = logging.getLogger()
  id = uuid.uuid4()
  op = "operation"
  if 'operation' in evt:
    op = evt['operation']
    if op in evt:
      result = evt[op]
      if result == False:
        logger.info("%s failed" % op) # 操作が失敗しました
        exit()
  logger.info("%s succeeded, id %s" % (op, id)) # 操作が成功しました、ID
  return '{"%s":"success", "%s_txnID": "%s"}' % (op, op, id)         

手順 2:フローを作成する

Serverless Workflow コンソールで、次の手順を実行してフローを作成します。

  1. フローの [resource Access Management ( RAM ) ユーザー] を構成します。
    {
        "Statement": [
            {
                "Action": "sts:AssumeRole",
                "Effect": "Allow",
                "Principal": {
                    "Service": [
                        "fnf.aliyuncs.com"
                    ]
                }
            }
        ],
        "Version": "1"
    }                               
  2. フローを定義します。
    version: v1
    type: flow
    steps:
      - type: task
        resourceArn: acs:fc:{region}:{accountID}:services/fnf-demo/functions/Operation
        name: BuyTrainTicket
        inputMappings:
        - target: operation
          source: buy_train_ticket
        - target: buy_train_ticket
          source: $input.buy_train_ticket_result
        catch: 
        - errors:
          - FC.Unknown
          goto: OrderFailed # 注文失敗
      - type: task
        resourceArn: acs:fc:{region}:{accountID}:services/fnf-demo/functions/Operation
        name: ReserveFlight
        inputMappings:
        - target: operation
          source: reserve_flight
        - target: reserve_flight
          source: $input.reserve_flight_result
        catch:  # ReserveFlight タスクによってスローされた FC.Unknown エラーがキャプチャされると、Serverless Workflow は CancelTrainTicket タスクにジャンプします。
        - errors:
          - FC.Unknown
          goto: CancelTrainTicket
      - type: task
        resourceArn: acs:fc:{region}:{accountID}:services/fnf-demo/functions/Operation
        name: ReserveHotel
        inputMappings:
        - target: operation
          source: reserve_hotel
        - target: reserve_hotel
          source: $input.reserve_hotel_result
        retry:  # Serverless Workflow は、FC.Unknown エラーが発生した場合、指数バックオフモードでタスクステップを最大 3 回再試行します。最初の再試行間隔は 1 秒で、残りの再試行では次の再試行間隔は前の再試行間隔の 2 倍になります。
        - errors:
          - FC.Unknown
          intervalSeconds: 1
          maxAttempts: 3
          multiplier: 2
        catch:  # ReserveHotel タスクによってスローされた FC.Unknown エラーがキャプチャされると、Serverless Workflow は CancelFlight タスクにジャンプします。
          - errors:
            - FC.Unknown
            goto: CancelFlight
      - type: succeed
        name: OrderSucceeded # 注文成功
      - type: task
        resourceArn: acs:fc:{region}:{accountID}:services/fnf-demo/functions/Operation
        name: CancelFlight
        inputMappings:
        - target: operation
          source: cancel_flight
        - target: reserve_flight_txnID
          source: $local.reserve_flight_txnID
      - type: task
        resourceArn: acs:fc:{region}:{accountID}:services/fnf-demo/functions/Operation
        name: CancelTrainTicket
        inputMappings:
        - target: operation
          source: cancel_train_ticket
        - target: reserve_flight_txnID
          source: $local.reserve_flight_txnID
      - type: fail
        name: OrderFailed # 注文失敗                             

手順 3:フローを実行し、結果を表示する

コンソールで作成したフローを実行します。 [StartExecution] 操作の入力は JSON 形式である必要があります。次の JSON オブジェクトは、各ステップの成功または失敗をシミュレートできます。たとえば、"reserve_hotel_result":"fail" はホテルの予約に失敗したことを示します。 [StartExecution] は非同期操作です。操作が呼び出されると、Serverless Workflow はフロー実行ステータスを照会するための実行名を返します。

{
  "buy_train_ticket_result":"success",
  "reserve_flight_result":"success",
  "reserve_hotel_result":"fail"
}                       

フロー実行の開始後、[サーバーレス ワークフロー] コンソールで、ターゲット実行名をクリックします。表示されるページの [定義とビジュアル ワークフロー] セクションで、実行プロセスと結果を確認します。 次の図に示すように、"reserve_hotel_result":"fail" が原因で、ReserveHotel が失敗し、[サーバーレス ワークフロー] はフロー定義に基づいて CancelFlight と CancelTrainTicket を順番に呼び出します。[サーバーレス ワークフロー] では、各ステップは永続的です。このように、ネットワークの中断や予期しないプロセスの終了などの障害は、フロー内のトランザクションに影響を与えません。

Screen Shot 2019-06-26 at 12.14.50 PM

各フローの実行に対して実行イベントが生成されます。コンソールで、または SDK またはコマンドラインインターフェース(CLI)を使用して、GetExecutionHistory 操作を呼び出して、実行イベントを照会できます。

Screen Shot 2019-06-26 at 12.17.26 PM

エラー処理と再試行

  1. 前の例では、ReserveFlight と ReserveHotel のリモートコールは、ネットワークまたはサービスエラーが原因で失敗します。一時的なエラーが発生した場合に再試行すると、注文フローの成功率が向上する可能性があります。Serverless Workflow はタスクステップを自動的に再試行します。たとえば、次のコードに基づいて ReserveHotel ステップを定義して、FC.Unknown がキャプチャされた後に指数バックオフモードでステップを再試行します。最大再試行回数後に サーバーレス ワークフローReserveHotel がまだ失敗する場合は、ステップの catch 定義に基づいて、Serverless Workflow は CancelFlight
      - type: task
        resourceArn: acs:fc:{region}:{accountID}:services/fnf-demo/functions/Operation
        name: ReserveHotel
        inputMappings:
        - target: operation
          source: reserve_hotel
        retry:  # サーバーレスワークフローは、FC.Unknown エラーが発生した場合、タスクステップを指数バックオフモードで最大 3 回再試行します。最初の再試行間隔は 1 秒で、残りの再試行では次の再試行間隔が前の再試行間隔の 2 倍になります。
        - errors:
          - FC.Unknown
          intervalSeconds: 1
          maxAttempts: 3
          multiplier: 2
        catch:  # ReserveHotel タスクによってスローされた FC.Unknown エラーがキャプチャされると、サーバーレスワークフローは CancelFlight タスクにジャンプします。
          - errors:
            - FC.Unknown
            goto: CancelFlight           
    関数によってスローされた FC.Unknown エラーをキャプチャし、 操作にジャンプして、定義された補正ロジックを実装します。
  2. 次の図は、retry パラメーターが定義された後、ReserveHotel タスクステップが指定された最大回数だけ再試行されることを示しています。Screen Shot 2019-06-26 at 12.19.55 PM

ステップ間のデータ転送

  1. ReserveHotel が失敗した後、CancelFlight と CancelTrainTicket が呼び出されます。これら 2 つのタスクをキャンセルするには、ReserveFlight と BuyTrainTicket によって返されたトランザクション ID(txnID)が必要です。次のセクションでは、inputMapping オブジェクトを使用して、前のステップの出力を CancelFlight ステップに渡す方法について説明します。
      - type: task
        resourceArn: acs:fc:{region}:{accountID}:services/fnf-demo/functions/Operation
        name: CancelFlight
        inputMappings:
        - target: operation
          source: cancel_flight
        - target: reserve_flight_txnID
          source: $local.reserve_flight_txnID
                        
  2. フローの各ステップの出力は、StepExited イベントの EventDetail の local オブジェクトに格納されます。
      {  
         "input":{
            "operation":"reserve_hotel",
            "reserve_hotel_result":"fail"
         },
         "local":{
            "buy_train_ticket":"success",
            "buy_train_ticket_txnID":"d37412b3-bb68-4d04-9d90-c8c15643d45e",
            "reserve_flight_result":"success",
            "reserve_flight_txnID":"024caecf-cfa3-43a6-b561-9b6fe0571b55"
         },
         "resourceArn":"acs:fc:{region}:{accountID}:services/fnf-demo/functions/Operation",
         "cause":"{\"errorMessage\":\"Process exited unexpectedly before completing request (duration: 12ms, maxMemoryUsage: 9.18MB)\"}",
         "error":"FC.Unknown",
         "retryCount":3,
         "goto":"CancelFlight"
      }         
  3. EventDetailinputMappings に基づいて、CancelFlight ステップの入力は次の JSON オブジェクトに変換されます。このようにして、CancelFlight 関数の入力には reserve_flight_txnID フィールドが含まれます。
      "input":{
        "operation":"cancel_flight",
        "reserve_flight_txnID":"024caecf-cfa3-43a6-b561-9b6fe0571b55"
      }