全部产品
Search
文档中心

DataWorks:for-each node

更新时间:Dec 19, 2025

Gunakan for-each node untuk mengeksekusi subtask yang sama terhadap setiap item dalam daftar, seperti nama file atau partisi. Node ini melakukan iterasi terhadap set hasil dari node hulu (biasanya assignment node) dan menjalankan loop internal untuk setiap elemen, sehingga mengotomatiskan tugas berulang tanpa perlu membuat task secara manual untuk tiap item.

Kasus Penggunaan

For-each node memungkinkan eksekusi berparameter. Gunakan node ini untuk menerapkan logika pemrosesan identik pada unit bisnis, lini produk, atau item konfigurasi yang berbeda. Sebagai contoh, Anda dapat menghasilkan laporan harian untuk beberapa lini produk dengan logika yang sama tetapi target berbeda.

Seperti loop for dalam pemrograman, for-each node melakukan iterasi terhadap daftar—misalnya nama tabel, nama partisi, atau nama file—dan mengeksekusi alur kerja bawah (sub-workflow) yang telah ditentukan untuk setiap item dalam daftar tersebut. Pendekatan ini secara signifikan meningkatkan otomatisasi dan fleksibilitas alur kerja.

Catatan Penggunaan

  • Edition: DataWorks Edisi Standar atau lebih tinggi.

  • Izin: Anda harus memiliki peran Development atau Workspace Manager di ruang kerja DataWorks Anda. Untuk informasi selengkapnya, lihat Tambahkan anggota ke ruang kerja.

Cara Kerja

For-each node membungkus alur kerja bawah (loop body) yang dapat dikustomisasi. Cara kerjanya sebagai berikut:

image
  1. Input data: Node ini bergantung pada assignment node hulu (atau node kompatibel seperti EMR Hive). Node ini mengikat parameter loopDataArray ke array result set dari hulu.

  2. Eksekusi loop: Node ini melakukan iterasi secara berurutan terhadap result set. Untuk setiap elemen, node ini mengeksekusi seluruh loop body (dari Start hingga End).

    Catatan

    Node Start dan End tidak dapat diedit; keduanya hanya berfungsi sebagai penanda awal dan akhir loop body.

  3. Pengiriman data: Pada setiap iterasi, nilai elemen saat ini diteruskan ke node dalam loop body melalui built-in variables. Node bisnis internal menggunakan ${dag.foreach.current} untuk mengakses item data yang sedang diproses.

Variabel Bawaan

Penting

Variabel dalam bentuk ${...} merupakan sintaks templat khusus DataWorks. DataWorks mengurai parameter dan melakukan penggantian statis.

Gunakan variabel bawaan berikut dalam node di dalam loop body for-each untuk mengakses status dan data loop:

Variabel Bawaan

Deskripsi

Analogi dengan loop for

${dag.loopDataArray}

Mengembalikan seluruh result set dari assignment node hulu.

Ambil kode berikut sebagai contoh:

for(int i=0;i<data.length;i++) {
   print(data[i]);
}
  • ${dag.loopDataArray} berkorespondensi dengan data.

  • ${dag.foreach.current} berkorespondensi dengan data[i].

  • ${dag.offset} berkorespondensi dengan i.

  • ${dag.loopTimes} berkorespondensi dengan i+1.

${dag.foreach.current}

Mengembalikan item yang diproses pada iterasi saat ini.

${dag.offset}

Mengembalikan offset loop saat ini (berbasis 0).

${dag.loopTimes}

Mengembalikan jumlah loop saat ini (berbasis 1).

Jika output hulu berupa array 2D (seperti hasil kueri SQL), Anda dapat menggunakan metode berikut untuk mengambil nilai tertentu:

Variabel Lainnya

Deskripsi

${dag.foreach.current}

Mengambil string baris data saat ini (array 1D), dipisahkan oleh koma ,.

${dag.foreach.current[n]}

Mengambil item data ke-n dari baris data saat ini (array 1D).

${dag.loopDataArray[i][j]}

Mengambil data pada baris ke-i dan kolom ke-j dari seluruh result set.

For-each node tidak mendukung loop bersarang. Ini hanya untuk demonstrasi pengambilan nilai.

Catatan Umum

  • Mekanisme eksekusi: Mendukung mode Serial dan Parallel. Gunakan mode Parallel untuk iterasi yang independen.

  • Batas loop: Default-nya adalah 128 iterasi; dapat disesuaikan hingga 1.024.

  • Debugging: Anda tidak dapat menjalankan for-each node secara langsung di Data Studio. Deploy task tersebut dan gunakan Pengujian asap di Operation Center.

  • Batasan eksekusi: For-each node tidak mendukung eksekusi terisolasi, termasuk pengujian asap, pengisian ulang data (backfill), dan eksekusi manual.

  • Kontrol alur: Saat menggunakan branch node di dalam loop, satukan semua cabang ke merge node sebelum node End.

Prosedur

Untuk mengonfigurasi task for-each menggunakan assignment node hulu dan node Shell internal:

  1. Persiapkan data hulu

    Buat assignment node untuk menghasilkan result set.

    1. Dalam alur kerja, buat assignment node (misalnya, assign) dan letakkan di hulu for-each node.

    2. Klik ganda assignment node dan pilih bahasa, seperti Python 2. Gunakan Python 2 untuk menghasilkan array yang berisi empat elemen:

      Assignment node mengeluarkan [10,20,30,40] ke node hilir. Hal ini karena assignment node secara otomatis membagi output baris terakhir menjadi array menggunakan koma sebagai pembatas.
      print "10,20,30,40"
    3. Assignment node secara otomatis menghasilkan parameter output bernama outputs, yang merepresentasikan result set-nya.

    4. Save assignment node.

  2. Konfigurasikan for-each node untuk mengonsumsi data

    Konfigurasikan for-each node agar menerima data hulu dan menggunakannya dalam loop body-nya.

    1. Klik ganda for-each node untuk masuk ke kanvas orkestrasi internalnya.

    2. Di panel Scheduling di sebelah kanan, temukan parameter loopDataArray di bawah Scheduling Parameters dan klik Associate.

      image

    3. Pilih outputs dari assignment node hulu (assign) sebagai sumber nilai. Hal ini secara otomatis membuat dependensi.

    4. Dalam loop body for-each, klik Create Internal Node dan pilih Shell.

      Dalam skenario aktual, Anda dapat mengonfigurasi jenis node apa pun.
    5. Klik ganda node Shell baru dan gunakan variabel bawaan dalam kode untuk mengambil dan mencetak informasi loop:

      #!/bin/bash
      # Gunakan ${dag.loopTimes} untuk mendapatkan jumlah loop saat ini
      echo "Nomor loop saat ini adalah: ${dag.loopTimes}"
      
      # Gunakan ${dag.foreach.current} untuk mendapatkan data yang sedang diiterasi
      echo "Item saat ini adalah: ${dag.foreach.current}"
    6. (Opsional) Di panel Scheduling di sisi kanan, konfigurasikan properti terkait di bawah Scheduling Policies.

      • Maximum Number of Loops: Nilai default-nya adalah 128, dan dapat disesuaikan hingga 1.024. Parameter ini menentukan jumlah maksimum iterasi untuk loop body. Jika volume data hulu besar, pastikan parameter ini disesuaikan agar mencakup semua iterasi.

      • Execution policy: Pilih Serial di sini.

        • Serial: Berjalan secara berurutan berdasarkan jumlah loop.

        • Parallel: Memungkinkan eksekusi konkuren loop internal dalam for-each node, meningkatkan efisiensi task. Jika batch tertentu gagal dalam mode paralel, hal ini tidak memengaruhi eksekusi batch lainnya; penjadwal akan menyelesaikan semua batch. Konkurensi default-nya adalah 5, dengan nilai maksimum yang didukung sebesar 20.

          image

    7. Simpan node Shell.

  3. Deploy, Jalankan, dan Verifikasi

    Kirimkan alur kerja ke Operation Center untuk dieksekusi dan verifikasi hasil for-each node.

    1. Kembali ke kanvas alur kerja utama dan klik tombol Deploy di bilah alat untuk memublikasikan seluruh alur kerja.

    2. Buka Operation Center > Auto Triggered Node O&M > Auto Triggered Nodes, lalu lakukan pengujian asap pada alur kerja target.

      Penting

      Jangan lakukan pengujian asap pada for-each node secara individual. Karena for-each node bergantung pada output assignment node hulu, Anda harus memulai pengujian dari assignment node untuk memastikan integritas alur data.

    3. Setelah instans pengujian berhasil dijalankan, temukan instans for-each node dalam daftar instans, klik untuk membukanya, lalu klik kanan dan pilih View Internal Nodes.

      image

    4. Dalam tampilan node internal, periksa instans node Shell yang dihasilkan untuk setiap loop. Buka log eksekusi salah satu instans untuk melihat hasil output iterasi spesifik tersebut dan verifikasi apakah sesuai dengan ekspektasi.

      image

Kasus Penggunaan: Memproses Format Data yang Berbeda

Skenario 1: Memproses Array Satu Dimensi (Output Shell/Python)

  • Output assignment node: 2025-11-01,2025-11-02,2025-11-03

  • Jumlah iterasi: 3 kali.

  • Saat loop ke-2:

    • Nilai ${dag.foreach.current} adalah 2025-11-02.

    • Nilai ${dag.loopTimes} adalah 2.

Skenario 2: Memproses Array Dua Dimensi (Output SQL)

  • Keluaran node Assignment (MaxCompute SQL):

    +-----+----------+
    | id  | city     |
    +-----+----------+
    | 101 | beijing  |
    | 102 | shanghai |
    +-----+----------+
  • Jumlah iterasi: 2 kali.

  • Saat loop ke-2:

    • Nilai ${dag.foreach.current} adalah 102,shanghai.

    • Nilai ${dag.loopTimes} adalah 2.

    • Nilai ${dag.foreach.current[0]} adalah 102.

    • Nilai ${dag.foreach.current[1]} adalah shanghai.

Skenario: Memproses data secara batch dari tabel partisi untuk beberapa lini bisnis

Contoh ini menunjukkan cara menggunakan assignment node dan for-each node untuk memproses data perilaku pengguna secara batch dari beberapa lini bisnis. Pendekatan ini mengotomatiskan pemrosesan data dengan memungkinkan Anda menggunakan satu set logika untuk beberapa lini produk.

image

Latar Belakang

Asumsikan Anda seorang developer data di perusahaan internet besar. Anda bertanggung jawab untuk memproses data dari tiga lini bisnis inti: E-dagang (ecom), keuangan (finance), dan logistik (logistics). Lebih banyak lini bisnis mungkin akan ditambahkan di masa depan. Anda perlu menjalankan logika agregasi yang sama pada log perilaku pengguna dari ketiga lini bisnis ini setiap hari. Logika tersebut menghitung popularitas harian (PV) untuk setiap pengguna dan menyimpan hasilnya dalam tabel agregat terpadu.

  • Tabel sumber hulu (lapisan DWD):

    • dwd_user_behavior_ecom_d: Tabel perilaku pengguna E-dagang.

    • dwd_user_behavior_finance_d: Tabel perilaku pengguna keuangan.

    • dwd_user_behavior_logistics_d: Tabel perilaku pengguna logistik.

    • dwd_user_behavior_${line-of-business}_d: Tabel perilaku pengguna untuk lini bisnis potensial lainnya.

    • Tabel-tabel ini memiliki skema yang sama dan dipartisi berdasarkan hari (dt).

  • Tabel target hilir (lapisan DWS):

    • dws_user_summary_d: Tabel agregat pengguna.

    • Tabel ini dipartisi berdasarkan lini bisnis (biz_line) dan hari (dt). Tabel ini digunakan untuk menyimpan hasil agregasi dari semua lini bisnis.

Membuat task terpisah untuk setiap lini bisnis mahal dalam hal pemeliharaan dan rentan terhadap kesalahan. Jika Anda menggunakan for-each node, Anda hanya perlu memelihara satu set logika pemrosesan. Sistem secara otomatis melakukan traversal semua lini bisnis untuk menyelesaikan perhitungan.

Persiapan Data

Pertama, buat tabel contoh dan masukkan data uji. Contoh ini menggunakan stempel waktu data 20251010.

  1. Kaitkan resource komputasi MaxCompute ke ruang kerja.

  2. Buka DataStudio untuk melakukan pengembangan data dan buat node MaxCompute SQL.

  3. Buat tabel sumber (lapisan DWD): Tambahkan kode berikut ke node MaxCompute SQL, pilih, lalu jalankan.

    -- Tabel perilaku pengguna E-dagang
    CREATE TABLE IF NOT EXISTS dwd_user_behavior_ecom_d (
        user_id     STRING COMMENT 'User ID',
        action_type STRING COMMENT 'Jenis perilaku',
        event_time  BIGINT COMMENT 'Stempel waktu UNIX level milidetik saat kejadian terjadi'
    ) 
    COMMENT 'Detail log perilaku pengguna E-dagang'
    PARTITIONED BY (dt STRING COMMENT 'Partisi tanggal dalam format yyyymmdd');
    
    INSERT OVERWRITE TABLE dwd_user_behavior_ecom_d PARTITION (dt='20251010') VALUES
    ('user001', 'click',        1760004060000), -- 2025-10-10 10:01:00.000
    ('user002', 'browse',       1760004150000), -- 2025-10-10 10:02:30.000
    ('user001', 'add_to_cart',  1760004300000); -- 2025-10-10 10:05:00.000
    -- Verifikasi bahwa tabel perilaku pengguna E-dagang telah dibuat.
    SELECT * FROM dwd_user_behavior_ecom_d where dt='20251010';
    
    -- Tabel perilaku pengguna keuangan
    CREATE TABLE IF NOT EXISTS dwd_user_behavior_finance_d (
        user_id     STRING COMMENT 'User ID',
        action_type STRING COMMENT 'Jenis perilaku',
        event_time  BIGINT COMMENT 'Stempel waktu UNIX level milidetik saat kejadian terjadi'
    ) 
    COMMENT 'Detail log perilaku pengguna keuangan'
    PARTITIONED BY (dt STRING COMMENT 'Partisi tanggal dalam format yyyymmdd');
    
    INSERT OVERWRITE TABLE dwd_user_behavior_finance_d PARTITION (dt='20251010') VALUES
    ('user003', 'open_app',      1760020200000), -- 2025-10-10 14:30:00.000
    ('user003', 'transfer',      1760020215000), -- 2025-10-10 14:30:15.000
    ('user003', 'check_balance', 1760020245000), -- 2025-10-10 14:30:45.000
    ('user004', 'open_app',      1760020300000); -- 2025-10-10 14:31:40.000
    -- Verifikasi bahwa tabel perilaku pengguna keuangan telah dibuat.
    SELECT * FROM dwd_user_behavior_finance_d where dt='20251010';
    
    -- Tabel perilaku pengguna logistik
    CREATE TABLE IF NOT EXISTS dwd_user_behavior_logistics_d (
        user_id     STRING COMMENT 'User ID',
        action_type STRING COMMENT 'Jenis perilaku',
        event_time  BIGINT COMMENT 'Stempel waktu UNIX level milidetik saat kejadian terjadi'
    ) 
    COMMENT 'Detail log perilaku pengguna logistik'
    PARTITIONED BY (dt STRING COMMENT 'Partisi tanggal dalam format yyyymmdd');
    
    INSERT OVERWRITE TABLE dwd_user_behavior_logistics_d PARTITION (dt='20251010') VALUES
    ('user001', 'check_status',    1760032800000), -- 2025-10-10 18:00:00.000
    ('user005', 'schedule_pickup', 1760032920000); -- 2025-10-10 18:02:00.000
    
    -- Verifikasi bahwa tabel perilaku pengguna logistik telah dibuat.
    SELECT * FROM dwd_user_behavior_logistics_d where dt='20251010';
  4. Buat tabel target (lapisan DWS): Tambahkan kode berikut ke node MaxCompute SQL, pilih, lalu jalankan.

    CREATE TABLE IF NOT EXISTS dws_user_summary_d (
        user_id     STRING COMMENT 'User ID',
        pv          BIGINT COMMENT 'Popularitas harian',
    ) 
    COMMENT 'Tabel agregat popularitas pengguna harian'
    PARTITIONED BY (
        dt           STRING COMMENT 'Partisi tanggal dalam format yyyymmdd',
        biz_line     STRING COMMENT 'Partisi lini bisnis, seperti ecom, finance, logistics'
    );
    Penting

    Jika ruang kerja menggunakan lingkungan standar, Anda harus memublikasikan node ini ke lingkungan produksi dan melakukan pengisian ulang data.

Implementasi Alur Kerja

  1. Buat alur kerja. Di panel Scheduling Parameters di sebelah kanan, atur parameter penjadwalan bizdate ke hari sebelumnya $[yyyymmdd-1].

    image

  2. Dalam alur kerja, buat assignment node bernama get_biz_list. Tulis kode berikut dalam MaxCompute SQL. Node ini mengeluarkan daftar lini bisnis yang akan diproses.

    -- Keluarkan semua lini bisnis yang akan diproses
    SELECT 'ecom' AS biz_line
    UNION ALL
    SELECT 'finance' AS biz_line
    UNION ALL
    SELECT 'logistics' AS biz_line;
  3. Konfigurasikan for-each node

    • Kembali ke kanvas alur kerja dan buat node for-each hilir untuk assignment node get_biz_list.

    • Buka halaman pengaturan for-each node. Di tab Schedule di sebelah kanan, di bawah Scheduling Parameters > Script Parameters, atur nilai parameter loopDataArray ke outputs dari node get_biz_list.

      image

    • Dalam loop body for-each node, klik Create Inner Node. Buat node MaxCompute SQL dan tulis logika pemrosesan untuk loop body.

      Catatan
      • Skrip ini dijalankan oleh for-each node dan berjalan sekali untuk setiap lini bisnis.

      • Variabel bawaan ${dag.foreach.current} secara dinamis diganti dengan nama lini bisnis saat ini pada waktu proses. Nilai iterasi yang diharapkan adalah 'ecom', 'finance', dan 'logistics'.

      SET odps.sql.allow.dynamic.partition=true;
      
      INSERT OVERWRITE TABLE dws_user_summary_d PARTITION (dt='${bizdate}', biz_line)
      SELECT
          user_id,
          COUNT(*) AS pv,
          '${dag.foreach.current}' AS biz_line
      FROM
          dwd_user_behavior_${dag.foreach.current}_d
      WHERE
          dt = '${bizdate}'
      GROUP BY
          user_id;
  4. Tambahkan node verifikasi

    Kembali ke kanvas alur kerja. Untuk for-each node, klik Create Downstream Node. Buat node MaxCompute SQL dan tambahkan kode berikut.

    SELECT * FROM dws_user_summary_d WHERE dt='20251010' ORDER BY biz_line, user_id;

Publikasikan dan Jalankan

Publikasikan alur kerja ke lingkungan produksi. Di Operation Center, navigasikan ke Auto Triggered Node O&M > Auto Triggered Nodes, temukan alur kerja target, jalankan pengujian asap, dan pilih '20251010' sebagai stempel waktu data.

Setelah eksekusi selesai, lihat log eksekusi dalam instans pengujian. Output yang diharapkan dari node terakhir adalah:

user_id

pv

dt

biz_line

user001

2

20251010

ecom

user002

1

20251010

ecom

user003

3

20251010

finance

user004

1

20251010

finance

user001

1

20251010

logistics

user005

1

20251010

logistics

Keunggulan Solusi Ini

  • Ekstensibilitas tinggi: Jika lini bisnis baru ditambahkan, Anda hanya perlu menambahkan satu baris kode SQL di assignment node. Anda tidak perlu mengubah logika pemrosesan.

  • Pemeliharaan mudah: Semua lini bisnis berbagi logika pemrosesan yang sama. Perubahan di satu tempat berlaku untuk semuanya.

FAQ

  • T: Mengapa saya tidak bisa menjalankan for-each node secara langsung di Data Studio untuk pengujian?

    J: Ini adalah keterbatasan desain. Node ini memerlukan lingkungan penjadwalan lengkap untuk menyelesaikan konteks dan dependensi node; oleh karena itu, debugging langsung di Data Studio tidak didukung. Anda harus memublikasikan task ke Operation Center dan mengujinya melalui pengisian ulang data atau penjadwalan periodik.

  • T: Mengapa pengujian asap pada for-each node secara individual gagal atau tidak melakukan apa-apa?

    J: Data looping untuk for-each node berasal dari parameter input loopDataArray, yang harus diikat ke parameter outputs dari assignment node hulu. Jika for-each node dijalankan secara individual, eksekusi akan dilewati atau gagal karena tidak dapat mengambil result set input.

  • T: Mengapa loop saya hanya dieksekusi sekali?

    J: Biasanya karena hasil output assignment node hulu diurai sebagai satu elemen tunggal. Periksa output Anda:

    • Apakah berupa string tunggal tanpa pembatas yang benar?

    • Jika Anda ingin melakukan iterasi pada beberapa item, pastikan item-item tersebut dipisahkan oleh koma standar (,).
      Misalnya, 'item1,item2,item3' akan melakukan loop tiga kali, sedangkan 'item1 item2 item3' hanya akan melakukan loop sekali.