When Map states are executed in a distributed environment, this mode is called distributed mode. Distributed mode can improve efficiency and capacity of data processing and is suitable for big data processing and parallel computing.
Concepts
Distributed mode: A mode in which Object Storage Service (OSS) data is processed in Map states. In distributed mode, a large amount of data can be concurrently processed. Objects in OSS buckets can be used as the source data.
Sub-workflow execution: Executing a Map state in distributed mode can be considered as executing sub-workflows. The definitions of sub-workflows are the state transition definitions of the Map processor, and the input of each sub-workflow is an item of the Map state. MapRun sub-workflows can be executed in standard mode and express mode. For more information, see Standard mode and express mode.
Fault tolerance policy: When a sub-workflow execution fails, the Map state ends in failure. For Map states that contain multiple items, you can configure fault tolerance policies to allow the Map states continue to be executed when a sub-workflow execution fails.
The following table describes the fields involved in distributed mode.
Field | Type | Required | Description | Example |
Name | string | Yes | The name of the state. | my-state-name |
Description | string | No | The description of the state. | describe it here |
Type | string | Yes | The type of the state. | Map |
InputConstructor | map[string]any | No | The input constructor. | See Inputs and outputs. |
ItemsPath | string | Yes | The expression that is used to extract an array within an input. | See the ItemsPath section of this topic. |
ItemBatcher | ItemBatcher | No | Combines multiple items into a batch, which is used as the input of a sub-workflow execution. | See the ItemBatcher section of this topic. |
ItemReader | ItemReader | No | Reads data from OSS buckets. | See the ItemReader section of this topic. |
ItemConstructor | ItemConstructor | No |
| See the ItemConstructor section of this topic. |
ResultWriter | ResultWriter | No | Writes the results of sub-workflow executions to a specified OSS bucket. | See the ResultWriter section of this topic. |
MaxConcuccency | int | No | Specifies the maximum number of concurrent sub-workflow executions. | 40 |
MaxItems | MaxItems | No | Specifies the maximum number of items that can be executed by the Map state machine. | See the MaxItems section of this topic. |
ToleratedFailurePercentage | ToleratedFailurePercentage | No | Specifies the percentage of failure that can be tolerated in an operation. | See the ToleratedFailurePercentage section of this topic. |
ToleratedFailureCount | ToleratedFailureCount | No | Specifies the times of failure that can be tolerated in an operation. | See the ToleratedFailureCount section of this topic. |
Processor | Processor | Yes | The Map processor. | See the Processor section of this topic. |
ProcessorConfig | ProcessorConfig | Yes | The processor configurations. | See the ProcessorConfig section of this topic. |
OutputConstructor | map[string]any | No | The output constructor. | See the OutputConstructor section of the "Inputs and outputs" topic. |
Next | string | No | The next state that is executed after the current state is complete. If the value of the End field is true, leave this field empty. | my-next-state |
End | bool | No | Specifies whether the state is the terminal state of the current scope. | true |
Retry | Retry | No | Defines the error retry policy. | See Error handling. |
Catch | Catch | No | Defines the error catch policy. | See Error handling. |
Usage limits
Quota limits
The following table describes the quota limits of distributed mode. If the default quota cannot meet your business requirements, you can submit a ticket to request for a larger quota.
Quota name | Description | Default value |
MaxOpenMapRun | The maximum number of distributed Map states that can be simultaneously executed within a single account in each region. | 10 |
MaxConcurrency | The maximum number of Map states that can be simultaneously executed within a single MapRun task. | 300 |
MaxItems | The maximum number of items that can be read within a single MapRun task. | 10000 |
Role limits
To execute Map states in distributed mode, you must have the following permissions. For more information about how to create a role policy, see Create a policy.
{
"Version": "1",
"Statement": [
{
"Effect": "Allow",
"Action": [
"oss:HeadObject",
"oss:GetObjectMeta",
"oss:GetObject",
"oss:PutObject",
"oss:ListObjectsV2",
"oss:ListObjects",
"oss:InitiateMultipartUpload",
"oss:UploadPart",
"oss:CompleteMultipartUpload",
"oss:AbortMultipartUpload",
"oss:ListMultipartUploads",
"oss:ListParts"
],
"Resource": "*"
}
]
}
Key fields in distributed mode
If ItemConstructor, ItemsPath, and ItemBatcher are configured, they are executed in the sequence of ItemsPath, ItemConstructor, and ItemBatcher.
ItemsPath
The expression that is used to extract arrays from an input. If a JSON array is returned after an ItemsPath expression is executed, the Map state can be executed. Each object in the array is passed to ItemProcessor for processing. You can use the $Context and $Input variables to extract arrays, as shown in the following example:
$Input.FieldA
Processor
The Map processor. The following table describes the fields in Processor.
Field | Type | Required | Description | Example |
States | array | Yes | The array of states that are contained in the flow. |
|
StartAt | string | Yes | The state from which the flow starts to be executed. | my start task |
ProcessorConfig
The processor configurations. The following table describes the fields in ProcessorConfig.
Field | Type | Required | Description | Example |
ExecutionMode | string | Yes | The execution mode. | Express |
ItemReader
Distributed mode supports larger size of inputs that are read from OSS buckets. The following table describes the fields in ItemReader.
Field | Type | Required | Description | Example |
SourceType | string | Yes | The type of the data source. Valid values: OSS_CSV, OSS_JSON_LIST, OSS_OBJECTS, and OSS_INVENTORY_FILES. | OSS_CSV |
SourceParameters | string | No | The parameters of the data source. | See the SourceParameters section of this topic. |
ReaderConfig | ReaderConfig | No | The reader configurations. | See the ReaderConfig section of this topic. |
SourceParameters
The parameters of the data source. The following table describes the fields in SourceParameters.
Field | Type | Required | Description | Example |
Bucket | string | No | The name of the bucket in which the object resides. | example-bucket |
ObjectName | string | No | The name of the object. | object_name_1 |
Prefix | string | No | The prefix that must be contained in the names of the buckets that you want to return. If you leave this field empty, the system returns all buckets that meet the conditions. This field is left empty by default. | example-prefix |
ReaderConfig
The reader configurations. The following table describes the fields in ReaderConfig.
Field | Type | Required | Description | Example |
CSVHeaders | []string | No | The column title or field name contained in the first row of the CSV file. | ColA,ColB,ColC |
To read data from OSS buckets, use one of the following methods:
Read data from the CSV files stored in OSS buckets
You can read data from CSV files by using Map states. For example, you have a CSV file named
example-object.csv
that is stored in an OSS bucket namedexample-bucket
. You can refer to the following sample snippet to read data from the CSV file:Type: StateMachine Name: MyWorkflow SpecVersion: v1 StartAt: Map States: - Type: Map Name: Map ProcessorConfig: ExecutionMode: Express Processor: StartAt: Pass States: - Type: Pass Name: Pass End: true ItemReader: SourceType: OSS_CSV SourceParameters: Bucket: example-bucket ObjectName: example-object.csv End: true
The following snippet shows the content of the
example-object.csv
file:ColA,ColB,ColC col_a_1,col_b_1,col_c_1
The following snippet shows the sample inputs of sub-workflow executions.
{ "ColA": "col_a_1", "ColB": "col_b_1", "ColC": "col_c_1", }
Read data from the JSON files stored in OSS buckets
ImportantJSON files stored in OSS buckets contain JSON arrays.
You can read JSON arrays from a specified OSS bucket. For example, you have a JSON file named
example-object.json
that is stored in an OSS bucket namedexample-bucket
. You can refer to the following sample snippet to read JSON arrays from the OSS bucket:Type: StateMachine Name: MyWorkflow SpecVersion: v1 StartAt: Map States: - Type: Map Name: Map ProcessorConfig: ExecutionMode: Express Processor: StartAt: Pass States: - Type: Pass Name: Pass End: true ItemReader: SourceType: OSS_JSON_LIST SourceParameters: Bucket: example-bucket ObjectName: example-object.json End: true
The following snippet shows the content of the
example-object.json
file:[ { "key_1": "value_1" } ]
The following snippet shows the sample inputs of sub-workflow executions.
{ "key_1": "value_1", }
Read objects stored in OSS buckets
You can read objects from a specified OSS bucket. For example, you have a list of objects whose names are prefixed with
example-prefix
. The objects are stored in an OSS bucket namedexample-bucket
. The following snippet provides a sample definition of the state machine.Type: StateMachine Name: MyWorkflow SpecVersion: v1 StartAt: Map States: - Type: Map Name: Map ProcessorConfig: ExecutionMode: Express Processor: StartAt: Pass States: - Type: Pass Name: Pass End: true ItemReader: SourceType: OSS_OBJECTS SourceParameters: Bucket: example-bucket Prefix: example-prefix End: true
The following snippet shows that the
example-prefix/object_1
object is stored in theexample-bucket
bucket:example-bucket ├── example-prefix/object_1
The following snippet shows the sample inputs of sub-workflow executions.
{ "XMLName": { "Space": "", "Local": "Contents" }, "Key": "example-prefix/object_1", "Type": "Normal", "Size": 268435, "ETag": "\"50B06D6680D86F04138HSN612EF5DEC6\"", "Owner": { "XMLName": { "Space": "", "Local": "" }, "ID": "", "DisplayName": "" }, "LastModified": "2024-01-01T01:01:01Z", "StorageClass": "Standard", "RestoreInfo": "" }
Read inventories from OSS buckets
You can read data from specified OSS buckets or objects by using Map states. For example, you have an inventory file named
manifest.json
that is stored in an OSS bucket namedexample-bucket
. The path to the inventory file isinventory/2024-01-01T01-01Z/manifest.json
. The following snippet provides a sample definition of the state machine.Type: StateMachine Name: MyWorkflow SpecVersion: v1 StartAt: Map States: - Type: Map Name: Map ProcessorConfig: ExecutionMode: Express Processor: StartAt: Pass States: - Type: Pass Name: Pass End: true ItemReader: SourceType: OSS_INVENTORY_FILES SourceParameters: Bucket: example-bucket ObjectName: inventory/2024-01-01T01-01Z/manifest.json ItemConstructor: Key.$: $Item.Key End: true
The following snippet shows the content of the inventory file:
"example-bucket","object_name_1" "example-bucket","object_name_2"
The following snippet shows the sample input of the first sub-workflow execution.
{ "Bucket": "example-bucket", "Key": "object_name_1" }
ItemBatcher
ItemBatcher combines multiple items into a batch, which is used as the input of a sub-workflow execution. The following table describes the fields in ItemBatcher.
Field | Type | Required | Description | Example |
MaxItemsPerBatch | int | No | The maximum number of items that can be processed in a single batch. | See the MaxItemsPerBatch section of this topic. |
MaxInputBytesPerBatch | int | No | The maximum bytes of data that can be processed in a single batch. | See the MaxInputBytesPerBatch section of this topic. |
BatchInput | map[string]any | No | The data set that is used as the input of a single batch. | See the BatchInput section of this topic. |
MaxItemsPerBatch
MaxItemsPerBatch specifies the maximum number of items that can be processed in a batch. The following snippet provides an example on how to use MaxItemsPerBatch.
Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
- Type: Map
Name: Map
ProcessorConfig:
ExecutionMode: Express
ItemsPath: $Input.Items
Processor:
StartAt: Pass
States:
- Type: Pass
Name: Pass
End: true
ItemBatcher:
MaxItemsPerBatch: 2
End: true
The following snippet shows the input executed by the state machine:
{
"Items": [
{"key_1":"value_1"},
{"key_2":"value_2"},
{"key_3":"value_3"},
{"key_4":"value_4"},
{"key_5":"value_5"}
]
}
The following snippet shows the sample inputs of sub-workflow executions.
# execution-1
# Sample input of the first sub-workflow execution
{
"Items": [
{"key_1":"value_1"},
{"key_2":"value_2"}
]
}
# execution-2
# Sample input of the second sub-workflow execution
{
"Items": [
{"key_1":"value_3"},
{"key_2":"value_4"}
]
}
# execution-3
# Sample input of the third sub-workflow execution
{
"Items": [
{"key_1":"value_5"},
]
}
MaxInputBytesPerBatch
MaxInputBytesPerBatch specifies the maximum bytes of data that can be processed in a single batch. The following snippet provides an example on how to use MaxInputBytesPerBatch.
ItemBatcher adds extra keys and inputs when ItemBatcher processes a batch. Therefore, the total size of the batch includes the size of the additional keys and inputs.
The unit of MaxInputBytesPerBatch is byte.
Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
- Type: Map
Name: Map
ProcessorConfig:
ExecutionMode: Express
ItemsPath: $Input.Items
Processor:
StartAt: Pass
States:
- Type: Pass
Name: Pass
End: true
ItemBatcher:
MaxInputBytesPerBatch: 50
End: true
The following snippet shows the input executed by the state machine:
{
"Items":[
{"Key":1},
{"key":2},
{"Key":3},
{"Key":4},
{"Key":5}
]
}
The following snippet shows the sample inputs of sub-workflow executions.
# execution-1
# Sample input of the first sub-workflow execution
{
"Items":[
{"Key":1},
{"key":2}
]
}
# execution-2
# Sample input of the second sub-workflow execution
{
"Items":[
{"Key":3},
{"key":4}
]
}
# execution-3
# Sample input of the third sub-workflow execution
{
"Items":[
{"Key":5}
]
}
BatchInput
BatchInput provides a data set that is used as the input of a single batch. BatchInput supports extra inputs in addition to items. The following snippet provides an example on how to use BatchInput.
Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
- Type: Map
Name: Map
ProcessorConfig:
ExecutionMode: Express
ItemsPath: $Input.Items
Processor:
StartAt: Pass
States:
- Type: Pass
Name: Pass
End: true
ItemBatcher:
MaxInputBytesPerBatch: 70
BatchInput:
InputKey.$: $Input.Key
End: true
The following snippet shows the input executed by the state machine:
{
"Key":"value",
"Items":[
{"Key":1},
{"key":2},
{"Key":3},
{"Key":4},
{"Key":5}
]
}
The following snippet shows the sample inputs of sub-workflow executions.
# execution-1
# Sample input of the first sub-workflow execution
{
"BatchInput":{
"InputKey":"value"
},
"Items":[
{"Key":1},
{"key":2}
]
}
# execution-2
# Sample input of the second sub-workflow execution
{
"BatchInput":{
"InputKey":"value"
},
"Items":[
{"Key":3},
{"key":4}
]
}
# execution-3
# Sample input of the third sub-workflow execution
{
"BatchInput":{
"InputKey":"value"
},
"Items":[
{"Key":5}
]
}
ItemConstructor
ItemConstructor constructs items and allows you to use $Item to reference original inputs. The following snippet provides an example on how to use ItemConstructor.
Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
- Type: Map
Name: Map
ProcessorConfig:
ExecutionMode: Express
ItemsPath: $Input.Items
Processor:
StartAt: Pass
States:
- Type: Pass
Name: Pass
End: true
ItemBatcher:
MaxInputBytesPerBatch: 200
BatchInput:
InputKey.$: $Input.Key
ItemConstructor:
ConstructedKey.$: $Item.Key
InputKey.$: $Input.Key
End: true
The following snippet shows the input executed by the state machine:
{
"Key":"value",
"Items":[
{"Key":1},
{"Key":2},
{"Key":3},
{"Key":4},
{"Key":5}
]
}
The following snippet shows the sample inputs of sub-workflow executions.
# execution-1
# Sample input of the first sub-workflow execution
{
"BatchInput": {
"InputKey": "value"
},
"Items": [
{
"InputKey": "value",
"ConstructedKey": 1
},
{
"InputKey": "value",
"ConstructedKey": 2
},
{
"InputKey": "value",
"ConstructedKey": 3
}
]
}
# execution-2
# Sample input of the second sub-workflow execution
{
"BatchInput": {
"InputKey": "value"
},
"Items": [
{
"InputKey": "value",
"ConstructedKey": 4
},
{
"InputKey": "value",
"ConstructedKey": 5
}
]
}
ResultWriter
ResultWriter writes the information about sub-workflow executions to the specified OSS bucket. The following table describes the field in ResultWriter.
Field | Type | Required | Description | Example |
Parameters | string | Yes | The request parameters. | See the Parameters section of this topic. |
Parameters
The request parameters. The following table describes the fields in Parameters.
Field | Type | Required | Description | Example |
Bucket | string | Yes | The name of the bucket in which the object resides. | example-bucket |
Prefix | string | Yes | The prefix that must be contained in the names of the buckets that you want to return. If you leave this field empty, the system returns all buckets that meet the conditions. This field is left empty by default. | example-prefix |
The following snippet provides an example on how to use ResultWriter.
Inputs and outputs of workflow states cannot exceed the specified sizes. For Map states that contain multiple items, the output may exceed the limit. In this case, we recommend that you configure ResultWriter to store the outputs of the Map states in OSS buckets.
Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
- Type: Map
Name: Map
ProcessorConfig:
ExecutionMode: Express
ItemsPath: $Input.Items
ItemConstructor:
Key.$: $Item.Key
FailedValue.$: $Input.FailedValue
ToleratedFailurePercentage: 30
Processor:
StartAt: Choice
States:
- Type: Choice
Name: Choice
Branches:
- Condition: $Input.Key > $Input.FailedValue
Next: Fail
Default: Succeed
- Type: Succeed
Name: Succeed
End: true
- Type: Fail
Name: Fail
Code: MockError
End: true
ResultWriter:
Parameters:
Bucket: example-bucket
Prefix: example-prefix/
End: true
The following snippet shows the input executed by the state machine:
{
"FailedValue": 4,
"Items": [
{
"Key": 1
},
{
"Key": 2
},
{
"Key": 3
},
{
"Key": 4
},
{
"Key": 5
}
]
}
The following snippets provide the content of three JSON files.
# The following snippet provides the content of the manifest.json file whose storage path is example-prefix/map-run-name/manifest.json.
{
"DestinationBucket": "example-bucket",
"MapRunName": "map-run-name",
"ResultFiles": {
"FAILED": [
{
"ObjectName": "example-prefix/map-run-name/FAILED_0.json",
"Size": 262
}
],
"SUCCEED": [
{
"ObjectName": "example-prefix/map-run-name/SUCCEED_0.json",
"Size": 1057
}
]
}
}
# The following snippet provides the content of the FAILED_0.json file whose storage path is example-prefix/map-run-name/FAILED_0.json.
[
{
"ExecutionName": "execution-name-5",
"FlowName": "example",
"Input": "{\"FailedValue\":4,\"Key\":5}",
"Output": "{\"ErrorCode\":\"MockError\"}",
"Status": "Failed",
"StartedTime": "rfc3339-format-time-string",
"StoppedTime": "rfc3339-format-time-string"
}
]
# The following snippet provides the content of the SUCCEED_0.json file whose storage path is example-prefix/map-run-name/SUCCEED_0.json.
[
{
"ExecutionName": "execution-name-1",
"FlowName": "example",
"Input": "{\"FailedValue\":4,\"Key\":1}",
"Output": "{\"FailedValue\":4,\"Key\":1}",
"Status": "Succeeded",
"StartedTime": "rfc3339-format-time-string",
"StoppedTime": "rfc3339-format-time-string"
},
{
"ExecutionName": "execution-name-2",
"FlowName": "example",
"Input": "{\"FailedValue\":4,\"Key\":2}",
"Output": "{\"FailedValue\":4,\"Key\":2}",
"Status": "Succeeded",
"StartedTime": "rfc3339-format-time-string",
"StoppedTime": "rfc3339-format-time-string"
},
{
"ExecutionName": "execution-name-3",
"FlowName": "example",
"Input": "{\"FailedValue\":4,\"Key\":3}",
"Output": "{\"FailedValue\":4,\"Key\":3}",
"Status": "Succeeded",
"StartedTime": "rfc3339-format-time-string",
"StoppedTime": "rfc3339-format-time-string"
},
{
"ExecutionName": "execution-name-4",
"FlowName": "example",
"Input": "{\"FailedValue\":4,\"Key\":4}",
"Output": "{\"FailedValue\":4,\"Key\":4}",
"Status": "Succeeded",
"StartedTime": "rfc3339-format-time-string",
"StoppedTime": "rfc3339-format-time-string"
}
]
MaxItems
MaxItems specifies the maximum number of items that can be executed by the Map state machine. For example, an OSS bucket contains 10,000 objects. If you set MaxItems to 1,000, the Map state machine loads only 1,000 objects from the OSS bucket.
MaxConcurrency
MaxConcurrency specifies the number of concurrent sub-workflow executions. For example, if the Map state machine is configured to execute 10,000 items and you set MaxConcurrency to 100, the Map state machine simultaneously executes 100 sub-workflows.
ToleratedFailurePercentage
ToleratedFailurePercentage specifies the percentage of failure that can be tolerated in an operation. For example, you have 10,000 items and set ToleratedFailurePercentage to 10. In this case, the Map state machine can tolerate a maximum of 1,000 failed item executions.
ToleratedFailureCount
ToleratedFailureCount specifies the times of failure that can be tolerated in an operation. For example, you have 10,000 items and set ToleratedFailureCount to 10. In this case, the Map state machine can tolerate a maximum of 10 failed item executions.
Example
The following workflow defines a Map state in distributed mode, which reads input from an upstream state and extracts iteration items by using $Input.Items
. Each Map state is executed as a sub-workflow in Express mode.
Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
- Type: Map
Name: Map
ProcessorConfig:
ExecutionMode: Express
ItemsPath: $Input.Items
Processor:
StartAt: Pass
States:
- Type: Pass
Name: Pass
End: true
End: true
The following snippet shows the input executed by the state machine:
{
"Items": [
{"key_1":"value_1"},
{"key_2":"value_2"},
{"key_3":"value_3"}
]
}
In this example, the Map state machine generates three sub-workflow executions. The following snippet shows the definitions of the sub-workflow executions:
Type: StateMachine
Name: Map
SpecVersion: v1
StartAt: Pass
States:
- Type: Pass
Name: Pass
End: true
The following snippet shows the sample inputs of the sub-workflow executions.
# execution-1
# Sample input of the sub-workflow execution corresponding to the first Map state
{"key_1":"value_1"}
# execution-2
# Sample input of the sub-workflow execution corresponding to the second Map state
{"key_2":"value_2"}
# execution-3
# Sample input of the sub-workflow execution corresponding to the third Map state
{"key_3":"value_3"}
After you run the preceding snippets, the output of the Map state machine is a JSON array. Each object in the array is an output of a Map state.
{
"Items": [
{
"key_1": "value_1"
},
{
"key_2": "value_2"
},
{
"key_3": "value_3"
}
]
}