All Products
Search
Document Center

Data Transmission Service:Konfigurasikan tugas ETL dalam mode Flink SQL

Last Updated:Mar 28, 2026
Penting

Fitur ini akan segera tidak tersedia dan hanya gratis untuk pengguna tertentu yang telah mengaktifkan fitur ini. Untuk mengonfigurasi ETL ke depannya, gunakan instans sinkronisasi data atau migrasi sebagai gantinya. Lihat Konfigurasikan ETL dalam tugas migrasi atau sinkronisasi data.

Saat Anda perlu mentransformasi data secara real-time saat mengalir dari database sumber ke tujuan—misalnya melakukan join dua tabel, memperkaya catatan dengan data lookup, atau menyaring baris—ETL streaming DTS memungkinkan Anda mendefinisikan logika tersebut dalam SQL. Mode Flink SQL menyediakan ekspresivitas SQL penuh, termasuk pernyataan yang tidak tersedia dalam mode DAG, tanpa memerlukan pengetahuan tentang detail internal pemrosesan aliran.

Topik ini menjelaskan cara mengonfigurasi tugas ETL streaming dalam mode Flink SQL.

Sebelum memulai

Sebelum mengonfigurasi tugas, pastikan bahwa:

  • Tugas ETL berada di salah satu wilayah berikut: Tiongkok (Hangzhou), Tiongkok (Shanghai), Tiongkok (Qingdao), Tiongkok (Beijing), Tiongkok (Zhangjiakou), Tiongkok (Shenzhen), Tiongkok (Guangzhou), atau Tiongkok (Hong Kong).

  • Database sumber merupakan salah satu tipe berikut: MySQL, PolarDB for MySQL, Oracle, PostgreSQL, iSeries DB2 (AS/400), Db2 for LUW, PolarDB-X 1.0, PolarDB for PostgreSQL, MariaDB, PolarDB for Oracle, SQL Server, atau PolarDB-X 2.0.

  • Database tujuan merupakan salah satu tipe berikut: MySQL, PolarDB for MySQL, Oracle, AnalyticDB for MySQL V3.0, PolarDB for PostgreSQL, PostgreSQL, Db2 for LUW, iSeries DB2 (AS/400), AnalyticDB for PostgreSQL, SQL Server, MariaDB, PolarDB-X 1.0, PolarDB for Oracle, atau Tablestore.

  • Database sumber dan tujuan berada di wilayah yang sama dan dalam Akun Alibaba Cloud yang sama.

  • Semua tabel aliran termasuk dalam instans yang sama.

  • Semua nama database dan nama tabel bersifat unik.

  • Skema target telah dibuat di database tujuan. Fitur ETL tidak mendukung migrasi skema, sehingga Anda harus membuat tabel tujuan secara manual sebelum memulai tugas. Misalnya, untuk melakukan join Tabel A (Field 1, Field 2, Field 3) dan Tabel B (Field 2, Field 3, Field 4) menjadi hasil yang berisi Field 2 dan Field 3, buat terlebih dahulu Tabel C dengan bidang-bidang tersebut di database tujuan.

  • Instans sumber dan tujuan telah didaftarkan di Data Management (DMS). Lihat Instance Management.

  • Tugas ETL hanya mendukung data inkremental. Sinkronisasi data penuh tidak didukung.

Konsep utama

Jenis tabel

TypeRoleDescription
Stream tableSourceDiperbarui secara real time. Dapat di-join dengan tabel dimensi untuk memperkaya data.
Dimension tableSource (lookup)Statis atau berubah perlahan. Digunakan untuk memperkaya data streaming guna analisis.
Output (sink)DestinationTabel tempat data hasil transformasi ditulis.

Jenis aliran

Saat Anda mendefinisikan tabel aliran dengan CREATE TABLE, atur streamType untuk mengontrol bagaimana perubahan pada tabel dinamis dikodekan saat ditulis ke tujuan:

ValueSupported operationsWhen to useNotes
appendINSERT onlyData hanya disisipkan, tidak pernah diperbarui atau dihapusTujuan hanya menerima baris baru.
upsertINSERT, UPDATE, DELETEData dapat disisipkan, diperbarui, atau dihapusMemerlukan kunci unik (bisa komposit). INSERT dan UPDATE dikodekan sebagai pesan upsert; DELETE sebagai pesan delete.

Konfigurasikan tugas ETL streaming dalam mode Flink SQL

Konfigurasi terdiri dari lima langkah:

  1. Buat aliran data dan pilih mode Flink SQL

  2. Tambahkan database sumber dan tujuan

  3. Tulis pernyataan Flink SQL

  4. Validasi dan publikasikan

  5. Beli instans dan mulai tugas

Langkah 1: Buat aliran data

  1. Masuk ke Konsol DTS.Konsol DTS

  2. Di panel navigasi sebelah kiri, klik ETL.

  3. Klik 新增数据流. Di kotak dialog Create Data Flow, masukkan nama di bidang Data Flow Name dan pilih FlinkSQL sebagai Development Method.

  4. Klik OK.

Langkah 2: Tambahkan database sumber dan tujuan

Di halaman Streaming ETL, konfigurasikan database sumber dan tujuan di bagian Data Flow Information.

ParameterDescription
RegionWilayah tempat database berada.
TypePeran entri database ini. Pilih Stream Table untuk sumber real-time, Dimension Table untuk tabel lookup statis, atau Output untuk tujuan.
Database typeJenis database sumber atau tujuan.
InstanceNama atau ID instans. Instans harus terdaftar di Data Management (DMS).
DatabaseDatabase yang berisi tabel yang ingin Anda transformasikan.
Physical tableTabel sumber atau tujuan.
Alias of physical tableNama yang mudah dibaca untuk tabel tersebut. Alias ini dirujuk dalam pernyataan Flink SQL Anda untuk menghubungkan setiap deklarasi CREATE TABLE dengan tabel fisik yang dipilih di sini.

Langkah 3: Tulis pernyataan Flink SQL

Di editor skrip pada halaman Streaming ETL, tulis pernyataan SQL untuk mendefinisikan logika ETL Anda.

Penting

Setiap pernyataan SQL harus diakhiri dengan titik koma (;).

Skrip Flink SQL lengkap memerlukan tiga jenis pernyataan:

StatementPurpose
CREATE TABLEMendefinisikan tabel sumber dan tujuan, termasuk parameter ETL-nya dalam klausa WITH.
CREATE VIEWMenggambarkan logika transformasi data, seperti JOIN antara tabel aliran dan tabel dimensi.
INSERT INTOMenulis data hasil transformasi dari view ke tabel tujuan.

Parameter dalam klausa WITH

Setiap pernyataan CREATE TABLE menggunakan klausa WITH untuk mengonfigurasi perilaku ETL tabel tersebut:

ParameterApplies toDescription
streamTypeHanya untuk stream tableCara perubahan dikodekan saat menulis ke tujuan. Nilai yang valid: append, upsert. Lihat Jenis aliran.
aliasSemua jenis tabelHarus persis sesuai dengan nilai Alias of physical table yang ditetapkan di Langkah 2. Nilai ini menghubungkan deklarasi CREATE TABLE dengan tabel fisik yang Anda pilih.
vertexTypeSemua jenis tabelPeran tabel tersebut. Nilai yang valid: stream (stream table), lookup (dimension table), sink (tabel tujuan).

Contoh: join stream table dengan dimension table

Skrip berikut melakukan join stream table test_orders dengan dimension table product dan memasukkan hasilnya ke tabel tujuan test_orders_new.

CREATE TABLE `etltest_test_orders` (
  `order_id` BIGINT,
  `user_id` BIGINT,
  `product_id` BIGINT,
  `total_price` DECIMAL(15,2),
  `order_date` TIMESTAMP(6),
  `dts_etl_schema_db_table` STRING,
  `dts_etl_db_log_time` BIGINT,
  `pt` AS PROCTIME(),
  WATERMARK FOR `order_date` AS `order_date` - INTERVAL '5' SECOND
) WITH (
  'streamType'= 'append',
  'alias'= 'test_orders',
  'vertexType'= 'stream'
);
CREATE TABLE `etltest_product` (
  `product_id` BIGINT,
  `product_name` STRING,
  `product_price` DECIMAL(15,2)
) WITH (
  'alias'= 'product',
  'vertexType'= 'lookup'
);
CREATE VIEW `etltest_test_orders_JOIN_etltest_product` AS
SELECT
  `etltest_test_orders`.`order_id` AS `order_id`,
  `etltest_test_orders`.`user_id` AS `user_id`,
  `etltest_test_orders`.`product_id` AS `product_id`,
  `etltest_test_orders`.`total_price` AS `total_price`,
  `etltest_test_orders`.`order_date` AS `order_date`,
  `etltest_test_orders`.`dts_etl_schema_db_table` AS `dts_etl_schema_db_table`,
  `etltest_test_orders`.`dts_etl_db_log_time` AS `dts_etl_db_log_time`,
  `etltest_product`.`product_id` AS `product_id_0001011101`,
  `etltest_product`.`product_name` AS `product_name`,
  `etltest_product`.`product_price` AS `product_price`
FROM `etltest_test_orders` LEFT JOIN `etltest_product` FOR SYSTEM_TIME AS OF `etltest_test_orders`.`pt` ON etltest_test_orders.product_id = etltest_product.product_id
;
CREATE TABLE `test_orders_new` (
  `order_id` BIGINT,
  `user_id` BIGINT,
  `product_id` BIGINT,
  `total_price` DECIMAL(15,2),
  `order_date` TIMESTAMP(6),
  `product_name` STRING,
  `product_price` DECIMAL(15,2)
) WITH (
  'alias'= 'test_orders_new',
  'vertexType'= 'sink'
);
INSERT INTO `test_orders_new` (
  `order_id`,
  `user_id`,
  `product_id`,
  `total_price`,
  `order_date`,
  `product_name`,
  `product_price`
)
SELECT
  `etltest_test_orders_JOIN_etltest_product`.`order_id`,
  `etltest_test_orders_JOIN_etltest_product`.`user_id`,
  `etltest_test_orders_JOIN_etltest_product`.`product_id`,
  `etltest_test_orders_JOIN_etltest_product`.`total_price`,
  `etltest_test_orders_JOIN_etltest_product`.`order_date`,
  `etltest_test_orders_JOIN_etltest_product`.`product_name`,
  `etltest_test_orders_JOIN_etltest_product`.`product_price`
FROM `etltest_test_orders_JOIN_etltest_product`;

Langkah 4: Validasi dan publikasikan

  1. Klik Generate Flink SQL Validation untuk memvalidasi pernyataan SQL Anda.

    • Jika validasi berhasil, klik ETL校验成功 untuk meninjau detailnya.

    • Jika validasi gagal, klik ETL校验成功 untuk melihat detail error, perbaiki pernyataan SQL, lalu jalankan validasi lagi.

    Catatan

    Mengklik Publish juga memicu validasi SQL dan menjalankan Pemeriksaan Awal dalam satu langkah.

  2. Setelah validasi berhasil, klik Publish untuk menjalankan Pemeriksaan Awal.

  3. Tunggu hingga tingkat keberhasilan Pemeriksaan Awal mencapai 100%.

    Catatan

    Jika Pemeriksaan Awal gagal, klik View Details di samping setiap item yang gagal, selesaikan masalahnya, lalu jalankan Pemeriksaan Awal lagi.

Langkah 5: Beli instans dan mulai tugas

  1. Di halaman Purchase Instance, pilih Instance Class dan konfirmasi Compute Units (CUs). Nilai CU tetap 2 selama periode pratinjau publik.

  2. Baca dan centang kotak untuk menyetujui Data Transmission Service (Pay-as-you-go) Service Terms dan Service Terms for Public Preview.

  3. Klik Buy and Start untuk memulai tugas ETL.

Catatan

Selama pratinjau publik, setiap pengguna dapat membuat dua instans ETL secara gratis.

Topik terkait