Distributed mode runs each iteration of a Map state as an independent sub-workflow execution, enabling high-concurrency parallel processing suitable for big data processing and parallel computing.
Use distributed mode when a single Map state cannot handle your workload sequentially. Distributed mode fans out work across concurrent sub-workflow executions -- each with its own lifecycle -- and can process data from inline input or Object Storage Service (OSS) buckets in parallel.
Key concepts
| Term | Definition |
|---|---|
| Distributed mode | A Map state processing mode that concurrently processes large amounts of data through sub-workflow executions. Objects in OSS buckets can be used as the source data. |
| Sub-workflow execution | A single iteration of a distributed Map state. The definition comes from the Processor field, and each sub-workflow receives one item (or one batch) as input. Sub-workflows can run in standard mode or express mode. For details, see Standard mode and express mode. |
| Fault tolerance policy | When a sub-workflow execution fails, the Map state ends in failure. For Map states that contain multiple items, you can configure fault tolerance policies to allow the Map state to continue processing remaining items when individual sub-workflow executions fail. |
How it works
The Map state receives input and extracts an array of items using
ItemsPath(or reads items from an OSS data source usingItemReader).If
ItemConstructoris configured, each item is transformed.If
ItemBatcheris configured, items are grouped into batches.Each item or batch is passed to a sub-workflow execution defined by
Processor, running in the mode specified byProcessorConfig.After all sub-workflow executions complete, the Map state collects results into a JSON array (or writes them to OSS using
ResultWriter).
When ItemsPath, ItemConstructor, and ItemBatcher are all configured, they run in this order: ItemsPath, then ItemConstructor, then ItemBatcher.
Basic example
The following workflow defines a distributed Map state that reads items from $Input.Items and runs each item through a Pass state in Express mode:
Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
- Type: Map
Name: Map
ProcessorConfig:
ExecutionMode: Express
ItemsPath: $Input.Items
Processor:
StartAt: Pass
States:
- Type: Pass
Name: Pass
End: true
End: trueState machine input:
{
"Items": [
{"key_1":"value_1"},
{"key_2":"value_2"},
{"key_3":"value_3"}
]
}This creates three sub-workflow executions, each running the following definition:
Type: StateMachine
Name: Map
SpecVersion: v1
StartAt: Pass
States:
- Type: Pass
Name: Pass
End: trueSub-workflow inputs:
// Sub-workflow 1
{"key_1":"value_1"}
// Sub-workflow 2
{"key_2":"value_2"}
// Sub-workflow 3
{"key_3":"value_3"}After all sub-workflows complete, the Map state output is a JSON array of all sub-workflow outputs:
{
"Items": [
{
"key_1": "value_1"
},
{
"key_2": "value_2"
},
{
"key_3": "value_3"
}
]
}Field reference
All fields available in a distributed mode Map state:
| Field | Type | Required | Description | Example |
|---|---|---|---|---|
| Name | string | Yes | The name of the state. | my-state-name |
| Description | string | No | The description of the state. | describe it here |
| Type | string | Yes | The type of the state. | Map |
| InputConstructor | map[string]any | No | The input constructor. | See Inputs and outputs. |
| ItemsPath | string | Yes | Expression to extract an array from the input. | See ItemsPath. |
| ItemBatcher | ItemBatcher | No | Combines multiple items into a batch as sub-workflow input. | See ItemBatcher. |
| ItemReader | ItemReader | No | Reads data from OSS buckets. | See ItemReader. |
| ItemConstructor | ItemConstructor | No | Transforms items using $Item to reference original input values. | See ItemConstructor. |
| ResultWriter | ResultWriter | No | Writes sub-workflow results to a specified OSS bucket. | See ResultWriter. |
| MaxConcuccency | int | No | Maximum number of concurrent sub-workflow executions. | 40 |
| MaxItems | MaxItems | No | Maximum number of items the Map state processes. | See MaxItems. |
| ToleratedFailurePercentage | ToleratedFailurePercentage | No | Percentage of failed sub-workflow executions to tolerate. | See ToleratedFailurePercentage. |
| ToleratedFailureCount | ToleratedFailureCount | No | Number of failed sub-workflow executions to tolerate. | See ToleratedFailureCount. |
| Processor | Processor | Yes | The Map processor that defines the sub-workflow. | See Processor. |
| ProcessorConfig | ProcessorConfig | Yes | The processor configuration. | See ProcessorConfig. |
| OutputConstructor | map[string]any | No | The output constructor. | See OutputConstructor. |
| Next | string | No | The next state to run after this state completes. Leave blank if End is true. | my-next-state |
| End | bool | No | Whether this state is the terminal state of the current scope. | true |
| Retry | Retry | No | Error retry policy. | See Error handling. |
| Catch | Catch | No | Error catch policy. | See Error handling. |
Field details
ItemsPath
Extracts an array from the Map state's input. Each element in the array becomes input for one sub-workflow execution. Reference data with $Context or $Input variables:
$Input.FieldAProcessor
Defines the sub-workflow that processes each item or batch.
| Field | Type | Required | Description | Example |
|---|---|---|---|---|
| States | array | Yes | The states in the sub-workflow. | See example below. |
| StartAt | string | Yes | The first state to run. | my start task |
Processor:
StartAt: Pass1
States:
- Type: Pass
Name: Pass1
End: trueProcessorConfig
Specifies the execution mode for sub-workflows.
| Field | Type | Required | Description | Example |
|---|---|---|---|---|
| ExecutionMode | string | Yes | The execution mode for sub-workflows. | Express |
ItemReader
Reads input data from OSS buckets, supporting larger size of inputs.
| Field | Type | Required | Description | Example |
|---|---|---|---|---|
| SourceType | string | Yes | Data source type. Valid values: OSS_CSV, OSS_JSON_LIST, OSS_OBJECTS, OSS_INVENTORY_FILES. | OSS_CSV |
| SourceParameters | SourceParameters | No | Data source location parameters. | See SourceParameters. |
| ReaderConfig | ReaderConfig | No | Reader configuration. | See ReaderConfig. |
SourceParameters
| Field | Type | Required | Description | Example |
|---|---|---|---|---|
| Bucket | string | No | The OSS bucket name. | example-bucket |
| ObjectName | string | No | The object name. | object_name_1 |
| Prefix | string | No | Object name prefix filter. If left blank, all matching objects are returned. | example-prefix |
ReaderConfig
| Field | Type | Required | Description | Example |
|---|---|---|---|---|
| CSVHeaders | []string | No | Column headers in the first row of the CSV file. | ColA,ColB,ColC |
Read CSV files from OSS
Read data from a CSV file stored in an OSS bucket. In this example, example-object.csv is stored in example-bucket:
Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
- Type: Map
Name: Map
ProcessorConfig:
ExecutionMode: Express
Processor:
StartAt: Pass
States:
- Type: Pass
Name: Pass
End: true
ItemReader:
SourceType: OSS_CSV
SourceParameters:
Bucket: example-bucket
ObjectName: example-object.csv
End: trueCSV file content:
ColA,ColB,ColC
col_a_1,col_b_1,col_c_1Each row becomes sub-workflow input:
{
"ColA": "col_a_1",
"ColB": "col_b_1",
"ColC": "col_c_1"
}Read JSON arrays from OSS
The JSON file must contain a JSON array.
Read a JSON array from example-object.json in example-bucket:
Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
- Type: Map
Name: Map
ProcessorConfig:
ExecutionMode: Express
Processor:
StartAt: Pass
States:
- Type: Pass
Name: Pass
End: true
ItemReader:
SourceType: OSS_JSON_LIST
SourceParameters:
Bucket: example-bucket
ObjectName: example-object.json
End: trueJSON file content:
[
{
"key_1": "value_1"
}
]Each element becomes sub-workflow input:
{
"key_1": "value_1"
}Read objects from OSS
List objects with a specific prefix and pass their metadata as sub-workflow input. This example reads objects prefixed with example-prefix from example-bucket:
Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
- Type: Map
Name: Map
ProcessorConfig:
ExecutionMode: Express
Processor:
StartAt: Pass
States:
- Type: Pass
Name: Pass
End: true
ItemReader:
SourceType: OSS_OBJECTS
SourceParameters:
Bucket: example-bucket
Prefix: example-prefix
End: trueGiven this bucket structure:
example-bucket
└── example-prefix/object_1Each object's metadata becomes sub-workflow input:
{
"XMLName": {
"Space": "",
"Local": "Contents"
},
"Key": "example-prefix/object_1",
"Type": "Normal",
"Size": 268435,
"ETag": "\"50B06D6680D86F04138HSN612EF5DEC6\"",
"Owner": {
"XMLName": {
"Space": "",
"Local": ""
},
"ID": "",
"DisplayName": ""
},
"LastModified": "2024-01-01T01:01:01Z",
"StorageClass": "Standard",
"RestoreInfo": ""
}Read OSS inventory files
Read items from an OSS inventory manifest. This example reads inventory/2024-01-01T01-01Z/manifest.json from example-bucket:
Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
- Type: Map
Name: Map
ProcessorConfig:
ExecutionMode: Express
Processor:
StartAt: Pass
States:
- Type: Pass
Name: Pass
End: true
ItemReader:
SourceType: OSS_INVENTORY_FILES
SourceParameters:
Bucket: example-bucket
ObjectName: inventory/2024-01-01T01-01Z/manifest.json
ItemConstructor:
Key.$: $Item.Key
End: trueInventory file content:
"example-bucket","object_name_1"
"example-bucket","object_name_2"First sub-workflow input:
{
"Bucket": "example-bucket",
"Key": "object_name_1"
}ItemBatcher
Groups multiple items into batches. Each batch becomes the input for a single sub-workflow execution.
| Field | Type | Required | Description | Example |
|---|---|---|---|---|
| MaxItemsPerBatch | int | No | Maximum items per batch. | See example. |
| MaxInputBytesPerBatch | int | No | Maximum bytes per batch. Unit: byte. | See example. |
| BatchInput | map[string]any | No | Additional data to include in each batch's input. | See example. |
Batch by item count
Set MaxItemsPerBatch to control how many items go into each batch:
Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
- Type: Map
Name: Map
ProcessorConfig:
ExecutionMode: Express
ItemsPath: $Input.Items
Processor:
StartAt: Pass
States:
- Type: Pass
Name: Pass
End: true
ItemBatcher:
MaxItemsPerBatch: 2
End: trueState machine input:
{
"Items": [
{"key_1":"value_1"},
{"key_2":"value_2"},
{"key_3":"value_3"},
{"key_4":"value_4"},
{"key_5":"value_5"}
]
}This produces three sub-workflow executions:
// Sub-workflow 1
{
"Items": [
{"key_1":"value_1"},
{"key_2":"value_2"}
]
}
// Sub-workflow 2
{
"Items": [
{"key_1":"value_3"},
{"key_2":"value_4"}
]
}
// Sub-workflow 3
{
"Items": [
{"key_1":"value_5"}
]
}Batch by byte size
Set MaxInputBytesPerBatch to limit the size of each batch in bytes.
ItemBatcher adds metadata keys to each batch. The total batch size includes these additional keys.
The unit for
MaxInputBytesPerBatchis byte.
Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
- Type: Map
Name: Map
ProcessorConfig:
ExecutionMode: Express
ItemsPath: $Input.Items
Processor:
StartAt: Pass
States:
- Type: Pass
Name: Pass
End: true
ItemBatcher:
MaxInputBytesPerBatch: 50
End: trueState machine input:
{
"Items":[
{"Key":1},
{"key":2},
{"Key":3},
{"Key":4},
{"Key":5}
]
}This produces three sub-workflow executions:
// Sub-workflow 1
{
"Items":[
{"Key":1},
{"key":2}
]
}
// Sub-workflow 2
{
"Items":[
{"Key":3},
{"key":4}
]
}
// Sub-workflow 3
{
"Items":[
{"Key":5}
]
}Add shared data to batches
Use BatchInput to include additional data from the state machine input in every batch:
Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
- Type: Map
Name: Map
ProcessorConfig:
ExecutionMode: Express
ItemsPath: $Input.Items
Processor:
StartAt: Pass
States:
- Type: Pass
Name: Pass
End: true
ItemBatcher:
MaxInputBytesPerBatch: 70
BatchInput:
InputKey.$: $Input.Key
End: trueState machine input:
{
"Key":"value",
"Items":[
{"Key":1},
{"key":2},
{"Key":3},
{"Key":4},
{"Key":5}
]
}Each sub-workflow receives both the BatchInput data and the batched items:
// Sub-workflow 1
{
"BatchInput":{
"InputKey":"value"
},
"Items":[
{"Key":1},
{"key":2}
]
}
// Sub-workflow 2
{
"BatchInput":{
"InputKey":"value"
},
"Items":[
{"Key":3},
{"key":4}
]
}
// Sub-workflow 3
{
"BatchInput":{
"InputKey":"value"
},
"Items":[
{"Key":5}
]
}ItemConstructor
Transforms each item before it is passed to a sub-workflow execution. Use $Item to reference the original item and $Input to reference the state machine input:
Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
- Type: Map
Name: Map
ProcessorConfig:
ExecutionMode: Express
ItemsPath: $Input.Items
Processor:
StartAt: Pass
States:
- Type: Pass
Name: Pass
End: true
ItemBatcher:
MaxInputBytesPerBatch: 200
BatchInput:
InputKey.$: $Input.Key
ItemConstructor:
ConstructedKey.$: $Item.Key
InputKey.$: $Input.Key
End: trueState machine input:
{
"Key":"value",
"Items":[
{"Key":1},
{"Key":2},
{"Key":3},
{"Key":4},
{"Key":5}
]
}Each item is transformed by ItemConstructor before batching:
// Sub-workflow 1
{
"BatchInput": {
"InputKey": "value"
},
"Items": [
{
"InputKey": "value",
"ConstructedKey": 1
},
{
"InputKey": "value",
"ConstructedKey": 2
},
{
"InputKey": "value",
"ConstructedKey": 3
}
]
}
// Sub-workflow 2
{
"BatchInput": {
"InputKey": "value"
},
"Items": [
{
"InputKey": "value",
"ConstructedKey": 4
},
{
"InputKey": "value",
"ConstructedKey": 5
}
]
}ResultWriter
Writes sub-workflow results to an OSS bucket. Configure ResultWriter when the combined output of all sub-workflow executions may exceed the state output size limit.
| Field | Type | Required | Description | Example |
|---|---|---|---|---|
| Parameters | Parameters | Yes | OSS destination for results. | See below. |
Parameters
| Field | Type | Required | Description | Example |
|---|---|---|---|---|
| Bucket | string | Yes | The destination OSS bucket. | example-bucket |
| Prefix | string | Yes | The object name prefix for result files. | example-prefix/ |
State inputs and outputs have size limits. For Map states with many items, the combined output may exceed this limit. Configure ResultWriter to store results in OSS instead.
The following example combines ResultWriter with fault tolerance. The Map state tolerates up to 30% failure and writes all results (successes and failures) to OSS:
Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
- Type: Map
Name: Map
ProcessorConfig:
ExecutionMode: Express
ItemsPath: $Input.Items
ItemConstructor:
Key.$: $Item.Key
FailedValue.$: $Input.FailedValue
ToleratedFailurePercentage: 30
Processor:
StartAt: Choice
States:
- Type: Choice
Name: Choice
Branches:
- Condition: $Input.Key > $Input.FailedValue
Next: Fail
Default: Succeed
- Type: Succeed
Name: Succeed
End: true
- Type: Fail
Name: Fail
Code: MockError
End: true
ResultWriter:
Parameters:
Bucket: example-bucket
Prefix: example-prefix/
End: trueState machine input:
{
"FailedValue": 4,
"Items": [
{"Key": 1},
{"Key": 2},
{"Key": 3},
{"Key": 4},
{"Key": 5}
]
}Result file structure
ResultWriter produces three types of files in the specified OSS location:
manifest.json (example-prefix/map-run-name/manifest.json):
{
"DestinationBucket": "example-bucket",
"MapRunName": "map-run-name",
"ResultFiles": {
"FAILED": [
{
"ObjectName": "example-prefix/map-run-name/FAILED_0.json",
"Size": 262
}
],
"SUCCEED": [
{
"ObjectName": "example-prefix/map-run-name/SUCCEED_0.json",
"Size": 1057
}
]
}
}FAILED_0.json (example-prefix/map-run-name/FAILED_0.json):
[
{
"ExecutionName": "execution-name-5",
"FlowName": "example",
"Input": "{\"FailedValue\":4,\"Key\":5}",
"Output": "{\"ErrorCode\":\"MockError\"}",
"Status": "Failed",
"StartedTime": "rfc3339-format-time-string",
"StoppedTime": "rfc3339-format-time-string"
}
]SUCCEED_0.json (example-prefix/map-run-name/SUCCEED_0.json):
[
{
"ExecutionName": "execution-name-1",
"FlowName": "example",
"Input": "{\"FailedValue\":4,\"Key\":1}",
"Output": "{\"FailedValue\":4,\"Key\":1}",
"Status": "Succeeded",
"StartedTime": "rfc3339-format-time-string",
"StoppedTime": "rfc3339-format-time-string"
},
{
"ExecutionName": "execution-name-2",
"FlowName": "example",
"Input": "{\"FailedValue\":4,\"Key\":2}",
"Output": "{\"FailedValue\":4,\"Key\":2}",
"Status": "Succeeded",
"StartedTime": "rfc3339-format-time-string",
"StoppedTime": "rfc3339-format-time-string"
},
{
"ExecutionName": "execution-name-3",
"FlowName": "example",
"Input": "{\"FailedValue\":4,\"Key\":3}",
"Output": "{\"FailedValue\":4,\"Key\":3}",
"Status": "Succeeded",
"StartedTime": "rfc3339-format-time-string",
"StoppedTime": "rfc3339-format-time-string"
},
{
"ExecutionName": "execution-name-4",
"FlowName": "example",
"Input": "{\"FailedValue\":4,\"Key\":4}",
"Output": "{\"FailedValue\":4,\"Key\":4}",
"Status": "Succeeded",
"StartedTime": "rfc3339-format-time-string",
"StoppedTime": "rfc3339-format-time-string"
}
]MaxItems
Limits the number of items the Map state processes. If the data source contains more items than MaxItems, only the first MaxItems items are processed.
For example, if an OSS bucket contains 10,000 objects and MaxItems is set to 1,000, the Map state processes only 1,000 objects.
MaxConcurrency
Limits the number of sub-workflow executions that run simultaneously. For example, if the Map state processes 10,000 items and MaxConcurrency is set to 100, at most 100 sub-workflows run at the same time.
ToleratedFailurePercentage
Sets the percentage of sub-workflow executions that can fail before the Map state itself fails. For example, with 10,000 items and ToleratedFailurePercentage set to 10, the Map state tolerates up to 1,000 failures.
ToleratedFailureCount
Sets the number of sub-workflow executions that can fail before the Map state itself fails. For example, with 10,000 items and ToleratedFailureCount set to 10, the Map state tolerates up to 10 failures.
Quota limits
The following table lists the default quota limits for distributed mode. To request a higher quota, submit a ticket.
| Quota name | Description | Default value |
|---|---|---|
| MaxOpenMapRun | Maximum number of distributed Map states running simultaneously per account per region | 10 |
| MaxConcurrency | Maximum number of concurrent sub-workflow executions per MapRun | 300 |
| MaxItems | Maximum number of items per MapRun | 10,000 |
Required permissions
Running Map states in distributed mode requires the following OSS permissions. Grant these actions in a role policy. For details, see Create a policy.
{
"Version": "1",
"Statement": [
{
"Effect": "Allow",
"Action": [
"oss:HeadObject",
"oss:GetObjectMeta",
"oss:GetObject",
"oss:PutObject",
"oss:ListObjectsV2",
"oss:ListObjects",
"oss:InitiateMultipartUpload",
"oss:UploadPart",
"oss:CompleteMultipartUpload",
"oss:AbortMultipartUpload",
"oss:ListMultipartUploads",
"oss:ListParts"
],
"Resource": "*"
}
]
}