All Products
Search
Document Center

DataWorks:Node for-each

Last Updated:Feb 27, 2026

Dalam alur kerja pemrosesan data, gunakan node for-each untuk mengeksekusi subtask yang sama terhadap setiap item dalam daftar, seperti daftar nama file atau partisi. Node ini secara otomatis melakukan iterasi terhadap set hasil dari node hulu (biasanya node assignment) dan mengeksekusi isi loop-nya untuk setiap elemen, sehingga menghilangkan kebutuhan membuat tugas secara manual untuk setiap item dan memungkinkan alur kerja yang lebih dinamis serta otomatis.

Kasus penggunaan

Node for-each ideal untuk eksekusi berparameter di mana logika analisis atau pemrosesan yang sama dijalankan pada data dari unit bisnis, lini produk, atau item konfigurasi yang berbeda. Misalnya, jika perusahaan Anda memiliki beberapa lini produk dan Anda perlu menghasilkan laporan harian terpisah untuk masing-masing, logika pemrosesannya identik, tetapi datanya berbeda.

Seperti halnya loop for dalam bahasa pemrograman, node for-each secara otomatis melakukan iterasi terhadap daftar item (seperti nama tabel, nama partisi, atau nama file) dan mengeksekusi sub-alur kerja yang telah ditentukan untuk setiap item, sehingga secara signifikan meningkatkan otomatisasi dan fleksibilitas alur kerja Anda.

Catatan penggunaan

  • Edition: DataWorks Edisi Standar atau lebih tinggi.

  • Izin: Anda harus memiliki peran Development atau Workspace Manager di ruang kerja DataWorks Anda. Untuk informasi selengkapnya, lihat Tambahkan anggota ke ruang kerja.

Cara kerja

Node for-each bertindak sebagai kontainer yang membungkus sub-alur kerja yang dapat dikustomisasi, yang dikenal sebagai isi loop. Cara kerjanya sebagai berikut:

image
  1. Data masukan: Node for-each bergantung pada node assignment atau node pemberi nilai lainnya di hulu (misalnya, node EMR Hive). Node ini mengambil set hasil dalam format array dengan mengikat parameter loopDataArray.

  2. Eksekusi loop: Setelah node dimulai, node tersebut melakukan iterasi melalui setiap elemen set hasil. Untuk setiap elemen, node tersebut mengeksekusi sepenuhnya isi loop sekali (dari start hingga end).

    Catatan

    Node start dan end tidak dapat diedit. Keduanya hanya menandai awal dan akhir isi loop.

  3. Pengiriman data: Dalam setiap iterasi, node dalam isi loop dapat mengakses nilai elemen saat ini melalui variabel bawaan, seperti ${dag.foreach.current}.

Variabel bawaan

Penting

Variabel dalam format ${...} menggunakan sintaks templat khusus DataWorks. DataWorks mengurai parameter ini secara langsung dan menggantinya secara statis.

Di dalam isi loop for-each, gunakan variabel bawaan berikut untuk mengakses status dan data loop:

Variabel bawaan

Deskripsi

Analogi dengan loop for

${dag.loopDataArray}

Mengambil seluruh set hasil dari node assignment hulu.

Pertimbangkan loop for berikut:

for(int i=0;i<data.length;i++) {
   print(data[i]);
}
  • ${dag.loopDataArray} setara dengan data.

  • ${dag.foreach.current} setara dengan data[i].

  • ${dag.offset} setara dengan i.

  • ${dag.loopTimes} setara dengan i+1.

${dag.foreach.current}

Mendapatkan item data yang sedang diproses dalam loop saat ini.

${dag.offset}

Mendapatkan offset loop saat ini, dimulai dari 0.

${dag.loopTimes}

Mendapatkan nomor iterasi loop saat ini, dimulai dari 1.

Jika output hulu berupa array dua dimensi (seperti hasil kueri SQL), Anda juga dapat menggunakan sintaks berikut untuk pengambilan nilai yang tepat:

Variabel lain

Deskripsi

${dag.foreach.current}

Mendapatkan baris data saat ini (array satu dimensi) sebagai string, dengan elemen dipisahkan oleh koma (,).

${dag.foreach.current[n]}

Mendapatkan elemen ke-n dari baris data saat ini (array satu dimensi).

${dag.loopDataArray[i][j]}

Mendapatkan nilai dari baris ke-i dan kolom ke-j dari seluruh set hasil.

Node for-each saat ini tidak mendukung loop bersarang. Sintaks ini hanya untuk pengambilan nilai.

Batasan

  • Mode eksekusi: Loop mendukung eksekusi serial maupun eksekusi paralel. Pilih eksekusi paralel ketika iterasi tidak saling bergantung.

  • Batas loop: Secara default, jumlah maksimum iterasi adalah 128. Nilai ini dapat ditingkatkan hingga maksimum 1.024.

  • Batasan debugging: Anda tidak dapat menjalankan node for-each secara langsung di DataStudio. Anda harus deploy tugas ke Operation Center dan mengujinya dengan menjalankan data backfill.

  • Batasan eksekusi: Node for-each tidak dapat dijalankan secara individual. Ini mencakup Pengujian asap, data backfill, dan eksekusi manual.

  • Alur kendali dalam isi loop: Saat menggunakan node branch dalam isi loop, pastikan semua cabang menyatu ke satu node merge sebelum terhubung ke node end. Hal ini menjamin integritas logis isi loop.

  • Batasan rerun: Setelah node di-deploy untuk penjadwalan, rerun otomatis saat gagal akan dilanjutkan dari titik kegagalan. Namun, rerun manual akan memulai ulang seluruh node for-each dari awal.

Prosedur

Prosedur ini memandu Anda melalui konfigurasi tugas for-each lengkap. Kami menggunakan node assignment sebagai node hulu dan node Shell dalam isi loop untuk mencetak hasilnya.

  1. Persiapkan data hulu (konfigurasikan node assignment)

    Buat node penetapan dan konfigurasikan outputnya agar menyediakan set hasil untuk iterasi node for-each berikutnya.

    1. Dalam alur kerja, buat node assignment (misalnya, assign) dan letakkan di hulu node for-each.

    2. Klik ganda node assignment, lalu pilih bahasa. Misalnya, gunakan Python 2 untuk menghasilkan array dengan empat elemen:

      Node penetapan kemudian menghasilkan output [10,20,30,40] ke node hilir. Node tersebut secara otomatis memisahkan baris terakhir dari output berdasarkan koma untuk membuat array.
      print "10,20,30,40"
    3. Node assignment secara otomatis menghasilkan parameter output bernama outputs untuk merepresentasikan set hasilnya.

    4. Save node assignment.

  2. Konfigurasikan node for-each untuk mengonsumsi data

    Konfigurasikan node for-each untuk menerima data hulu dan menggunakannya dalam isi loop-nya.

    1. Klik ganda node for-each untuk masuk ke kanvas internalnya.

    2. Pada panel Scheduling settings di sebelah kanan, temukan parameter loopDataArray di bawah Scheduling parameters lalu klik Bind.

      image

    3. Pada kotak dialog yang muncul, atur Value Source ke parameter outputs dari node assignment hulu (assign). Tindakan ini secara otomatis membuat dependensi.

    4. Dalam isi loop for-each, klik Create Inner Node lalu pilih node Shell.

      Dalam skenario nyata, Anda dapat mengonfigurasi jenis node apa pun.
    5. Klik ganda node Shell baru dan gunakan variabel bawaan dalam kode untuk mengakses serta mencetak informasi loop:

      #!/bin/bash
      # Gunakan ${dag.loopTimes} untuk mendapatkan nomor loop saat ini
      echo "Current loop number is: ${dag.loopTimes}"
      
      # Gunakan ${dag.foreach.current} untuk mendapatkan item data saat ini
      echo "Current item is: ${dag.foreach.current}"
    6. (Opsional) Pada panel Scheduling settings di sebelah kanan, konfigurasikan properti di bawah Scheduling strategy.

      • Maximum number of cycles: Default-nya 128 dan dapat ditingkatkan hingga maksimum 1.024.

        Penting

        Parameter ini menentukan jumlah maksimum eksekusi isi loop. Jika Anda memproses data dalam jumlah besar, pastikan nilai ini cukup tinggi untuk mencakup semua item.

      • Execution mode: Pilih opsi. Untuk contoh ini, pilih Serial.

        • Serial: Menjalankan iterasi secara berurutan.

        • Parallel: Menjalankan iterasi secara konkuren untuk meningkatkan efisiensi task. Saat dikonfigurasi untuk eksekusi paralel, kegagalan satu Batch tidak memengaruhi Batch lainnya, dan penjadwalan berlanjut hingga semua Batch selesai. Konkurensi default adalah 5, dengan maksimum 20.

          image

    7. Save node Shell.

  3. Deploy, run, and verify

    Kirimkan alur kerja ke Operation Center untuk dieksekusi dan verifikasi hasil dari node for-each.

    1. Kembali ke kanvas alur kerja utama dan klik tombol Deploy pada bilah alat untuk menerapkan seluruh alur kerja.

    2. Buka Operation Center > Task O&M > Periodic Task, lalu lakukan Pengujian asap pada alur kerja target.

      Penting

      Jangan menjalankan Pengujian asap hanya pada node for-each. Karena node for-each bergantung pada output dari node assignment hulu, Anda harus memulai pengujian dari assignment node untuk memastikan node tersebut menerima data masukan yang diperlukan.

    3. Setelah test instance berhasil dijalankan, temukan instans node for-each dalam daftar. Kemudian, buka instans tersebut, klik kanan, lalu pilih View Inner Nodes.

      image

    4. Pada tampilan node internal, periksa instans Shell node yang dihasilkan oleh setiap iterasi. Buka runtime log salah satu instans untuk melihat output dari iterasi tersebut dan verifikasi bahwa output-nya benar.

      image

Kasus penggunaan: Menangani berbagai format data

Skenario 1: Memproses array satu dimensi (output Shell/Python)

  • keluaran node penetapan: 2025-11-01, 2025-11-02, 2025-11-03

  • Jumlah iterasi: 3

  • Pada iterasi kedua:

    • Nilai ${dag.foreach.current} adalah 2025-11-02.

    • Nilai ${dag.loopTimes} adalah 2.

Skenario 2: Memproses array dua dimensi (output SQL)

  • output assignment node (MaxCompute SQL):

    +-----+----------+
    | id  | city     |
    +-----+----------+
    | 101 | beijing  |
    | 102 | shanghai |
    +-----+----------+
  • Jumlah iterasi: 2

  • Pada iterasi kedua:

    • Nilai ${dag.foreach.current} adalah 102,shanghai.

    • Nilai ${dag.loopTimes} adalah 2.

    • Nilai ${dag.foreach.current[0]} adalah 102.

    • Nilai ${dag.foreach.current[1]} adalah shanghai.

Kasus penggunaan: Proses batch data dari tabel partisi untuk beberapa lini bisnis

Contoh ini menunjukkan cara menggunakan assignment node dan for-each node untuk memproses data perilaku pengguna secara batch dari beberapa lini bisnis.

Latar belakang

Asumsikan Anda seorang developer data di perusahaan internet berskala besar. Anda bertanggung jawab memproses data dari tiga lini bisnis inti: e-commerce, keuangan, dan logistik. Lini bisnis tambahan mungkin akan ditambahkan di masa depan. Anda perlu menjalankan logika agregasi yang sama setiap hari terhadap log perilaku pengguna dari ketiga lini bisnis tersebut. Logika ini menghitung PV untuk setiap pengguna dan menyimpan hasilnya dalam satu tabel agregat terpadu.

  • Tabel sumber hulu (lapisan DWD):

    • dwd_user_behavior_ecom_d: Tabel perilaku pengguna e-commerce.

    • dwd_user_behavior_finance_d: Tabel perilaku pengguna keuangan.

    • dwd_user_behavior_logistics_d: Tabel perilaku pengguna logistik.

    • dwd_user_behavior_${line-of-business}_d: Tabel perilaku pengguna untuk lini bisnis potensial lainnya.

    • Tabel-tabel ini memiliki skema yang sama dan dipartisi berdasarkan hari (dt).

  • Tabel tujuan hilir (lapisan DWS):

    • dws_user_summary_d: Tabel agregat pengguna.

    • Tabel ini dipartisi berdasarkan lini bisnis (biz_line) dan hari (dt). Tabel ini digunakan untuk menyimpan hasil agregasi dari seluruh lini bisnis.

Membuat tugas terpisah untuk setiap lini bisnis mahal dalam hal maintenance dan rentan terhadap error. Jika Anda menggunakan for-each node, Anda hanya perlu memelihara satu set logika pemrosesan. Sistem secara otomatis melakukan traversal terhadap semua lini bisnis untuk menyelesaikan perhitungan.

Persiapan data

Pertama, buat tabel contoh dan masukkan data uji. Contoh ini menggunakan stempel waktu data 20251010.

  1. Kaitkan resource komputasi MaxCompute dengan ruang kerja.

  2. Buka Data Studio dan buat node MaxCompute SQL.

  3. Buat tabel sumber: Tambahkan kode berikut ke node MaxCompute SQL, pilih, lalu jalankan.

    -- Tabel perilaku pengguna e-commerce
    CREATE TABLE IF NOT EXISTS dwd_user_behavior_ecom_d (
        user_id     STRING COMMENT 'User ID',
        action_type STRING COMMENT 'Jenis perilaku',
        event_time  BIGINT COMMENT 'Stempel waktu UNIX tingkat milidetik saat event terjadi'
    ) 
    COMMENT 'Detail log perilaku pengguna e-commerce'
    PARTITIONED BY (dt STRING COMMENT 'Partisi tanggal dalam format yyyymmdd');
    
    INSERT OVERWRITE TABLE dwd_user_behavior_ecom_d PARTITION (dt='20251010') VALUES
    ('user001', 'click',        1760004060000), -- 2025-10-10 10:01:00.000
    ('user002', 'browse',       1760004150000), -- 2025-10-10 10:02:30.000
    ('user001', 'add_to_cart',  1760004300000); -- 2025-10-10 10:05:00.000
    -- Verifikasi bahwa tabel perilaku pengguna e-commerce telah dibuat.
    SELECT * FROM dwd_user_behavior_ecom_d where dt='20251010';
    
    -- Tabel perilaku pengguna keuangan
    CREATE TABLE IF NOT EXISTS dwd_user_behavior_finance_d (
        user_id     STRING COMMENT 'User ID',
        action_type STRING COMMENT 'Jenis perilaku',
        event_time  BIGINT COMMENT 'Stempel waktu UNIX tingkat milidetik saat event terjadi'
    ) 
    COMMENT 'Detail log perilaku pengguna keuangan'
    PARTITIONED BY (dt STRING COMMENT 'Partisi tanggal dalam format yyyymmdd');
    
    INSERT OVERWRITE TABLE dwd_user_behavior_finance_d PARTITION (dt='20251010') VALUES
    ('user003', 'open_app',      1760020200000), -- 2025-10-10 14:30:00.000
    ('user003', 'transfer',      1760020215000), -- 2025-10-10 14:30:15.000
    ('user003', 'check_balance', 1760020245000), -- 2025-10-10 14:30:45.000
    ('user004', 'open_app',      1760020300000); -- 2025-10-10 14:31:40.000
    -- Verifikasi bahwa tabel perilaku pengguna keuangan telah dibuat.
    SELECT * FROM dwd_user_behavior_finance_d where dt='20251010';
    
    -- Tabel perilaku pengguna logistik
    CREATE TABLE IF NOT EXISTS dwd_user_behavior_logistics_d (
        user_id     STRING COMMENT 'User ID',
        action_type STRING COMMENT 'Jenis perilaku',
        event_time  BIGINT COMMENT 'Stempel waktu UNIX tingkat milidetik saat event terjadi'
    ) 
    COMMENT 'Detail log perilaku pengguna logistik'
    PARTITIONED BY (dt STRING COMMENT 'Partisi tanggal dalam format yyyymmdd');
    
    INSERT OVERWRITE TABLE dwd_user_behavior_logistics_d PARTITION (dt='20251010') VALUES
    ('user001', 'check_status',    1760032800000), -- 2025-10-10 18:00:00.000
    ('user005', 'schedule_pickup', 1760032920000); -- 2025-10-10 18:02:00.000
    
    -- Verifikasi bahwa tabel perilaku pengguna logistik telah dibuat.
    SELECT * FROM dwd_user_behavior_logistics_d where dt='20251010';
  4. Buat tabel tujuan: Tambahkan kode berikut ke node MaxCompute SQL, pilih, lalu jalankan.

    CREATE TABLE IF NOT EXISTS dws_user_summary_d (
        user_id     STRING COMMENT 'User ID',
        pv          BIGINT COMMENT 'Popularitas harian'
    ) 
    COMMENT 'Tabel agregat popularitas pengguna harian'
    PARTITIONED BY (
        dt           STRING COMMENT 'Partisi tanggal dalam format yyyymmdd',
        biz_line     STRING COMMENT 'Partisi lini bisnis, seperti ecom, finance, logistics'
    );
    Penting

    Jika ruang kerja menggunakan mode standar, Anda harus mendeploy node ini ke lingkungan produksi dan melakukan pengisian ulang data.

Implementasi alur kerja

  1. Buat alur kerja. Di panel Scheduling di sebelah kanan, atur parameter penjadwalan bizdate ke hari sebelumnya $[yyyymmdd-1].

    image

  2. Di dalam alur kerja, buat assignment node bernama get_biz_list. Tulis kode berikut dalam MaxCompute SQL. Node ini menghasilkan daftar lini bisnis yang akan diproses.

    -- Output semua lini bisnis yang akan diproses
    SELECT 'ecom' AS biz_line
    UNION ALL
    SELECT 'finance' AS biz_line
    UNION ALL
    SELECT 'logistics' AS biz_line;
  3. Konfigurasi for-each node

    • Kembali ke kanvas alur kerja dan buat for-each node hilir untuk assignment node tersebut.

    • Buka halaman pengaturan for-each node. Di tab Scheduling di sebelah kanan, di bawah Scheduling Parameters > Script Parameters, atur nilai parameter loopDataArray ke output dari node get_biz_list.

      image

    • Di dalam badan loop for-each node, klik Create Internal Node. Buat node MaxCompute SQL dan tulis logika pemrosesan untuk badan loop tersebut.

      Catatan
      • Skrip ini dikendalikan oleh for-each node dan dijalankan sekali untuk setiap lini bisnis.

      • Variabel bawaan ${dag.foreach.current} secara dinamis diganti dengan nama lini bisnis saat ini pada waktu proses. Nilai iterasi yang diharapkan adalah ecom, finance, dan logistics.

      SET odps.sql.allow.dynamic.partition=true;
      
      INSERT OVERWRITE TABLE dws_user_summary_d PARTITION (dt='${bizdate}', biz_line)
      SELECT
          user_id,
          COUNT(*) AS pv,
          '${dag.foreach.current}' AS biz_line
      FROM
          dwd_user_behavior_${dag.foreach.current}_d
      WHERE
          dt = '${bizdate}'
      GROUP BY
          user_id;
  4. Tambahkan node verifikasi

    Kembali ke kanvas alur kerja. Untuk for-each node, klik Create Descendant Node. Buat node MaxCompute SQL dan tambahkan kode berikut.

    SELECT * FROM dws_user_summary_d WHERE dt='20251010' ORDER BY biz_line, user_id;

Deploy dan jalankan

Deploy alur kerja ke lingkungan produksi. Di Pusat Operasi, navigasikan ke Auto Triggered Node O&M > Auto Triggered Nodes, temukan alur kerja target, jalankan pengujian, dan pilih '20251010' sebagai stempel waktu data.

Setelah eksekusi selesai, lihat log eksekusi di instans pengujian. Output yang diharapkan dari node terakhir adalah:

user_id

pv

dt

biz_line

user001

2

20251010

ecom

user002

1

20251010

ecom

user003

3

20251010

finance

user004

1

20251010

finance

user001

1

20251010

logistics

user005

1

20251010

logistics

Keunggulan

  • Ekstensibilitas tinggi: Jika lini bisnis baru ditambahkan, Anda hanya perlu menambahkan satu baris kode SQL di assignment node. Anda tidak perlu mengubah logika pemrosesan.

  • Maintenance mudah: Semua lini bisnis menggunakan logika pemrosesan yang sama. Perubahan di satu tempat langsung berlaku untuk semuanya.

FAQ

  • T: Mengapa saya tidak bisa menjalankan node for-each secara langsung di DataStudio untuk pengujian?

    J: Ini merupakan keterbatasan desain. Node tersebut memerlukan lingkungan penjadwalan yang lengkap untuk menyelesaikan node context-nya dan dependensinya; oleh karena itu, Anda tidak dapat melakukan debug secara langsung di DataStudio. Anda harus mendeploy task ke Operation Center dan mengujinya dengan menggunakan data backfill atau eksekusi terjadwal.

  • T: Mengapa Pengujian asap pada node for-each sendiri gagal atau tidak melakukan apa-apa?

    J: Data loop untuk node for-each berasal dari parameter input loopDataArray, yang harus bound ke parameter outputs dari assignment node hulu. Jika Anda menjalankan node for-each sendirian, proses tersebut akan gagal atau dilewati karena tidak dapat mengambil result set input.

  • T: Mengapa loop saya hanya berjalan sekali? 

    J: Hal ini biasanya terjadi karena sistem mengurai output dari assignment node hulu sebagai satu elemen tunggal. Periksa output Anda:

    • 1. Apakah berupa string tunggal tanpa pembatas apa pun?

    • 2. Jika Anda ingin melakukan iterasi pada beberapa item, pastikan item-item tersebut dipisahkan dengan koma (,). Misalnya, 'item1,item2,item3' akan melakukan loop tiga kali, sedangkan 'item1 item2 item3' hanya akan melakukan loop sekali.