Fitur ini akan segera tidak tersedia dan hanya gratis untuk pengguna tertentu yang telah mengaktifkan fitur ini. Untuk mengonfigurasi ETL ke depannya, gunakan instans sinkronisasi data atau migrasi sebagai gantinya. Lihat Konfigurasikan ETL dalam tugas migrasi atau sinkronisasi data.
Saat Anda perlu mentransformasi data secara real-time saat mengalir dari database sumber ke tujuan—misalnya melakukan join dua tabel, memperkaya catatan dengan data lookup, atau menyaring baris—ETL streaming DTS memungkinkan Anda mendefinisikan logika tersebut dalam SQL. Mode Flink SQL menyediakan ekspresivitas SQL penuh, termasuk pernyataan yang tidak tersedia dalam mode DAG, tanpa memerlukan pengetahuan tentang detail internal pemrosesan aliran.
Topik ini menjelaskan cara mengonfigurasi tugas ETL streaming dalam mode Flink SQL.
Sebelum memulai
Sebelum mengonfigurasi tugas, pastikan bahwa:
Tugas ETL berada di salah satu wilayah berikut: Tiongkok (Hangzhou), Tiongkok (Shanghai), Tiongkok (Qingdao), Tiongkok (Beijing), Tiongkok (Zhangjiakou), Tiongkok (Shenzhen), Tiongkok (Guangzhou), atau Tiongkok (Hong Kong).
Database sumber merupakan salah satu tipe berikut: MySQL, PolarDB for MySQL, Oracle, PostgreSQL, iSeries DB2 (AS/400), Db2 for LUW, PolarDB-X 1.0, PolarDB for PostgreSQL, MariaDB, PolarDB for Oracle, SQL Server, atau PolarDB-X 2.0.
Database tujuan merupakan salah satu tipe berikut: MySQL, PolarDB for MySQL, Oracle, AnalyticDB for MySQL V3.0, PolarDB for PostgreSQL, PostgreSQL, Db2 for LUW, iSeries DB2 (AS/400), AnalyticDB for PostgreSQL, SQL Server, MariaDB, PolarDB-X 1.0, PolarDB for Oracle, atau Tablestore.
Database sumber dan tujuan berada di wilayah yang sama dan dalam Akun Alibaba Cloud yang sama.
Semua tabel aliran termasuk dalam instans yang sama.
Semua nama database dan nama tabel bersifat unik.
Skema target telah dibuat di database tujuan. Fitur ETL tidak mendukung migrasi skema, sehingga Anda harus membuat tabel tujuan secara manual sebelum memulai tugas. Misalnya, untuk melakukan join Tabel A (Field 1, Field 2, Field 3) dan Tabel B (Field 2, Field 3, Field 4) menjadi hasil yang berisi Field 2 dan Field 3, buat terlebih dahulu Tabel C dengan bidang-bidang tersebut di database tujuan.
Instans sumber dan tujuan telah didaftarkan di Data Management (DMS). Lihat Instance Management.
Tugas ETL hanya mendukung data inkremental. Sinkronisasi data penuh tidak didukung.
Konsep utama
Jenis tabel
| Type | Role | Description |
|---|---|---|
| Stream table | Source | Diperbarui secara real time. Dapat di-join dengan tabel dimensi untuk memperkaya data. |
| Dimension table | Source (lookup) | Statis atau berubah perlahan. Digunakan untuk memperkaya data streaming guna analisis. |
| Output (sink) | Destination | Tabel tempat data hasil transformasi ditulis. |
Jenis aliran
Saat Anda mendefinisikan tabel aliran dengan CREATE TABLE, atur streamType untuk mengontrol bagaimana perubahan pada tabel dinamis dikodekan saat ditulis ke tujuan:
| Value | Supported operations | When to use | Notes |
|---|---|---|---|
append | INSERT only | Data hanya disisipkan, tidak pernah diperbarui atau dihapus | Tujuan hanya menerima baris baru. |
upsert | INSERT, UPDATE, DELETE | Data dapat disisipkan, diperbarui, atau dihapus | Memerlukan kunci unik (bisa komposit). INSERT dan UPDATE dikodekan sebagai pesan upsert; DELETE sebagai pesan delete. |
Konfigurasikan tugas ETL streaming dalam mode Flink SQL
Konfigurasi terdiri dari lima langkah:
Buat aliran data dan pilih mode Flink SQL
Tambahkan database sumber dan tujuan
Tulis pernyataan Flink SQL
Validasi dan publikasikan
Beli instans dan mulai tugas
Langkah 1: Buat aliran data
Masuk ke Konsol DTS.Konsol DTS
Di panel navigasi sebelah kiri, klik ETL.
Klik
. Di kotak dialog Create Data Flow, masukkan nama di bidang Data Flow Name dan pilih FlinkSQL sebagai Development Method.Klik OK.
Langkah 2: Tambahkan database sumber dan tujuan
Di halaman Streaming ETL, konfigurasikan database sumber dan tujuan di bagian Data Flow Information.
| Parameter | Description |
|---|---|
| Region | Wilayah tempat database berada. |
| Type | Peran entri database ini. Pilih Stream Table untuk sumber real-time, Dimension Table untuk tabel lookup statis, atau Output untuk tujuan. |
| Database type | Jenis database sumber atau tujuan. |
| Instance | Nama atau ID instans. Instans harus terdaftar di Data Management (DMS). |
| Database | Database yang berisi tabel yang ingin Anda transformasikan. |
| Physical table | Tabel sumber atau tujuan. |
| Alias of physical table | Nama yang mudah dibaca untuk tabel tersebut. Alias ini dirujuk dalam pernyataan Flink SQL Anda untuk menghubungkan setiap deklarasi CREATE TABLE dengan tabel fisik yang dipilih di sini. |
Langkah 3: Tulis pernyataan Flink SQL
Di editor skrip pada halaman Streaming ETL, tulis pernyataan SQL untuk mendefinisikan logika ETL Anda.
Setiap pernyataan SQL harus diakhiri dengan titik koma (;).
Skrip Flink SQL lengkap memerlukan tiga jenis pernyataan:
| Statement | Purpose |
|---|---|
CREATE TABLE | Mendefinisikan tabel sumber dan tujuan, termasuk parameter ETL-nya dalam klausa WITH. |
CREATE VIEW | Menggambarkan logika transformasi data, seperti JOIN antara tabel aliran dan tabel dimensi. |
INSERT INTO | Menulis data hasil transformasi dari view ke tabel tujuan. |
Parameter dalam klausa WITH
Setiap pernyataan CREATE TABLE menggunakan klausa WITH untuk mengonfigurasi perilaku ETL tabel tersebut:
| Parameter | Applies to | Description |
|---|---|---|
streamType | Hanya untuk stream table | Cara perubahan dikodekan saat menulis ke tujuan. Nilai yang valid: append, upsert. Lihat Jenis aliran. |
alias | Semua jenis tabel | Harus persis sesuai dengan nilai Alias of physical table yang ditetapkan di Langkah 2. Nilai ini menghubungkan deklarasi CREATE TABLE dengan tabel fisik yang Anda pilih. |
vertexType | Semua jenis tabel | Peran tabel tersebut. Nilai yang valid: stream (stream table), lookup (dimension table), sink (tabel tujuan). |
Contoh: join stream table dengan dimension table
Skrip berikut melakukan join stream table test_orders dengan dimension table product dan memasukkan hasilnya ke tabel tujuan test_orders_new.
CREATE TABLE `etltest_test_orders` (
`order_id` BIGINT,
`user_id` BIGINT,
`product_id` BIGINT,
`total_price` DECIMAL(15,2),
`order_date` TIMESTAMP(6),
`dts_etl_schema_db_table` STRING,
`dts_etl_db_log_time` BIGINT,
`pt` AS PROCTIME(),
WATERMARK FOR `order_date` AS `order_date` - INTERVAL '5' SECOND
) WITH (
'streamType'= 'append',
'alias'= 'test_orders',
'vertexType'= 'stream'
);
CREATE TABLE `etltest_product` (
`product_id` BIGINT,
`product_name` STRING,
`product_price` DECIMAL(15,2)
) WITH (
'alias'= 'product',
'vertexType'= 'lookup'
);
CREATE VIEW `etltest_test_orders_JOIN_etltest_product` AS
SELECT
`etltest_test_orders`.`order_id` AS `order_id`,
`etltest_test_orders`.`user_id` AS `user_id`,
`etltest_test_orders`.`product_id` AS `product_id`,
`etltest_test_orders`.`total_price` AS `total_price`,
`etltest_test_orders`.`order_date` AS `order_date`,
`etltest_test_orders`.`dts_etl_schema_db_table` AS `dts_etl_schema_db_table`,
`etltest_test_orders`.`dts_etl_db_log_time` AS `dts_etl_db_log_time`,
`etltest_product`.`product_id` AS `product_id_0001011101`,
`etltest_product`.`product_name` AS `product_name`,
`etltest_product`.`product_price` AS `product_price`
FROM `etltest_test_orders` LEFT JOIN `etltest_product` FOR SYSTEM_TIME AS OF `etltest_test_orders`.`pt` ON etltest_test_orders.product_id = etltest_product.product_id
;
CREATE TABLE `test_orders_new` (
`order_id` BIGINT,
`user_id` BIGINT,
`product_id` BIGINT,
`total_price` DECIMAL(15,2),
`order_date` TIMESTAMP(6),
`product_name` STRING,
`product_price` DECIMAL(15,2)
) WITH (
'alias'= 'test_orders_new',
'vertexType'= 'sink'
);
INSERT INTO `test_orders_new` (
`order_id`,
`user_id`,
`product_id`,
`total_price`,
`order_date`,
`product_name`,
`product_price`
)
SELECT
`etltest_test_orders_JOIN_etltest_product`.`order_id`,
`etltest_test_orders_JOIN_etltest_product`.`user_id`,
`etltest_test_orders_JOIN_etltest_product`.`product_id`,
`etltest_test_orders_JOIN_etltest_product`.`total_price`,
`etltest_test_orders_JOIN_etltest_product`.`order_date`,
`etltest_test_orders_JOIN_etltest_product`.`product_name`,
`etltest_test_orders_JOIN_etltest_product`.`product_price`
FROM `etltest_test_orders_JOIN_etltest_product`;Langkah 4: Validasi dan publikasikan
Klik Generate Flink SQL Validation untuk memvalidasi pernyataan SQL Anda.
Jika validasi berhasil, klik
untuk meninjau detailnya.Jika validasi gagal, klik
untuk melihat detail error, perbaiki pernyataan SQL, lalu jalankan validasi lagi.
CatatanMengklik Publish juga memicu validasi SQL dan menjalankan Pemeriksaan Awal dalam satu langkah.
Setelah validasi berhasil, klik Publish untuk menjalankan Pemeriksaan Awal.
Tunggu hingga tingkat keberhasilan Pemeriksaan Awal mencapai 100%.
CatatanJika Pemeriksaan Awal gagal, klik View Details di samping setiap item yang gagal, selesaikan masalahnya, lalu jalankan Pemeriksaan Awal lagi.
Langkah 5: Beli instans dan mulai tugas
Di halaman Purchase Instance, pilih Instance Class dan konfirmasi Compute Units (CUs). Nilai CU tetap 2 selama periode pratinjau publik.
Baca dan centang kotak untuk menyetujui Data Transmission Service (Pay-as-you-go) Service Terms dan Service Terms for Public Preview.
Klik Buy and Start untuk memulai tugas ETL.
Selama pratinjau publik, setiap pengguna dapat membuat dua instans ETL secara gratis.