Topik ini menjelaskan cara menggunakan layanan Flink dalam kluster EMR Dataflow untuk menyinkronkan data dari MySQL ke EMR Serverless StarRocks menggunakan pernyataan CREATE TABLE AS (CTAS).
Informasi latar belakang
Anda dapat menggunakan pernyataan CTAS atau CREATE DATABASE AS (CDAS) untuk menyinkronkan data dari MySQL ke EMR Serverless StarRocks. Pernyataan CTAS menyinkronkan skema dan data dari satu tabel, sedangkan pernyataan CDAS menyinkronkan skema dan data dari seluruh database atau beberapa tabel dalam database yang sama. Topik ini berfokus pada penggunaan pernyataan CTAS. Untuk informasi lebih lanjut tentang CDAS, lihat Pengenalan CDAS.
Dengan menggunakan pernyataan CTAS, Anda dapat secara otomatis membuat tabel di StarRocks dengan skema yang identik dengan tabel di MySQL serta menyinkronkan data. Selain itu, perubahan skema tabel sumber disinkronkan secara real-time ke tabel tujuan, meningkatkan efisiensi pembuatan tabel di penyimpanan tujuan dan pemeliharaan perubahan skema.
Saat mengeksekusi pernyataan CTAS, Flink melakukan operasi berikut:
Memeriksa apakah tabel tujuan ada di penyimpanan tujuan.
Jika tabel tujuan tidak ada, tabel tersebut dibuat di penyimpanan tujuan berdasarkan katalog tempat tabel tersebut termasuk. Tabel tujuan memiliki skema yang sama dengan tabel sumber.
Jika tabel tujuan sudah ada, Flink melewati langkah pembuatan tabel. Namun, jika skema tabel tujuan berbeda dari skema tabel sumber, pesan kesalahan akan dikembalikan.
Menjalankan pekerjaan sinkronisasi data. Flink menyinkronkan data dan perubahan skema tabel sumber ke tabel tujuan.
Kebijakan sinkronisasi perubahan skema menggunakan pernyataan CTAS untuk menyinkronkan data secara real-time dan perubahan skema tabel sumber ke tabel tujuan.
Perubahan skema mencakup pembuatan tabel dan modifikasi skema setelah tabel dibuat.
Perubahan skema berikut didukung:
Menambahkan kolom nullable: Pernyataan secara otomatis menambahkan kolom terkait ke akhir skema tabel tujuan dan menyinkronkan data ke kolom yang ditambahkan.
Menghapus kolom nullable: Pernyataan secara otomatis mengisi kolom nullable tabel tujuan dengan nilai null alih-alih menghapus kolom dari tabel.
Mengganti nama kolom: Pernyataan menambahkan kolom yang diganti namanya ke akhir tabel tujuan dan mengisi kolom sebelum penggantian nama dengan nilai null.
Sebagai contoh, jika nama kolom col_a di tabel sumber diubah menjadi col_b, kolom col_b ditambahkan ke akhir tabel tujuan dan kolom col_a secara otomatis diisi dengan nilai null.
Perubahan skema berikut tidak didukung:
Perubahan tipe data.
Sebagai contoh, data dalam kolom diubah dari tipe VARCHAR menjadi tipe BIGINT, atau properti kolom diubah dari NOT NULL menjadi NULLABLE.
Perubahan kendala, seperti kunci utama atau indeks.
Penambahan atau penghapusan kolom non-nullable.
- Jika skema tabel sumber mengalami salah satu perubahan di atas, Anda harus menghapus tabel tujuan dan memulai ulang pekerjaan yang mengeksekusi pernyataan CTAS. Dengan cara ini, tabel tujuan dibuat kembali dan data historis disinkronkan ulang ke tabel tujuan.
- Pernyataan CTAS tidak mengidentifikasi jenis pernyataan DDL, tetapi membandingkan perbedaan skema antara dua catatan data sebelum dan sesudah skema diubah. Oleh karena itu, jika Anda menghapus kolom dan kemudian menambahkan kolom lagi tanpa adanya perubahan data antara dua pernyataan DDL, pernyataan CTAS menganggap bahwa tidak ada perubahan skema yang terjadi. Demikian pula, pernyataan CTAS tidak memicu sinkronisasi perubahan skema bahkan jika Anda menambahkan kolom ke tabel sumber. Pernyataan hanya mengidentifikasi perubahan skema ketika data berubah di tabel sumber. Dalam hal ini, pernyataan menyinkronkan perubahan skema ke tabel tujuan.
- Untuk informasi lebih lanjut tentang tipe bidang yang didukung oleh pernyataan CTAS, lihat Muat data secara kontinu dari Apache Flink®.
Prasyarat
Kluster Dataflow dibuat di konsol baru, dan layanan Flink dipilih. Untuk informasi lebih lanjut, lihat Buat kluster.
Instans EMR Serverless StarRocks dibuat. Untuk informasi lebih lanjut, lihat Buat instans.
Instans ApsaraDB RDS for MySQL dibuat. Untuk informasi lebih lanjut, lihat Langkah 1: Buat instans ApsaraDB RDS for MySQL dan konfigurasikan database.
Topik ini menggunakan MySQL 5.7 dan kluster Dataflow EMR-3.42.0 sebagai contoh.
Batasan
Kluster Dataflow, instans StarRocks, dan instans ApsaraDB RDS for MySQL harus diterapkan di virtual private cloud (VPC) yang sama.
Kluster Dataflow dan instans StarRocks harus dapat diakses melalui Internet.
Versi mesin instans ApsaraDB RDS for MySQL harus 5.7 atau lebih baru.
Kluster Dataflow harus versi EMR-3.42.0 atau lebih baru atau EMR-5.8.0 atau lebih baru.
Langkah 1: Persiapkan data uji
Buat database uji dan akun uji. Untuk informasi lebih lanjut, lihat Langkah 1: Buat instans ApsaraDB RDS for MySQL dan konfigurasikan database.
Setelah membuat database uji dan akun, berikan izin baca dan tulis kepada akun tersebut.
CatatanDalam topik ini, nama database adalah test_cdc dan nama akun adalah emr_test.
Hubungkan ke instans ApsaraDB RDS for MySQL menggunakan akun uji. Untuk informasi lebih lanjut, lihat Langkah 2: Hubungkan ke instans ApsaraDB RDS for MySQL.
Jalankan perintah berikut untuk membuat tabel:
use test_cdc; CREATE TABLE IF NOT EXISTS `runoob_tbl`( `runoob_id` INT UNSIGNED AUTO_INCREMENT, `runoob_title` VARCHAR(100) NOT NULL, `runoob_author` VARCHAR(40) NOT NULL, `submission_date` DATE, `add_col` int DEFAULT NULL, PRIMARY KEY ( `runoob_id` ) )ENGINE=InnoDB DEFAULT CHARSET=utf8; INSERT INTO test_cdc.`runoob_tbl` (`runoob_id`,`runoob_title`,`runoob_author`,`submission_date`,`add_col`) values (18,'first','tom','2022-06-22 17:13:44',3)Masuk dan hubungkan ke instans EMR Serverless StarRocks. Untuk informasi lebih lanjut, lihat Hubungkan ke instans StarRocks menggunakan klien MySQL.
Jalankan perintah berikut untuk membuat database test_cdc, buat pengguna super administrator bernama test (dengan kata sandi contoh 1qaz!QAZ), atau buat pengguna biasa bernama test dan berikan izin pada database kepada pengguna tersebut. Untuk informasi lebih lanjut, lihat Kelola pengguna.
CREATE DATABASE test_cdc; CREATE USER 'test' IDENTIFIED by '1qaz!QAZ'; GRANT ALL on test_cdc to test;
Langkah 2: Unggah konektor kustom
Unggah konektor kustom untuk koneksi Flink, StarRocks, dan ApsaraDB RDS for MySQL.
Masuk ke kluster Dataflow menggunakan SSH. Untuk informasi lebih lanjut, lihat Masuk ke kluster.
Unduh flink-connector-starrocks-1.2.2_flink-1.13_2.11.jar dan ververica-connector-mysql-1.13-vvr-4.0.12-1-20220330.065158-3-jar-with-dependencies.jar, lalu unggah mereka ke direktori /opt/apps/FLINK/flink-current/lib kluster Dataflow.
Langkah 3: Eksekusi pernyataan CTAS
Kirim pekerjaan dalam mode sesi.
Masuk ke kluster Dataflow menggunakan SSH. Untuk informasi lebih lanjut, lihat Masuk ke kluster.
Jalankan perintah berikut untuk masuk ke direktori /opt/apps/FLINK/flink-current:
cd /opt/apps/FLINK/flink-currentJalankan perintah berikut untuk memulai sesi YARN:
./bin/yarn-session.sh --detachedJika perintah berhasil dieksekusi,
application_XXXX_YYdikembalikan dalam output. Ini adalah sessionId yang Anda perlukan untuk masuk ke klien SQL.
Jalankan perintah berikut untuk membuka klien SQL:
./bin/sql-client.sh -s <application_XXXX_YY>CatatanGanti
<application_XXXX_YY>dengan sessionId yang Anda peroleh di langkah sebelumnya.
Buat katalog untuk MySQL dan StarRocks.
CREATE CATALOG sr WITH ( 'type' = 'starrocks', 'endpoint' = 'fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030', 'username' = 'test', 'password' = '1qaz!QAZ', 'dbname' = 'test_cdc' ); CREATE CATALOG mysql WITH ( 'type' = 'mysql', 'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = 'emr_test', 'password' = '123456', 'default-database' = 'test_cdc' );Tabel berikut menjelaskan parameter. Anda dapat memodifikasi parameter berdasarkan kebutuhan bisnis Anda.
Tabel 1. Parameter Katalog StarRocks
Parameter
Deskripsi
type
Tipe katalog. Atur nilainya menjadi starrocks.
endpoint
Titik akhir internal dan port query node FE. Formatnya adalah
Titik akhir internal node FE instans EMR Serverless StarRocks:9030. Contoh: fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030.CatatanUntuk informasi tentang cara mendapatkan titik akhir internal node FE instans EMR Serverless StarRocks, lihat Lihat daftar instans dan detailnya.
username
Nama pengguna yang digunakan untuk mengakses database StarRocks.
Masukkan nama pengguna yang Anda buat di Langkah 1: Persiapkan data uji. Dalam contoh ini, test digunakan.
password
Kata sandi yang digunakan untuk mengakses database StarRocks.
Masukkan kata sandi yang Anda atur untuk akun di Langkah 1: Persiapkan data uji. Dalam contoh ini, 1qaz!QAZ digunakan.
dbname
Nama database StarRocks.
Masukkan nama database yang Anda buat di Langkah 1: Persiapkan data uji. Dalam contoh ini, test_cdc digunakan.
Tabel 2. Parameter Katalog MySQL
Parameter
Deskripsi
type
Tipe katalog. Atur nilainya menjadi mysql.
hostname
Titik akhir internal instans ApsaraDB RDS for MySQL.
Anda dapat menyalin titik akhir internal pada halaman Koneksi Database instans ApsaraDB RDS for MySQL di konsol ApsaraDB RDS. Contoh: rm-bp1nu0c46fn9k****.mysql.rds.aliyuncs.com.
port
Nomor port database MySQL. Nilai default: 3306.
username
Nama pengguna yang digunakan untuk mengakses database MySQL.
Masukkan nama pengguna akun yang Anda buat di Langkah 1: Persiapkan data uji. Dalam contoh ini, emr_test digunakan.
password
Kata sandi yang digunakan untuk mengakses database MySQL.
Masukkan kata sandi akun yang Anda buat di Langkah 1: Persiapkan data uji. Dalam contoh ini, 123456 digunakan.
default-database
Nama database MySQL default.
Masukkan nama database yang Anda buat di Langkah 1: Persiapkan data uji. Dalam contoh ini, test_cdc digunakan.
Eksekusi pernyataan CTAS di katalog StarRocks.
Anda dapat menggunakan salah satu dari tiga metode berikut untuk mengeksekusi pernyataan CTAS:
Semantik setidaknya sekali: Gunakan parameter sink.buffer-flush.interval-ms untuk mengonfigurasi interval penulisan data ke StarRocks. Keuntungannya adalah interval penulisan pendek dan lebih sedikit memori yang digunakan.
use CATALOG sr; CREATE TABLE IF NOT EXISTS runoob_tbl1 with ( 'starrocks.create.table.properties'=' engine = olap primary key(runoob_id) distributed by hash(runoob_id ) buckets 8', 'database-name'='test_cdc', 'jdbc-url'='jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030', 'load-url'='fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030', 'table-name'='runoob_tbl_sr', 'username'='test', 'password' = '1qaz!QAZ', 'sink.buffer-flush.interval-ms' = '5000', 'sink.properties.row_delimiter' = '\x02', 'sink.properties.column_separator' = '\x01' ) as table mysql.test_cdc.runoob_tbl /*+ OPTIONS ( 'connector' = 'mysql-cdc', 'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = 'test', 'password' = '123456', 'database-name' = 'test_cdc', 'table-name' = 'runoob_tbl' )*/;Semantik exactly-once: Tentukan interval di mana checkpoint dijadwalkan secara berkala. Keuntungannya adalah data tidak hilang atau terduplikasi saat terjadi kesalahan. Kerugiannya adalah interval checkpoint menentukan kapan data dapat terlihat. Untuk informasi selengkapnya, lihat Checkpointing.
set 'execution.checkpointing.interval' = '1 min'; set 'execution.checkpointing.mode' = 'EXACTLY_ONCE'; set 'execution.checkpointing.timeout' = '10 min'; use CATALOG sr; CREATE TABLE IF NOT EXISTS runoob_tbl1 with ( 'starrocks.create.table.properties'=' engine = olap primary key(runoob_id) distributed by hash(runoob_id ) buckets 8', 'database-name'='test_cdc', 'jdbc-url'='jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030', 'load-url'='fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030', 'table-name'='runoob_tbl', 'username'='test', 'password' = '1qaz!QAZ', 'sink.semantic' = 'exactly-once', 'sink.properties.row_delimiter' = '\x02', 'sink.properties.column_separator' = '\x01' ) as table mysql.test_cdc.runoob_tbl /*+ OPTIONS ( 'connector' = 'mysql-cdc', 'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = 'test', 'password' = '123456', 'database-name' = 'test_cdc', 'table-name' = 'runoob_tbl' )*/;Mode sederhana: Keuntungannya adalah Anda tidak perlu memperhatikan bidang dalam tabel di database MySQL saat membuat tabel. Skema tabel yang ingin Anda buat sama dengan skema tabel di database MySQL. Mode ini mudah digunakan untuk pengembang. Kerugiannya adalah Anda tidak dapat membuat partisi. Untuk tabel yang perlu dipartisi, Anda harus membuat partisi dalam mode normal.
use CATALOG sr; CREATE TABLE IF NOT EXISTS runoob_tbl1 with ( 'starrocks.create.table.properties'='buckets 8', 'starrocks.create.table.mode'='simple', 'database-name'='test_cdc', 'jdbc-url'='jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030', 'load-url'='fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030', 'table-name'='runoob_tbl_sr', 'username'='test', 'password' = '1qaz!QAZ', 'sink.buffer-flush.interval-ms' = '5000', 'sink.properties.row_delimiter' = '\x02', 'sink.properties.column_separator' = '\x01' ) as table mysql.test_cdc.runoob_tbl /*+ OPTIONS ( 'connector' = 'mysql-cdc', 'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = 'emr_test', 'password' = '123456', 'database-name' = 'test_cdc', 'table-name' = 'runoob_tbl' )*/;Tabel 3. Parameter WITH
Parameter
Diperlukan
Deskripsi
starrocks.create.table.properties
Ya
Definisi lainnya kecuali definisi bidang dalam pernyataan yang digunakan untuk membuat tabel di database StarRocks, seperti engine, key, dan buckets dalam kode contoh.
database-name
Ya
Nama database StarRocks.
Dalam contoh ini, test_cdc digunakan.
jdbc-url
Ya
URL JDBC yang digunakan untuk terhubung ke StarRocks dan menjalankan kueri di StarRocks.
Contoh: jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030.
fe-c-9b354c83e891****-internal.starrocks.aliyuncs.comadalah titik akhir internal node FE instans EMR Serverless StarRocks.CatatanUntuk informasi tentang cara mendapatkan titik akhir internal node FE instans EMR Serverless StarRocks, lihat Lihat daftar instans dan detailnya.
load-url
Ya
Titik akhir internal dan port query node FE. Formatnya adalah
Titik akhir internal node FE instans EMR Serverless StarRocks:8030.Contoh: fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030.
CatatanUntuk informasi tentang cara mendapatkan titik akhir internal node FE instans EMR Serverless StarRocks, lihat Lihat daftar instans dan detailnya.
sink.semantic
Tidak
Semantik yang digunakan untuk mengeksekusi pernyataan. Atur parameter ini menjadi exactly-once untuk memastikan konsistensi data. Nilai default: at-least-once.
starrocks.create.table.mode
Tidak
Nilai valid:
normal (default): Anda harus menentukan konfigurasi lengkap seperti engine, key, dan buckets dalam parameter starrocks.create.table.properties, seperti yang ditunjukkan dalam contoh.
simple: Secara default, parameter engine diatur ke olap, parameter key diatur ke primary key. Primary key sama dengan primary key di tabel MySQL. Parameter distributed by hash dikonfigurasikan untuk semua primary key, dan tidak ada partisi. Anda harus menentukan buckets dalam parameter starrocks.create.table.properties. Anda juga dapat menentukan parameter opsional seperti properties.
sink.properties.row_delimiter
Tidak
Pemisah baris kustom.
sink.properties.column_separator
Tidak
Pemisah kolom kustom.
CatatanJika Anda menggunakan versi lebih lama dari Flink vvr-6.0.5-flink-1.15, tambahkan
'sink.use.new-apiapi'='false',ke klausa WITH.Untuk informasi lebih lanjut tentang konfigurasi lainnya, lihat Muat data secara kontinu dari Apache Flink.
Tabel 4. Parameter OPTIONS
Parameter
Deskripsi
connector
Tipe konektor. Atur nilainya menjadi mysql-cdc.
hostname
Titik akhir internal instans ApsaraDB RDS for MySQL.
Anda dapat menyalin titik akhir internal pada halaman Koneksi Database instans ApsaraDB RDS for MySQL di konsol ApsaraDB RDS. Contoh: rm-bp1nu0c46fn9k****.mysql.rds.aliyuncs.com.
port
Nomor port database MySQL. Nilai default: 3306.
username
Nama pengguna yang digunakan untuk mengakses database MySQL.
Masukkan nama pengguna akun yang Anda buat di Langkah 1: Persiapkan data uji. Dalam contoh ini, emr_test digunakan.
password
Kata sandi yang digunakan untuk mengakses database MySQL.
Masukkan kata sandi akun yang Anda buat di Langkah 1: Persiapkan data uji.
table-name
Nama tabel di database StarRocks.
Masukkan nama tabel yang Anda buat di Langkah 1: Persiapkan data uji. Dalam contoh ini, runoob_tbl digunakan.
database-name
Nama database MySQL default.
Masukkan nama database yang Anda buat di Langkah 1: Persiapkan data uji. Dalam contoh ini, test_cdc digunakan.
Langkah 4: Lihat hasil sinkronisasi data
Jika checkpointing diaktifkan, waktu tunggu maksimum sekitar interval checkpoint.
Kueri data
Masuk dan hubungkan ke instans EMR Serverless StarRocks. Untuk informasi lebih lanjut, lihat Hubungkan ke instans StarRocks menggunakan klien MySQL.
Jalankan perintah berikut di jendela koneksi StarRocks untuk melihat data tabel:
use test_cdc; select * from runoob_tbl1;Keluaran berikut dikembalikan, menunjukkan bahwa data telah disinkronkan dari instans ApsaraDB RDS for MySQL ke kluster StarRocks.
+-----------+--------------+---------------+-----------------+---------+ | runoob_id | runoob_title | runoob_author | submission_date | add_col | +-----------+--------------+---------------+-----------------+---------+ | 18 | first | tom | 2022-06-22 | 3 | +-----------+--------------+---------------+-----------------+---------+
Kueri data yang dimasukkan
Jalankan perintah berikut di jendela database ApsaraDB RDS for MySQL untuk memasukkan data:
INSERT INTO runoob_tbl(`runoob_id`,`runoob_title`,`runoob_author`,`submission_date`,`add_col`) values(1,'second','tom2','2022-06-23',1);Jalankan perintah berikut di jendela koneksi StarRocks untuk melihat data tabel:
select * from runoob_tbl1;Keluaran berikut dikembalikan, menunjukkan bahwa data telah dimasukkan.
+-----------+--------------+---------------+-----------------+---------+ | runoob_id | runoob_title | runoob_author | submission_date | add_col | +-----------+--------------+---------------+-----------------+---------+ | 1 | second | tom2 | 2022-06-23 | 1 | | 18 | first | tom | 2022-06-22 | 3 | +-----------+--------------+---------------+-----------------+---------+
Sinkronkan data yang diperbarui
Jalankan perintah berikut di jendela database ApsaraDB RDS for MySQL untuk memperbarui data tertentu:
update runoob_tbl set runoob_title= 'new' where runoob_id = 18;Jalankan perintah berikut di jendela koneksi StarRocks untuk melihat data tabel:
select * from runoob_tbl1;Keluaran berikut dikembalikan, menunjukkan bahwa data yang diperbarui telah disinkronkan.
+-----------+--------------+---------------+-----------------+---------+ | runoob_id | runoob_title | runoob_author | submission_date | add_col | +-----------+--------------+---------------+-----------------+---------+ | 1 | second | tom2 | 2022-06-23 | 1 | | 18 | new | tom | 2022-06-22 | 3 | +-----------+--------------+---------------+-----------------+---------+
Sinkronkan data yang dihapus
Jalankan perintah berikut di jendela database ApsaraDB RDS for MySQL untuk menghapus data tertentu:
DELETE FROM runoob_tbl WHERE runoob_id = 1;Jalankan perintah berikut di jendela koneksi StarRocks untuk melihat data tabel:
select * from runoob_tbl1;Keluaran berikut dikembalikan, menunjukkan bahwa data yang dihapus telah disinkronkan.
+-----------+--------------+---------------+-----------------+---------+ | runoob_id | runoob_title | runoob_author | submission_date | add_col | +-----------+--------------+---------------+-----------------+---------+ | 18 | new | tom | 2022-06-22 | 3 | +-----------+--------------+---------------+-----------------+---------+
Tambahkan kolom nullable
Jalankan perintah berikut di jendela database ApsaraDB RDS for MySQL untuk menambahkan kolom nullable:
alter table `runoob_tbl` add COLUMN `add_col2` INT;Jalankan perintah berikut untuk memasukkan data:
INSERT INTO runoob_tbl(`runoob_id`,`runoob_title`,`runoob_author`,`submission_date`,`add_col`,`add_col2`) values(1,'second','tom2','2022-06-23',1,2)Jalankan perintah berikut di jendela koneksi StarRocks untuk melihat data tabel:
select * from runoob_tbl1;Keluaran berikut dikembalikan, menunjukkan bahwa skema telah berubah dan kolom nullable telah ditambahkan.
+-----------+--------------+---------------+-----------------+---------+---------+ | runoob_id | runoob_title | runoob_author | submission_date | add_col | add_co2 | +-----------+--------------+---------------+-----------------+---------+---------+ | 1 | second | tom2 | 2022-06-23 | 1 | 2 | | 18 | new | tom | 2022-06-22 | 3 | NULL | +-----------+--------------+---------------+-----------------+---------+---------+
Pengenalan CDAS
Pernyataan CREATE DATABASE AS (CDAS) adalah gula sintaksis dari pernyataan CTAS. Dengan menggunakan pernyataan CDAS, Anda dapat menyinkronkan seluruh database dari MySQL, yang berarti pekerjaan Flink dibuat. Tabel sumber adalah database di MySQL, dan tabel tujuan adalah beberapa tabel di StarRocks. Anda juga dapat menggunakan sintaks including table untuk memilih hanya beberapa tabel dalam database untuk operasi CDAS.
Mirip dengan eksekusi pernyataan CTAS, Anda harus membuat katalog di database MySQL dan database StarRocks sebelum mengeksekusi pernyataan CDAS. Contoh pernyataan:
CREATE DATABASE IF NOT EXISTS sr_db with (
'starrocks.create.table.properties'=' buckets 8',
'starrocks.create.table.mode'='simple',
'jdbc-url'='jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030',
'load-url'='fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030',
'username'='test',
'password' = '1qaz!QAZ',
'sink.buffer-flush.interval-ms' = '5000',
'sink.properties.row_delimiter' = '\x02',
'sink.properties.column_separator' = '\x01'
)
as DATABASEmysql.test_cdc including table
'tabl1','tbl2','tbl3' /*+ OPTIONS ( 'connector' = 'mysql-cdc',
'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
'port' = '3306',
'username' = 'test',
'password' = '123456',
'database-name' = 'test_cdc' )*/;