すべてのプロダクト
Search
ドキュメントセンター

CloudFlow:分散モード

最終更新日:Mar 11, 2026

分散モードでは、Map ステートの各反復が独立したサブワークフロー実行として動作し、ビッグデータ処理や並列計算に適した高同時並列処理を実現します。

単一の Map ステートでワークロードを逐次的に処理できない場合は、分散モードを使用してください。分散モードでは、同時実行される複数のサブワークフロー実行(それぞれに独自のライフサイクルを持つ)に処理を分散し、インライン入力または Object Storage Service (OSS) バケットから並列でデータを処理できます。

基本概念

用語定義
分散モードサブワークフロー実行を通じて大量のデータを同時に処理する Map ステートの処理モードです。OSS バケット内のオブジェクトをソースデータとして使用できます。
サブワークフロー実行分散型 Map ステートの 1 回の反復です。定義は Processor フィールドから取得され、各サブワークフローは 1 つのアイテム(または 1 つのバッチ)を入力として受け取ります。サブワークフローは標準モードまたは Express モードで実行できます。詳細については、「標準モードと Express モード」をご参照ください。
フォールトトレランスポリシーサブワークフロー実行が失敗した場合、Map ステートは失敗として終了します。複数のアイテムを含む Map ステートでは、個別のサブワークフロー実行が失敗しても Map ステートが残りのアイテムの処理を継続できるように、フォールトトレランスポリシーを設定できます。

仕組み

  1. Map ステートは入力を受信し、ItemsPath を使用して配列のアイテムを抽出するか、ItemReader を使用して OSS データソースからアイテムを読み取ります。

  2. ItemConstructor が設定されている場合、各アイテムが変換されます。

  3. ItemBatcher が設定されている場合、アイテムがバッチにグループ化されます。

  4. 各アイテムまたはバッチは、Processor で定義されたサブワークフロー実行に渡され、ProcessorConfig で指定されたモードで実行されます。

  5. すべてのサブワークフロー実行が完了すると、Map ステートは結果を JSON 配列に集約するか、ResultWriter を使用して OSS に書き込みます。

重要

ItemsPathItemConstructor、および ItemBatcher がすべて設定されている場合、これらは ItemsPath → ItemConstructor → ItemBatcher の順序で実行されます。

基本例

以下のワークフローは、$Input.Items からアイテムを読み取り、Express モードで Pass ステートを介して各アイテムを処理する分散型 Map ステートを定義しています。

Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
  - Type: Map
    Name: Map
    ProcessorConfig:
      ExecutionMode: Express
    Processor:
      StartAt: Pass
      States:
        - Type: Pass
          Name: Pass
          End: true
    ItemReader:
      SourceType: OSS_CSV
      SourceParameters:
        Bucket: example-bucket
        ObjectName: example-object.csv
    End: true

状態機械の入力:

{
  "Items": [
    {"key_1":"value_1"},
    {"key_2":"value_2"},
    {"key_3":"value_3"}
  ]
}

これにより、以下の定義で 3 つのサブワークフロー実行が作成されます。

Type: StateMachine
Name: Map
SpecVersion: v1
StartAt: Pass
States:
  - Type: Pass
    Name: Pass
    End: true

サブワークフローの入力:

// サブワークフロー 1
{"key_1":"value_1"}

// サブワークフロー 2
{"key_2":"value_2"}

// サブワークフロー 3
{"key_3":"value_3"}

すべてのサブワークフローが完了すると、Map ステートの出力はすべてのサブワークフロー出力の JSON 配列になります。

{
    "Items": [
        {
            "key_1": "value_1"
        },
        {
            "key_2": "value_2"
        },
        {
            "key_3": "value_3"
        }
    ]
}

フィールドリファレンス

分散モードの Map ステートで使用可能なすべてのフィールドを以下に示します。

フィールドタイプ必須説明
Namestringはいステートの名前です。my-state-name
Descriptionstringいいえステートの説明です。ここに記述します
Typestringはいステートのタイプです。Map
InputConstructormap[string]anyいいえ入力コンストラクターです。入力と出力」をご参照ください。
ItemsPathstringはい入力から配列を抽出する式です。ItemsPath」をご参照ください。
ItemBatcherItemBatcherいいえ複数のアイテムをバッチにまとめて、サブワークフローの入力とします。ItemBatcher」をご参照ください。
ItemReaderItemReaderいいえOSS バケットからデータを読み取ります。ItemReader」をご参照ください。
ItemConstructorItemConstructorいいえ元の入力値を参照するために $Item を使用してアイテムを変換します。ItemConstructor」をご参照ください。
ResultWriterResultWriterいいえサブワークフローの結果を指定された OSS バケットに書き込みます。ResultWriter」をご参照ください。
MaxConcurrencyintいいえ同時実行されるサブワークフロー実行の最大数です。40
MaxItemsMaxItemsいいえMap ステートが処理するアイテムの最大数です。MaxItems」をご参照ください。
ToleratedFailurePercentageToleratedFailurePercentageいいえ許容される失敗したサブワークフロー実行の割合です。ToleratedFailurePercentage」をご参照ください。
ToleratedFailureCountToleratedFailureCountいいえ許容される失敗したサブワークフロー実行の数です。ToleratedFailureCount」をご参照ください。
ProcessorProcessorはいサブワークフローを定義する Map プロセッサです。Processor」をご参照ください。
ProcessorConfigProcessorConfigはいプロセッサの構成です。ProcessorConfig」をご参照ください。
OutputConstructormap[string]anyいいえ出力コンストラクターです。OutputConstructor」をご参照ください。
Nextstringいいえこのステートが完了した後に実行される次のステートです。End が true の場合は空欄にしてください。my-next-state
Endboolいいえこのステートが現在のスコープの終端ステートかどうかを示します。true
RetryRetryいいえエラーのリトライポリシーです。エラー処理」をご参照ください。
CatchCatchいいえエラーのキャッチポリシーです。エラー処理」をご参照ください。

フィールドの詳細

ItemsPath

Map ステートの入力から配列を抽出します。配列の各要素が 1 つのサブワークフロー実行の入力となります。$Context または $Input 変数を使用してデータを参照します。

$Input.FieldA

Processor

各アイテムまたはバッチを処理するサブワークフローを定義します。

フィールドタイプ必須説明
Statesarrayはいサブワークフロー内のステートです。下記の例をご参照ください。
StartAtstringはい最初に実行されるステートです。my start task
Processor:
   StartAt: Pass1
   States:
     - Type: Pass
       Name: Pass1
       End: true

ProcessorConfig

サブワークフローの実行モードを指定します。

フィールドタイプ必須説明
ExecutionModestringはいサブワークフローの実行モードです。Express

ItemReader

OSS バケットから入力データを読み取り、より大規模な入力をサポートします。

フィールドタイプ必須説明
SourceTypestringはいデータソースのタイプです。有効な値:OSS_CSVOSS_JSON_LISTOSS_OBJECTSOSS_INVENTORY_FILESOSS_CSV
SourceParametersSourceParametersいいえデータソースの場所パラメーターです。SourceParameters」をご参照ください。
ReaderConfigReaderConfigいいえリーダーの構成です。ReaderConfig」をご参照ください。

SourceParameters

フィールドタイプ必須説明
BucketstringいいえOSS バケット名です。example-bucket
ObjectNamestringいいえオブジェクト名です。object_name_1
Prefixstringいいえオブジェクト名のプレフィックスフィルターです。空欄の場合、一致するすべてのオブジェクトが返されます。example-prefix

ReaderConfig

フィールドタイプ必須説明
CSVHeaders[]stringいいえCSV ファイルの最初の行にある列ヘッダーです。ColA,ColB,ColC

OSS から CSV ファイルを読み取る

OSS バケットに格納された CSV ファイルからデータを読み取ります。この例では、example-object.csvexample-bucket に格納されています。

Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
  - Type: Map
    Name: Map
    ProcessorConfig:
      ExecutionMode: Express
    Processor:
      StartAt: Pass
      States:
        - Type: Pass
          Name: Pass
          End: true
    ItemReader:
      SourceType: OSS_CSV
      SourceParameters:
        Bucket: example-bucket
        ObjectName: example-object.csv
    End: true

CSV ファイルの内容:

ColA,ColB,ColC
col_a_1,col_b_1,col_c_1

各行がサブワークフローの入力になります。

{
  "ColA": "col_a_1",
  "ColB": "col_b_1",
  "ColC": "col_c_1"
}

OSS から JSON 配列を読み取る

重要

JSON ファイルには JSON 配列が含まれている必要があります。

example-object.jsonexample-bucket に格納されている JSON 配列を読み取ります。

Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
  - Type: Map
    Name: Map
    ProcessorConfig:
      ExecutionMode: Express
    Processor:
      StartAt: Pass
      States:
        - Type: Pass
          Name: Pass
          End: true
    ItemReader:
      SourceType: OSS_JSON_LIST
      SourceParameters:
        Bucket: example-bucket
        ObjectName: example-object.json
    End: true

JSON ファイルの内容:

[
  {
    "key_1": "value_1"
  }
]

各要素がサブワークフローの入力になります。

{
  "key_1": "value_1"
}

OSS からオブジェクトを読み取る

特定のプレフィックスを持つオブジェクトを一覧表示し、そのメタデータをサブワークフローの入力として渡します。この例では、example-prefix で始まるオブジェクトを example-bucket から読み取ります。

Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
  - Type: Map
    Name: Map
    ProcessorConfig:
      ExecutionMode: Express
    Processor:
      StartAt: Pass
      States:
        - Type: Pass
          Name: Pass
          End: true
    ItemReader:
      SourceType: OSS_OBJECTS
      SourceParameters:
        Bucket: example-bucket
        Prefix: example-prefix
    End: true

このバケット構造の場合:

example-bucket
   └── example-prefix/object_1

各オブジェクトのメタデータがサブワークフローの入力になります。

{
  "XMLName": {
    "Space": "",
    "Local": "Contents"
  },
  "Key": "example-prefix/object_1",
  "Type": "Normal",
  "Size": 268435,
  "ETag": "\"50B06D6680D86F04138HSN612EF5DEC6\"",
  "Owner": {
    "XMLName": {
      "Space": "",
      "Local": ""
    },
    "ID": "",
    "DisplayName": ""
  },
  "LastModified": "2024-01-01T01:01:01Z",
  "StorageClass": "Standard",
  "RestoreInfo": ""
}

OSS 在庫ファイルを読み取る

OSS 在庫マニフェストからアイテムを読み取ります。この例では、inventory/2024-01-01T01-01Z/manifest.jsonexample-bucket から読み取ります。

Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
  - Type: Map
    Name: Map
    ProcessorConfig:
      ExecutionMode: Express
    Processor:
      StartAt: Pass
      States:
        - Type: Pass
          Name: Pass
          End: true
    ItemReader:
      SourceType: OSS_INVENTORY_FILES
      SourceParameters:
        Bucket: example-bucket
        ObjectName: inventory/2024-01-01T01-01Z/manifest.json
    ItemConstructor:
      Key.$: $Item.Key
    End: true

在庫ファイルの内容:

"example-bucket","object_name_1"
"example-bucket","object_name_2"

最初のサブワークフローの入力:

{
  "Bucket": "example-bucket",
  "Key": "object_name_1"
}

ItemBatcher

複数のアイテムをバッチにグループ化します。各バッチが 1 つのサブワークフロー実行の入力となります。

フィールドタイプ必須説明
MaxItemsPerBatchintいいえ1 バッチあたりの最大アイテム数です。」をご参照ください。
MaxInputBytesPerBatchintいいえ1 バッチあたりの最大バイト数です。単位:バイト。」をご参照ください。
BatchInputmap[string]anyいいえ各バッチの入力に含める追加データです。」をご参照ください。

アイテム数によるバッチ処理

MaxItemsPerBatch を設定して、各バッチに含めるアイテム数を制御します。

Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
  - Type: Map
    Name: Map
    ProcessorConfig:
      ExecutionMode: Express
    ItemsPath: $Input.Items
    Processor:
      StartAt: Pass
      States:
        - Type: Pass
          Name: Pass
          End: true
    ItemBatcher:
      MaxItemsPerBatch: 2
    End: true

状態機械の入力:

{
  "Items": [
    {"key_1":"value_1"},
    {"key_2":"value_2"},
    {"key_3":"value_3"},
    {"key_4":"value_4"},
    {"key_5":"value_5"}
  ]
}

これにより、3 つのサブワークフロー実行が生成されます。

// サブワークフロー 1
{
  "Items": [
    {"key_1":"value_1"},
    {"key_2":"value_2"}
  ]
}

// サブワークフロー 2
{
  "Items": [
    {"key_1":"value_3"},
    {"key_2":"value_4"}
  ]
}

// サブワークフロー 3
{
  "Items": [
    {"key_1":"value_5"}
  ]
}

バイトサイズによるバッチ処理

MaxInputBytesPerBatch を設定して、各バッチのサイズをバイト単位で制限します。

重要
  • ItemBatcher は各バッチにメタデータキーを追加します。バッチの合計サイズには、これらの追加キーも含まれます。

  • MaxInputBytesPerBatch の単位はバイトです。

Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
  - Type: Map
    Name: Map
    ProcessorConfig:
      ExecutionMode: Express
    ItemsPath: $Input.Items
    Processor:
      StartAt: Pass
      States:
        - Type: Pass
          Name: Pass
          End: true
    ItemBatcher:
      MaxInputBytesPerBatch: 50
    End: true

状態機械の入力:

{
  "Items":[
    {"Key":1},
    {"key":2},
    {"Key":3},
    {"Key":4},
    {"Key":5}
  ]
}

これにより、3 つのサブワークフロー実行が生成されます。

// サブワークフロー 1
{
  "Items":[
    {"Key":1},
    {"key":2}
  ]
}

// サブワークフロー 2
{
  "Items":[
    {"Key":3},
    {"key":4}
  ]
}

// サブワークフロー 3
{
  "Items":[
    {"Key":5}
  ]
}

バッチに共有データを追加

BatchInput を使用して、状態機械の入力から追加データをすべてのバッチに含めます。

Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
  - Type: Map
    Name: Map
    ProcessorConfig:
      ExecutionMode: Express
    ItemsPath: $Input.Items
    Processor:
      StartAt: Pass
      States:
        - Type: Pass
          Name: Pass
          End: true
    ItemBatcher:
      MaxInputBytesPerBatch: 70
      BatchInput:
        InputKey.$: $Input.Key
    End: true

状態機械の入力:

{
  "Key":"value",
  "Items":[
    {"Key":1},
    {"key":2},
    {"Key":3},
    {"Key":4},
    {"Key":5}
  ]
}

各サブワークフローは、BatchInput データとバッチ化されたアイテムの両方を受け取ります。

// サブワークフロー 1
{
  "BatchInput":{
    "InputKey":"value"
  },
  "Items":[
    {"Key":1},
    {"key":2}
  ]
}

// サブワークフロー 2
{
  "BatchInput":{
    "InputKey":"value"
  },
  "Items":[
    {"Key":3},
    {"key":4}
  ]
}

// サブワークフロー 3
{
  "BatchInput":{
    "InputKey":"value"
  },
  "Items":[
    {"Key":5}
  ]
}

ItemConstructor

サブワークフロー実行に渡される前に、各アイテムを変換します。$Item を使用して元のアイテムを参照し、$Input を使用して状態機械の入力を参照します。

Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
  - Type: Map
    Name: Map
    ProcessorConfig:
      ExecutionMode: Express
    ItemsPath: $Input.Items
    Processor:
      StartAt: Pass
      States:
        - Type: Pass
          Name: Pass
          End: true
    ItemBatcher:
      MaxInputBytesPerBatch: 200
      BatchInput:
        InputKey.$: $Input.Key
    ItemConstructor:
      ConstructedKey.$: $Item.Key
      InputKey.$: $Input.Key
    End: true

状態機械の入力:

{
  "Key":"value",
  "Items":[
    {"Key":1},
    {"Key":2},
    {"Key":3},
    {"Key":4},
    {"Key":5}
  ]
}

各アイテムはバッチ処理の前に ItemConstructor によって変換されます。

// サブワークフロー 1
{
  "BatchInput": {
    "InputKey": "value"
  },
  "Items": [
    {
      "InputKey": "value",
      "ConstructedKey": 1
    },
    {
      "InputKey": "value",
      "ConstructedKey": 2
    },
    {
      "InputKey": "value",
      "ConstructedKey": 3
    }
  ]
}

// サブワークフロー 2
{
  "BatchInput": {
    "InputKey": "value"
  },
  "Items": [
    {
      "InputKey": "value",
      "ConstructedKey": 4
    },
    {
      "InputKey": "value",
      "ConstructedKey": 5
    }
  ]
}

ResultWriter

サブワークフローの結果を OSS バケットに書き込みます。すべてのサブワークフロー実行の出力を結合した結果が状態出力のサイズ制限を超える可能性がある場合は、ResultWriter を設定してください。

フィールドタイプ必須説明
ParametersParametersはい結果の OSS 送信先です。以下をご参照ください。

Parameters

フィールドタイプ必須説明
Bucketstringはい送信先の OSS バケット。example-bucket
Prefixstringはい結果ファイルのオブジェクト名プレフィックスです。example-prefix/
説明

ステートの入力および出力にはサイズ制限があります。アイテム数が多い Map ステートでは、結合された出力がこの制限を超える可能性があります。その場合は、ResultWriter を設定して結果を OSS に保存してください。

以下の例では、ResultWriter とフォールトトレランスを組み合わせています。Map ステートは最大 30% の失敗を許容し、すべての結果(成功および失敗)を OSS に書き込みます。

Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
  - Type: Map
    Name: Map
    ProcessorConfig:
      ExecutionMode: Express
    ItemsPath: $Input.Items
    ItemConstructor:
      Key.$: $Item.Key
      FailedValue.$: $Input.FailedValue
    ToleratedFailurePercentage: 30
    Processor:
      StartAt: Choice
      States:
        - Type: Choice
          Name: Choice
          Branches:
            - Condition: $Input.Key > $Input.FailedValue
              Next: Fail
          Default: Succeed
        - Type: Succeed
          Name: Succeed
          End: true
        - Type: Fail
          Name: Fail
          Code: MockError
          End: true
    ResultWriter:
      Parameters:
        Bucket: example-bucket
        Prefix: example-prefix/
    End: true

状態機械の入力:

{
  "FailedValue": 4,
  "Items": [
    {"Key": 1},
    {"Key": 2},
    {"Key": 3},
    {"Key": 4},
    {"Key": 5}
  ]
}

結果ファイルの構造

ResultWriter は、指定された OSS の場所に以下の 3 種類のファイルを生成します。

manifest.json (example-prefix/map-run-name/manifest.json):

{
    "DestinationBucket": "example-bucket",
    "MapRunName": "map-run-name",
    "ResultFiles": {
        "FAILED": [
            {
                "ObjectName": "example-prefix/map-run-name/FAILED_0.json",
                "Size": 262
            }
        ],
        "SUCCEED": [
            {
                "ObjectName": "example-prefix/map-run-name/SUCCEED_0.json",
                "Size": 1057
            }
        ]
    }
}

FAILED_0.json (example-prefix/map-run-name/FAILED_0.json):

[
    {
        "ExecutionName": "execution-name-5",
        "FlowName": "example",
        "Input": "{\"FailedValue\":4,\"Key\":5}",
        "Output": "{\"ErrorCode\":\"MockError\"}",
        "Status": "Failed",
        "StartedTime": "rfc3339-format-time-string",
        "StoppedTime": "rfc3339-format-time-string"
    }
]

SUCCEED_0.json (example-prefix/map-run-name/SUCCEED_0.json):

[
    {
        "ExecutionName": "execution-name-1",
        "FlowName": "example",
        "Input": "{\"FailedValue\":4,\"Key\":1}",
        "Output": "{\"FailedValue\":4,\"Key\":1}",
        "Status": "Succeeded",
        "StartedTime": "rfc3339-format-time-string",
        "StoppedTime": "rfc3339-format-time-string"
    },
    {
        "ExecutionName": "execution-name-2",
        "FlowName": "example",
        "Input": "{\"FailedValue\":4,\"Key\":2}",
        "Output": "{\"FailedValue\":4,\"Key\":2}",
        "Status": "Succeeded",
        "StartedTime": "rfc3339-format-time-string",
        "StoppedTime": "rfc3339-format-time-string"
    },
    {
        "ExecutionName": "execution-name-3",
        "FlowName": "example",
        "Input": "{\"FailedValue\":4,\"Key\":3}",
        "Output": "{\"FailedValue\":4,\"Key\":3}",
        "Status": "Succeeded",
        "StartedTime": "rfc3339-format-time-string",
        "StoppedTime": "rfc3339-format-time-string"
    },
    {
        "ExecutionName": "execution-name-4",
        "FlowName": "example",
        "Input": "{\"FailedValue\":4,\"Key\":4}",
        "Output": "{\"FailedValue\":4,\"Key\":4}",
        "Status": "Succeeded",
        "StartedTime": "rfc3339-format-time-string",
        "StoppedTime": "rfc3339-format-time-string"
    }
]

MaxItems

Map ステートが処理するアイテム数の上限を設定します。データソースに MaxItems より多くのアイテムが含まれている場合、最初の MaxItems アイテムのみが処理されます。

たとえば、OSS バケットに 10,000 個のオブジェクトが含まれており、MaxItems が 1,000 に設定されている場合、Map ステートは 1,000 個のオブジェクトのみを処理します。

MaxConcurrency

同時に実行されるサブワークフロー実行の数を制限します。たとえば、Map ステートが 10,000 個のアイテムを処理しており、MaxConcurrency が 100 に設定されている場合、同時に実行されるサブワークフローは最大 100 個です。

ToleratedFailurePercentage

Map ステート自体が失敗する前に許容される、失敗したサブワークフロー実行の割合を設定します。たとえば、10,000 個のアイテムがあり、ToleratedFailurePercentage が 10 に設定されている場合、Map ステートは最大 1,000 件の失敗を許容します。

ToleratedFailureCount

Map ステート自体が失敗する前に許容される、失敗したサブワークフロー実行の数を設定します。たとえば、10,000 個のアイテムがあり、ToleratedFailureCount が 10 に設定されている場合、Map ステートは最大 10 件の失敗を許容します。

クォータ制限

以下の表は、分散モードのデフォルトクォータ制限を示しています。クォータを引き上げるには、してください。チケットを送信してください。

クォータ名説明デフォルト値
MaxOpenMapRunアカウントおよびリージョンごとに同時に実行できる分散型 Map ステートの最大数10
MaxConcurrencyMapRun ごとの同時実行されるサブワークフロー実行の最大数300
MaxItemsMapRun ごとの最大アイテム数10,000

必要な権限

分散モードで Map ステートを実行するには、以下の OSS 権限が必要です。これらの操作をロールポリシーで許可してください。詳細については、「ポリシーの作成」をご参照ください。

{
    "Version": "1",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "oss:HeadObject",
                "oss:GetObjectMeta",
                "oss:GetObject",
                "oss:PutObject",
                "oss:ListObjectsV2",
                "oss:ListObjects",
                "oss:InitiateMultipartUpload",
                "oss:UploadPart",
                "oss:CompleteMultipartUpload",
                "oss:AbortMultipartUpload",
                "oss:ListMultipartUploads",
                "oss:ListParts"
            ],
            "Resource": "*"
        }
    ]
}