Topik ini menjelaskan cara menggunakan Realtime Compute for Apache Flink untuk menyinkronkan data dari MySQL ke EMR Serverless StarRocks menggunakan pernyataan CREATE TABLE AS (CTAS).
Informasi latar belakang
Anda dapat menggunakan pernyataan CTAS atau CREATE DATABASE AS (CDAS) untuk menyinkronkan data dari MySQL ke EMR Serverless StarRocks. Pernyataan CTAS dapat menyinkronkan skema dan data dari satu tabel, sedangkan pernyataan CDAS dapat menyinkronkan skema dan data dari seluruh database atau beberapa tabel dalam database yang sama. Topik ini berfokus pada penggunaan pernyataan CTAS. Pernyataan CDAS digunakan dengan cara yang mirip dengan pernyataan CTAS. Untuk informasi lebih lanjut, lihat Pengenalan CDAS.
Dengan menggunakan pernyataan CREATE TABLE AS (CTAS), Anda dapat secara otomatis membuat tabel di StarRocks dengan skema yang sama seperti tabel di MySQL dan menyinkronkan data. Anda juga dapat menyinkronkan perubahan skema tabel sumber ke tabel tujuan secara real-time, meningkatkan efisiensi pembuatan tabel di penyimpanan tujuan dan pemeliharaan perubahan skema tabel sumber.
Saat Anda menjalankan pernyataan CTAS, Flink melakukan operasi berikut:
Periksa apakah tabel tujuan ada di penyimpanan tujuan.
Jika tabel tujuan tidak ada, tabel tujuan yang sesuai dibuat di penyimpanan tujuan berdasarkan katalog tempat tabel tersebut milik. Tabel tujuan memiliki skema yang sama dengan tabel sumber.
Jika tabel tujuan sudah ada, Flink melewati langkah pembuatan tabel. Namun, jika skema tabel tujuan yang ada berbeda dari skema tabel sumber, pesan kesalahan akan dikembalikan.
Komit dan jalankan pekerjaan sinkronisasi data. Flink menyinkronkan data dan perubahan skema tabel sumber ke tabel tujuan.
Kebijakan sinkronisasi perubahan skema menggunakan pernyataan CTAS untuk menyinkronkan data secara real-time dan perubahan skema tabel sumber ke tabel tujuan.
Perubahan skema mencakup pembuatan tabel dan perubahan skema setelah tabel dibuat.
Perubahan skema berikut didukung:
Tambah kolom nullable: Pernyataan secara otomatis menambahkan kolom terkait ke akhir skema tabel tujuan dan menyinkronkan data ke kolom yang ditambahkan.
Hapus kolom nullable: Pernyataan secara otomatis mengisi kolom nullable tabel tujuan dengan nilai null alih-alih menghapus kolom dari tabel.
Ganti nama kolom: Pernyataan menambahkan kolom yang diganti namanya ke akhir tabel tujuan dan mengisi kolom sebelum penggantian nama dengan nilai null.
Sebagai contoh, jika nama kolom col_a di tabel sumber diubah menjadi col_b, kolom col_b ditambahkan ke akhir tabel tujuan dan kolom col_a secara otomatis diisi dengan nilai null.
Perubahan skema berikut tidak didukung:
Perubahan tipe data.
Sebagai contoh, data di kolom diubah dari tipe VARCHAR menjadi tipe BIGINT, atau properti kolom diubah dari NOT NULL menjadi NULLABLE.
Perubahan kendala, seperti kunci utama atau indeks.
Penambahan atau penghapusan kolom non-nullable.
- Jika skema tabel sumber memiliki salah satu perubahan di atas, Anda harus menghapus tabel tujuan dan memulai ulang pekerjaan yang menjalankan pernyataan CTAS. Dengan cara ini, tabel tujuan dibuat kembali dan data historis disinkronkan ulang ke tabel tujuan.
- Pernyataan CTAS tidak mengidentifikasi jenis pernyataan DDL, tetapi membandingkan perbedaan skema antara dua catatan data sebelum dan sesudah skema diubah. Oleh karena itu, jika Anda menghapus kolom dan kemudian menambahkan kolom lagi, dan tidak ada perubahan data antara dua pernyataan DDL yang digunakan untuk menghapus dan menambah kolom, pernyataan CTAS menganggap bahwa tidak ada perubahan skema yang terjadi. Demikian pula, pernyataan CTAS tidak memicu sinkronisasi perubahan skema bahkan jika Anda menambahkan kolom ke tabel sumber. Pernyataan hanya mengidentifikasi perubahan skema ketika data berubah di tabel sumber. Dalam hal ini, pernyataan menyinkronkan perubahan skema ke tabel tujuan.
- Untuk informasi lebih lanjut tentang tipe bidang yang didukung oleh pernyataan CTAS, lihat Muat data secara kontinu dari Apache Flink®.
Prasyarat
Flink yang sepenuhnya dikelola telah diaktifkan dan kluster Flink telah dibuat. Untuk informasi lebih lanjut, lihat Aktifkan Flink yang sepenuhnya dikelola dan Mulai Cepat untuk Pekerjaan SQL Flink.
Anda telah membuat instans EMR Serverless StarRocks. Untuk informasi lebih lanjut, lihat Buat Instans.
Instans ApsaraDB RDS for MySQL telah dibuat. Untuk informasi lebih lanjut, lihat Buat Instans ApsaraDB RDS for MySQL.
Topik ini menggunakan MySQL 5.7 dan versi Flink vvr-8.0.11-flink-1.17 sebagai contoh.
Batasan
Kluster Flink, instans EMR Serverless StarRocks, dan instans ApsaraDB RDS for MySQL harus berada di VPC yang sama.
Instans ApsaraDB RDS for MySQL harus versi 5.7 atau yang lebih baru.
Langkah 1: Persiapkan data uji
Buat database uji dan akun. Untuk informasi lebih lanjut, lihat Buat Database dan Akun.
Setelah Anda membuat database dan akun, berikan izin baca dan tulis kepada akun uji.
CatatanDalam topik ini, database diberi nama test_cdc, dan akun diberi nama test.
Gunakan akun uji yang dibuat untuk terhubung ke instans MySQL. Untuk informasi lebih lanjut, lihat Gunakan DMS untuk Masuk ke Instans ApsaraDB RDS for MySQL.
Di MySQL, jalankan perintah berikut untuk membuat tabel data.
use test_cdc; -- Buat tabel CREATE TABLE IF NOT EXISTS `runoob_tbl`( `runoob_id` INT UNSIGNED AUTO_INCREMENT, `runoob_title` VARCHAR(100) NOT NULL, `runoob_author` VARCHAR(40) NOT NULL, `submission_date` DATE, `add_col` int DEFAULT NULL, PRIMARY KEY ( `runoob_id` ) )ENGINE=InnoDB DEFAULT CHARSET=utf8; -- Masukkan data INSERT INTO test_cdc.`runoob_tbl` (`runoob_id`,`runoob_title`,`runoob_author`,`submission_date`,`add_col`) values (18,'first','tom','2025-06-22 17:13:44',3)Masuk dan terhubung ke instans EMR Serverless StarRocks. Untuk informasi lebih lanjut, lihat Gunakan Klien MySQL untuk Terhubung ke Instans StarRocks.
Jalankan perintah berikut untuk membuat database test_cdc dan pengguna uji. Anda dapat membuat pengguna sebagai super administrator (contoh kata sandi: 1qaz!QAZ) atau sebagai pengguna biasa dan memberikan izin pengguna pada database dan tabelnya. Untuk informasi lebih lanjut, lihat Kelola Pengguna.
-- Buat database CREATE DATABASE test_cdc; -- Buat pengguna CREATE USER 'test' IDENTIFIED by '1qaz!QAZ'; -- Berikan izin database kepada pengguna GRANT ALL on test_cdc to test; -- Berikan izin tabel kepada pengguna GRANT ALL ON ALL TABLES IN DATABASE test_cdc to test;
Langkah 2: Buat katalog di Konsol Flink
Buat katalog MySQL dan StarRocks di halaman Data Management di Konsol Realtime Compute for Apache Flink. Untuk informasi lebih lanjut, lihat Data Management.
Konfigurasi parameter hanya untuk referensi. Konfigurasikan parameter sesuai kebutuhan.
Katalog MySQL
Contoh Kode
CREATE CATALOG mysql WITH ( 'type' = 'mysql', 'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = 'emr-test', 'password' = '123456', 'default-database' = 'test_cdc' );Parameter
Parameter
Deskripsi
type
Tipe. Nilainya tetap mysql.
hostname
Titik akhir internal dari instans RDS. Anda dapat pergi ke halaman Koneksi Database dari instans RDS dan klik titik akhir internal untuk menyalinnya. Contoh: rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com.
port
Nomor port layanan database MySQL. Nilai default adalah 3306.
username
Nama pengguna untuk layanan database MySQL.
Masukkan nama pengguna akun di Langkah 1: Persiapkan data uji. Dalam contoh ini, test digunakan.
password
Kata sandi untuk layanan database MySQL.
Masukkan kata sandi akun di Langkah 1: Persiapkan data uji.
default-database
Nama database MySQL default.
Masukkan nama database yang dibuat di Langkah 1: Persiapkan data uji. Dalam contoh ini, test_cdc digunakan.
Katalog StarRocks
Contoh Kode
CREATE CATALOG sr WITH ( 'type' = 'starrocks', 'endpoint' = 'fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030', 'username' = 'test', 'password' = '1qaz!QAZ', 'dbname' = 'test_cdc' );Parameter
Parameter
Deskripsi
type
Tipe. Nilainya tetap starrocks.
endpoint
Titik akhir internal dan port query node frontend (FE). Formatnya adalah
Titik akhir internal node FE dari instans EMR Serverless StarRocks:9030.Contoh: fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030.
CatatanUntuk informasi tentang cara mendapatkan titik akhir internal node FE dari instans EMR Serverless StarRocks, lihat Lihat daftar instans dan detail instans.
username
Nama pengguna untuk StarRocks.
Masukkan nama pengguna akun di Langkah 1: Persiapkan data uji. Dalam contoh ini, test digunakan.
password
Kata sandi untuk layanan database StarRocks.
Masukkan kata sandi akun di Langkah 1: Persiapkan data uji.
dbname
Nama database StarRocks.
Masukkan nama database yang dibuat di Langkah 1: Persiapkan data uji. Dalam contoh ini, test_cdc digunakan.
Langkah 3: Buat dan terapkan pekerjaan
Di halaman di Konsol Realtime Compute for Apache Flink, tulis pernyataan CTAS.
Berikut ini tiga contoh yang diberikan:
Semantik setidaknya sekali: Gunakan item konfigurasi sink.buffer-flush.interval-ms untuk mengonfigurasi interval penulisan data ke StarRocks. Mode ini memberikan interval penulisan pendek dan penggunaan memori rendah.
/* Semantik setidaknya sekali */ use CATALOG sr; CREATE TABLE IF NOT EXISTS runoob_tbl with ( 'starrocks.create.table.properties'=' engine = olap primary key(runoob_id) distributed by hash(runoob_id ) buckets 8', 'database-name'='test_cdc', 'jdbc-url'='jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030', 'load-url'='fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030', 'table-name'='runoob_tbl', 'username'='test', 'password' = '1qaz!QAZ', 'sink.buffer-flush.interval-ms' = '5000', 'sink.properties.row_delimiter' = '\x02', 'sink.properties.column_separator' = '\x01' ) as table mysql.test_cdc.runoob_tbl;Semantik tepat-sekali: Anda harus menentukan interval checkpoint. Keuntungannya adalah data tidak hilang atau duplikat saat terjadi kesalahan. Kerugiannya adalah interval checkpoint menentukan kapan data terlihat. Untuk informasi lebih lanjut, lihat Checkpointing.
/* Semantik tepat-sekali. */ set 'execution.checkpointing.interval' = '1 min'; set 'execution.checkpointing.mode' = 'EXACTLY_ONCE'; set 'execution.checkpointing.timeout' = '10 min'; use CATALOG sr; CREATE TABLE IF NOT EXISTS runoob_tbl with ( 'starrocks.create.table.properties'=' engine = olap primary key(runoob_id) distributed by hash(runoob_id ) buckets 8', 'database-name'='test_cdc', 'jdbc-url'='jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030', 'load-url'='fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030', 'table-name'='runoob_tbl', 'username'='test', 'password' = '1qaz!QAZ', 'sink.semantic' = 'exactly-once', 'sink.properties.row_delimiter' = '\x02', 'sink.properties.column_separator' = '\x01' ) as table mysql.test_cdc.runoob_tbl;Mode Sederhana: Saat Anda membuat tabel, Anda tidak perlu menentukan bidang tabel sumber. Tabel dibuat berdasarkan skema tabel MySQL. Ini nyaman bagi pengembang. Namun, Anda tidak dapat membuat partisi. Untuk tabel yang memerlukan partisi, Anda harus membuatnya dalam mode normal.
/* Dua contoh sebelumnya menggunakan mode normal. Contoh ini menggunakan mode sederhana. */ use CATALOG sr; CREATE TABLE IF NOT EXISTS runoob_tbl with ( 'starrocks.create.table.mode'='simple', 'database-name'='test_cdc', 'jdbc-url'='jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030', 'load-url'='fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030', 'table-name'='runoob_tbl', 'username'='test', 'password' = '1qaz!QAZ', 'sink.buffer-flush.interval-ms' = '5000', 'sink.properties.row_delimiter' = '\x02', 'sink.properties.column_separator' = '\x01' ) as table mysql.test_cdc.runoob_tbl;
Tabel 1. Parameter WITH
Parameter
Diperlukan
Deskripsi
starrocks.create.table.properties
Ya
Definisi sufiks lain dalam pernyataan pembuatan tabel StarRocks, tidak termasuk definisi bidang. Contoh: engine, key, dan buckets.
database-name
Ya
Nama database StarRocks.
Dalam topik ini, nama database adalah test_cdc.
jdbc-url
Ya
Digunakan untuk menjalankan query di StarRocks.
Contoh: jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030. Dalam contoh ini,
fe-c-9b354c83e891****-internal.starrocks.aliyuncs.comadalah titik akhir internal node FE dari instans EMR Serverless StarRocks.CatatanUntuk informasi tentang cara mendapatkan alamat jaringan internal node FE dari instans EMR Serverless StarRocks, lihat Lihat daftar instans dan detail.
load-url
Ya
Titik akhir internal dan port query node FE. Formatnya adalah
Titik akhir internal node FE dari instans EMR Serverless StarRocks:8030.Contoh: fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030.
CatatanUntuk informasi tentang cara mendapatkan alamat jaringan internal node FE dari instans EMR Serverless StarRocks, lihat Lihat Daftar Instans dan Detail.
sink.semantic
Tidak
Atur nilai menjadi exactly-once untuk memastikan semantik tepat-sekali. Nilai default adalah at-least-once.
starrocks.create.table.mode
Tidak
Nilai berikut didukung:
mode normal (default): Anda harus menentukan konfigurasi lengkap, seperti engine, key, dan buckets, dalam parameter starrocks.create.table.properties seperti yang ditunjukkan dalam contoh.
mode sederhana: Engine secara default adalah olap. Tipe kunci adalah primary key, dan primary key sama dengan primary key tabel MySQL. Distribusi default adalah by hash (semua primary key), tanpa partisi. Dalam parameter starrocks.create.table.properties, buckets diperlukan, dan properties opsional.
sink.properties.row_delimiter
Tidak
Pemisah baris kustom.
sink.properties.column_separator
Tidak
Pemisah kolom kustom.
CatatanJika Anda menggunakan versi lebih awal dari Flink vvr-6.0.5-flink-1.15, Anda harus menambahkan
'sink.use.new-apiapi'='false',ke klausa WITH.Untuk informasi lebih lanjut tentang konfigurasi lainnya, lihat Muat Data Secara Kontinu dari Apache Flink.
Tabel 2. Parameter OPTIONS
Parameter
Deskripsi
connector
Tipe. Nilainya tetap mysql-cdc.
hostname
Titik akhir internal dari instans RDS.
Anda dapat pergi ke halaman Koneksi Database dari instans RDS dan klik titik akhir internal untuk menyalinnya. Contoh: rm-bp1nu0c46fn9k****.mysql.rds.aliyuncs.com.
port
Nomor port layanan database MySQL. Nilai default adalah 3306.
username
Nama pengguna untuk layanan database MySQL.
Masukkan nama pengguna akun di Langkah 1: Persiapkan data uji. Dalam contoh ini, test digunakan.
password
Kata sandi untuk layanan database MySQL.
Masukkan kata sandi akun dari Langkah 1: Persiapkan data uji.
table-name
Nama tabel di StarRocks.
Masukkan nama tabel yang dibuat di Langkah 1: Persiapkan data uji. Dalam contoh ini, runoob_tbl digunakan.
database-name
Nama database MySQL default.
Masukkan nama database yang dibuat di Langkah 1: Persiapkan data uji. Dalam contoh ini, test_cdc digunakan.
Klik Deploy.
Di halaman Terapkan Versi Baru, pilih target penyebaran dan klik OK.
Di halaman O&M Pekerjaan, klik Start di kolom Aksi untuk pekerjaan yang ingin Anda mulai.
Anda tidak dapat men-debug pernyataan CTAS di Konsol Realtime Compute for Apache Flink.
Langkah 4: Verifikasi hasil sinkronisasi data
Query data
Masuk dan terhubung ke instans EMR Serverless StarRocks. Untuk informasi lebih lanjut, lihat Gunakan Klien MySQL untuk Terhubung ke Instans StarRocks.
Di jendela koneksi StarRocks, jalankan perintah berikut untuk melihat data tabel.
use test_cdc; select * from runoob_tbl;Hasil berikut dikembalikan, yang menunjukkan bahwa data dari MySQL telah disinkronkan ke StarRocks.
+-----------+--------------+---------------+-----------------+---------+ | runoob_id | runoob_title | runoob_author | submission_date | add_col | +-----------+--------------+---------------+-----------------+---------+ | 18 | first | tom | 2025-06-22 | 3 | +-----------+--------------+---------------+-----------------+---------+
Sinkronkan data yang dimasukkan
Di jendela database RDS, jalankan perintah berikut untuk memasukkan data.
INSERT INTO runoob_tbl(`runoob_id`,`runoob_title`,`runoob_author`,`submission_date`,`add_col`) values(1,'second','tom2','2022-06-23',1)Di jendela koneksi StarRocks, jalankan perintah berikut untuk melihat data tabel.
select * from runoob_tbl;Hasil berikut dikembalikan, yang menunjukkan bahwa data telah dimasukkan.
+-----------+--------------+---------------+-----------------+---------+ | runoob_id | runoob_title | runoob_author | submission_date | add_col | +-----------+--------------+---------------+-----------------+---------+ | 1 | second | tom2 | 2025-06-23 | 1 | | 18 | first | tom | 2025-06-22 | 3 | +-----------+--------------+---------------+-----------------+---------+
Sinkronkan data yang diperbarui
Di jendela database RDS, jalankan perintah berikut untuk memperbarui data yang ditentukan.
update runoob_tbl set runoob_title= 'new' where runoob_id = 18Di jendela koneksi StarRocks, jalankan perintah berikut untuk melihat data tabel.
select * from runoob_tbl;Hasil berikut dikembalikan, yang menunjukkan bahwa data telah diperbarui.
+-----------+--------------+---------------+-----------------+---------+ | runoob_id | runoob_title | runoob_author | submission_date | add_col | +-----------+--------------+---------------+-----------------+---------+ | 1 | second | tom2 | 2025-06-23 | 1 | | 18 | new | tom | 2025-06-22 | 3 | +-----------+--------------+---------------+-----------------+---------+
Sinkronkan data yang dihapus
Di jendela database RDS, jalankan perintah berikut untuk menghapus data yang ditentukan.
DELETE FROM runoob_tbl WHERE runoob_id = 1Di jendela koneksi StarRocks, jalankan perintah berikut untuk melihat data tabel.
select * from runoob_tbl;Hasil berikut dikembalikan, yang menunjukkan bahwa data telah dihapus.
+-----------+--------------+---------------+-----------------+---------+ | runoob_id | runoob_title | runoob_author | submission_date | add_col | +-----------+--------------+---------------+-----------------+---------+ | 18 | new | tom | 2025-06-22 | 3 | +-----------+--------------+---------------+-----------------+---------+
Sinkronkan kolom nullable yang ditambahkan
Di jendela database RDS, jalankan perintah berikut untuk menambahkan kolom nullable.
alter table `runoob_tbl` add COLUMN `add_col2` INT;Jalankan perintah berikut untuk memasukkan data.
INSERT INTO runoob_tbl(`runoob_id`,`runoob_title`,`runoob_author`,`submission_date`,`add_col`,`add_col2`) values(1,'second','tom2','2022-06-23',1,2)Di jendela koneksi StarRocks, jalankan perintah berikut untuk melihat data tabel.
select * from runoob_tbl;Hasil berikut dikembalikan, yang menunjukkan bahwa skema telah berubah.
+-----------+--------------+---------------+-----------------+---------+----------+ | runoob_id | runoob_title | runoob_author | submission_date | add_col | add_col2 | +-----------+--------------+---------------+-----------------+---------+----------+ | 18 | new | tom | 2025-06-22 | 3 | NULL | +-----------+--------------+---------------+-----------------+---------+----------+ | 1 | second | tom2 | 2025-06-23 | 1 | 2 | +-----------+--------------+---------------+-----------------+---------+----------+
Pengenalan CDAS
CDAS adalah gula sintaksis untuk CTAS. Pernyataan CDAS memungkinkan sinkronisasi seluruh database dari MySQL. Ini menghasilkan pekerjaan Flink di mana sumbernya adalah database di MySQL dan tujuannya adalah set tabel yang sesuai di StarRocks. Anda juga dapat menggunakan sintaks `including table` untuk memilih hanya sebagian subset tabel dari database untuk operasi CDAS.
Mirip dengan eksekusi CTAS, Anda harus membuat katalog MySQL dan StarRocks sebelum menjalankan pernyataan CDAS. Kode berikut memberikan contoh sintaksis.
CREATE DATABASE IF NOT EXISTS sr_db with (
'starrocks.create.table.properties'=' buckets 8',
'starrocks.create.table.mode'='simple',
'jdbc-url'='jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030',
'load-url'='fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030',
'sink.buffer-flush.interval-ms' = '5000',
'sink.properties.row_delimiter' = '\x02',
'sink.properties.column_separator' = '\x01'
)
as DATABASE mysql.test_cdc including table
'tabl1','tbl2','tbl3';Referensi
Realtime Compute for Apache Flink mendukung menyinkronkan data ke StarRocks tidak hanya menggunakan pernyataan CTAS tetapi juga file YAML ingest data. Untuk informasi lebih lanjut, lihat Ingest Data.