All Products
Search
Document Center

CloudFlow:Mode terdistribusi

Last Updated:Mar 11, 2026

Mode terdistribusi menjalankan setiap iterasi state Map sebagai eksekusi sub-workflow independen, memungkinkan pemrosesan paralel berkonkurensi tinggi yang cocok untuk pemrosesan data besar dan komputasi paralel.

Gunakan mode terdistribusi jika satu state Map tidak mampu menangani beban kerja Anda secara sekuensial. Mode ini mendistribusikan pekerjaan ke seluruh eksekusi sub-workflow konkuren—masing-masing dengan siklus hidup sendiri—dan dapat memproses data dari input inline atau bucket Object Storage Service (OSS) secara paralel.

Konsep utama

TermDefinition
Distributed modeMode pemrosesan state Map yang memproses jumlah besar data secara konkuren melalui eksekusi sub-workflow. Objek dalam bucket OSS dapat digunakan sebagai sumber data.
Sub-workflow executionSatu iterasi dari state Map terdistribusi. Definisi berasal dari bidang Processor, dan setiap sub-workflow menerima satu item (atau satu batch) sebagai input. Sub-workflow dapat berjalan dalam mode standar atau mode express. Untuk detailnya, lihat Standard mode and express mode.
Fault tolerance policyKetika eksekusi sub-workflow gagal, state Map berakhir dengan kegagalan. Untuk state Map yang berisi multiple item, Anda dapat mengonfigurasi kebijakan toleransi kesalahan agar state Map tetap memproses item yang tersisa meskipun eksekusi sub-workflow individual gagal.

Cara kerja

  1. State Map menerima input dan mengekstrak array item menggunakan ItemsPath (atau membaca item dari sumber data OSS menggunakan ItemReader).

  2. Jika ItemConstructor dikonfigurasi, setiap item ditransformasikan.

  3. Jika ItemBatcher dikonfigurasi, item dikelompokkan menjadi batch.

  4. Setiap item atau batch diteruskan ke eksekusi sub-workflow yang didefinisikan oleh Processor, berjalan dalam mode yang ditentukan oleh ProcessorConfig.

  5. Setelah semua eksekusi sub-workflow selesai, state Map mengumpulkan hasilnya ke dalam array JSON (atau menuliskannya ke OSS menggunakan ResultWriter).

Penting

Ketika ItemsPath, ItemConstructor, dan ItemBatcher semuanya dikonfigurasi, mereka dijalankan dalam urutan berikut: ItemsPath, lalu ItemConstructor, lalu ItemBatcher.

Contoh dasar

Workflow berikut mendefinisikan state Map terdistribusi yang membaca item dari $Input.Items dan menjalankan setiap item melalui state Pass dalam mode Express:

Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
  - Type: Map
    Name: Map
    ProcessorConfig:
      ExecutionMode: Express
    Processor:
      StartAt: Pass
      States:
        - Type: Pass
          Name: Pass
          End: true
    ItemReader:
      SourceType: OSS_CSV
      SourceParameters:
        Bucket: example-bucket
        ObjectName: example-object.csv
    End: true

Input state machine:

{
  "Items": [
    {"key_1":"value_1"},
    {"key_2":"value_2"},
    {"key_3":"value_3"}
  ]
}

Ini membuat tiga eksekusi sub-workflow, masing-masing menjalankan definisi berikut:

Type: StateMachine
Name: Map
SpecVersion: v1
StartAt: Pass
States:
  - Type: Pass
    Name: Pass
    End: true

Input sub-workflow:

// Sub-workflow 1
{"key_1":"value_1"}

// Sub-workflow 2
{"key_2":"value_2"}

// Sub-workflow 3
{"key_3":"value_3"}

Setelah semua sub-workflow selesai, output state Map adalah array JSON yang berisi semua output sub-workflow:

{
    "Items": [
        {
            "key_1": "value_1"
        },
        {
            "key_2": "value_2"
        },
        {
            "key_3": "value_3"
        }
    ]
}

Referensi bidang

Semua bidang yang tersedia dalam state Map mode terdistribusi:

FieldTypeRequiredDescriptionExample
NamestringYesNama state.my-state-name
DescriptionstringNoDeskripsi status.describe it here
TypestringYesJenis state.Map
InputConstructormap[string]anyNoKonstruktor input.Lihat Inputs and outputs.
ItemsPathstringYesEkspresi untuk mengekstrak array dari input.Lihat ItemsPath.
ItemBatcherItemBatcherNoMenggabungkan multiple item menjadi satu batch sebagai input sub-workflow.Lihat ItemBatcher.
ItemReaderItemReaderNoMembaca data dari bucket OSS.Lihat ItemReader.
ItemConstructorItemConstructorNoMentransformasi item menggunakan $Item untuk mereferensikan nilai input asli.Lihat ItemConstructor.
ResultWriterResultWriterNoMenulis hasil sub-workflow ke bucket OSS tertentu.Lihat ResultWriter.
MaxConcuccencyintNoJumlah maksimum eksekusi sub-workflow konkuren.40
MaxItemsMaxItemsNoJumlah maksimum item yang diproses oleh state Map.Lihat MaxItems.
ToleratedFailurePercentageToleratedFailurePercentageNoPersentase kegagalan eksekusi sub-workflow yang dapat ditoleransi.Lihat ToleratedFailurePercentage.
ToleratedFailureCountToleratedFailureCountNoJumlah kegagalan eksekusi sub-workflow yang dapat ditoleransi.Lihat ToleratedFailureCount.
ProcessorProcessorYesProsesor Map yang mendefinisikan sub-workflow.Lihat Processor.
ProcessorConfigProcessorConfigYesKonfigurasi prosesor.Lihat ProcessorConfig.
OutputConstructormap[string]anyNoKonstruktor output.Lihat OutputConstructor.
NextstringNoState berikutnya yang dijalankan setelah state ini selesai. Biarkan kosong jika End bernilai true.my-next-state
EndboolNoApakah state ini merupakan state terminal dalam cakupan saat ini.true
RetryRetryNoKebijakan retry error.Lihat Error handling.
CatchCatchNoKebijakan penanganan error.Lihat Error handling.

Detail bidang

ItemsPath

Mengekstrak array dari input state Map. Setiap elemen dalam array menjadi input untuk satu eksekusi sub-workflow. Data direferensikan menggunakan variabel $Context atau $Input:

$Input.FieldA

Processor

Mendefinisikan sub-workflow yang memproses setiap item atau batch.

FieldTypeRequiredDescriptionExample
StatesarrayYesState dalam sub-workflow.Lihat contoh di bawah.
StartAtstringYesState pertama yang dijalankan.my start task
Processor:
   StartAt: Pass1
   States:
     - Type: Pass
       Name: Pass1
       End: true

ProcessorConfig

Menentukan mode eksekusi untuk sub-workflow.

FieldTypeRequiredDescriptionExample
ExecutionModestringYesMode eksekusi untuk sub-workflow.Express

ItemReader

Membaca data input dari bucket OSS, mendukung ukuran input yang lebih besar.

FieldTypeRequiredDescriptionExample
SourceTypestringYesJenis sumber data. Nilai valid: OSS_CSV, OSS_JSON_LIST, OSS_OBJECTS, OSS_INVENTORY_FILES.OSS_CSV
SourceParametersSourceParametersNoParameter lokasi sumber data.Lihat SourceParameters.
ReaderConfigReaderConfigNoKonfigurasi Reader.Lihat ReaderConfig.

SourceParameters

FieldTypeRequiredDescriptionExample
BucketstringNoNama bucket OSS.example-bucket
ObjectNamestringNoNama objek.object_name_1
PrefixstringNoFilter awalan nama objek. Jika dibiarkan kosong, semua objek yang cocok akan dikembalikan.example-prefix

ReaderConfig

FieldTypeRequiredDescriptionExample
CSVHeaders[]stringNoHeader kolom pada baris pertama file CSV.ColA,ColB,ColC

Baca file CSV dari OSS

Baca data dari file CSV yang disimpan dalam bucket OSS. Dalam contoh ini, example-object.csv disimpan di example-bucket:

Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
  - Type: Map
    Name: Map
    ProcessorConfig:
      ExecutionMode: Express
    Processor:
      StartAt: Pass
      States:
        - Type: Pass
          Name: Pass
          End: true
    ItemReader:
      SourceType: OSS_CSV
      SourceParameters:
        Bucket: example-bucket
        ObjectName: example-object.csv
    End: true

Konten file CSV:

ColA,ColB,ColC
col_a_1,col_b_1,col_c_1

Setiap baris menjadi input sub-workflow:

{
  "ColA": "col_a_1",
  "ColB": "col_b_1",
  "ColC": "col_c_1"
}

Baca array JSON dari OSS

Penting

File JSON harus berisi array JSON.

Baca array JSON dari example-object.json di example-bucket:

Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
  - Type: Map
    Name: Map
    ProcessorConfig:
      ExecutionMode: Express
    Processor:
      StartAt: Pass
      States:
        - Type: Pass
          Name: Pass
          End: true
    ItemReader:
      SourceType: OSS_JSON_LIST
      SourceParameters:
        Bucket: example-bucket
        ObjectName: example-object.json
    End: true

Konten file JSON:

[
  {
    "key_1": "value_1"
  }
]

Setiap elemen menjadi input sub-workflow:

{
  "key_1": "value_1"
}

Baca objek dari OSS

Daftar objek dengan awalan tertentu dan teruskan metadata-nya sebagai input sub-workflow. Contoh ini membaca objek dengan awalan example-prefix dari example-bucket:

Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
  - Type: Map
    Name: Map
    ProcessorConfig:
      ExecutionMode: Express
    Processor:
      StartAt: Pass
      States:
        - Type: Pass
          Name: Pass
          End: true
    ItemReader:
      SourceType: OSS_OBJECTS
      SourceParameters:
        Bucket: example-bucket
        Prefix: example-prefix
    End: true

Dengan struktur bucket berikut:

example-bucket
   └── example-prefix/object_1

Metadata setiap objek menjadi input sub-workflow:

{
  "XMLName": {
    "Space": "",
    "Local": "Contents"
  },
  "Key": "example-prefix/object_1",
  "Type": "Normal",
  "Size": 268435,
  "ETag": "\"50B06D6680D86F04138HSN612EF5DEC6\"",
  "Owner": {
    "XMLName": {
      "Space": "",
      "Local": ""
    },
    "ID": "",
    "DisplayName": ""
  },
  "LastModified": "2024-01-01T01:01:01Z",
  "StorageClass": "Standard",
  "RestoreInfo": ""
}

Baca file inventaris OSS

Baca item dari manifes inventaris OSS. Contoh ini membaca inventory/2024-01-01T01-01Z/manifest.json dari example-bucket:

Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
  - Type: Map
    Name: Map
    ProcessorConfig:
      ExecutionMode: Express
    Processor:
      StartAt: Pass
      States:
        - Type: Pass
          Name: Pass
          End: true
    ItemReader:
      SourceType: OSS_INVENTORY_FILES
      SourceParameters:
        Bucket: example-bucket
        ObjectName: inventory/2024-01-01T01-01Z/manifest.json
    ItemConstructor:
      Key.$: $Item.Key
    End: true

Konten file inventaris:

"example-bucket","object_name_1"
"example-bucket","object_name_2"

Input sub-workflow pertama:

{
  "Bucket": "example-bucket",
  "Key": "object_name_1"
}

ItemBatcher

Mengelompokkan beberapa item menjadi batch. Setiap batch menjadi input untuk satu eksekusi sub-workflow.

FieldTypeRequiredDescriptionExample
MaxItemsPerBatchintNoJumlah maksimum item per batch.Lihat contoh.
MaxInputBytesPerBatchintNoJumlah maksimum byte per batch. Satuan: byte.Lihat contoh.
BatchInputmap[string]anyNoData tambahan yang disertakan dalam input setiap batch.Lihat contoh.

Batch berdasarkan jumlah item

Tetapkan MaxItemsPerBatch untuk mengontrol jumlah item dalam setiap batch:

Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
  - Type: Map
    Name: Map
    ProcessorConfig:
      ExecutionMode: Express
    ItemsPath: $Input.Items
    Processor:
      StartAt: Pass
      States:
        - Type: Pass
          Name: Pass
          End: true
    ItemBatcher:
      MaxItemsPerBatch: 2
    End: true

Input state machine:

{
  "Items": [
    {"key_1":"value_1"},
    {"key_2":"value_2"},
    {"key_3":"value_3"},
    {"key_4":"value_4"},
    {"key_5":"value_5"}
  ]
}

Ini menghasilkan tiga eksekusi sub-workflow:

// Sub-workflow 1
{
  "Items": [
    {"key_1":"value_1"},
    {"key_2":"value_2"}
  ]
}

// Sub-workflow 2
{
  "Items": [
    {"key_1":"value_3"},
    {"key_2":"value_4"}
  ]
}

// Sub-workflow 3
{
  "Items": [
    {"key_1":"value_5"}
  ]
}

Batch berdasarkan ukuran byte

Tetapkan MaxInputBytesPerBatch untuk membatasi ukuran setiap batch dalam byte.

Penting
  • ItemBatcher menambahkan kunci metadata ke setiap batch. Ukuran total batch mencakup kunci tambahan ini.

  • Satuan untuk MaxInputBytesPerBatch adalah byte.

Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
  - Type: Map
    Name: Map
    ProcessorConfig:
      ExecutionMode: Express
    ItemsPath: $Input.Items
    Processor:
      StartAt: Pass
      States:
        - Type: Pass
          Name: Pass
          End: true
    ItemBatcher:
      MaxInputBytesPerBatch: 50
    End: true

Input state machine:

{
  "Items":[
    {"Key":1},
    {"key":2},
    {"Key":3},
    {"Key":4},
    {"Key":5}
  ]
}

Ini menghasilkan tiga eksekusi sub-workflow:

// Sub-workflow 1
{
  "Items":[
    {"Key":1},
    {"key":2}
  ]
}

// Sub-workflow 2
{
  "Items":[
    {"Key":3},
    {"key":4}
  ]
}

// Sub-workflow 3
{
  "Items":[
    {"Key":5}
  ]
}

Tambahkan data bersama ke batch

Gunakan BatchInput untuk menyertakan data tambahan dari input state machine ke setiap batch:

Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
  - Type: Map
    Name: Map
    ProcessorConfig:
      ExecutionMode: Express
    ItemsPath: $Input.Items
    Processor:
      StartAt: Pass
      States:
        - Type: Pass
          Name: Pass
          End: true
    ItemBatcher:
      MaxInputBytesPerBatch: 70
      BatchInput:
        InputKey.$: $Input.Key
    End: true

Input state machine:

{
  "Key":"value",
  "Items":[
    {"Key":1},
    {"key":2},
    {"Key":3},
    {"Key":4},
    {"Key":5}
  ]
}

Setiap sub-workflow menerima data BatchInput dan item yang dibatch:

// Sub-workflow 1
{
  "BatchInput":{
    "InputKey":"value"
  },
  "Items":[
    {"Key":1},
    {"key":2}
  ]
}

// Sub-workflow 2
{
  "BatchInput":{
    "InputKey":"value"
  },
  "Items":[
    {"Key":3},
    {"key":4}
  ]
}

// Sub-workflow 3
{
  "BatchInput":{
    "InputKey":"value"
  },
  "Items":[
    {"Key":5}
  ]
}

ItemConstructor

Mentransformasi setiap item sebelum diteruskan ke eksekusi sub-workflow. Gunakan $Item untuk mereferensikan item asli dan $Input untuk mereferensikan input state machine:

Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
  - Type: Map
    Name: Map
    ProcessorConfig:
      ExecutionMode: Express
    ItemsPath: $Input.Items
    Processor:
      StartAt: Pass
      States:
        - Type: Pass
          Name: Pass
          End: true
    ItemBatcher:
      MaxInputBytesPerBatch: 200
      BatchInput:
        InputKey.$: $Input.Key
    ItemConstructor:
      ConstructedKey.$: $Item.Key
      InputKey.$: $Input.Key
    End: true

Masukan mesin status:

{
  "Key":"value",
  "Items":[
    {"Key":1},
    {"Key":2},
    {"Key":3},
    {"Key":4},
    {"Key":5}
  ]
}

Setiap item ditransformasi oleh ItemConstructor sebelum dibatch:

// Sub-workflow 1
{
  "BatchInput": {
    "InputKey": "value"
  },
  "Items": [
    {
      "InputKey": "value",
      "ConstructedKey": 1
    },
    {
      "InputKey": "value",
      "ConstructedKey": 2
    },
    {
      "InputKey": "value",
      "ConstructedKey": 3
    }
  ]
}

// Sub-workflow 2
{
  "BatchInput": {
    "InputKey": "value"
  },
  "Items": [
    {
      "InputKey": "value",
      "ConstructedKey": 4
    },
    {
      "InputKey": "value",
      "ConstructedKey": 5
    }
  ]
}

ResultWriter

Menulis hasil sub-workflow ke bucket OSS. Konfigurasikan ResultWriter ketika output gabungan dari semua eksekusi sub-workflow berpotensi melebihi batas ukuran output state.

FieldTypeRequiredDescriptionExample
ParametersParametersYesTujuan OSS untuk hasil.Lihat di bawah.

Parameters

FieldTypeRequiredDescriptionExample
BucketstringYesBucket OSS tujuan.example-bucket
PrefixstringYesAwalan nama objek untuk file hasil.example-prefix/
Catatan

Input dan output state memiliki batas ukuran. Untuk state Map dengan banyak item, output gabungan dapat melebihi batas ini. Konfigurasikan ResultWriter untuk menyimpan hasil di OSS sebagai gantinya.

Contoh berikut menggabungkan ResultWriter dengan toleransi kesalahan. State Map mentoleransi kegagalan hingga 30% dan menulis semua hasil (sukses maupun gagal) ke OSS:

Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
  - Type: Map
    Name: Map
    ProcessorConfig:
      ExecutionMode: Express
    ItemsPath: $Input.Items
    ItemConstructor:
      Key.$: $Item.Key
      FailedValue.$: $Input.FailedValue
    ToleratedFailurePercentage: 30
    Processor:
      StartAt: Choice
      States:
        - Type: Choice
          Name: Choice
          Branches:
            - Condition: $Input.Key > $Input.FailedValue
              Next: Fail
          Default: Succeed
        - Type: Succeed
          Name: Succeed
          End: true
        - Type: Fail
          Name: Fail
          Code: MockError
          End: true
    ResultWriter:
      Parameters:
        Bucket: example-bucket
        Prefix: example-prefix/
    End: true

Input state machine:

{
  "FailedValue": 4,
  "Items": [
    {"Key": 1},
    {"Key": 2},
    {"Key": 3},
    {"Key": 4},
    {"Key": 5}
  ]
}

Struktur file hasil

ResultWriter menghasilkan tiga jenis file di lokasi OSS yang ditentukan:

manifest.json (example-prefix/map-run-name/manifest.json):

{
    "DestinationBucket": "example-bucket",
    "MapRunName": "map-run-name",
    "ResultFiles": {
        "FAILED": [
            {
                "ObjectName": "example-prefix/map-run-name/FAILED_0.json",
                "Size": 262
            }
        ],
        "SUCCEED": [
            {
                "ObjectName": "example-prefix/map-run-name/SUCCEED_0.json",
                "Size": 1057
            }
        ]
    }
}

FAILED_0.json (example-prefix/map-run-name/FAILED_0.json):

[
    {
        "ExecutionName": "execution-name-5",
        "FlowName": "example",
        "Input": "{\"FailedValue\":4,\"Key\":5}",
        "Output": "{\"ErrorCode\":\"MockError\"}",
        "Status": "Failed",
        "StartedTime": "rfc3339-format-time-string",
        "StoppedTime": "rfc3339-format-time-string"
    }
]

SUCCEED_0.json (example-prefix/map-run-name/SUCCEED_0.json):

[
    {
        "ExecutionName": "execution-name-1",
        "FlowName": "example",
        "Input": "{\"FailedValue\":4,\"Key\":1}",
        "Output": "{\"FailedValue\":4,\"Key\":1}",
        "Status": "Succeeded",
        "StartedTime": "rfc3339-format-time-string",
        "StoppedTime": "rfc3339-format-time-string"
    },
    {
        "ExecutionName": "execution-name-2",
        "FlowName": "example",
        "Input": "{\"FailedValue\":4,\"Key\":2}",
        "Output": "{\"FailedValue\":4,\"Key\":2}",
        "Status": "Succeeded",
        "StartedTime": "rfc3339-format-time-string",
        "StoppedTime": "rfc3339-format-time-string"
    },
    {
        "ExecutionName": "execution-name-3",
        "FlowName": "example",
        "Input": "{\"FailedValue\":4,\"Key\":3}",
        "Output": "{\"FailedValue\":4,\"Key\":3}",
        "Status": "Succeeded",
        "StartedTime": "rfc3339-format-time-string",
        "StoppedTime": "rfc3339-format-time-string"
    },
    {
        "ExecutionName": "execution-name-4",
        "FlowName": "example",
        "Input": "{\"FailedValue\":4,\"Key\":4}",
        "Output": "{\"FailedValue\":4,\"Key\":4}",
        "Status": "Succeeded",
        "StartedTime": "rfc3339-format-time-string",
        "StoppedTime": "rfc3339-format-time-string"
    }
]

MaxItems

Membatasi jumlah item yang diproses oleh state Map. Jika sumber data berisi lebih banyak item daripada MaxItems, hanya MaxItems item pertama yang diproses.

Sebagai contoh, jika bucket OSS berisi 10.000 objek dan MaxItems diatur ke 1.000, state Map hanya memproses 1.000 objek.

MaxConcurrency

Membatasi jumlah eksekusi sub-workflow yang berjalan secara simultan. Sebagai contoh, jika state Map memproses 10.000 item dan MaxConcurrency diatur ke 100, paling banyak 100 sub-workflow berjalan pada waktu yang sama.

ToleratedFailurePercentage

Menetapkan persentase eksekusi sub-workflow yang boleh gagal sebelum state Map itu sendiri gagal. Sebagai contoh, dengan 10.000 item dan ToleratedFailurePercentage diatur ke 10, state Map mentoleransi hingga 1.000 kegagalan.

ToleratedFailureCount

Menetapkan jumlah eksekusi sub-workflow yang boleh gagal sebelum state Map itu sendiri gagal. Sebagai contoh, dengan 10.000 item dan ToleratedFailureCount diatur ke 10, state Map mentoleransi hingga 10 kegagalan.

Batas kuota

Tabel berikut mencantumkan batas kuota default untuk mode terdistribusi. Untuk meminta kuota yang lebih tinggi, submit a ticket.

Quota nameDescriptionDefault value
MaxOpenMapRunJumlah maksimum state Map terdistribusi yang berjalan secara simultan per akun per wilayah10
MaxConcurrencyJumlah maksimum eksekusi sub-workflow konkuren per MapRun300
MaxItemsJumlah maksimum item per MapRun10.000

Izin yang diperlukan

Menjalankan state Map dalam mode terdistribusi memerlukan izin OSS berikut. Berikan aksi ini dalam kebijakan role. Untuk detailnya, lihat Create a policy.

{
    "Version": "1",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "oss:HeadObject",
                "oss:GetObjectMeta",
                "oss:GetObject",
                "oss:PutObject",
                "oss:ListObjectsV2",
                "oss:ListObjects",
                "oss:InitiateMultipartUpload",
                "oss:UploadPart",
                "oss:CompleteMultipartUpload",
                "oss:AbortMultipartUpload",
                "oss:ListMultipartUploads",
                "oss:ListParts"
            ],
            "Resource": "*"
        }
    ]
}