Mode terdistribusi menjalankan setiap iterasi state Map sebagai eksekusi sub-workflow independen, memungkinkan pemrosesan paralel berkonkurensi tinggi yang cocok untuk pemrosesan data besar dan komputasi paralel.
Gunakan mode terdistribusi jika satu state Map tidak mampu menangani beban kerja Anda secara sekuensial. Mode ini mendistribusikan pekerjaan ke seluruh eksekusi sub-workflow konkuren—masing-masing dengan siklus hidup sendiri—dan dapat memproses data dari input inline atau bucket Object Storage Service (OSS) secara paralel.
Konsep utama
| Term | Definition |
|---|---|
| Distributed mode | Mode pemrosesan state Map yang memproses jumlah besar data secara konkuren melalui eksekusi sub-workflow. Objek dalam bucket OSS dapat digunakan sebagai sumber data. |
| Sub-workflow execution | Satu iterasi dari state Map terdistribusi. Definisi berasal dari bidang Processor, dan setiap sub-workflow menerima satu item (atau satu batch) sebagai input. Sub-workflow dapat berjalan dalam mode standar atau mode express. Untuk detailnya, lihat Standard mode and express mode. |
| Fault tolerance policy | Ketika eksekusi sub-workflow gagal, state Map berakhir dengan kegagalan. Untuk state Map yang berisi multiple item, Anda dapat mengonfigurasi kebijakan toleransi kesalahan agar state Map tetap memproses item yang tersisa meskipun eksekusi sub-workflow individual gagal. |
Cara kerja
State Map menerima input dan mengekstrak array item menggunakan
ItemsPath(atau membaca item dari sumber data OSS menggunakanItemReader).Jika
ItemConstructordikonfigurasi, setiap item ditransformasikan.Jika
ItemBatcherdikonfigurasi, item dikelompokkan menjadi batch.Setiap item atau batch diteruskan ke eksekusi sub-workflow yang didefinisikan oleh
Processor, berjalan dalam mode yang ditentukan olehProcessorConfig.Setelah semua eksekusi sub-workflow selesai, state Map mengumpulkan hasilnya ke dalam array JSON (atau menuliskannya ke OSS menggunakan
ResultWriter).
Ketika ItemsPath, ItemConstructor, dan ItemBatcher semuanya dikonfigurasi, mereka dijalankan dalam urutan berikut: ItemsPath, lalu ItemConstructor, lalu ItemBatcher.
Contoh dasar
Workflow berikut mendefinisikan state Map terdistribusi yang membaca item dari $Input.Items dan menjalankan setiap item melalui state Pass dalam mode Express:
Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
- Type: Map
Name: Map
ProcessorConfig:
ExecutionMode: Express
Processor:
StartAt: Pass
States:
- Type: Pass
Name: Pass
End: true
ItemReader:
SourceType: OSS_CSV
SourceParameters:
Bucket: example-bucket
ObjectName: example-object.csv
End: trueInput state machine:
{
"Items": [
{"key_1":"value_1"},
{"key_2":"value_2"},
{"key_3":"value_3"}
]
}Ini membuat tiga eksekusi sub-workflow, masing-masing menjalankan definisi berikut:
Type: StateMachine
Name: Map
SpecVersion: v1
StartAt: Pass
States:
- Type: Pass
Name: Pass
End: trueInput sub-workflow:
// Sub-workflow 1
{"key_1":"value_1"}
// Sub-workflow 2
{"key_2":"value_2"}
// Sub-workflow 3
{"key_3":"value_3"}Setelah semua sub-workflow selesai, output state Map adalah array JSON yang berisi semua output sub-workflow:
{
"Items": [
{
"key_1": "value_1"
},
{
"key_2": "value_2"
},
{
"key_3": "value_3"
}
]
}Referensi bidang
Semua bidang yang tersedia dalam state Map mode terdistribusi:
| Field | Type | Required | Description | Example |
|---|---|---|---|---|
| Name | string | Yes | Nama state. | my-state-name |
| Description | string | No | Deskripsi status. | describe it here |
| Type | string | Yes | Jenis state. | Map |
| InputConstructor | map[string]any | No | Konstruktor input. | Lihat Inputs and outputs. |
| ItemsPath | string | Yes | Ekspresi untuk mengekstrak array dari input. | Lihat ItemsPath. |
| ItemBatcher | ItemBatcher | No | Menggabungkan multiple item menjadi satu batch sebagai input sub-workflow. | Lihat ItemBatcher. |
| ItemReader | ItemReader | No | Membaca data dari bucket OSS. | Lihat ItemReader. |
| ItemConstructor | ItemConstructor | No | Mentransformasi item menggunakan $Item untuk mereferensikan nilai input asli. | Lihat ItemConstructor. |
| ResultWriter | ResultWriter | No | Menulis hasil sub-workflow ke bucket OSS tertentu. | Lihat ResultWriter. |
| MaxConcuccency | int | No | Jumlah maksimum eksekusi sub-workflow konkuren. | 40 |
| MaxItems | MaxItems | No | Jumlah maksimum item yang diproses oleh state Map. | Lihat MaxItems. |
| ToleratedFailurePercentage | ToleratedFailurePercentage | No | Persentase kegagalan eksekusi sub-workflow yang dapat ditoleransi. | Lihat ToleratedFailurePercentage. |
| ToleratedFailureCount | ToleratedFailureCount | No | Jumlah kegagalan eksekusi sub-workflow yang dapat ditoleransi. | Lihat ToleratedFailureCount. |
| Processor | Processor | Yes | Prosesor Map yang mendefinisikan sub-workflow. | Lihat Processor. |
| ProcessorConfig | ProcessorConfig | Yes | Konfigurasi prosesor. | Lihat ProcessorConfig. |
| OutputConstructor | map[string]any | No | Konstruktor output. | Lihat OutputConstructor. |
| Next | string | No | State berikutnya yang dijalankan setelah state ini selesai. Biarkan kosong jika End bernilai true. | my-next-state |
| End | bool | No | Apakah state ini merupakan state terminal dalam cakupan saat ini. | true |
| Retry | Retry | No | Kebijakan retry error. | Lihat Error handling. |
| Catch | Catch | No | Kebijakan penanganan error. | Lihat Error handling. |
Detail bidang
ItemsPath
Mengekstrak array dari input state Map. Setiap elemen dalam array menjadi input untuk satu eksekusi sub-workflow. Data direferensikan menggunakan variabel $Context atau $Input:
$Input.FieldAProcessor
Mendefinisikan sub-workflow yang memproses setiap item atau batch.
| Field | Type | Required | Description | Example |
|---|---|---|---|---|
| States | array | Yes | State dalam sub-workflow. | Lihat contoh di bawah. |
| StartAt | string | Yes | State pertama yang dijalankan. | my start task |
Processor:
StartAt: Pass1
States:
- Type: Pass
Name: Pass1
End: trueProcessorConfig
Menentukan mode eksekusi untuk sub-workflow.
| Field | Type | Required | Description | Example |
|---|---|---|---|---|
| ExecutionMode | string | Yes | Mode eksekusi untuk sub-workflow. | Express |
ItemReader
Membaca data input dari bucket OSS, mendukung ukuran input yang lebih besar.
| Field | Type | Required | Description | Example |
|---|---|---|---|---|
| SourceType | string | Yes | Jenis sumber data. Nilai valid: OSS_CSV, OSS_JSON_LIST, OSS_OBJECTS, OSS_INVENTORY_FILES. | OSS_CSV |
| SourceParameters | SourceParameters | No | Parameter lokasi sumber data. | Lihat SourceParameters. |
| ReaderConfig | ReaderConfig | No | Konfigurasi Reader. | Lihat ReaderConfig. |
SourceParameters
| Field | Type | Required | Description | Example |
|---|---|---|---|---|
| Bucket | string | No | Nama bucket OSS. | example-bucket |
| ObjectName | string | No | Nama objek. | object_name_1 |
| Prefix | string | No | Filter awalan nama objek. Jika dibiarkan kosong, semua objek yang cocok akan dikembalikan. | example-prefix |
ReaderConfig
| Field | Type | Required | Description | Example |
|---|---|---|---|---|
| CSVHeaders | []string | No | Header kolom pada baris pertama file CSV. | ColA,ColB,ColC |
Baca file CSV dari OSS
Baca data dari file CSV yang disimpan dalam bucket OSS. Dalam contoh ini, example-object.csv disimpan di example-bucket:
Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
- Type: Map
Name: Map
ProcessorConfig:
ExecutionMode: Express
Processor:
StartAt: Pass
States:
- Type: Pass
Name: Pass
End: true
ItemReader:
SourceType: OSS_CSV
SourceParameters:
Bucket: example-bucket
ObjectName: example-object.csv
End: trueKonten file CSV:
ColA,ColB,ColC
col_a_1,col_b_1,col_c_1Setiap baris menjadi input sub-workflow:
{
"ColA": "col_a_1",
"ColB": "col_b_1",
"ColC": "col_c_1"
}Baca array JSON dari OSS
File JSON harus berisi array JSON.
Baca array JSON dari example-object.json di example-bucket:
Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
- Type: Map
Name: Map
ProcessorConfig:
ExecutionMode: Express
Processor:
StartAt: Pass
States:
- Type: Pass
Name: Pass
End: true
ItemReader:
SourceType: OSS_JSON_LIST
SourceParameters:
Bucket: example-bucket
ObjectName: example-object.json
End: trueKonten file JSON:
[
{
"key_1": "value_1"
}
]Setiap elemen menjadi input sub-workflow:
{
"key_1": "value_1"
}Baca objek dari OSS
Daftar objek dengan awalan tertentu dan teruskan metadata-nya sebagai input sub-workflow. Contoh ini membaca objek dengan awalan example-prefix dari example-bucket:
Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
- Type: Map
Name: Map
ProcessorConfig:
ExecutionMode: Express
Processor:
StartAt: Pass
States:
- Type: Pass
Name: Pass
End: true
ItemReader:
SourceType: OSS_OBJECTS
SourceParameters:
Bucket: example-bucket
Prefix: example-prefix
End: trueDengan struktur bucket berikut:
example-bucket
└── example-prefix/object_1Metadata setiap objek menjadi input sub-workflow:
{
"XMLName": {
"Space": "",
"Local": "Contents"
},
"Key": "example-prefix/object_1",
"Type": "Normal",
"Size": 268435,
"ETag": "\"50B06D6680D86F04138HSN612EF5DEC6\"",
"Owner": {
"XMLName": {
"Space": "",
"Local": ""
},
"ID": "",
"DisplayName": ""
},
"LastModified": "2024-01-01T01:01:01Z",
"StorageClass": "Standard",
"RestoreInfo": ""
}Baca file inventaris OSS
Baca item dari manifes inventaris OSS. Contoh ini membaca inventory/2024-01-01T01-01Z/manifest.json dari example-bucket:
Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
- Type: Map
Name: Map
ProcessorConfig:
ExecutionMode: Express
Processor:
StartAt: Pass
States:
- Type: Pass
Name: Pass
End: true
ItemReader:
SourceType: OSS_INVENTORY_FILES
SourceParameters:
Bucket: example-bucket
ObjectName: inventory/2024-01-01T01-01Z/manifest.json
ItemConstructor:
Key.$: $Item.Key
End: trueKonten file inventaris:
"example-bucket","object_name_1"
"example-bucket","object_name_2"Input sub-workflow pertama:
{
"Bucket": "example-bucket",
"Key": "object_name_1"
}ItemBatcher
Mengelompokkan beberapa item menjadi batch. Setiap batch menjadi input untuk satu eksekusi sub-workflow.
| Field | Type | Required | Description | Example |
|---|---|---|---|---|
| MaxItemsPerBatch | int | No | Jumlah maksimum item per batch. | Lihat contoh. |
| MaxInputBytesPerBatch | int | No | Jumlah maksimum byte per batch. Satuan: byte. | Lihat contoh. |
| BatchInput | map[string]any | No | Data tambahan yang disertakan dalam input setiap batch. | Lihat contoh. |
Batch berdasarkan jumlah item
Tetapkan MaxItemsPerBatch untuk mengontrol jumlah item dalam setiap batch:
Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
- Type: Map
Name: Map
ProcessorConfig:
ExecutionMode: Express
ItemsPath: $Input.Items
Processor:
StartAt: Pass
States:
- Type: Pass
Name: Pass
End: true
ItemBatcher:
MaxItemsPerBatch: 2
End: trueInput state machine:
{
"Items": [
{"key_1":"value_1"},
{"key_2":"value_2"},
{"key_3":"value_3"},
{"key_4":"value_4"},
{"key_5":"value_5"}
]
}Ini menghasilkan tiga eksekusi sub-workflow:
// Sub-workflow 1
{
"Items": [
{"key_1":"value_1"},
{"key_2":"value_2"}
]
}
// Sub-workflow 2
{
"Items": [
{"key_1":"value_3"},
{"key_2":"value_4"}
]
}
// Sub-workflow 3
{
"Items": [
{"key_1":"value_5"}
]
}Batch berdasarkan ukuran byte
Tetapkan MaxInputBytesPerBatch untuk membatasi ukuran setiap batch dalam byte.
ItemBatcher menambahkan kunci metadata ke setiap batch. Ukuran total batch mencakup kunci tambahan ini.
Satuan untuk
MaxInputBytesPerBatchadalah byte.
Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
- Type: Map
Name: Map
ProcessorConfig:
ExecutionMode: Express
ItemsPath: $Input.Items
Processor:
StartAt: Pass
States:
- Type: Pass
Name: Pass
End: true
ItemBatcher:
MaxInputBytesPerBatch: 50
End: trueInput state machine:
{
"Items":[
{"Key":1},
{"key":2},
{"Key":3},
{"Key":4},
{"Key":5}
]
}Ini menghasilkan tiga eksekusi sub-workflow:
// Sub-workflow 1
{
"Items":[
{"Key":1},
{"key":2}
]
}
// Sub-workflow 2
{
"Items":[
{"Key":3},
{"key":4}
]
}
// Sub-workflow 3
{
"Items":[
{"Key":5}
]
}Tambahkan data bersama ke batch
Gunakan BatchInput untuk menyertakan data tambahan dari input state machine ke setiap batch:
Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
- Type: Map
Name: Map
ProcessorConfig:
ExecutionMode: Express
ItemsPath: $Input.Items
Processor:
StartAt: Pass
States:
- Type: Pass
Name: Pass
End: true
ItemBatcher:
MaxInputBytesPerBatch: 70
BatchInput:
InputKey.$: $Input.Key
End: trueInput state machine:
{
"Key":"value",
"Items":[
{"Key":1},
{"key":2},
{"Key":3},
{"Key":4},
{"Key":5}
]
}Setiap sub-workflow menerima data BatchInput dan item yang dibatch:
// Sub-workflow 1
{
"BatchInput":{
"InputKey":"value"
},
"Items":[
{"Key":1},
{"key":2}
]
}
// Sub-workflow 2
{
"BatchInput":{
"InputKey":"value"
},
"Items":[
{"Key":3},
{"key":4}
]
}
// Sub-workflow 3
{
"BatchInput":{
"InputKey":"value"
},
"Items":[
{"Key":5}
]
}ItemConstructor
Mentransformasi setiap item sebelum diteruskan ke eksekusi sub-workflow. Gunakan $Item untuk mereferensikan item asli dan $Input untuk mereferensikan input state machine:
Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
- Type: Map
Name: Map
ProcessorConfig:
ExecutionMode: Express
ItemsPath: $Input.Items
Processor:
StartAt: Pass
States:
- Type: Pass
Name: Pass
End: true
ItemBatcher:
MaxInputBytesPerBatch: 200
BatchInput:
InputKey.$: $Input.Key
ItemConstructor:
ConstructedKey.$: $Item.Key
InputKey.$: $Input.Key
End: trueMasukan mesin status:
{
"Key":"value",
"Items":[
{"Key":1},
{"Key":2},
{"Key":3},
{"Key":4},
{"Key":5}
]
}Setiap item ditransformasi oleh ItemConstructor sebelum dibatch:
// Sub-workflow 1
{
"BatchInput": {
"InputKey": "value"
},
"Items": [
{
"InputKey": "value",
"ConstructedKey": 1
},
{
"InputKey": "value",
"ConstructedKey": 2
},
{
"InputKey": "value",
"ConstructedKey": 3
}
]
}
// Sub-workflow 2
{
"BatchInput": {
"InputKey": "value"
},
"Items": [
{
"InputKey": "value",
"ConstructedKey": 4
},
{
"InputKey": "value",
"ConstructedKey": 5
}
]
}ResultWriter
Menulis hasil sub-workflow ke bucket OSS. Konfigurasikan ResultWriter ketika output gabungan dari semua eksekusi sub-workflow berpotensi melebihi batas ukuran output state.
| Field | Type | Required | Description | Example |
|---|---|---|---|---|
| Parameters | Parameters | Yes | Tujuan OSS untuk hasil. | Lihat di bawah. |
Parameters
| Field | Type | Required | Description | Example |
|---|---|---|---|---|
| Bucket | string | Yes | Bucket OSS tujuan. | example-bucket |
| Prefix | string | Yes | Awalan nama objek untuk file hasil. | example-prefix/ |
Input dan output state memiliki batas ukuran. Untuk state Map dengan banyak item, output gabungan dapat melebihi batas ini. Konfigurasikan ResultWriter untuk menyimpan hasil di OSS sebagai gantinya.
Contoh berikut menggabungkan ResultWriter dengan toleransi kesalahan. State Map mentoleransi kegagalan hingga 30% dan menulis semua hasil (sukses maupun gagal) ke OSS:
Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
- Type: Map
Name: Map
ProcessorConfig:
ExecutionMode: Express
ItemsPath: $Input.Items
ItemConstructor:
Key.$: $Item.Key
FailedValue.$: $Input.FailedValue
ToleratedFailurePercentage: 30
Processor:
StartAt: Choice
States:
- Type: Choice
Name: Choice
Branches:
- Condition: $Input.Key > $Input.FailedValue
Next: Fail
Default: Succeed
- Type: Succeed
Name: Succeed
End: true
- Type: Fail
Name: Fail
Code: MockError
End: true
ResultWriter:
Parameters:
Bucket: example-bucket
Prefix: example-prefix/
End: trueInput state machine:
{
"FailedValue": 4,
"Items": [
{"Key": 1},
{"Key": 2},
{"Key": 3},
{"Key": 4},
{"Key": 5}
]
}Struktur file hasil
ResultWriter menghasilkan tiga jenis file di lokasi OSS yang ditentukan:
manifest.json (example-prefix/map-run-name/manifest.json):
{
"DestinationBucket": "example-bucket",
"MapRunName": "map-run-name",
"ResultFiles": {
"FAILED": [
{
"ObjectName": "example-prefix/map-run-name/FAILED_0.json",
"Size": 262
}
],
"SUCCEED": [
{
"ObjectName": "example-prefix/map-run-name/SUCCEED_0.json",
"Size": 1057
}
]
}
}FAILED_0.json (example-prefix/map-run-name/FAILED_0.json):
[
{
"ExecutionName": "execution-name-5",
"FlowName": "example",
"Input": "{\"FailedValue\":4,\"Key\":5}",
"Output": "{\"ErrorCode\":\"MockError\"}",
"Status": "Failed",
"StartedTime": "rfc3339-format-time-string",
"StoppedTime": "rfc3339-format-time-string"
}
]SUCCEED_0.json (example-prefix/map-run-name/SUCCEED_0.json):
[
{
"ExecutionName": "execution-name-1",
"FlowName": "example",
"Input": "{\"FailedValue\":4,\"Key\":1}",
"Output": "{\"FailedValue\":4,\"Key\":1}",
"Status": "Succeeded",
"StartedTime": "rfc3339-format-time-string",
"StoppedTime": "rfc3339-format-time-string"
},
{
"ExecutionName": "execution-name-2",
"FlowName": "example",
"Input": "{\"FailedValue\":4,\"Key\":2}",
"Output": "{\"FailedValue\":4,\"Key\":2}",
"Status": "Succeeded",
"StartedTime": "rfc3339-format-time-string",
"StoppedTime": "rfc3339-format-time-string"
},
{
"ExecutionName": "execution-name-3",
"FlowName": "example",
"Input": "{\"FailedValue\":4,\"Key\":3}",
"Output": "{\"FailedValue\":4,\"Key\":3}",
"Status": "Succeeded",
"StartedTime": "rfc3339-format-time-string",
"StoppedTime": "rfc3339-format-time-string"
},
{
"ExecutionName": "execution-name-4",
"FlowName": "example",
"Input": "{\"FailedValue\":4,\"Key\":4}",
"Output": "{\"FailedValue\":4,\"Key\":4}",
"Status": "Succeeded",
"StartedTime": "rfc3339-format-time-string",
"StoppedTime": "rfc3339-format-time-string"
}
]MaxItems
Membatasi jumlah item yang diproses oleh state Map. Jika sumber data berisi lebih banyak item daripada MaxItems, hanya MaxItems item pertama yang diproses.
Sebagai contoh, jika bucket OSS berisi 10.000 objek dan MaxItems diatur ke 1.000, state Map hanya memproses 1.000 objek.
MaxConcurrency
Membatasi jumlah eksekusi sub-workflow yang berjalan secara simultan. Sebagai contoh, jika state Map memproses 10.000 item dan MaxConcurrency diatur ke 100, paling banyak 100 sub-workflow berjalan pada waktu yang sama.
ToleratedFailurePercentage
Menetapkan persentase eksekusi sub-workflow yang boleh gagal sebelum state Map itu sendiri gagal. Sebagai contoh, dengan 10.000 item dan ToleratedFailurePercentage diatur ke 10, state Map mentoleransi hingga 1.000 kegagalan.
ToleratedFailureCount
Menetapkan jumlah eksekusi sub-workflow yang boleh gagal sebelum state Map itu sendiri gagal. Sebagai contoh, dengan 10.000 item dan ToleratedFailureCount diatur ke 10, state Map mentoleransi hingga 10 kegagalan.
Batas kuota
Tabel berikut mencantumkan batas kuota default untuk mode terdistribusi. Untuk meminta kuota yang lebih tinggi, submit a ticket.
| Quota name | Description | Default value |
|---|---|---|
| MaxOpenMapRun | Jumlah maksimum state Map terdistribusi yang berjalan secara simultan per akun per wilayah | 10 |
| MaxConcurrency | Jumlah maksimum eksekusi sub-workflow konkuren per MapRun | 300 |
| MaxItems | Jumlah maksimum item per MapRun | 10.000 |
Izin yang diperlukan
Menjalankan state Map dalam mode terdistribusi memerlukan izin OSS berikut. Berikan aksi ini dalam kebijakan role. Untuk detailnya, lihat Create a policy.
{
"Version": "1",
"Statement": [
{
"Effect": "Allow",
"Action": [
"oss:HeadObject",
"oss:GetObjectMeta",
"oss:GetObject",
"oss:PutObject",
"oss:ListObjectsV2",
"oss:ListObjects",
"oss:InitiateMultipartUpload",
"oss:UploadPart",
"oss:CompleteMultipartUpload",
"oss:AbortMultipartUpload",
"oss:ListMultipartUploads",
"oss:ListParts"
],
"Resource": "*"
}
]
}