分散モードでは、Map ステートの各反復が独立したサブワークフロー実行として動作し、ビッグデータ処理や並列計算に適した高同時並列処理を実現します。
単一の Map ステートでワークロードを逐次的に処理できない場合は、分散モードを使用してください。分散モードでは、同時実行される複数のサブワークフロー実行(それぞれに独自のライフサイクルを持つ)に処理を分散し、インライン入力または Object Storage Service (OSS) バケットから並列でデータを処理できます。
基本概念
| 用語 | 定義 |
|---|---|
| 分散モード | サブワークフロー実行を通じて大量のデータを同時に処理する Map ステートの処理モードです。OSS バケット内のオブジェクトをソースデータとして使用できます。 |
| サブワークフロー実行 | 分散型 Map ステートの 1 回の反復です。定義は Processor フィールドから取得され、各サブワークフローは 1 つのアイテム(または 1 つのバッチ)を入力として受け取ります。サブワークフローは標準モードまたは Express モードで実行できます。詳細については、「標準モードと Express モード」をご参照ください。 |
| フォールトトレランスポリシー | サブワークフロー実行が失敗した場合、Map ステートは失敗として終了します。複数のアイテムを含む Map ステートでは、個別のサブワークフロー実行が失敗しても Map ステートが残りのアイテムの処理を継続できるように、フォールトトレランスポリシーを設定できます。 |
仕組み
Map ステートは入力を受信し、
ItemsPathを使用して配列のアイテムを抽出するか、ItemReaderを使用して OSS データソースからアイテムを読み取ります。ItemConstructorが設定されている場合、各アイテムが変換されます。ItemBatcherが設定されている場合、アイテムがバッチにグループ化されます。各アイテムまたはバッチは、
Processorで定義されたサブワークフロー実行に渡され、ProcessorConfigで指定されたモードで実行されます。すべてのサブワークフロー実行が完了すると、Map ステートは結果を JSON 配列に集約するか、
ResultWriterを使用して OSS に書き込みます。
ItemsPath、ItemConstructor、および ItemBatcher がすべて設定されている場合、これらは ItemsPath → ItemConstructor → ItemBatcher の順序で実行されます。
基本例
以下のワークフローは、$Input.Items からアイテムを読み取り、Express モードで Pass ステートを介して各アイテムを処理する分散型 Map ステートを定義しています。
Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
- Type: Map
Name: Map
ProcessorConfig:
ExecutionMode: Express
Processor:
StartAt: Pass
States:
- Type: Pass
Name: Pass
End: true
ItemReader:
SourceType: OSS_CSV
SourceParameters:
Bucket: example-bucket
ObjectName: example-object.csv
End: true状態機械の入力:
{
"Items": [
{"key_1":"value_1"},
{"key_2":"value_2"},
{"key_3":"value_3"}
]
}これにより、以下の定義で 3 つのサブワークフロー実行が作成されます。
Type: StateMachine
Name: Map
SpecVersion: v1
StartAt: Pass
States:
- Type: Pass
Name: Pass
End: trueサブワークフローの入力:
// サブワークフロー 1
{"key_1":"value_1"}
// サブワークフロー 2
{"key_2":"value_2"}
// サブワークフロー 3
{"key_3":"value_3"}すべてのサブワークフローが完了すると、Map ステートの出力はすべてのサブワークフロー出力の JSON 配列になります。
{
"Items": [
{
"key_1": "value_1"
},
{
"key_2": "value_2"
},
{
"key_3": "value_3"
}
]
}フィールドリファレンス
分散モードの Map ステートで使用可能なすべてのフィールドを以下に示します。
| フィールド | タイプ | 必須 | 説明 | 例 |
|---|---|---|---|---|
| Name | string | はい | ステートの名前です。 | my-state-name |
| Description | string | いいえ | ステートの説明です。 | ここに記述します |
| Type | string | はい | ステートのタイプです。 | Map |
| InputConstructor | map[string]any | いいえ | 入力コンストラクターです。 | 「入力と出力」をご参照ください。 |
| ItemsPath | string | はい | 入力から配列を抽出する式です。 | 「ItemsPath」をご参照ください。 |
| ItemBatcher | ItemBatcher | いいえ | 複数のアイテムをバッチにまとめて、サブワークフローの入力とします。 | 「ItemBatcher」をご参照ください。 |
| ItemReader | ItemReader | いいえ | OSS バケットからデータを読み取ります。 | 「ItemReader」をご参照ください。 |
| ItemConstructor | ItemConstructor | いいえ | 元の入力値を参照するために $Item を使用してアイテムを変換します。 | 「ItemConstructor」をご参照ください。 |
| ResultWriter | ResultWriter | いいえ | サブワークフローの結果を指定された OSS バケットに書き込みます。 | 「ResultWriter」をご参照ください。 |
| MaxConcurrency | int | いいえ | 同時実行されるサブワークフロー実行の最大数です。 | 40 |
| MaxItems | MaxItems | いいえ | Map ステートが処理するアイテムの最大数です。 | 「MaxItems」をご参照ください。 |
| ToleratedFailurePercentage | ToleratedFailurePercentage | いいえ | 許容される失敗したサブワークフロー実行の割合です。 | 「ToleratedFailurePercentage」をご参照ください。 |
| ToleratedFailureCount | ToleratedFailureCount | いいえ | 許容される失敗したサブワークフロー実行の数です。 | 「ToleratedFailureCount」をご参照ください。 |
| Processor | Processor | はい | サブワークフローを定義する Map プロセッサです。 | 「Processor」をご参照ください。 |
| ProcessorConfig | ProcessorConfig | はい | プロセッサの構成です。 | 「ProcessorConfig」をご参照ください。 |
| OutputConstructor | map[string]any | いいえ | 出力コンストラクターです。 | 「OutputConstructor」をご参照ください。 |
| Next | string | いいえ | このステートが完了した後に実行される次のステートです。End が true の場合は空欄にしてください。 | my-next-state |
| End | bool | いいえ | このステートが現在のスコープの終端ステートかどうかを示します。 | true |
| Retry | Retry | いいえ | エラーのリトライポリシーです。 | 「エラー処理」をご参照ください。 |
| Catch | Catch | いいえ | エラーのキャッチポリシーです。 | 「エラー処理」をご参照ください。 |
フィールドの詳細
ItemsPath
Map ステートの入力から配列を抽出します。配列の各要素が 1 つのサブワークフロー実行の入力となります。$Context または $Input 変数を使用してデータを参照します。
$Input.FieldAProcessor
各アイテムまたはバッチを処理するサブワークフローを定義します。
| フィールド | タイプ | 必須 | 説明 | 例 |
|---|---|---|---|---|
| States | array | はい | サブワークフロー内のステートです。 | 下記の例をご参照ください。 |
| StartAt | string | はい | 最初に実行されるステートです。 | my start task |
Processor:
StartAt: Pass1
States:
- Type: Pass
Name: Pass1
End: trueProcessorConfig
サブワークフローの実行モードを指定します。
| フィールド | タイプ | 必須 | 説明 | 例 |
|---|---|---|---|---|
| ExecutionMode | string | はい | サブワークフローの実行モードです。 | Express |
ItemReader
OSS バケットから入力データを読み取り、より大規模な入力をサポートします。
| フィールド | タイプ | 必須 | 説明 | 例 |
|---|---|---|---|---|
| SourceType | string | はい | データソースのタイプです。有効な値:OSS_CSV、OSS_JSON_LIST、OSS_OBJECTS、OSS_INVENTORY_FILES。 | OSS_CSV |
| SourceParameters | SourceParameters | いいえ | データソースの場所パラメーターです。 | 「SourceParameters」をご参照ください。 |
| ReaderConfig | ReaderConfig | いいえ | リーダーの構成です。 | 「ReaderConfig」をご参照ください。 |
SourceParameters
| フィールド | タイプ | 必須 | 説明 | 例 |
|---|---|---|---|---|
| Bucket | string | いいえ | OSS バケット名です。 | example-bucket |
| ObjectName | string | いいえ | オブジェクト名です。 | object_name_1 |
| Prefix | string | いいえ | オブジェクト名のプレフィックスフィルターです。空欄の場合、一致するすべてのオブジェクトが返されます。 | example-prefix |
ReaderConfig
| フィールド | タイプ | 必須 | 説明 | 例 |
|---|---|---|---|---|
| CSVHeaders | []string | いいえ | CSV ファイルの最初の行にある列ヘッダーです。 | ColA,ColB,ColC |
OSS から CSV ファイルを読み取る
OSS バケットに格納された CSV ファイルからデータを読み取ります。この例では、example-object.csv が example-bucket に格納されています。
Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
- Type: Map
Name: Map
ProcessorConfig:
ExecutionMode: Express
Processor:
StartAt: Pass
States:
- Type: Pass
Name: Pass
End: true
ItemReader:
SourceType: OSS_CSV
SourceParameters:
Bucket: example-bucket
ObjectName: example-object.csv
End: trueCSV ファイルの内容:
ColA,ColB,ColC
col_a_1,col_b_1,col_c_1各行がサブワークフローの入力になります。
{
"ColA": "col_a_1",
"ColB": "col_b_1",
"ColC": "col_c_1"
}OSS から JSON 配列を読み取る
JSON ファイルには JSON 配列が含まれている必要があります。
example-object.json が example-bucket に格納されている JSON 配列を読み取ります。
Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
- Type: Map
Name: Map
ProcessorConfig:
ExecutionMode: Express
Processor:
StartAt: Pass
States:
- Type: Pass
Name: Pass
End: true
ItemReader:
SourceType: OSS_JSON_LIST
SourceParameters:
Bucket: example-bucket
ObjectName: example-object.json
End: trueJSON ファイルの内容:
[
{
"key_1": "value_1"
}
]各要素がサブワークフローの入力になります。
{
"key_1": "value_1"
}OSS からオブジェクトを読み取る
特定のプレフィックスを持つオブジェクトを一覧表示し、そのメタデータをサブワークフローの入力として渡します。この例では、example-prefix で始まるオブジェクトを example-bucket から読み取ります。
Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
- Type: Map
Name: Map
ProcessorConfig:
ExecutionMode: Express
Processor:
StartAt: Pass
States:
- Type: Pass
Name: Pass
End: true
ItemReader:
SourceType: OSS_OBJECTS
SourceParameters:
Bucket: example-bucket
Prefix: example-prefix
End: trueこのバケット構造の場合:
example-bucket
└── example-prefix/object_1各オブジェクトのメタデータがサブワークフローの入力になります。
{
"XMLName": {
"Space": "",
"Local": "Contents"
},
"Key": "example-prefix/object_1",
"Type": "Normal",
"Size": 268435,
"ETag": "\"50B06D6680D86F04138HSN612EF5DEC6\"",
"Owner": {
"XMLName": {
"Space": "",
"Local": ""
},
"ID": "",
"DisplayName": ""
},
"LastModified": "2024-01-01T01:01:01Z",
"StorageClass": "Standard",
"RestoreInfo": ""
}OSS 在庫ファイルを読み取る
OSS 在庫マニフェストからアイテムを読み取ります。この例では、inventory/2024-01-01T01-01Z/manifest.json を example-bucket から読み取ります。
Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
- Type: Map
Name: Map
ProcessorConfig:
ExecutionMode: Express
Processor:
StartAt: Pass
States:
- Type: Pass
Name: Pass
End: true
ItemReader:
SourceType: OSS_INVENTORY_FILES
SourceParameters:
Bucket: example-bucket
ObjectName: inventory/2024-01-01T01-01Z/manifest.json
ItemConstructor:
Key.$: $Item.Key
End: true在庫ファイルの内容:
"example-bucket","object_name_1"
"example-bucket","object_name_2"最初のサブワークフローの入力:
{
"Bucket": "example-bucket",
"Key": "object_name_1"
}ItemBatcher
複数のアイテムをバッチにグループ化します。各バッチが 1 つのサブワークフロー実行の入力となります。
| フィールド | タイプ | 必須 | 説明 | 例 |
|---|---|---|---|---|
| MaxItemsPerBatch | int | いいえ | 1 バッチあたりの最大アイテム数です。 | 「例」をご参照ください。 |
| MaxInputBytesPerBatch | int | いいえ | 1 バッチあたりの最大バイト数です。単位:バイト。 | 「例」をご参照ください。 |
| BatchInput | map[string]any | いいえ | 各バッチの入力に含める追加データです。 | 「例」をご参照ください。 |
アイテム数によるバッチ処理
MaxItemsPerBatch を設定して、各バッチに含めるアイテム数を制御します。
Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
- Type: Map
Name: Map
ProcessorConfig:
ExecutionMode: Express
ItemsPath: $Input.Items
Processor:
StartAt: Pass
States:
- Type: Pass
Name: Pass
End: true
ItemBatcher:
MaxItemsPerBatch: 2
End: true状態機械の入力:
{
"Items": [
{"key_1":"value_1"},
{"key_2":"value_2"},
{"key_3":"value_3"},
{"key_4":"value_4"},
{"key_5":"value_5"}
]
}これにより、3 つのサブワークフロー実行が生成されます。
// サブワークフロー 1
{
"Items": [
{"key_1":"value_1"},
{"key_2":"value_2"}
]
}
// サブワークフロー 2
{
"Items": [
{"key_1":"value_3"},
{"key_2":"value_4"}
]
}
// サブワークフロー 3
{
"Items": [
{"key_1":"value_5"}
]
}バイトサイズによるバッチ処理
MaxInputBytesPerBatch を設定して、各バッチのサイズをバイト単位で制限します。
ItemBatcher は各バッチにメタデータキーを追加します。バッチの合計サイズには、これらの追加キーも含まれます。
MaxInputBytesPerBatchの単位はバイトです。
Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
- Type: Map
Name: Map
ProcessorConfig:
ExecutionMode: Express
ItemsPath: $Input.Items
Processor:
StartAt: Pass
States:
- Type: Pass
Name: Pass
End: true
ItemBatcher:
MaxInputBytesPerBatch: 50
End: true状態機械の入力:
{
"Items":[
{"Key":1},
{"key":2},
{"Key":3},
{"Key":4},
{"Key":5}
]
}これにより、3 つのサブワークフロー実行が生成されます。
// サブワークフロー 1
{
"Items":[
{"Key":1},
{"key":2}
]
}
// サブワークフロー 2
{
"Items":[
{"Key":3},
{"key":4}
]
}
// サブワークフロー 3
{
"Items":[
{"Key":5}
]
}バッチに共有データを追加
BatchInput を使用して、状態機械の入力から追加データをすべてのバッチに含めます。
Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
- Type: Map
Name: Map
ProcessorConfig:
ExecutionMode: Express
ItemsPath: $Input.Items
Processor:
StartAt: Pass
States:
- Type: Pass
Name: Pass
End: true
ItemBatcher:
MaxInputBytesPerBatch: 70
BatchInput:
InputKey.$: $Input.Key
End: true状態機械の入力:
{
"Key":"value",
"Items":[
{"Key":1},
{"key":2},
{"Key":3},
{"Key":4},
{"Key":5}
]
}各サブワークフローは、BatchInput データとバッチ化されたアイテムの両方を受け取ります。
// サブワークフロー 1
{
"BatchInput":{
"InputKey":"value"
},
"Items":[
{"Key":1},
{"key":2}
]
}
// サブワークフロー 2
{
"BatchInput":{
"InputKey":"value"
},
"Items":[
{"Key":3},
{"key":4}
]
}
// サブワークフロー 3
{
"BatchInput":{
"InputKey":"value"
},
"Items":[
{"Key":5}
]
}ItemConstructor
サブワークフロー実行に渡される前に、各アイテムを変換します。$Item を使用して元のアイテムを参照し、$Input を使用して状態機械の入力を参照します。
Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
- Type: Map
Name: Map
ProcessorConfig:
ExecutionMode: Express
ItemsPath: $Input.Items
Processor:
StartAt: Pass
States:
- Type: Pass
Name: Pass
End: true
ItemBatcher:
MaxInputBytesPerBatch: 200
BatchInput:
InputKey.$: $Input.Key
ItemConstructor:
ConstructedKey.$: $Item.Key
InputKey.$: $Input.Key
End: true状態機械の入力:
{
"Key":"value",
"Items":[
{"Key":1},
{"Key":2},
{"Key":3},
{"Key":4},
{"Key":5}
]
}各アイテムはバッチ処理の前に ItemConstructor によって変換されます。
// サブワークフロー 1
{
"BatchInput": {
"InputKey": "value"
},
"Items": [
{
"InputKey": "value",
"ConstructedKey": 1
},
{
"InputKey": "value",
"ConstructedKey": 2
},
{
"InputKey": "value",
"ConstructedKey": 3
}
]
}
// サブワークフロー 2
{
"BatchInput": {
"InputKey": "value"
},
"Items": [
{
"InputKey": "value",
"ConstructedKey": 4
},
{
"InputKey": "value",
"ConstructedKey": 5
}
]
}ResultWriter
サブワークフローの結果を OSS バケットに書き込みます。すべてのサブワークフロー実行の出力を結合した結果が状態出力のサイズ制限を超える可能性がある場合は、ResultWriter を設定してください。
| フィールド | タイプ | 必須 | 説明 | 例 |
|---|---|---|---|---|
| Parameters | Parameters | はい | 結果の OSS 送信先です。 | 以下をご参照ください。 |
Parameters
| フィールド | タイプ | 必須 | 説明 | 例 |
|---|---|---|---|---|
| Bucket | string | はい | 送信先の OSS バケット。 | example-bucket |
| Prefix | string | はい | 結果ファイルのオブジェクト名プレフィックスです。 | example-prefix/ |
ステートの入力および出力にはサイズ制限があります。アイテム数が多い Map ステートでは、結合された出力がこの制限を超える可能性があります。その場合は、ResultWriter を設定して結果を OSS に保存してください。
以下の例では、ResultWriter とフォールトトレランスを組み合わせています。Map ステートは最大 30% の失敗を許容し、すべての結果(成功および失敗)を OSS に書き込みます。
Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
- Type: Map
Name: Map
ProcessorConfig:
ExecutionMode: Express
ItemsPath: $Input.Items
ItemConstructor:
Key.$: $Item.Key
FailedValue.$: $Input.FailedValue
ToleratedFailurePercentage: 30
Processor:
StartAt: Choice
States:
- Type: Choice
Name: Choice
Branches:
- Condition: $Input.Key > $Input.FailedValue
Next: Fail
Default: Succeed
- Type: Succeed
Name: Succeed
End: true
- Type: Fail
Name: Fail
Code: MockError
End: true
ResultWriter:
Parameters:
Bucket: example-bucket
Prefix: example-prefix/
End: true状態機械の入力:
{
"FailedValue": 4,
"Items": [
{"Key": 1},
{"Key": 2},
{"Key": 3},
{"Key": 4},
{"Key": 5}
]
}結果ファイルの構造
ResultWriter は、指定された OSS の場所に以下の 3 種類のファイルを生成します。
manifest.json (example-prefix/map-run-name/manifest.json):
{
"DestinationBucket": "example-bucket",
"MapRunName": "map-run-name",
"ResultFiles": {
"FAILED": [
{
"ObjectName": "example-prefix/map-run-name/FAILED_0.json",
"Size": 262
}
],
"SUCCEED": [
{
"ObjectName": "example-prefix/map-run-name/SUCCEED_0.json",
"Size": 1057
}
]
}
}FAILED_0.json (example-prefix/map-run-name/FAILED_0.json):
[
{
"ExecutionName": "execution-name-5",
"FlowName": "example",
"Input": "{\"FailedValue\":4,\"Key\":5}",
"Output": "{\"ErrorCode\":\"MockError\"}",
"Status": "Failed",
"StartedTime": "rfc3339-format-time-string",
"StoppedTime": "rfc3339-format-time-string"
}
]SUCCEED_0.json (example-prefix/map-run-name/SUCCEED_0.json):
[
{
"ExecutionName": "execution-name-1",
"FlowName": "example",
"Input": "{\"FailedValue\":4,\"Key\":1}",
"Output": "{\"FailedValue\":4,\"Key\":1}",
"Status": "Succeeded",
"StartedTime": "rfc3339-format-time-string",
"StoppedTime": "rfc3339-format-time-string"
},
{
"ExecutionName": "execution-name-2",
"FlowName": "example",
"Input": "{\"FailedValue\":4,\"Key\":2}",
"Output": "{\"FailedValue\":4,\"Key\":2}",
"Status": "Succeeded",
"StartedTime": "rfc3339-format-time-string",
"StoppedTime": "rfc3339-format-time-string"
},
{
"ExecutionName": "execution-name-3",
"FlowName": "example",
"Input": "{\"FailedValue\":4,\"Key\":3}",
"Output": "{\"FailedValue\":4,\"Key\":3}",
"Status": "Succeeded",
"StartedTime": "rfc3339-format-time-string",
"StoppedTime": "rfc3339-format-time-string"
},
{
"ExecutionName": "execution-name-4",
"FlowName": "example",
"Input": "{\"FailedValue\":4,\"Key\":4}",
"Output": "{\"FailedValue\":4,\"Key\":4}",
"Status": "Succeeded",
"StartedTime": "rfc3339-format-time-string",
"StoppedTime": "rfc3339-format-time-string"
}
]MaxItems
Map ステートが処理するアイテム数の上限を設定します。データソースに MaxItems より多くのアイテムが含まれている場合、最初の MaxItems アイテムのみが処理されます。
たとえば、OSS バケットに 10,000 個のオブジェクトが含まれており、MaxItems が 1,000 に設定されている場合、Map ステートは 1,000 個のオブジェクトのみを処理します。
MaxConcurrency
同時に実行されるサブワークフロー実行の数を制限します。たとえば、Map ステートが 10,000 個のアイテムを処理しており、MaxConcurrency が 100 に設定されている場合、同時に実行されるサブワークフローは最大 100 個です。
ToleratedFailurePercentage
Map ステート自体が失敗する前に許容される、失敗したサブワークフロー実行の割合を設定します。たとえば、10,000 個のアイテムがあり、ToleratedFailurePercentage が 10 に設定されている場合、Map ステートは最大 1,000 件の失敗を許容します。
ToleratedFailureCount
Map ステート自体が失敗する前に許容される、失敗したサブワークフロー実行の数を設定します。たとえば、10,000 個のアイテムがあり、ToleratedFailureCount が 10 に設定されている場合、Map ステートは最大 10 件の失敗を許容します。
クォータ制限
以下の表は、分散モードのデフォルトクォータ制限を示しています。クォータを引き上げるには、してください。チケットを送信してください。
| クォータ名 | 説明 | デフォルト値 |
|---|---|---|
| MaxOpenMapRun | アカウントおよびリージョンごとに同時に実行できる分散型 Map ステートの最大数 | 10 |
| MaxConcurrency | MapRun ごとの同時実行されるサブワークフロー実行の最大数 | 300 |
| MaxItems | MapRun ごとの最大アイテム数 | 10,000 |
必要な権限
分散モードで Map ステートを実行するには、以下の OSS 権限が必要です。これらの操作をロールポリシーで許可してください。詳細については、「ポリシーの作成」をご参照ください。
{
"Version": "1",
"Statement": [
{
"Effect": "Allow",
"Action": [
"oss:HeadObject",
"oss:GetObjectMeta",
"oss:GetObject",
"oss:PutObject",
"oss:ListObjectsV2",
"oss:ListObjects",
"oss:InitiateMultipartUpload",
"oss:UploadPart",
"oss:CompleteMultipartUpload",
"oss:AbortMultipartUpload",
"oss:ListMultipartUploads",
"oss:ListParts"
],
"Resource": "*"
}
]
}