Dalam alur kerja pemrosesan data, gunakan node for-each untuk mengeksekusi subtask yang sama terhadap setiap item dalam daftar, seperti daftar nama file atau partisi. Node ini secara otomatis melakukan iterasi terhadap set hasil dari node hulu (biasanya node assignment) dan mengeksekusi isi loop-nya untuk setiap elemen, sehingga menghilangkan kebutuhan membuat tugas secara manual untuk setiap item dan memungkinkan alur kerja yang lebih dinamis serta otomatis.
Kasus penggunaan
Node for-each ideal untuk eksekusi berparameter di mana logika analisis atau pemrosesan yang sama dijalankan pada data dari unit bisnis, lini produk, atau item konfigurasi yang berbeda. Misalnya, jika perusahaan Anda memiliki beberapa lini produk dan Anda perlu menghasilkan laporan harian terpisah untuk masing-masing, logika pemrosesannya identik, tetapi datanya berbeda.
Seperti halnya loop for dalam bahasa pemrograman, node for-each secara otomatis melakukan iterasi terhadap daftar item (seperti nama tabel, nama partisi, atau nama file) dan mengeksekusi sub-alur kerja yang telah ditentukan untuk setiap item, sehingga secara signifikan meningkatkan otomatisasi dan fleksibilitas alur kerja Anda.
Catatan penggunaan
Edition: DataWorks Edisi Standar atau lebih tinggi.
Izin: Anda harus memiliki peran Development atau Workspace Manager di ruang kerja DataWorks Anda. Untuk informasi selengkapnya, lihat Tambahkan anggota ke ruang kerja.
Cara kerja
Node for-each bertindak sebagai kontainer yang membungkus sub-alur kerja yang dapat dikustomisasi, yang dikenal sebagai isi loop. Cara kerjanya sebagai berikut:
Data masukan: Node for-each bergantung pada node assignment atau node pemberi nilai lainnya di hulu (misalnya, node EMR Hive). Node ini mengambil set hasil dalam format array dengan mengikat parameter
loopDataArray.Eksekusi loop: Setelah node dimulai, node tersebut melakukan iterasi melalui setiap elemen set hasil. Untuk setiap elemen, node tersebut mengeksekusi sepenuhnya isi loop sekali (dari
starthinggaend).CatatanNode start dan end tidak dapat diedit. Keduanya hanya menandai awal dan akhir isi loop.
Pengiriman data: Dalam setiap iterasi, node dalam isi loop dapat mengakses nilai elemen saat ini melalui variabel bawaan, seperti
${dag.foreach.current}.
Variabel bawaan
Variabel dalam format ${...} menggunakan sintaks templat khusus DataWorks. DataWorks mengurai parameter ini secara langsung dan menggantinya secara statis.
Di dalam isi loop for-each, gunakan variabel bawaan berikut untuk mengakses status dan data loop:
Variabel bawaan | Deskripsi | Analogi dengan loop for |
| Mengambil seluruh set hasil dari node assignment hulu. | Pertimbangkan loop for berikut:
|
| Mendapatkan item data yang sedang diproses dalam loop saat ini. | |
| Mendapatkan offset loop saat ini, dimulai dari 0. | |
| Mendapatkan nomor iterasi loop saat ini, dimulai dari 1. |
Jika output hulu berupa array dua dimensi (seperti hasil kueri SQL), Anda juga dapat menggunakan sintaks berikut untuk pengambilan nilai yang tepat:
Variabel lain | Deskripsi |
| Mendapatkan baris data saat ini (array satu dimensi) sebagai string, dengan elemen dipisahkan oleh koma ( |
| Mendapatkan elemen ke- |
| Mendapatkan nilai dari baris ke- Node for-each saat ini tidak mendukung loop bersarang. Sintaks ini hanya untuk pengambilan nilai. |
Batasan
Mode eksekusi: Loop mendukung eksekusi serial maupun eksekusi paralel. Pilih eksekusi paralel ketika iterasi tidak saling bergantung.
Batas loop: Secara default, jumlah maksimum iterasi adalah 128. Nilai ini dapat ditingkatkan hingga maksimum 1.024.
Batasan debugging: Anda tidak dapat menjalankan node for-each secara langsung di DataStudio. Anda harus deploy tugas ke Operation Center dan mengujinya dengan menjalankan data backfill.
Batasan eksekusi: Node for-each tidak dapat dijalankan secara individual. Ini mencakup Pengujian asap, data backfill, dan eksekusi manual.
Alur kendali dalam isi loop: Saat menggunakan node branch dalam isi loop, pastikan semua cabang menyatu ke satu node merge sebelum terhubung ke node
end. Hal ini menjamin integritas logis isi loop.Batasan rerun: Setelah node di-deploy untuk penjadwalan, rerun otomatis saat gagal akan dilanjutkan dari titik kegagalan. Namun, rerun manual akan memulai ulang seluruh node for-each dari awal.
Prosedur
Prosedur ini memandu Anda melalui konfigurasi tugas for-each lengkap. Kami menggunakan node assignment sebagai node hulu dan node Shell dalam isi loop untuk mencetak hasilnya.
Persiapkan data hulu (konfigurasikan node assignment)
Buat node penetapan dan konfigurasikan outputnya agar menyediakan set hasil untuk iterasi node for-each berikutnya.
Dalam alur kerja, buat node assignment (misalnya,
assign) dan letakkan di hulu node for-each.Klik ganda node assignment, lalu pilih bahasa. Misalnya, gunakan
Python 2untuk menghasilkan array dengan empat elemen:Node penetapan kemudian menghasilkan output [10,20,30,40] ke node hilir. Node tersebut secara otomatis memisahkan baris terakhir dari output berdasarkan koma untuk membuat array.
print "10,20,30,40"Node assignment secara otomatis menghasilkan parameter output bernama
outputsuntuk merepresentasikan set hasilnya.Save node assignment.
Konfigurasikan node for-each untuk mengonsumsi data
Konfigurasikan node for-each untuk menerima data hulu dan menggunakannya dalam isi loop-nya.
Klik ganda node for-each untuk masuk ke kanvas internalnya.
Pada panel Scheduling settings di sebelah kanan, temukan parameter
loopDataArraydi bawah Scheduling parameters lalu klik Bind.
Pada kotak dialog yang muncul, atur Value Source ke parameter
outputsdari node assignment hulu (assign). Tindakan ini secara otomatis membuat dependensi.Dalam isi loop for-each, klik Create Inner Node lalu pilih node
Shell.Dalam skenario nyata, Anda dapat mengonfigurasi jenis node apa pun.
Klik ganda node Shell baru dan gunakan variabel bawaan dalam kode untuk mengakses serta mencetak informasi loop:
#!/bin/bash # Gunakan ${dag.loopTimes} untuk mendapatkan nomor loop saat ini echo "Current loop number is: ${dag.loopTimes}" # Gunakan ${dag.foreach.current} untuk mendapatkan item data saat ini echo "Current item is: ${dag.foreach.current}"(Opsional) Pada panel Scheduling settings di sebelah kanan, konfigurasikan properti di bawah Scheduling strategy.
Maximum number of cycles: Default-nya 128 dan dapat ditingkatkan hingga maksimum 1.024.
PentingParameter ini menentukan jumlah maksimum eksekusi isi loop. Jika Anda memproses data dalam jumlah besar, pastikan nilai ini cukup tinggi untuk mencakup semua item.
Execution mode: Pilih opsi. Untuk contoh ini, pilih Serial.
Serial: Menjalankan iterasi secara berurutan.
Parallel: Menjalankan iterasi secara konkuren untuk meningkatkan efisiensi task. Saat dikonfigurasi untuk eksekusi paralel, kegagalan satu Batch tidak memengaruhi Batch lainnya, dan penjadwalan berlanjut hingga semua Batch selesai. Konkurensi default adalah 5, dengan maksimum 20.

Save node Shell.
Deploy, run, and verify
Kirimkan alur kerja ke Operation Center untuk dieksekusi dan verifikasi hasil dari node for-each.
Kembali ke kanvas alur kerja utama dan klik tombol Deploy pada bilah alat untuk menerapkan seluruh alur kerja.
Buka , lalu lakukan Pengujian asap pada alur kerja target.
PentingJangan menjalankan Pengujian asap hanya pada node for-each. Karena node for-each bergantung pada output dari node assignment hulu, Anda harus memulai pengujian dari assignment node untuk memastikan node tersebut menerima data masukan yang diperlukan.
Setelah test instance berhasil dijalankan, temukan instans node for-each dalam daftar. Kemudian, buka instans tersebut, klik kanan, lalu pilih View Inner Nodes.

Pada tampilan node internal, periksa instans Shell node yang dihasilkan oleh setiap iterasi. Buka runtime log salah satu instans untuk melihat output dari iterasi tersebut dan verifikasi bahwa output-nya benar.

Kasus penggunaan: Menangani berbagai format data
Skenario 1: Memproses array satu dimensi (output Shell/Python)
keluaran node penetapan: 2025-11-01, 2025-11-02, 2025-11-03
Jumlah iterasi: 3
Pada iterasi kedua:
Nilai
${dag.foreach.current}adalah2025-11-02.Nilai
${dag.loopTimes}adalah2.
Skenario 2: Memproses array dua dimensi (output SQL)
output assignment node (MaxCompute SQL):
+-----+----------+ | id | city | +-----+----------+ | 101 | beijing | | 102 | shanghai | +-----+----------+Jumlah iterasi: 2
Pada iterasi kedua:
Nilai
${dag.foreach.current}adalah102,shanghai.Nilai
${dag.loopTimes}adalah2.Nilai
${dag.foreach.current[0]}adalah102.Nilai
${dag.foreach.current[1]}adalahshanghai.
Kasus penggunaan: Proses batch data dari tabel partisi untuk beberapa lini bisnis
Contoh ini menunjukkan cara menggunakan assignment node dan for-each node untuk memproses data perilaku pengguna secara batch dari beberapa lini bisnis.
Latar belakang
Asumsikan Anda seorang developer data di perusahaan internet berskala besar. Anda bertanggung jawab memproses data dari tiga lini bisnis inti: e-commerce, keuangan, dan logistik. Lini bisnis tambahan mungkin akan ditambahkan di masa depan. Anda perlu menjalankan logika agregasi yang sama setiap hari terhadap log perilaku pengguna dari ketiga lini bisnis tersebut. Logika ini menghitung PV untuk setiap pengguna dan menyimpan hasilnya dalam satu tabel agregat terpadu.
Tabel sumber hulu (lapisan DWD):
dwd_user_behavior_ecom_d: Tabel perilaku pengguna e-commerce.dwd_user_behavior_finance_d: Tabel perilaku pengguna keuangan.dwd_user_behavior_logistics_d: Tabel perilaku pengguna logistik.dwd_user_behavior_${line-of-business}_d: Tabel perilaku pengguna untuk lini bisnis potensial lainnya.Tabel-tabel ini memiliki skema yang sama dan dipartisi berdasarkan hari (
dt).
Tabel tujuan hilir (lapisan DWS):
dws_user_summary_d: Tabel agregat pengguna.Tabel ini dipartisi berdasarkan lini bisnis (
biz_line) dan hari (dt). Tabel ini digunakan untuk menyimpan hasil agregasi dari seluruh lini bisnis.
Membuat tugas terpisah untuk setiap lini bisnis mahal dalam hal maintenance dan rentan terhadap error. Jika Anda menggunakan for-each node, Anda hanya perlu memelihara satu set logika pemrosesan. Sistem secara otomatis melakukan traversal terhadap semua lini bisnis untuk menyelesaikan perhitungan.
Persiapan data
Pertama, buat tabel contoh dan masukkan data uji. Contoh ini menggunakan stempel waktu data 20251010.
Kaitkan resource komputasi MaxCompute dengan ruang kerja.
Buka Data Studio dan buat node MaxCompute SQL.
Buat tabel sumber: Tambahkan kode berikut ke node MaxCompute SQL, pilih, lalu jalankan.
-- Tabel perilaku pengguna e-commerce CREATE TABLE IF NOT EXISTS dwd_user_behavior_ecom_d ( user_id STRING COMMENT 'User ID', action_type STRING COMMENT 'Jenis perilaku', event_time BIGINT COMMENT 'Stempel waktu UNIX tingkat milidetik saat event terjadi' ) COMMENT 'Detail log perilaku pengguna e-commerce' PARTITIONED BY (dt STRING COMMENT 'Partisi tanggal dalam format yyyymmdd'); INSERT OVERWRITE TABLE dwd_user_behavior_ecom_d PARTITION (dt='20251010') VALUES ('user001', 'click', 1760004060000), -- 2025-10-10 10:01:00.000 ('user002', 'browse', 1760004150000), -- 2025-10-10 10:02:30.000 ('user001', 'add_to_cart', 1760004300000); -- 2025-10-10 10:05:00.000 -- Verifikasi bahwa tabel perilaku pengguna e-commerce telah dibuat. SELECT * FROM dwd_user_behavior_ecom_d where dt='20251010'; -- Tabel perilaku pengguna keuangan CREATE TABLE IF NOT EXISTS dwd_user_behavior_finance_d ( user_id STRING COMMENT 'User ID', action_type STRING COMMENT 'Jenis perilaku', event_time BIGINT COMMENT 'Stempel waktu UNIX tingkat milidetik saat event terjadi' ) COMMENT 'Detail log perilaku pengguna keuangan' PARTITIONED BY (dt STRING COMMENT 'Partisi tanggal dalam format yyyymmdd'); INSERT OVERWRITE TABLE dwd_user_behavior_finance_d PARTITION (dt='20251010') VALUES ('user003', 'open_app', 1760020200000), -- 2025-10-10 14:30:00.000 ('user003', 'transfer', 1760020215000), -- 2025-10-10 14:30:15.000 ('user003', 'check_balance', 1760020245000), -- 2025-10-10 14:30:45.000 ('user004', 'open_app', 1760020300000); -- 2025-10-10 14:31:40.000 -- Verifikasi bahwa tabel perilaku pengguna keuangan telah dibuat. SELECT * FROM dwd_user_behavior_finance_d where dt='20251010'; -- Tabel perilaku pengguna logistik CREATE TABLE IF NOT EXISTS dwd_user_behavior_logistics_d ( user_id STRING COMMENT 'User ID', action_type STRING COMMENT 'Jenis perilaku', event_time BIGINT COMMENT 'Stempel waktu UNIX tingkat milidetik saat event terjadi' ) COMMENT 'Detail log perilaku pengguna logistik' PARTITIONED BY (dt STRING COMMENT 'Partisi tanggal dalam format yyyymmdd'); INSERT OVERWRITE TABLE dwd_user_behavior_logistics_d PARTITION (dt='20251010') VALUES ('user001', 'check_status', 1760032800000), -- 2025-10-10 18:00:00.000 ('user005', 'schedule_pickup', 1760032920000); -- 2025-10-10 18:02:00.000 -- Verifikasi bahwa tabel perilaku pengguna logistik telah dibuat. SELECT * FROM dwd_user_behavior_logistics_d where dt='20251010';Buat tabel tujuan: Tambahkan kode berikut ke node MaxCompute SQL, pilih, lalu jalankan.
CREATE TABLE IF NOT EXISTS dws_user_summary_d ( user_id STRING COMMENT 'User ID', pv BIGINT COMMENT 'Popularitas harian' ) COMMENT 'Tabel agregat popularitas pengguna harian' PARTITIONED BY ( dt STRING COMMENT 'Partisi tanggal dalam format yyyymmdd', biz_line STRING COMMENT 'Partisi lini bisnis, seperti ecom, finance, logistics' );PentingJika ruang kerja menggunakan mode standar, Anda harus mendeploy node ini ke lingkungan produksi dan melakukan pengisian ulang data.
Implementasi alur kerja
Buat alur kerja. Di panel Scheduling di sebelah kanan, atur parameter penjadwalan bizdate ke hari sebelumnya
$[yyyymmdd-1].
Di dalam alur kerja, buat assignment node bernama
get_biz_list. Tulis kode berikut dalam MaxCompute SQL. Node ini menghasilkan daftar lini bisnis yang akan diproses.-- Output semua lini bisnis yang akan diproses SELECT 'ecom' AS biz_line UNION ALL SELECT 'finance' AS biz_line UNION ALL SELECT 'logistics' AS biz_line;Konfigurasi for-each node
Kembali ke kanvas alur kerja dan buat for-each node hilir untuk assignment node tersebut.
Buka halaman pengaturan for-each node. Di tab Scheduling di sebelah kanan, di bawah , atur nilai parameter loopDataArray ke output dari node get_biz_list.

Di dalam badan loop for-each node, klik Create Internal Node. Buat node MaxCompute SQL dan tulis logika pemrosesan untuk badan loop tersebut.
CatatanSkrip ini dikendalikan oleh for-each node dan dijalankan sekali untuk setiap lini bisnis.
Variabel bawaan
${dag.foreach.current}secara dinamis diganti dengan nama lini bisnis saat ini pada waktu proses. Nilai iterasi yang diharapkan adalahecom,finance, danlogistics.
SET odps.sql.allow.dynamic.partition=true; INSERT OVERWRITE TABLE dws_user_summary_d PARTITION (dt='${bizdate}', biz_line) SELECT user_id, COUNT(*) AS pv, '${dag.foreach.current}' AS biz_line FROM dwd_user_behavior_${dag.foreach.current}_d WHERE dt = '${bizdate}' GROUP BY user_id;
Tambahkan node verifikasi
Kembali ke kanvas alur kerja. Untuk for-each node, klik Create Descendant Node. Buat node MaxCompute SQL dan tambahkan kode berikut.
SELECT * FROM dws_user_summary_d WHERE dt='20251010' ORDER BY biz_line, user_id;
Deploy dan jalankan
Deploy alur kerja ke lingkungan produksi. Di Pusat Operasi, navigasikan ke , temukan alur kerja target, jalankan pengujian, dan pilih '20251010' sebagai stempel waktu data.
Setelah eksekusi selesai, lihat log eksekusi di instans pengujian. Output yang diharapkan dari node terakhir adalah:
user_id | pv | dt | biz_line |
user001 | 2 | 20251010 | ecom |
user002 | 1 | 20251010 | ecom |
user003 | 3 | 20251010 | finance |
user004 | 1 | 20251010 | finance |
user001 | 1 | 20251010 | logistics |
user005 | 1 | 20251010 | logistics |
Keunggulan
Ekstensibilitas tinggi: Jika lini bisnis baru ditambahkan, Anda hanya perlu menambahkan satu baris kode SQL di assignment node. Anda tidak perlu mengubah logika pemrosesan.
Maintenance mudah: Semua lini bisnis menggunakan logika pemrosesan yang sama. Perubahan di satu tempat langsung berlaku untuk semuanya.
FAQ
T: Mengapa saya tidak bisa menjalankan node for-each secara langsung di DataStudio untuk pengujian?
J: Ini merupakan keterbatasan desain. Node tersebut memerlukan lingkungan penjadwalan yang lengkap untuk menyelesaikan node context-nya dan dependensinya; oleh karena itu, Anda tidak dapat melakukan debug secara langsung di DataStudio. Anda harus mendeploy task ke Operation Center dan mengujinya dengan menggunakan data backfill atau eksekusi terjadwal.
T: Mengapa Pengujian asap pada node for-each sendiri gagal atau tidak melakukan apa-apa?
J: Data loop untuk node for-each berasal dari parameter input
loopDataArray, yang harus bound ke parameteroutputsdari assignment node hulu. Jika Anda menjalankan node for-each sendirian, proses tersebut akan gagal atau dilewati karena tidak dapat mengambil result set input.T: Mengapa loop saya hanya berjalan sekali?
J: Hal ini biasanya terjadi karena sistem mengurai output dari assignment node hulu sebagai satu elemen tunggal. Periksa output Anda:
1. Apakah berupa string tunggal tanpa pembatas apa pun?
2. Jika Anda ingin melakukan iterasi pada beberapa item, pastikan item-item tersebut dipisahkan dengan koma (
,). Misalnya,'item1,item2,item3'akan melakukan loop tiga kali, sedangkan'item1 item2 item3'hanya akan melakukan loop sekali.