Fitur ekstrak, transformasi, dan muat (ETL) dalam Data Transmission Service (DTS) memungkinkan Anda memproses data streaming secara real-time melalui antarmuka DAG visual berbasis drag-and-drop—tanpa perlu menulis kode. Tutorial ini menjelaskan contoh lengkap: menggabungkan data transaksi real-time dengan data referensi produk, menyaring pesanan yang melebihi ambang batas harga tertentu, lalu menuliskan hasilnya ke database tujuan.
Kasus penggunaan
Manajemen data multi-sumber terpusat: Konsolidasikan data dari beberapa wilayah atau sumber heterogen ke satu database secara real-time.
Pelaporan real-time: Bangun pipeline pelaporan yang mencerminkan aktivitas bisnis langsung berdasarkan dimensi seperti produk, pelanggan, dan waktu.
Komputasi real-time: Bersihkan data streaming untuk mengekstrak nilai fitur dan tag guna mendukung kasus penggunaan seperti profil pengguna, kontrol risiko, dan sistem rekomendasi.
Cara kerja
Contoh ini menggabungkan dua sumber data:
Sebuah stream table — tabel
test_orders, yang menerima event transaksi real-time (ID pesanan, ID pelanggan, ID produk, total harga, dan tanggal pesanan).Sebuah dimension table — tabel
product, yang menyimpan data referensi produk yang relatif statis (ID produk, nama produk, dan harga satuan).
Tugas ETL menggabungkan kedua tabel tersebut, menyaring pesanan dengan total_price > 3000.00, lalu menuliskan baris yang sesuai ke tabel tujuan secara real-time.

Stream table diperbarui terus-menerus seiring kedatangan event. Dimension table menyimpan data referensi yang jarang berubah dan digunakan untuk memperkaya data stream, mirip dengan lookup table.
Prasyarat
Sebelum memulai, pastikan Anda telah memiliki:
Akses ke DTS console
Instans ApsaraDB RDS for MySQL sebagai sumber (berisi stream table dan dimension table)
Instans ApsaraDB RDS for MySQL sebagai tujuan
Baca bagian Prasyarat dan Peringatan pada topik "Configure an ETL task in DAG mode" sebelum melanjutkan.
Siapkan tabel sumber dan tujuan
Buat tabel-tabel berikut di instans ApsaraDB RDS for MySQL Anda.
Data transaksi real-time
CREATE TABLE test_orders (
order_id BIGINT NOT NULL COMMENT 'Order ID',
user_id BIGINT NOT NULL COMMENT 'User ID',
product_id BIGINT NOT NULL COMMENT 'Product ID',
total_price DECIMAL(15,2) NOT NULL COMMENT 'Total price',
order_date TIMESTAMP NOT NULL COMMENT 'Order date',
PRIMARY KEY (order_id)
);Data dimensi bisnis
CREATE TABLE product (
product_id BIGINT NOT NULL COMMENT 'Product ID',
product_name VARCHAR(20) COMMENT 'Product name',
product_price DECIMAL(15,2) NOT NULL COMMENT 'Unit price'
);Tabel tujuan
CREATE TABLE test_orders_new (
order_id BIGINT NOT NULL COMMENT 'Order ID',
user_id BIGINT NOT NULL COMMENT 'User ID',
product_id BIGINT NOT NULL COMMENT 'Product ID',
total_price DECIMAL(15,2) NOT NULL COMMENT 'Total price',
order_date TIMESTAMP NOT NULL COMMENT 'Order date',
product_id_2 BIGINT NOT NULL COMMENT 'Product ID (from product table)',
product_name VARCHAR(20) COMMENT 'Product name',
product_price DECIMAL(15,2) NOT NULL COMMENT 'Unit price',
PRIMARY KEY (order_id)
);Tabel tujuan menggunakanproduct_id_2untuk menyimpan ID produk dari tabel dimensiproduct. Hal ini menghindari konflik nama kolom denganproduct_iddari stream tabletest_orders. Saat dua tabel yang digabung memiliki nama kolom yang sama, ubah salah satu nama kolom tersebut dalam skema tabel tujuan sebelum mengonfigurasi tugas ETL.
Konfigurasikan tugas ETL
Langkah-langkah berikut mengonfigurasi aliran data yang menggabungkan stream table dengan dimension table, menyaring hasilnya, lalu menuliskannya ke tujuan.

Langkah 1: Konfigurasikan database sumber
Masuk ke DTS console.DTS console
Di panel navigasi kiri, klik ETL.
Di pojok kiri atas halaman Streaming ETL, klik
. Di kotak dialog Create Data Flow, masukkan nama di bidang Data Flow Name dan atur Development Method ke DAG.Klik OK.
Konfigurasikan stream table:
Dari panel kiri, seret node Input/Dimension Table MySQL ke kanvas.
Klik Input/Dimension Table MySQL-1 di kanvas.
Di tab Node Settings, konfigurasikan parameter berikut:
Parameter Deskripsi Data Source Name DTS menghasilkan nama secara otomatis. Masukkan nama deskriptif agar mudah diidentifikasi. Region Pilih wilayah database sumber. Wilayah yang didukung: Tiongkok (Hangzhou), Tiongkok (Shanghai), Tiongkok (Qingdao), Tiongkok (Beijing), Tiongkok (Zhangjiakou), Tiongkok (Shenzhen), Tiongkok (Guangzhou), dan Tiongkok (Hong Kong). Instances Pilih instans database sumber. Untuk membuat instans baru, klik Create Instance. Lihat Databases supported by DMS untuk detail selengkapnya. Node Type Pilih Stream Table. Stream table menerima pembaruan berkelanjutan dan dapat digabungkan dengan dimension table. Convert Format Menentukan cara tabel dinamis dikodekan saat ditulis kembali ke stream. Upsert Stream: Operasi INSERT dan UPDATE dikodekan sebagai pesan upsert; operasi DELETE sebagai pesan delete. Memerlukan kunci unik (kunci komposit didukung). Append-Only Stream: Hanya operasi INSERT yang disertakan dalam stream output. Select Databases and Tables Pilih database dan tabel yang akan ditransformasi. Di tab Output Fields, pilih kolom yang akan disertakan.
Pada Time Attribute
Parameter Deskripsi Event Time Watermark Pilih bidang timestamp yang merepresentasikan kapan setiap event dihasilkan (misalnya, order_date).Latency of Event Time Watermark Masukkan penundaan maksimum yang dapat diterima untuk event yang datang tidak berurutan. ETL menunggu selama durasi ini untuk event yang terlambat sebelum membuangnya. Misalnya, jika data yang dihasilkan pada pukul 09.59 belum tiba pada pukul 10.00 ditambah latency yang dikonfigurasi, data tersebut akan dibuang. Processing Time Masukkan nama kolom. ETL menyimpan waktu pemrosesan server dalam kolom ini. Gunakan processing time untuk temporal join yang selalu mengambil versi terbaru dari tabel dimensi.
Jika ikon
tidak lagi muncul di sisi kanan node, stream table telah dikonfigurasi.Konfigurasikan dimension table:
Dari panel kiri, seret node Input/Dimension Table MySQL lainnya ke kanvas.
Klik Input/Dimension Table MySQL-2 di kanvas.
Di tab Node Settings, konfigurasikan parameter berikut:
Parameter Deskripsi Data Source Name DTS menghasilkan nama secara otomatis. Masukkan nama deskriptif agar mudah diidentifikasi. Region Pilih wilayah database sumber. Instances Pilih instans database sumber, atau klik Create Instance. Node Type Pilih Dimension Table. Dimension table menyimpan data referensi yang jarang berubah dan digunakan untuk memperkaya data stream menjadi wide table. Select Databases and Tables Pilih database dan tabel yang akan ditransformasi. Di tab Output Fields, pilih kolom yang akan disertakan.
Jika ikon
tidak lagi muncul di sisi kanan node, dimension table telah dikonfigurasi.
Langkah 2: Konfigurasikan komponen Table Join
Dari bagian Transform di panel kiri, seret node JOIN ke kanvas.
Hubungkan node stream table ke Table Join-1: arahkan kursor ke node stream table, klik lingkaran kosong di tepi kanannya, lalu seret garis koneksi ke Table Join-1. Ulangi langkah ini untuk node dimension table.
Klik Table Join-1 di kanvas untuk membuka pengaturannya.
Di tab Node Settings, konfigurasikan parameter berikut:
Bagian Parameter Deskripsi Conversion Name Enter Transformation Name DTS menghasilkan nama secara otomatis. Masukkan nama deskriptif. JOIN Settings Left Table in JOIN Clause Pilih stream table sebagai tabel utama (kiri). Temporal Join Time Attribute Menentukan cara versi stream table dicocokkan dengan dimension table. Jika tidak diatur, join reguler akan dilakukan. Based on Event Time Watermark: mencocokkan versi dimension table pada saat event dihasilkan (gunakan dengan tabel versi). Based on Processing Time: selalu menggabungkan dengan versi terbaru dimension table (gunakan dengan tabel standar). Pada contoh ini, pilih Based on Processing Time. Select JOIN Operation Pilih jenis join. Pada contoh ini, pilih Inner Join. Inner Join: hanya mengembalikan baris yang memiliki nilai yang cocok di kedua tabel. Left Join: mengembalikan semua baris dari stream table, ditambah baris yang cocok dari dimension table jika tersedia. Right Join: mengembalikan semua baris dari dimension table, ditambah baris yang cocok dari stream table jika tersedia. JOIN Condition + Add Condition Klik + Add Condition lalu pilih bidang join. Bidang di sebelah kiri tanda =milik tabel kiri; bidang di sebelah kanan milik tabel kanan.Di tab Output Fields, pilih kolom yang akan disertakan dalam wide table.
Jika ikon
tidak lagi muncul di sisi kanan komponen Table Join-1, join telah dikonfigurasi.
Langkah 3: Konfigurasikan komponen Table Record Filter
Dari bagian Transform di panel kiri, seret node Table Record Filter ke kanvas.
Hubungkan Table Join-1 ke Table Record Filter-1: arahkan kursor ke Table Join-1, klik lingkaran kosong di tepi kanannya, lalu seret garis koneksi ke Table Record Filter-1.
Klik Table Record Filter-1 di kanvas.
Di tab Node Settings, masukkan nama di bidang Conversion Name.
Di bidang WHERE Condition, tentukan kondisi filter menggunakan salah satu metode berikut:
Ketik kondisi secara langsung. Misalnya, masukkan
total_price>3000.00untuk hanya melewatkan pesanan dengan total harga di atas 3.000.Klik bidang di bagian Input Fields dan operator di bagian Operator untuk membangun kondisi secara visual.
Jika ikon
tidak lagi muncul di sisi kanan komponen, filter telah dikonfigurasi.
Langkah 4: Konfigurasikan database tujuan
Dari bagian Output di panel kiri, seret node MySQL ke kanvas.
Hubungkan Table Record Filter-1 ke Output MySQL-1: arahkan kursor ke Table Record Filter-1, klik lingkaran kosong di tepi kanannya, lalu seret garis koneksi ke Output MySQL-1.
Klik Output MySQL-1 di kanvas.
Di tab Node Settings, konfigurasikan parameter berikut:
Parameter Deskripsi Data Source Name DTS menghasilkan nama secara otomatis. Masukkan nama deskriptif. Region Pilih wilayah database tujuan. Wilayah yang didukung: Tiongkok (Hangzhou), Tiongkok (Shanghai), Tiongkok (Qingdao), Tiongkok (Beijing), Tiongkok (Zhangjiakou), Tiongkok (Shenzhen), Tiongkok (Guangzhou), dan Tiongkok (Hong Kong). Instances Pilih instans database tujuan, atau klik Create Instance. Table Mapping Di bagian Select Destination Table, klik tabel tujuan. Di tab Output Fields, pilih kolom yang akan dituliskan ke tabel tujuan.
Jika ikon
tidak lagi muncul di sisi kanan komponen, database tujuan telah dikonfigurasi.
Langkah 5: Pemeriksaan Awal dan mulai tugas
Klik Generate Flink SQL Validation. DTS menghasilkan pernyataan SQL Flink dan menjalankan pemeriksaan validasi.
Setelah validasi selesai, klik View ETL Validation Details untuk meninjau SQL yang dihasilkan dan hasil pemeriksaan. Klik Close setelah selesai.
Jika validasi gagal, perbaiki masalah yang tercantum dalam hasil lalu hasilkan ulang SQL Flink.
Klik Next: Save Task Settings and Precheck. Tugas hanya dapat dimulai setelah Pemeriksaan Awal berhasil. Jika ada item yang gagal, klik View Details di samping item yang gagal, perbaiki masalahnya, lalu jalankan Pemeriksaan Awal lagi.
Setelah Pemeriksaan Awal berhasil, klik Next: Purchase Instance.
Di halaman Purchase Instance, atur parameter Instance Class dan Compute Units (CUs). Baca dan pilih Data Transmission Service (Pay-as-you-go) Service Terms dan Service Terms for Public Preview.
Selama periode pratinjau publik, setiap pengguna dapat membuat hingga dua instans ETL tanpa biaya.
Klik Buy and Start untuk menjalankan tugas ETL.
Hasil
Setelah tugas dimulai, DTS secara terus-menerus membaca event perubahan dari test_orders, menggabungkannya dengan dimension table product, menerapkan filter total_price > 3000.00, lalu menuliskan baris yang sesuai ke test_orders_new secara real-time.
Gambar 1. Tabel data transaksi real-time: test_orders
Gambar 2. Tabel tujuan: test_orders_new
What's next
Configure an ETL task in DAG mode — referensi parameter lengkap untuk semua pengaturan tugas ETL
Databases supported by DMS — jenis database sumber dan tujuan yang didukung