All Products
Search
Document Center

Data Transmission Service:Gunakan fitur ETL untuk menganalisis pesanan real-time

Last Updated:Mar 28, 2026

Fitur ekstrak, transformasi, dan muat (ETL) dalam Data Transmission Service (DTS) memungkinkan Anda memproses data streaming secara real-time melalui antarmuka DAG visual berbasis drag-and-drop—tanpa perlu menulis kode. Tutorial ini menjelaskan contoh lengkap: menggabungkan data transaksi real-time dengan data referensi produk, menyaring pesanan yang melebihi ambang batas harga tertentu, lalu menuliskan hasilnya ke database tujuan.

Kasus penggunaan

  • Manajemen data multi-sumber terpusat: Konsolidasikan data dari beberapa wilayah atau sumber heterogen ke satu database secara real-time.

  • Pelaporan real-time: Bangun pipeline pelaporan yang mencerminkan aktivitas bisnis langsung berdasarkan dimensi seperti produk, pelanggan, dan waktu.

  • Komputasi real-time: Bersihkan data streaming untuk mengekstrak nilai fitur dan tag guna mendukung kasus penggunaan seperti profil pengguna, kontrol risiko, dan sistem rekomendasi.

Cara kerja

Contoh ini menggabungkan dua sumber data:

  • Sebuah stream table — tabel test_orders, yang menerima event transaksi real-time (ID pesanan, ID pelanggan, ID produk, total harga, dan tanggal pesanan).

  • Sebuah dimension table — tabel product, yang menyimpan data referensi produk yang relatif statis (ID produk, nama produk, dan harga satuan).

Tugas ETL menggabungkan kedua tabel tersebut, menyaring pesanan dengan total_price > 3000.00, lalu menuliskan baris yang sesuai ke tabel tujuan secara real-time.

实现流程
Stream table diperbarui terus-menerus seiring kedatangan event. Dimension table menyimpan data referensi yang jarang berubah dan digunakan untuk memperkaya data stream, mirip dengan lookup table.

Prasyarat

Sebelum memulai, pastikan Anda telah memiliki:

  • Akses ke DTS console

  • Instans ApsaraDB RDS for MySQL sebagai sumber (berisi stream table dan dimension table)

  • Instans ApsaraDB RDS for MySQL sebagai tujuan

Peringatan

Baca bagian Prasyarat dan Peringatan pada topik "Configure an ETL task in DAG mode" sebelum melanjutkan.

Siapkan tabel sumber dan tujuan

Buat tabel-tabel berikut di instans ApsaraDB RDS for MySQL Anda.

Data transaksi real-time

CREATE TABLE test_orders (
    order_id    BIGINT        NOT NULL COMMENT 'Order ID',
    user_id     BIGINT        NOT NULL COMMENT 'User ID',
    product_id  BIGINT        NOT NULL COMMENT 'Product ID',
    total_price DECIMAL(15,2) NOT NULL COMMENT 'Total price',
    order_date  TIMESTAMP     NOT NULL COMMENT 'Order date',
    PRIMARY KEY (order_id)
);

Data dimensi bisnis

CREATE TABLE product (
    product_id    BIGINT        NOT NULL COMMENT 'Product ID',
    product_name  VARCHAR(20)            COMMENT 'Product name',
    product_price DECIMAL(15,2) NOT NULL COMMENT 'Unit price'
);

Tabel tujuan

CREATE TABLE test_orders_new (
    order_id      BIGINT        NOT NULL COMMENT 'Order ID',
    user_id       BIGINT        NOT NULL COMMENT 'User ID',
    product_id    BIGINT        NOT NULL COMMENT 'Product ID',
    total_price   DECIMAL(15,2) NOT NULL COMMENT 'Total price',
    order_date    TIMESTAMP     NOT NULL COMMENT 'Order date',
    product_id_2  BIGINT        NOT NULL COMMENT 'Product ID (from product table)',
    product_name  VARCHAR(20)            COMMENT 'Product name',
    product_price DECIMAL(15,2) NOT NULL COMMENT 'Unit price',
    PRIMARY KEY (order_id)
);
Tabel tujuan menggunakan product_id_2 untuk menyimpan ID produk dari tabel dimensi product. Hal ini menghindari konflik nama kolom dengan product_id dari stream table test_orders. Saat dua tabel yang digabung memiliki nama kolom yang sama, ubah salah satu nama kolom tersebut dalam skema tabel tujuan sebelum mengonfigurasi tugas ETL.

Konfigurasikan tugas ETL

Langkah-langkah berikut mengonfigurasi aliran data yang menggabungkan stream table dengan dimension table, menyaring hasilnya, lalu menuliskannya ke tujuan.

配置流程概览

Langkah 1: Konfigurasikan database sumber

  1. Masuk ke DTS console.DTS console

  2. Di panel navigasi kiri, klik ETL.

  3. Di pojok kiri atas halaman Streaming ETL, klik 新增数据流. Di kotak dialog Create Data Flow, masukkan nama di bidang Data Flow Name dan atur Development Method ke DAG.

  4. Klik OK.

  5. Konfigurasikan stream table:

    1. Dari panel kiri, seret node Input/Dimension Table MySQL ke kanvas.

    2. Klik Input/Dimension Table MySQL-1 di kanvas.

    3. Di tab Node Settings, konfigurasikan parameter berikut:

      ParameterDeskripsi
      Data Source NameDTS menghasilkan nama secara otomatis. Masukkan nama deskriptif agar mudah diidentifikasi.
      RegionPilih wilayah database sumber. Wilayah yang didukung: Tiongkok (Hangzhou), Tiongkok (Shanghai), Tiongkok (Qingdao), Tiongkok (Beijing), Tiongkok (Zhangjiakou), Tiongkok (Shenzhen), Tiongkok (Guangzhou), dan Tiongkok (Hong Kong).
      InstancesPilih instans database sumber. Untuk membuat instans baru, klik Create Instance. Lihat Databases supported by DMS untuk detail selengkapnya.
      Node TypePilih Stream Table. Stream table menerima pembaruan berkelanjutan dan dapat digabungkan dengan dimension table.
      Convert FormatMenentukan cara tabel dinamis dikodekan saat ditulis kembali ke stream. Upsert Stream: Operasi INSERT dan UPDATE dikodekan sebagai pesan upsert; operasi DELETE sebagai pesan delete. Memerlukan kunci unik (kunci komposit didukung). Append-Only Stream: Hanya operasi INSERT yang disertakan dalam stream output.
      Select Databases and TablesPilih database dan tabel yang akan ditransformasi.
    4. Di tab Output Fields, pilih kolom yang akan disertakan.

    5. Pada Time Attribute

      ParameterDeskripsi
      Event Time WatermarkPilih bidang timestamp yang merepresentasikan kapan setiap event dihasilkan (misalnya, order_date).
      Latency of Event Time WatermarkMasukkan penundaan maksimum yang dapat diterima untuk event yang datang tidak berurutan. ETL menunggu selama durasi ini untuk event yang terlambat sebelum membuangnya. Misalnya, jika data yang dihasilkan pada pukul 09.59 belum tiba pada pukul 10.00 ditambah latency yang dikonfigurasi, data tersebut akan dibuang.
      Processing TimeMasukkan nama kolom. ETL menyimpan waktu pemrosesan server dalam kolom ini. Gunakan processing time untuk temporal join yang selalu mengambil versi terbaru dari tabel dimensi.
    Jika ikon 配置源库信息_感叹号 tidak lagi muncul di sisi kanan node, stream table telah dikonfigurasi.
  6. Konfigurasikan dimension table:

    1. Dari panel kiri, seret node Input/Dimension Table MySQL lainnya ke kanvas.

    2. Klik Input/Dimension Table MySQL-2 di kanvas.

    3. Di tab Node Settings, konfigurasikan parameter berikut:

      ParameterDeskripsi
      Data Source NameDTS menghasilkan nama secara otomatis. Masukkan nama deskriptif agar mudah diidentifikasi.
      RegionPilih wilayah database sumber.
      InstancesPilih instans database sumber, atau klik Create Instance.
      Node TypePilih Dimension Table. Dimension table menyimpan data referensi yang jarang berubah dan digunakan untuk memperkaya data stream menjadi wide table.
      Select Databases and TablesPilih database dan tabel yang akan ditransformasi.
    4. Di tab Output Fields, pilih kolom yang akan disertakan.

    Jika ikon 配置源库信息_感叹号 tidak lagi muncul di sisi kanan node, dimension table telah dikonfigurasi.

Langkah 2: Konfigurasikan komponen Table Join

  1. Dari bagian Transform di panel kiri, seret node JOIN ke kanvas.

  2. Hubungkan node stream table ke Table Join-1: arahkan kursor ke node stream table, klik lingkaran kosong di tepi kanannya, lalu seret garis koneksi ke Table Join-1. Ulangi langkah ini untuk node dimension table.

  3. Klik Table Join-1 di kanvas untuk membuka pengaturannya.

  4. Di tab Node Settings, konfigurasikan parameter berikut:

    BagianParameterDeskripsi
    Conversion NameEnter Transformation NameDTS menghasilkan nama secara otomatis. Masukkan nama deskriptif.
    JOIN SettingsLeft Table in JOIN ClausePilih stream table sebagai tabel utama (kiri).
    Temporal Join Time AttributeMenentukan cara versi stream table dicocokkan dengan dimension table. Jika tidak diatur, join reguler akan dilakukan. Based on Event Time Watermark: mencocokkan versi dimension table pada saat event dihasilkan (gunakan dengan tabel versi). Based on Processing Time: selalu menggabungkan dengan versi terbaru dimension table (gunakan dengan tabel standar). Pada contoh ini, pilih Based on Processing Time.
    Select JOIN OperationPilih jenis join. Pada contoh ini, pilih Inner Join. Inner Join: hanya mengembalikan baris yang memiliki nilai yang cocok di kedua tabel. Left Join: mengembalikan semua baris dari stream table, ditambah baris yang cocok dari dimension table jika tersedia. Right Join: mengembalikan semua baris dari dimension table, ditambah baris yang cocok dari stream table jika tersedia.
    JOIN Condition+ Add ConditionKlik + Add Condition lalu pilih bidang join. Bidang di sebelah kiri tanda = milik tabel kiri; bidang di sebelah kanan milik tabel kanan.
  5. Di tab Output Fields, pilih kolom yang akan disertakan dalam wide table.

    Jika ikon 配置源库信息_感叹号 tidak lagi muncul di sisi kanan komponen Table Join-1, join telah dikonfigurasi.

Langkah 3: Konfigurasikan komponen Table Record Filter

  1. Dari bagian Transform di panel kiri, seret node Table Record Filter ke kanvas.

  2. Hubungkan Table Join-1 ke Table Record Filter-1: arahkan kursor ke Table Join-1, klik lingkaran kosong di tepi kanannya, lalu seret garis koneksi ke Table Record Filter-1.

  3. Klik Table Record Filter-1 di kanvas.

  4. Di tab Node Settings, masukkan nama di bidang Conversion Name.

  5. Di bidang WHERE Condition, tentukan kondisi filter menggunakan salah satu metode berikut:

    • Ketik kondisi secara langsung. Misalnya, masukkan total_price>3000.00 untuk hanya melewatkan pesanan dengan total harga di atas 3.000.

    • Klik bidang di bagian Input Fields dan operator di bagian Operator untuk membangun kondisi secara visual.

    Jika ikon 配置源库信息_感叹号 tidak lagi muncul di sisi kanan komponen, filter telah dikonfigurasi.

Langkah 4: Konfigurasikan database tujuan

  1. Dari bagian Output di panel kiri, seret node MySQL ke kanvas.

  2. Hubungkan Table Record Filter-1 ke Output MySQL-1: arahkan kursor ke Table Record Filter-1, klik lingkaran kosong di tepi kanannya, lalu seret garis koneksi ke Output MySQL-1.

  3. Klik Output MySQL-1 di kanvas.

  4. Di tab Node Settings, konfigurasikan parameter berikut:

    ParameterDeskripsi
    Data Source NameDTS menghasilkan nama secara otomatis. Masukkan nama deskriptif.
    RegionPilih wilayah database tujuan. Wilayah yang didukung: Tiongkok (Hangzhou), Tiongkok (Shanghai), Tiongkok (Qingdao), Tiongkok (Beijing), Tiongkok (Zhangjiakou), Tiongkok (Shenzhen), Tiongkok (Guangzhou), dan Tiongkok (Hong Kong).
    InstancesPilih instans database tujuan, atau klik Create Instance.
    Table MappingDi bagian Select Destination Table, klik tabel tujuan.
  5. Di tab Output Fields, pilih kolom yang akan dituliskan ke tabel tujuan.

    Jika ikon 配置源库信息_感叹号 tidak lagi muncul di sisi kanan komponen, database tujuan telah dikonfigurasi.

Langkah 5: Pemeriksaan Awal dan mulai tugas

  1. Klik Generate Flink SQL Validation. DTS menghasilkan pernyataan SQL Flink dan menjalankan pemeriksaan validasi.

  2. Setelah validasi selesai, klik View ETL Validation Details untuk meninjau SQL yang dihasilkan dan hasil pemeriksaan. Klik Close setelah selesai.

    Jika validasi gagal, perbaiki masalah yang tercantum dalam hasil lalu hasilkan ulang SQL Flink.
  3. Klik Next: Save Task Settings and Precheck. Tugas hanya dapat dimulai setelah Pemeriksaan Awal berhasil. Jika ada item yang gagal, klik View Details di samping item yang gagal, perbaiki masalahnya, lalu jalankan Pemeriksaan Awal lagi.

  4. Setelah Pemeriksaan Awal berhasil, klik Next: Purchase Instance.

  5. Di halaman Purchase Instance, atur parameter Instance Class dan Compute Units (CUs). Baca dan pilih Data Transmission Service (Pay-as-you-go) Service Terms dan Service Terms for Public Preview.

    Selama periode pratinjau publik, setiap pengguna dapat membuat hingga dua instans ETL tanpa biaya.
  6. Klik Buy and Start untuk menjalankan tugas ETL.

Hasil

Setelah tugas dimulai, DTS secara terus-menerus membaca event perubahan dari test_orders, menggabungkannya dengan dimension table product, menerapkan filter total_price > 3000.00, lalu menuliskan baris yang sesuai ke test_orders_new secara real-time.

Gambar 1. Tabel data transaksi real-time: test_orders目标表test_orders_new

Gambar 2. Tabel tujuan: test_orders_new业务数据表test_orders

What's next