All Products
Search
Document Center

CloudFlow:Distributed mode

Last Updated:Mar 10, 2025

When Map states are executed in a distributed environment, this mode is called distributed mode. Distributed mode can improve efficiency and capacity of data processing and is suitable for big data processing and parallel computing.

Concepts

Distributed mode: A mode in which Object Storage Service (OSS) data is processed in Map states. In distributed mode, a large amount of data can be concurrently processed. Objects in OSS buckets can be used as the source data.

Sub-workflow execution: Executing a Map state in distributed mode can be considered as executing sub-workflows. The definitions of sub-workflows are the state transition definitions of the Map processor, and the input of each sub-workflow is an item of the Map state. MapRun sub-workflows can be executed in standard mode and express mode. For more information, see Standard mode and express mode.

Fault tolerance policy: When a sub-workflow execution fails, the Map state ends in failure. For Map states that contain multiple items, you can configure fault tolerance policies to allow the Map states continue to be executed when a sub-workflow execution fails.

The following table describes the fields involved in distributed mode.

Field

Type

Required

Description

Example

Name

string

Yes

The name of the state.

my-state-name

Description

string

No

The description of the state.

describe it here

Type

string

Yes

The type of the state.

Map

InputConstructor

map[string]any

No

The input constructor.

See Inputs and outputs.

ItemsPath

string

Yes

The expression that is used to extract an array within an input.

See the ItemsPath section of this topic.

ItemBatcher

ItemBatcher

No

Combines multiple items into a batch, which is used as the input of a sub-workflow execution.

See the ItemBatcher section of this topic.

ItemReader

ItemReader

No

Reads data from OSS buckets.

See the ItemReader section of this topic.

ItemConstructor

ItemConstructor

No

  • Uses $Item to reference original inputs.

  • Separately constructs items.

See the ItemConstructor section of this topic.

ResultWriter

ResultWriter

No

Writes the results of sub-workflow executions to a specified OSS bucket.

See the ResultWriter section of this topic.

MaxConcuccency

int

No

Specifies the maximum number of concurrent sub-workflow executions.

40

MaxItems

MaxItems

No

Specifies the maximum number of items that can be executed by the Map state machine.

See the MaxItems section of this topic.

ToleratedFailurePercentage

ToleratedFailurePercentage

No

Specifies the percentage of failure that can be tolerated in an operation.

See the ToleratedFailurePercentage section of this topic.

ToleratedFailureCount

ToleratedFailureCount

No

Specifies the times of failure that can be tolerated in an operation.

See the ToleratedFailureCount section of this topic.

Processor

Processor

Yes

The Map processor.

See the Processor section of this topic.

ProcessorConfig

ProcessorConfig

Yes

The processor configurations.

See the ProcessorConfig section of this topic.

OutputConstructor

map[string]any

No

The output constructor.

See the OutputConstructor section of the "Inputs and outputs" topic.

Next

string

No

The next state that is executed after the current state is complete. If the value of the End field is true, leave this field empty.

my-next-state

End

bool

No

Specifies whether the state is the terminal state of the current scope.

true

Retry

Retry

No

Defines the error retry policy.

See Error handling.

Catch

Catch

No

Defines the error catch policy.

See Error handling.

Usage limits

Quota limits

The following table describes the quota limits of distributed mode. If the default quota cannot meet your business requirements, you can submit a ticket to request for a larger quota.

Quota name

Description

Default value

MaxOpenMapRun

The maximum number of distributed Map states that can be simultaneously executed within a single account in each region.

10

MaxConcurrency

The maximum number of Map states that can be simultaneously executed within a single MapRun task.

300

MaxItems

The maximum number of items that can be read within a single MapRun task.

10000

Role limits

To execute Map states in distributed mode, you must have the following permissions. For more information about how to create a role policy, see Create a policy.

{
    "Version": "1",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "oss:HeadObject",
                "oss:GetObjectMeta",
                "oss:GetObject",
                "oss:PutObject",
                "oss:ListObjectsV2",
                "oss:ListObjects",
                "oss:InitiateMultipartUpload",
                "oss:UploadPart",
                "oss:CompleteMultipartUpload",
                "oss:AbortMultipartUpload",
                "oss:ListMultipartUploads",
                "oss:ListParts"
            ],
            "Resource": "*"
        }
    ]
}

Key fields in distributed mode

Important

If ItemConstructor, ItemsPath, and ItemBatcher are configured, they are executed in the sequence of ItemsPath, ItemConstructor, and ItemBatcher.

ItemsPath

The expression that is used to extract arrays from an input. If a JSON array is returned after an ItemsPath expression is executed, the Map state can be executed. Each object in the array is passed to ItemProcessor for processing. You can use the $Context and $Input variables to extract arrays, as shown in the following example:

$Input.FieldA

Processor

The Map processor. The following table describes the fields in Processor.

Field

Type

Required

Description

Example

States

array

Yes

The array of states that are contained in the flow.

Processor:
   StartAt: Pass1
   States:
     - Type: Pass
       Name: Pass1
       End: true

StartAt

string

Yes

The state from which the flow starts to be executed.

my start task

ProcessorConfig

The processor configurations. The following table describes the fields in ProcessorConfig.

Field

Type

Required

Description

Example

ExecutionMode

string

Yes

The execution mode.

Express

ItemReader

Distributed mode supports larger size of inputs that are read from OSS buckets. The following table describes the fields in ItemReader.

Field

Type

Required

Description

Example

SourceType

string

Yes

The type of the data source. Valid values: OSS_CSV, OSS_JSON_LIST, OSS_OBJECTS, and OSS_INVENTORY_FILES.

OSS_CSV

SourceParameters

string

No

The parameters of the data source.

See the SourceParameters section of this topic.

ReaderConfig

ReaderConfig

No

The reader configurations.

See the ReaderConfig section of this topic.

SourceParameters

The parameters of the data source. The following table describes the fields in SourceParameters.

Field

Type

Required

Description

Example

Bucket

string

No

The name of the bucket in which the object resides.

example-bucket

ObjectName

string

No

The name of the object.

object_name_1

Prefix

string

No

The prefix that must be contained in the names of the buckets that you want to return. If you leave this field empty, the system returns all buckets that meet the conditions.

This field is left empty by default.

example-prefix

ReaderConfig

The reader configurations. The following table describes the fields in ReaderConfig.

Field

Type

Required

Description

Example

CSVHeaders

[]string

No

The column title or field name contained in the first row of the CSV file.

ColA,ColB,ColC

To read data from OSS buckets, use one of the following methods:

  • Read data from the CSV files stored in OSS buckets

    You can read data from CSV files by using Map states. For example, you have a CSV file named example-object.csv that is stored in an OSS bucket named example-bucket. You can refer to the following sample snippet to read data from the CSV file:

    Type: StateMachine
    Name: MyWorkflow
    SpecVersion: v1
    StartAt: Map
    States:
      - Type: Map
        Name: Map
        ProcessorConfig:
          ExecutionMode: Express
        Processor:
          StartAt: Pass
          States:
            - Type: Pass
              Name: Pass
              End: true
        ItemReader:
          SourceType: OSS_CSV
          SourceParameters:
            Bucket: example-bucket
            ObjectName: example-object.csv
        End: true

    The following snippet shows the content of the example-object.csv file:

    ColA,ColB,ColC
    col_a_1,col_b_1,col_c_1

    The following snippet shows the sample inputs of sub-workflow executions.

    {
      "ColA": "col_a_1",
      "ColB": "col_b_1",
      "ColC": "col_c_1",
    }
  • Read data from the JSON files stored in OSS buckets

    Important

    JSON files stored in OSS buckets contain JSON arrays.

    You can read JSON arrays from a specified OSS bucket. For example, you have a JSON file named example-object.json that is stored in an OSS bucket named example-bucket. You can refer to the following sample snippet to read JSON arrays from the OSS bucket:

    Type: StateMachine
    Name: MyWorkflow
    SpecVersion: v1
    StartAt: Map
    States:
      - Type: Map
        Name: Map
        ProcessorConfig:
          ExecutionMode: Express
        Processor:
          StartAt: Pass
          States:
            - Type: Pass
              Name: Pass
              End: true
        ItemReader:
          SourceType: OSS_JSON_LIST
          SourceParameters:
            Bucket: example-bucket
            ObjectName: example-object.json
        End: true

    The following snippet shows the content of the example-object.json file:

    [
      {
        "key_1": "value_1"
      }
    ]

    The following snippet shows the sample inputs of sub-workflow executions.

    {
      "key_1": "value_1",
    }
  • Read objects stored in OSS buckets

    You can read objects from a specified OSS bucket. For example, you have a list of objects whose names are prefixed with example-prefix. The objects are stored in an OSS bucket named example-bucket. The following snippet provides a sample definition of the state machine.

    Type: StateMachine
    Name: MyWorkflow
    SpecVersion: v1
    StartAt: Map
    States:
      - Type: Map
        Name: Map
        ProcessorConfig:
          ExecutionMode: Express
        Processor:
          StartAt: Pass
          States:
            - Type: Pass
              Name: Pass
              End: true
        ItemReader:
          SourceType: OSS_OBJECTS
          SourceParameters:
            Bucket: example-bucket
            Prefix: example-prefix
        End: true

    The following snippet shows that the example-prefix/object_1 object is stored in the example-bucket bucket:

    example-bucket
       ├── example-prefix/object_1

    The following snippet shows the sample inputs of sub-workflow executions.

    {
      "XMLName": {
        "Space": "",
        "Local": "Contents"
      },
      "Key": "example-prefix/object_1",
      "Type": "Normal",
      "Size": 268435,
      "ETag": "\"50B06D6680D86F04138HSN612EF5DEC6\"",
      "Owner": {
        "XMLName": {
          "Space": "",
          "Local": ""
        },
        "ID": "",
        "DisplayName": ""
      },
      "LastModified": "2024-01-01T01:01:01Z",
      "StorageClass": "Standard",
      "RestoreInfo": ""
    }
  • Read inventories from OSS buckets

    You can read data from specified OSS buckets or objects by using Map states. For example, you have an inventory file named manifest.json that is stored in an OSS bucket named example-bucket. The path to the inventory file is inventory/2024-01-01T01-01Z/manifest.json. The following snippet provides a sample definition of the state machine.

    Type: StateMachine
    Name: MyWorkflow
    SpecVersion: v1
    StartAt: Map
    States:
      - Type: Map
        Name: Map
        ProcessorConfig:
          ExecutionMode: Express
        Processor:
          StartAt: Pass
          States:
            - Type: Pass
              Name: Pass
              End: true
        ItemReader:
          SourceType: OSS_INVENTORY_FILES
          SourceParameters:
            Bucket: example-bucket
            ObjectName: inventory/2024-01-01T01-01Z/manifest.json
        ItemConstructor:
          Key.$: $Item.Key
        End: true

    The following snippet shows the content of the inventory file:

    "example-bucket","object_name_1"  
    "example-bucket","object_name_2"

    The following snippet shows the sample input of the first sub-workflow execution.

    {
      "Bucket": "example-bucket",
      "Key": "object_name_1"
    }

ItemBatcher

ItemBatcher combines multiple items into a batch, which is used as the input of a sub-workflow execution. The following table describes the fields in ItemBatcher.

Field

Type

Required

Description

Example

MaxItemsPerBatch

int

No

The maximum number of items that can be processed in a single batch.

See the MaxItemsPerBatch section of this topic.

MaxInputBytesPerBatch

int

No

The maximum bytes of data that can be processed in a single batch.

See the MaxInputBytesPerBatch section of this topic.

BatchInput

map[string]any

No

The data set that is used as the input of a single batch.

See the BatchInput section of this topic.

MaxItemsPerBatch

MaxItemsPerBatch specifies the maximum number of items that can be processed in a batch. The following snippet provides an example on how to use MaxItemsPerBatch.

Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
  - Type: Map
    Name: Map
    ProcessorConfig:
      ExecutionMode: Express
    ItemsPath: $Input.Items
    Processor:
      StartAt: Pass
      States:
        - Type: Pass
          Name: Pass
          End: true
    ItemBatcher:
      MaxItemsPerBatch: 2
    End: true

The following snippet shows the input executed by the state machine:

{
  "Items": [
    {"key_1":"value_1"},
    {"key_2":"value_2"},
    {"key_3":"value_3"},
    {"key_4":"value_4"},
    {"key_5":"value_5"}
  ]
}

The following snippet shows the sample inputs of sub-workflow executions.

# execution-1
# Sample input of the first sub-workflow execution
{
  "Items": [
    {"key_1":"value_1"},
    {"key_2":"value_2"}
  ]
}

# execution-2
# Sample input of the second sub-workflow execution
{
  "Items": [
    {"key_1":"value_3"},
    {"key_2":"value_4"}
  ]
}

# execution-3
# Sample input of the third sub-workflow execution
{
  "Items": [
    {"key_1":"value_5"},
  ]
}

MaxInputBytesPerBatch

MaxInputBytesPerBatch specifies the maximum bytes of data that can be processed in a single batch. The following snippet provides an example on how to use MaxInputBytesPerBatch.

Important
  • ItemBatcher adds extra keys and inputs when ItemBatcher processes a batch. Therefore, the total size of the batch includes the size of the additional keys and inputs.

  • The unit of MaxInputBytesPerBatch is byte.

Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
  - Type: Map
    Name: Map
    ProcessorConfig:
      ExecutionMode: Express
    ItemsPath: $Input.Items
    Processor:
      StartAt: Pass
      States:
        - Type: Pass
          Name: Pass
          End: true
    ItemBatcher:
      MaxInputBytesPerBatch: 50
    End: true

The following snippet shows the input executed by the state machine:

{
  "Items":[
    {"Key":1},
    {"key":2},
    {"Key":3},
    {"Key":4},
    {"Key":5}
  ]
}

The following snippet shows the sample inputs of sub-workflow executions.

# execution-1
# Sample input of the first sub-workflow execution
{
  "Items":[
    {"Key":1},
    {"key":2}
  ]
}

# execution-2
# Sample input of the second sub-workflow execution
{
  "Items":[
    {"Key":3},
    {"key":4}
  ]
}

# execution-3
# Sample input of the third sub-workflow execution
{
  "Items":[
    {"Key":5}
  ]
}

BatchInput

BatchInput provides a data set that is used as the input of a single batch. BatchInput supports extra inputs in addition to items. The following snippet provides an example on how to use BatchInput.

Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
  - Type: Map
    Name: Map
    ProcessorConfig:
      ExecutionMode: Express
    ItemsPath: $Input.Items
    Processor:
      StartAt: Pass
      States:
        - Type: Pass
          Name: Pass
          End: true
    ItemBatcher:
      MaxInputBytesPerBatch: 70
      BatchInput:
        InputKey.$: $Input.Key
    End: true

The following snippet shows the input executed by the state machine:

{
  "Key":"value",
  "Items":[
    {"Key":1},
    {"key":2},
    {"Key":3},
    {"Key":4},
    {"Key":5}
  ]
}

The following snippet shows the sample inputs of sub-workflow executions.

# execution-1
# Sample input of the first sub-workflow execution
{
  "BatchInput":{
    "InputKey":"value"
  },
  "Items":[
    {"Key":1},
    {"key":2}
  ]
}

# execution-2
# Sample input of the second sub-workflow execution
{
  "BatchInput":{
    "InputKey":"value"
  },
  "Items":[
    {"Key":3},
    {"key":4}
  ]
}

# execution-3
# Sample input of the third sub-workflow execution
{
  "BatchInput":{
    "InputKey":"value"
  },
  "Items":[
    {"Key":5}
  ]
}

ItemConstructor

ItemConstructor constructs items and allows you to use $Item to reference original inputs. The following snippet provides an example on how to use ItemConstructor.

Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
  - Type: Map
    Name: Map
    ProcessorConfig:
      ExecutionMode: Express
    ItemsPath: $Input.Items
    Processor:
      StartAt: Pass
      States:
        - Type: Pass
          Name: Pass
          End: true
    ItemBatcher:
      MaxInputBytesPerBatch: 200
      BatchInput:
        InputKey.$: $Input.Key
    ItemConstructor:
      ConstructedKey.$: $Item.Key
      InputKey.$: $Input.Key
    End: true

The following snippet shows the input executed by the state machine:

{
  "Key":"value",
  "Items":[
    {"Key":1},
    {"Key":2},
    {"Key":3},
    {"Key":4},
    {"Key":5}
  ]
}

The following snippet shows the sample inputs of sub-workflow executions.

# execution-1
# Sample input of the first sub-workflow execution
{
  "BatchInput": {
    "InputKey": "value"
  },
  "Items": [
    {
      "InputKey": "value",
      "ConstructedKey": 1
    },
    {
      "InputKey": "value",
      "ConstructedKey": 2
    },
    {
      "InputKey": "value",
      "ConstructedKey": 3
    }
  ]
}

# execution-2
# Sample input of the second sub-workflow execution
{
  "BatchInput": {
    "InputKey": "value"
  },
  "Items": [
    {
      "InputKey": "value",
      "ConstructedKey": 4
    },
    {
      "InputKey": "value",
      "ConstructedKey": 5
    }
  ]
}

ResultWriter

ResultWriter writes the information about sub-workflow executions to the specified OSS bucket. The following table describes the field in ResultWriter.

Field

Type

Required

Description

Example

Parameters

string

Yes

The request parameters.

See the Parameters section of this topic.

Parameters

The request parameters. The following table describes the fields in Parameters.

Field

Type

Required

Description

Example

Bucket

string

Yes

The name of the bucket in which the object resides.

example-bucket

Prefix

string

Yes

The prefix that must be contained in the names of the buckets that you want to return. If you leave this field empty, the system returns all buckets that meet the conditions.

This field is left empty by default.

example-prefix

The following snippet provides an example on how to use ResultWriter.

Note

Inputs and outputs of workflow states cannot exceed the specified sizes. For Map states that contain multiple items, the output may exceed the limit. In this case, we recommend that you configure ResultWriter to store the outputs of the Map states in OSS buckets.

Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
  - Type: Map
    Name: Map
    ProcessorConfig:
      ExecutionMode: Express
    ItemsPath: $Input.Items
    ItemConstructor:
      Key.$: $Item.Key
      FailedValue.$: $Input.FailedValue
    ToleratedFailurePercentage: 30
    Processor:
      StartAt: Choice
      States:
        - Type: Choice
          Name: Choice
          Branches:
            - Condition: $Input.Key > $Input.FailedValue
              Next: Fail
          Default: Succeed
        - Type: Succeed
          Name: Succeed
          End: true
        - Type: Fail
          Name: Fail
          Code: MockError
          End: true
    ResultWriter:
      Parameters:
        Bucket: example-bucket
        Prefix: example-prefix/
    End: true

The following snippet shows the input executed by the state machine:

{
  "FailedValue": 4,
  "Items": [
    {
      "Key": 1
    },
    {
      "Key": 2
    },
    {
      "Key": 3
    },
    {
      "Key": 4
    },
    {
      "Key": 5
    }
  ]
}

The following snippets provide the content of three JSON files.

# The following snippet provides the content of the manifest.json file whose storage path is example-prefix/map-run-name/manifest.json.
{
    "DestinationBucket": "example-bucket",
    "MapRunName": "map-run-name",
    "ResultFiles": {
        "FAILED": [
            {
                "ObjectName": "example-prefix/map-run-name/FAILED_0.json",
                "Size": 262
            }
        ],
        "SUCCEED": [
            {
                "ObjectName": "example-prefix/map-run-name/SUCCEED_0.json",
                "Size": 1057
            }
        ]
    }
}

# The following snippet provides the content of the FAILED_0.json file whose storage path is example-prefix/map-run-name/FAILED_0.json.
[
    {
        "ExecutionName": "execution-name-5",
        "FlowName": "example",
        "Input": "{\"FailedValue\":4,\"Key\":5}",
        "Output": "{\"ErrorCode\":\"MockError\"}",
        "Status": "Failed",
        "StartedTime": "rfc3339-format-time-string",
        "StoppedTime": "rfc3339-format-time-string"
    }
]

# The following snippet provides the content of the SUCCEED_0.json file whose storage path is example-prefix/map-run-name/SUCCEED_0.json.
[
    {
        "ExecutionName": "execution-name-1",
        "FlowName": "example",
        "Input": "{\"FailedValue\":4,\"Key\":1}",
        "Output": "{\"FailedValue\":4,\"Key\":1}",
        "Status": "Succeeded",
        "StartedTime": "rfc3339-format-time-string",
        "StoppedTime": "rfc3339-format-time-string"
    },
    {
        "ExecutionName": "execution-name-2",
        "FlowName": "example",
        "Input": "{\"FailedValue\":4,\"Key\":2}",
        "Output": "{\"FailedValue\":4,\"Key\":2}",
        "Status": "Succeeded",
        "StartedTime": "rfc3339-format-time-string",
        "StoppedTime": "rfc3339-format-time-string"
    },
    {
        "ExecutionName": "execution-name-3",
        "FlowName": "example",
        "Input": "{\"FailedValue\":4,\"Key\":3}",
        "Output": "{\"FailedValue\":4,\"Key\":3}",
        "Status": "Succeeded",
        "StartedTime": "rfc3339-format-time-string",
        "StoppedTime": "rfc3339-format-time-string"
    },
    {
        "ExecutionName": "execution-name-4",
        "FlowName": "example",
        "Input": "{\"FailedValue\":4,\"Key\":4}",
        "Output": "{\"FailedValue\":4,\"Key\":4}",
        "Status": "Succeeded",
        "StartedTime": "rfc3339-format-time-string",
        "StoppedTime": "rfc3339-format-time-string"
    }
]

MaxItems

MaxItems specifies the maximum number of items that can be executed by the Map state machine. For example, an OSS bucket contains 10,000 objects. If you set MaxItems to 1,000, the Map state machine loads only 1,000 objects from the OSS bucket.

MaxConcurrency

MaxConcurrency specifies the number of concurrent sub-workflow executions. For example, if the Map state machine is configured to execute 10,000 items and you set MaxConcurrency to 100, the Map state machine simultaneously executes 100 sub-workflows.

ToleratedFailurePercentage

ToleratedFailurePercentage specifies the percentage of failure that can be tolerated in an operation. For example, you have 10,000 items and set ToleratedFailurePercentage to 10. In this case, the Map state machine can tolerate a maximum of 1,000 failed item executions.

ToleratedFailureCount

ToleratedFailureCount specifies the times of failure that can be tolerated in an operation. For example, you have 10,000 items and set ToleratedFailureCount to 10. In this case, the Map state machine can tolerate a maximum of 10 failed item executions.

Example

The following workflow defines a Map state in distributed mode, which reads input from an upstream state and extracts iteration items by using $Input.Items. Each Map state is executed as a sub-workflow in Express mode.

Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
  - Type: Map
    Name: Map
    ProcessorConfig:
      ExecutionMode: Express
    ItemsPath: $Input.Items
    Processor:
      StartAt: Pass
      States:
        - Type: Pass
          Name: Pass
          End: true
    End: true

The following snippet shows the input executed by the state machine:

{
  "Items": [
    {"key_1":"value_1"},
    {"key_2":"value_2"},
    {"key_3":"value_3"}
  ]
}

In this example, the Map state machine generates three sub-workflow executions. The following snippet shows the definitions of the sub-workflow executions:

Type: StateMachine
Name: Map
SpecVersion: v1
StartAt: Pass
States:
  - Type: Pass
    Name: Pass
    End: true

The following snippet shows the sample inputs of the sub-workflow executions.

# execution-1
# Sample input of the sub-workflow execution corresponding to the first Map state
{"key_1":"value_1"}

# execution-2
# Sample input of the sub-workflow execution corresponding to the second Map state
{"key_2":"value_2"}

# execution-3
# Sample input of the sub-workflow execution corresponding to the third Map state
{"key_3":"value_3"}

After you run the preceding snippets, the output of the Map state machine is a JSON array. Each object in the array is an output of a Map state.

{
    "Items": [
        {
            "key_1": "value_1"
        },
        {
            "key_2": "value_2"
        },
        {
            "key_3": "value_3"
        }
    ]
}