All Products
Search
Document Center

CloudFlow:Distributed mode

Last Updated:Mar 10, 2026

Distributed mode runs each iteration of a Map state as an independent sub-workflow execution, enabling high-concurrency parallel processing suitable for big data processing and parallel computing.

Use distributed mode when a single Map state cannot handle your workload sequentially. Distributed mode fans out work across concurrent sub-workflow executions -- each with its own lifecycle -- and can process data from inline input or Object Storage Service (OSS) buckets in parallel.

Key concepts

TermDefinition
Distributed modeA Map state processing mode that concurrently processes large amounts of data through sub-workflow executions. Objects in OSS buckets can be used as the source data.
Sub-workflow executionA single iteration of a distributed Map state. The definition comes from the Processor field, and each sub-workflow receives one item (or one batch) as input. Sub-workflows can run in standard mode or express mode. For details, see Standard mode and express mode.
Fault tolerance policyWhen a sub-workflow execution fails, the Map state ends in failure. For Map states that contain multiple items, you can configure fault tolerance policies to allow the Map state to continue processing remaining items when individual sub-workflow executions fail.

How it works

  1. The Map state receives input and extracts an array of items using ItemsPath (or reads items from an OSS data source using ItemReader).

  2. If ItemConstructor is configured, each item is transformed.

  3. If ItemBatcher is configured, items are grouped into batches.

  4. Each item or batch is passed to a sub-workflow execution defined by Processor, running in the mode specified by ProcessorConfig.

  5. After all sub-workflow executions complete, the Map state collects results into a JSON array (or writes them to OSS using ResultWriter).

Important

When ItemsPath, ItemConstructor, and ItemBatcher are all configured, they run in this order: ItemsPath, then ItemConstructor, then ItemBatcher.

Basic example

The following workflow defines a distributed Map state that reads items from $Input.Items and runs each item through a Pass state in Express mode:

Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
  - Type: Map
    Name: Map
    ProcessorConfig:
      ExecutionMode: Express
    ItemsPath: $Input.Items
    Processor:
      StartAt: Pass
      States:
        - Type: Pass
          Name: Pass
          End: true
    End: true

State machine input:

{
  "Items": [
    {"key_1":"value_1"},
    {"key_2":"value_2"},
    {"key_3":"value_3"}
  ]
}

This creates three sub-workflow executions, each running the following definition:

Type: StateMachine
Name: Map
SpecVersion: v1
StartAt: Pass
States:
  - Type: Pass
    Name: Pass
    End: true

Sub-workflow inputs:

// Sub-workflow 1
{"key_1":"value_1"}

// Sub-workflow 2
{"key_2":"value_2"}

// Sub-workflow 3
{"key_3":"value_3"}

After all sub-workflows complete, the Map state output is a JSON array of all sub-workflow outputs:

{
    "Items": [
        {
            "key_1": "value_1"
        },
        {
            "key_2": "value_2"
        },
        {
            "key_3": "value_3"
        }
    ]
}

Field reference

All fields available in a distributed mode Map state:

FieldTypeRequiredDescriptionExample
NamestringYesThe name of the state.my-state-name
DescriptionstringNoThe description of the state.describe it here
TypestringYesThe type of the state.Map
InputConstructormap[string]anyNoThe input constructor.See Inputs and outputs.
ItemsPathstringYesExpression to extract an array from the input.See ItemsPath.
ItemBatcherItemBatcherNoCombines multiple items into a batch as sub-workflow input.See ItemBatcher.
ItemReaderItemReaderNoReads data from OSS buckets.See ItemReader.
ItemConstructorItemConstructorNoTransforms items using $Item to reference original input values.See ItemConstructor.
ResultWriterResultWriterNoWrites sub-workflow results to a specified OSS bucket.See ResultWriter.
MaxConcuccencyintNoMaximum number of concurrent sub-workflow executions.40
MaxItemsMaxItemsNoMaximum number of items the Map state processes.See MaxItems.
ToleratedFailurePercentageToleratedFailurePercentageNoPercentage of failed sub-workflow executions to tolerate.See ToleratedFailurePercentage.
ToleratedFailureCountToleratedFailureCountNoNumber of failed sub-workflow executions to tolerate.See ToleratedFailureCount.
ProcessorProcessorYesThe Map processor that defines the sub-workflow.See Processor.
ProcessorConfigProcessorConfigYesThe processor configuration.See ProcessorConfig.
OutputConstructormap[string]anyNoThe output constructor.See OutputConstructor.
NextstringNoThe next state to run after this state completes. Leave blank if End is true.my-next-state
EndboolNoWhether this state is the terminal state of the current scope.true
RetryRetryNoError retry policy.See Error handling.
CatchCatchNoError catch policy.See Error handling.

Field details

ItemsPath

Extracts an array from the Map state's input. Each element in the array becomes input for one sub-workflow execution. Reference data with $Context or $Input variables:

$Input.FieldA

Processor

Defines the sub-workflow that processes each item or batch.

FieldTypeRequiredDescriptionExample
StatesarrayYesThe states in the sub-workflow.See example below.
StartAtstringYesThe first state to run.my start task
Processor:
   StartAt: Pass1
   States:
     - Type: Pass
       Name: Pass1
       End: true

ProcessorConfig

Specifies the execution mode for sub-workflows.

FieldTypeRequiredDescriptionExample
ExecutionModestringYesThe execution mode for sub-workflows.Express

ItemReader

Reads input data from OSS buckets, supporting larger size of inputs.

FieldTypeRequiredDescriptionExample
SourceTypestringYesData source type. Valid values: OSS_CSV, OSS_JSON_LIST, OSS_OBJECTS, OSS_INVENTORY_FILES.OSS_CSV
SourceParametersSourceParametersNoData source location parameters.See SourceParameters.
ReaderConfigReaderConfigNoReader configuration.See ReaderConfig.

SourceParameters

FieldTypeRequiredDescriptionExample
BucketstringNoThe OSS bucket name.example-bucket
ObjectNamestringNoThe object name.object_name_1
PrefixstringNoObject name prefix filter. If left blank, all matching objects are returned.example-prefix

ReaderConfig

FieldTypeRequiredDescriptionExample
CSVHeaders[]stringNoColumn headers in the first row of the CSV file.ColA,ColB,ColC

Read CSV files from OSS

Read data from a CSV file stored in an OSS bucket. In this example, example-object.csv is stored in example-bucket:

Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
  - Type: Map
    Name: Map
    ProcessorConfig:
      ExecutionMode: Express
    Processor:
      StartAt: Pass
      States:
        - Type: Pass
          Name: Pass
          End: true
    ItemReader:
      SourceType: OSS_CSV
      SourceParameters:
        Bucket: example-bucket
        ObjectName: example-object.csv
    End: true

CSV file content:

ColA,ColB,ColC
col_a_1,col_b_1,col_c_1

Each row becomes sub-workflow input:

{
  "ColA": "col_a_1",
  "ColB": "col_b_1",
  "ColC": "col_c_1"
}

Read JSON arrays from OSS

Important

The JSON file must contain a JSON array.

Read a JSON array from example-object.json in example-bucket:

Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
  - Type: Map
    Name: Map
    ProcessorConfig:
      ExecutionMode: Express
    Processor:
      StartAt: Pass
      States:
        - Type: Pass
          Name: Pass
          End: true
    ItemReader:
      SourceType: OSS_JSON_LIST
      SourceParameters:
        Bucket: example-bucket
        ObjectName: example-object.json
    End: true

JSON file content:

[
  {
    "key_1": "value_1"
  }
]

Each element becomes sub-workflow input:

{
  "key_1": "value_1"
}

Read objects from OSS

List objects with a specific prefix and pass their metadata as sub-workflow input. This example reads objects prefixed with example-prefix from example-bucket:

Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
  - Type: Map
    Name: Map
    ProcessorConfig:
      ExecutionMode: Express
    Processor:
      StartAt: Pass
      States:
        - Type: Pass
          Name: Pass
          End: true
    ItemReader:
      SourceType: OSS_OBJECTS
      SourceParameters:
        Bucket: example-bucket
        Prefix: example-prefix
    End: true

Given this bucket structure:

example-bucket
   └── example-prefix/object_1

Each object's metadata becomes sub-workflow input:

{
  "XMLName": {
    "Space": "",
    "Local": "Contents"
  },
  "Key": "example-prefix/object_1",
  "Type": "Normal",
  "Size": 268435,
  "ETag": "\"50B06D6680D86F04138HSN612EF5DEC6\"",
  "Owner": {
    "XMLName": {
      "Space": "",
      "Local": ""
    },
    "ID": "",
    "DisplayName": ""
  },
  "LastModified": "2024-01-01T01:01:01Z",
  "StorageClass": "Standard",
  "RestoreInfo": ""
}

Read OSS inventory files

Read items from an OSS inventory manifest. This example reads inventory/2024-01-01T01-01Z/manifest.json from example-bucket:

Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
  - Type: Map
    Name: Map
    ProcessorConfig:
      ExecutionMode: Express
    Processor:
      StartAt: Pass
      States:
        - Type: Pass
          Name: Pass
          End: true
    ItemReader:
      SourceType: OSS_INVENTORY_FILES
      SourceParameters:
        Bucket: example-bucket
        ObjectName: inventory/2024-01-01T01-01Z/manifest.json
    ItemConstructor:
      Key.$: $Item.Key
    End: true

Inventory file content:

"example-bucket","object_name_1"
"example-bucket","object_name_2"

First sub-workflow input:

{
  "Bucket": "example-bucket",
  "Key": "object_name_1"
}

ItemBatcher

Groups multiple items into batches. Each batch becomes the input for a single sub-workflow execution.

FieldTypeRequiredDescriptionExample
MaxItemsPerBatchintNoMaximum items per batch.See example.
MaxInputBytesPerBatchintNoMaximum bytes per batch. Unit: byte.See example.
BatchInputmap[string]anyNoAdditional data to include in each batch's input.See example.

Batch by item count

Set MaxItemsPerBatch to control how many items go into each batch:

Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
  - Type: Map
    Name: Map
    ProcessorConfig:
      ExecutionMode: Express
    ItemsPath: $Input.Items
    Processor:
      StartAt: Pass
      States:
        - Type: Pass
          Name: Pass
          End: true
    ItemBatcher:
      MaxItemsPerBatch: 2
    End: true

State machine input:

{
  "Items": [
    {"key_1":"value_1"},
    {"key_2":"value_2"},
    {"key_3":"value_3"},
    {"key_4":"value_4"},
    {"key_5":"value_5"}
  ]
}

This produces three sub-workflow executions:

// Sub-workflow 1
{
  "Items": [
    {"key_1":"value_1"},
    {"key_2":"value_2"}
  ]
}

// Sub-workflow 2
{
  "Items": [
    {"key_1":"value_3"},
    {"key_2":"value_4"}
  ]
}

// Sub-workflow 3
{
  "Items": [
    {"key_1":"value_5"}
  ]
}

Batch by byte size

Set MaxInputBytesPerBatch to limit the size of each batch in bytes.

Important
  • ItemBatcher adds metadata keys to each batch. The total batch size includes these additional keys.

  • The unit for MaxInputBytesPerBatch is byte.

Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
  - Type: Map
    Name: Map
    ProcessorConfig:
      ExecutionMode: Express
    ItemsPath: $Input.Items
    Processor:
      StartAt: Pass
      States:
        - Type: Pass
          Name: Pass
          End: true
    ItemBatcher:
      MaxInputBytesPerBatch: 50
    End: true

State machine input:

{
  "Items":[
    {"Key":1},
    {"key":2},
    {"Key":3},
    {"Key":4},
    {"Key":5}
  ]
}

This produces three sub-workflow executions:

// Sub-workflow 1
{
  "Items":[
    {"Key":1},
    {"key":2}
  ]
}

// Sub-workflow 2
{
  "Items":[
    {"Key":3},
    {"key":4}
  ]
}

// Sub-workflow 3
{
  "Items":[
    {"Key":5}
  ]
}

Add shared data to batches

Use BatchInput to include additional data from the state machine input in every batch:

Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
  - Type: Map
    Name: Map
    ProcessorConfig:
      ExecutionMode: Express
    ItemsPath: $Input.Items
    Processor:
      StartAt: Pass
      States:
        - Type: Pass
          Name: Pass
          End: true
    ItemBatcher:
      MaxInputBytesPerBatch: 70
      BatchInput:
        InputKey.$: $Input.Key
    End: true

State machine input:

{
  "Key":"value",
  "Items":[
    {"Key":1},
    {"key":2},
    {"Key":3},
    {"Key":4},
    {"Key":5}
  ]
}

Each sub-workflow receives both the BatchInput data and the batched items:

// Sub-workflow 1
{
  "BatchInput":{
    "InputKey":"value"
  },
  "Items":[
    {"Key":1},
    {"key":2}
  ]
}

// Sub-workflow 2
{
  "BatchInput":{
    "InputKey":"value"
  },
  "Items":[
    {"Key":3},
    {"key":4}
  ]
}

// Sub-workflow 3
{
  "BatchInput":{
    "InputKey":"value"
  },
  "Items":[
    {"Key":5}
  ]
}

ItemConstructor

Transforms each item before it is passed to a sub-workflow execution. Use $Item to reference the original item and $Input to reference the state machine input:

Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
  - Type: Map
    Name: Map
    ProcessorConfig:
      ExecutionMode: Express
    ItemsPath: $Input.Items
    Processor:
      StartAt: Pass
      States:
        - Type: Pass
          Name: Pass
          End: true
    ItemBatcher:
      MaxInputBytesPerBatch: 200
      BatchInput:
        InputKey.$: $Input.Key
    ItemConstructor:
      ConstructedKey.$: $Item.Key
      InputKey.$: $Input.Key
    End: true

State machine input:

{
  "Key":"value",
  "Items":[
    {"Key":1},
    {"Key":2},
    {"Key":3},
    {"Key":4},
    {"Key":5}
  ]
}

Each item is transformed by ItemConstructor before batching:

// Sub-workflow 1
{
  "BatchInput": {
    "InputKey": "value"
  },
  "Items": [
    {
      "InputKey": "value",
      "ConstructedKey": 1
    },
    {
      "InputKey": "value",
      "ConstructedKey": 2
    },
    {
      "InputKey": "value",
      "ConstructedKey": 3
    }
  ]
}

// Sub-workflow 2
{
  "BatchInput": {
    "InputKey": "value"
  },
  "Items": [
    {
      "InputKey": "value",
      "ConstructedKey": 4
    },
    {
      "InputKey": "value",
      "ConstructedKey": 5
    }
  ]
}

ResultWriter

Writes sub-workflow results to an OSS bucket. Configure ResultWriter when the combined output of all sub-workflow executions may exceed the state output size limit.

FieldTypeRequiredDescriptionExample
ParametersParametersYesOSS destination for results.See below.

Parameters

FieldTypeRequiredDescriptionExample
BucketstringYesThe destination OSS bucket.example-bucket
PrefixstringYesThe object name prefix for result files.example-prefix/
Note

State inputs and outputs have size limits. For Map states with many items, the combined output may exceed this limit. Configure ResultWriter to store results in OSS instead.

The following example combines ResultWriter with fault tolerance. The Map state tolerates up to 30% failure and writes all results (successes and failures) to OSS:

Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
  - Type: Map
    Name: Map
    ProcessorConfig:
      ExecutionMode: Express
    ItemsPath: $Input.Items
    ItemConstructor:
      Key.$: $Item.Key
      FailedValue.$: $Input.FailedValue
    ToleratedFailurePercentage: 30
    Processor:
      StartAt: Choice
      States:
        - Type: Choice
          Name: Choice
          Branches:
            - Condition: $Input.Key > $Input.FailedValue
              Next: Fail
          Default: Succeed
        - Type: Succeed
          Name: Succeed
          End: true
        - Type: Fail
          Name: Fail
          Code: MockError
          End: true
    ResultWriter:
      Parameters:
        Bucket: example-bucket
        Prefix: example-prefix/
    End: true

State machine input:

{
  "FailedValue": 4,
  "Items": [
    {"Key": 1},
    {"Key": 2},
    {"Key": 3},
    {"Key": 4},
    {"Key": 5}
  ]
}

Result file structure

ResultWriter produces three types of files in the specified OSS location:

manifest.json (example-prefix/map-run-name/manifest.json):

{
    "DestinationBucket": "example-bucket",
    "MapRunName": "map-run-name",
    "ResultFiles": {
        "FAILED": [
            {
                "ObjectName": "example-prefix/map-run-name/FAILED_0.json",
                "Size": 262
            }
        ],
        "SUCCEED": [
            {
                "ObjectName": "example-prefix/map-run-name/SUCCEED_0.json",
                "Size": 1057
            }
        ]
    }
}

FAILED_0.json (example-prefix/map-run-name/FAILED_0.json):

[
    {
        "ExecutionName": "execution-name-5",
        "FlowName": "example",
        "Input": "{\"FailedValue\":4,\"Key\":5}",
        "Output": "{\"ErrorCode\":\"MockError\"}",
        "Status": "Failed",
        "StartedTime": "rfc3339-format-time-string",
        "StoppedTime": "rfc3339-format-time-string"
    }
]

SUCCEED_0.json (example-prefix/map-run-name/SUCCEED_0.json):

[
    {
        "ExecutionName": "execution-name-1",
        "FlowName": "example",
        "Input": "{\"FailedValue\":4,\"Key\":1}",
        "Output": "{\"FailedValue\":4,\"Key\":1}",
        "Status": "Succeeded",
        "StartedTime": "rfc3339-format-time-string",
        "StoppedTime": "rfc3339-format-time-string"
    },
    {
        "ExecutionName": "execution-name-2",
        "FlowName": "example",
        "Input": "{\"FailedValue\":4,\"Key\":2}",
        "Output": "{\"FailedValue\":4,\"Key\":2}",
        "Status": "Succeeded",
        "StartedTime": "rfc3339-format-time-string",
        "StoppedTime": "rfc3339-format-time-string"
    },
    {
        "ExecutionName": "execution-name-3",
        "FlowName": "example",
        "Input": "{\"FailedValue\":4,\"Key\":3}",
        "Output": "{\"FailedValue\":4,\"Key\":3}",
        "Status": "Succeeded",
        "StartedTime": "rfc3339-format-time-string",
        "StoppedTime": "rfc3339-format-time-string"
    },
    {
        "ExecutionName": "execution-name-4",
        "FlowName": "example",
        "Input": "{\"FailedValue\":4,\"Key\":4}",
        "Output": "{\"FailedValue\":4,\"Key\":4}",
        "Status": "Succeeded",
        "StartedTime": "rfc3339-format-time-string",
        "StoppedTime": "rfc3339-format-time-string"
    }
]

MaxItems

Limits the number of items the Map state processes. If the data source contains more items than MaxItems, only the first MaxItems items are processed.

For example, if an OSS bucket contains 10,000 objects and MaxItems is set to 1,000, the Map state processes only 1,000 objects.

MaxConcurrency

Limits the number of sub-workflow executions that run simultaneously. For example, if the Map state processes 10,000 items and MaxConcurrency is set to 100, at most 100 sub-workflows run at the same time.

ToleratedFailurePercentage

Sets the percentage of sub-workflow executions that can fail before the Map state itself fails. For example, with 10,000 items and ToleratedFailurePercentage set to 10, the Map state tolerates up to 1,000 failures.

ToleratedFailureCount

Sets the number of sub-workflow executions that can fail before the Map state itself fails. For example, with 10,000 items and ToleratedFailureCount set to 10, the Map state tolerates up to 10 failures.

Quota limits

The following table lists the default quota limits for distributed mode. To request a higher quota, submit a ticket.

Quota nameDescriptionDefault value
MaxOpenMapRunMaximum number of distributed Map states running simultaneously per account per region10
MaxConcurrencyMaximum number of concurrent sub-workflow executions per MapRun300
MaxItemsMaximum number of items per MapRun10,000

Required permissions

Running Map states in distributed mode requires the following OSS permissions. Grant these actions in a role policy. For details, see Create a policy.

{
    "Version": "1",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "oss:HeadObject",
                "oss:GetObjectMeta",
                "oss:GetObject",
                "oss:PutObject",
                "oss:ListObjectsV2",
                "oss:ListObjects",
                "oss:InitiateMultipartUpload",
                "oss:UploadPart",
                "oss:CompleteMultipartUpload",
                "oss:AbortMultipartUpload",
                "oss:ListMultipartUploads",
                "oss:ListParts"
            ],
            "Resource": "*"
        }
    ]
}