Pernyataan CREATE TABLE AS (CTAS) secara otomatis membuat tabel yang sesuai di StarRocks dan terus-menerus menyinkronkan data serta perubahan skema dari sumber MySQL. Pernyataan CREATE DATABASE AS (CDAS) memperluas fungsionalitas ini ke seluruh database. Topik ini menjelaskan cara menggunakan kedua pernyataan tersebut di Realtime Compute for Apache Flink untuk memindahkan data pemrosesan transaksi (TP) dari ApsaraDB RDS for MySQL ke kluster StarRocks E-MapReduce (EMR) guna pemrosesan analitis (AP).
Ikhtisar arsitektur
| Komponen | Peran |
|---|---|
| ApsaraDB RDS for MySQL | Sumber — asal event change data capture (CDC) dan perubahan skema |
| Realtime Compute for Apache Flink | Mesin pemrosesan — membaca CDC MySQL dan menulis ke StarRocks |
| Kluster EMR StarRocks | Sink — tujuan analitis untuk data yang disinkronkan |
Cara kerja
Saat menjalankan pernyataan CTAS, Flink melakukan dua operasi secara berurutan:
Pemeriksaan pembuatan tabel — Flink memeriksa apakah tabel tujuan sudah ada di StarRocks.
Jika belum ada, Flink membuat tabel tersebut dengan skema yang sama seperti sumber berdasarkan katalog tujuan.
Jika sudah ada, Flink melewatkan pembuatan. Jika skema yang ada berbeda dari skema sumber, akan dikembalikan error.
Penyinkronan data — Flink memulai pekerjaan berkelanjutan yang menyinkronkan data dan perubahan skema dari tabel sumber ke tabel tujuan.
Perilaku perubahan skema
CTAS menggunakan kebijakan tetap untuk menyebarkan perubahan skema. Tabel berikut merangkum perubahan yang didukung dan tidak didukung secara otomatis.
Perubahan skema yang didukung
| Jenis perubahan | Perilaku di StarRocks |
|---|---|
| Menambahkan kolom nullable | Kolom ditambahkan di akhir tabel tujuan; data masuk mengisi kolom tersebut |
| Menghapus kolom nullable | Kolom tetap ada di StarRocks tetapi diisi dengan nilai NULL |
| Mengganti nama kolom | Kolom dengan nama baru ditambahkan di akhir; kolom dengan nama lama diisi dengan nilai NULL. Misalnya, mengganti nama col_a menjadi col_b akan menambahkan col_b di akhir dan mengatur col_a menjadi NULL. |
Perubahan skema yang tidak didukung
Jika salah satu perubahan berikut terjadi pada tabel sumber, hapus tabel tujuan di StarRocks dan mulai ulang pekerjaan CTAS. Flink akan membuat ulang tabel tersebut dan menyinkronkan kembali semua data historis.
Mengubah tipe data kolom (misalnya,
VARCHARmenjadiBIGINT, atauNOT NULLmenjadiNULLABLE)Mengubah constraint seperti kunci primer atau indeks
Menambah atau menghapus kolom non-nullable
Menyesuaikan panjang field dalam pernyataan DDL (Data Definition Language)
CTAS mendeteksi perubahan skema dengan membandingkan skema dari catatan data berturut-turut — bukan dengan mengurai jenis pernyataan DDL. Akibatnya:
Jika sebuah kolom dihapus lalu ditambahkan kembali tanpa perubahan data di antaranya, CTAS menganggap hal ini sebagai tidak ada perubahan skema.
Perubahan skema hanya disebarkan ketika data baru tiba di tabel sumber setelah perubahan tersebut.
Untuk pemetaan tipe field, lihat Continuously load data from Apache Flink.
Saat CTAS menggabungkan beberapa tabel MySQL, Flink secara otomatis menambahkan kolom_db_namedan_table_nameke tabel tujuan untuk melacak sumbernya. Tentukan urutan kolom Anda sendiri mulai dari kolom ketiga.
Prasyarat
Sebelum memulai, pastikan Anda telah:
Memiliki ruang kerja Flink yang sepenuhnya dikelola dan berjalan dengan vvr-6.0.3-flink-1.15 atau lebih baru. Lihat Aktifkan Flink yang sepenuhnya dikelola dan Memulai penerapan Flink SQL.
Memiliki kluster EMR StarRocks. Lihat Buat kluster StarRocks.
Memiliki instans ApsaraDB RDS for MySQL yang menjalankan MySQL 5.7 atau lebih baru. Lihat Buat instans ApsaraDB RDS for MySQL.
Contoh dalam topik ini menggunakan MySQL 5.7, EMR 3.39.1, dan Flink yang sepenuhnya dikelola vvr-6.0.3-flink-1.15.
Batasan
Ruang kerja Flink, kluster StarRocks, dan instans ApsaraDB RDS for MySQL harus berada dalam virtual private cloud (VPC) yang sama.
Versi mesin ApsaraDB RDS for MySQL harus 5.7 atau lebih baru.
Akses Internet harus diaktifkan untuk kluster StarRocks.
Flink yang sepenuhnya dikelola harus vvr-6.0.3-flink-1.15 atau lebih baru.
Langkah 1: Siapkan data uji
Buat database dan akun pada instans ApsaraDB RDS for MySQL. Lihat Buat database dan akun untuk instans ApsaraDB RDS for MySQL. Berikan izin baca dan tulis kepada akun uji.
Topik ini menggunakan database bernama
test_cdcdan akun bernamatest.Login ke instans ApsaraDB RDS for MySQL menggunakan akun uji. Lihat Gunakan DMS untuk login ke instans ApsaraDB RDS for MySQL.
Buat tabel uji dan masukkan satu baris:
USE test_cdc; CREATE TABLE IF NOT EXISTS `runoob_tbl` ( `runoob_id` INT UNSIGNED AUTO_INCREMENT, `runoob_title` VARCHAR(100) NOT NULL, `runoob_author` VARCHAR(40) NOT NULL, `submission_date` DATE, `add_col` INT DEFAULT NULL, PRIMARY KEY (`runoob_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; INSERT INTO test_cdc.`runoob_tbl` (`runoob_id`, `runoob_title`, `runoob_author`, `submission_date`, `add_col`) VALUES (18, 'first', 'tom', '2022-06-22', 3);Login ke kluster StarRocks melalui SSH. Lihat Login ke kluster.
Hubungkan ke StarRocks:
mysql -h127.0.0.1 -P 9030 -urootBuat pengguna dan berikan izin yang diperlukan untuk tutorial ini:
CREATE DATABASE test_cdc; CREATE USER 'test' IDENTIFIED BY '123456'; GRANT CREATE TABLE ON DATABASE test_cdc TO test;
Langkah 2: Buat katalog di editor SQL Flink
Pada halaman Draft Editor di konsol Flink yang sepenuhnya dikelola, buat satu katalog untuk MySQL dan satu untuk StarRocks. Lihat Memulai penerapan Flink SQL.
Nilai parameter di bawah ini bersifat contoh. Sesuaikan dengan lingkungan Anda.
Katalog MySQL
CREATE CATALOG mysql WITH (
'type' = 'mysql',
'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
'port' = '3306',
'username' = 'test',
'password' = '123456',
'default-database' = 'test_cdc'
);| Parameter | Deskripsi |
|---|---|
type | Jenis katalog. Atur ke mysql. |
hostname | Titik akhir internal instans ApsaraDB RDS for MySQL. Salin dari halaman Database Connection di konsol ApsaraDB RDS (misalnya, rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com). |
port | Port database MySQL. Default: 3306. |
username | Username yang dibuat di Langkah 1: Siapkan data uji. Dalam contoh ini: test. |
password | Password untuk username yang dibuat di Langkah 1: Siapkan data uji. |
default-database | Nama database yang dibuat di Langkah 1: Siapkan data uji. Dalam contoh ini: test_cdc. |
Katalog StarRocks
CREATE CATALOG sr WITH (
'type' = 'starrocks',
'endpoint' = '172.16.**.**:9030',
'username' = 'test',
'password' = '123456',
'dbname' = 'test_cdc'
);| Parameter | Deskripsi |
|---|---|
type | Jenis katalog. Atur ke starrocks. |
endpoint | Alamat IP dan port antarmuka depan StarRocks (misalnya, 172.16..:9030). |
username | Username yang dibuat di Langkah 1: Siapkan data uji. Dalam contoh ini: test. |
password | Password untuk username yang dibuat di Langkah 1: Siapkan data uji. |
dbname | Nama database StarRocks. Dalam contoh ini: test_cdc. |
Langkah 3: Tulis dan publikasikan penerapan CTAS
Pada halaman Draft Editor, tulis pernyataan CTAS. Tersedia tiga mode pengiriman — pilih berdasarkan kebutuhan konsistensi Anda.
Semantik at-least-once (direkomendasikan untuk skenario latensi rendah)
Data ditulis pada interval flush yang dapat dikonfigurasi. Penggunaan memori lebih rendah, tetapi penulisan duplikat mungkin terjadi saat terjadi kegagalan.
/* Semantik at-least-once */
USE CATALOG sr;
CREATE TABLE IF NOT EXISTS runoob_tbl_sr WITH (
'starrocks.create.table.properties' = 'engine = olap primary key(runoob_id) distributed by hash(runoob_id) buckets 8',
'database-name' = 'test_cdc',
'jdbc-url' = 'jdbc:mysql://172.16.**.**:9030',
'load-url' = '172.16.**.**:18030',
'table-name' = 'runoob_tbl_sr',
'username' = 'test',
'password' = '123456',
'sink.buffer-flush.interval-ms' = '5000',
'sink.properties.row_delimiter' = '\x02',
'sink.properties.column_separator' = '\x01'
)
AS TABLE mysql.test_cdc.runoob_tbl
/*+ OPTIONS (
'connector' = 'mysql-cdc',
'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
'port' = '3306',
'username' = 'test',
'password' = '123456',
'database-name' = 'test_cdc',
'table-name' = 'runoob_tbl'
) */;Semantik exactly-once (direkomendasikan untuk skenario kritis data)
Tidak ada kehilangan data atau duplikasi saat terjadi kegagalan. Visibilitas data bergantung pada interval checkpoint.
/* Semantik exactly-once */
SET 'execution.checkpointing.interval' = '1 min';
SET 'execution.checkpointing.mode' = 'EXACTLY_ONCE';
SET 'execution.checkpointing.timeout' = '10 min';
USE CATALOG sr;
CREATE TABLE IF NOT EXISTS runoob_tbl WITH (
'starrocks.create.table.properties' = 'engine = olap primary key(runoob_id) distributed by hash(runoob_id) buckets 8',
'database-name' = 'test_cdc',
'jdbc-url' = 'jdbc:mysql://172.16.**.**:9030',
'load-url' = '172.16.**.**:18030',
'table-name' = 'runoob_tbl',
'username' = 'test',
'password' = '123456',
'sink.semantic' = 'exactly-once',
'sink.properties.row_delimiter' = '\x02',
'sink.properties.column_separator' = '\x01'
)
AS TABLE mysql.test_cdc.runoob_tbl
/*+ OPTIONS (
'connector' = 'mysql-cdc',
'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
'port' = '3306',
'username' = 'test',
'password' = '123456',
'database-name' = 'test_cdc',
'table-name' = 'runoob_tbl'
) */;Untuk opsi konfigurasi checkpoint, lihat Checkpointing.
Mode simple (direkomendasikan untuk penyiapan cepat)
Flink menurunkan definisi tabel dari skema MySQL — tidak perlu menentukan engine, kunci, atau distribusi secara manual. Tabel terpartisi tidak didukung dalam mode simple; buat partisi menggunakan mode normal sebagai gantinya.
/* Mode simple */
USE CATALOG sr;
CREATE TABLE IF NOT EXISTS runoob_tbl1 WITH (
'starrocks.create.table.properties' = 'buckets 8',
'starrocks.create.table.mode' = 'simple',
'database-name' = 'test_cdc',
'jdbc-url' = 'jdbc:mysql://172.16.**.**:9030',
'load-url' = '172.16.**.**:18030',
'table-name' = 'runoob_tbl_sr',
'username' = 'test',
'password' = '123456',
'sink.buffer-flush.interval-ms' = '5000',
'sink.properties.row_delimiter' = '\x02',
'sink.properties.column_separator' = '\x01'
)
AS TABLE mysql.test_cdc.runoob_tbl
/*+ OPTIONS (
'connector' = 'mysql-cdc',
'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
'port' = '3306',
'username' = 'test',
'password' = '123456',
'database-name' = 'test_cdc',
'table-name' = 'runoob_tbl'
) */;Parameter klausa WITH
| Parameter | Wajib | Deskripsi |
|---|---|---|
starrocks.create.table.properties | Ya | Definisi sufiks untuk pernyataan CREATE TABLE StarRocks, tidak termasuk definisi field — misalnya, engine, key, dan buckets. |
database-name | Ya | Nama database StarRocks. |
jdbc-url | Ya | URL Java Database Connectivity (JDBC) untuk kueri StarRocks — misalnya, jdbc:mysql://172.16..:9030, di mana 172.16.. adalah alamat IP internal kluster StarRocks. |
load-url | Ya | Alamat IP internal dan port HTTP antarmuka depan StarRocks. Pilih port berdasarkan versi kluster EMR Anda: 18030 untuk EMR V5.9.0 atau lebih baru (versi minor) dan EMR V3.43.0 atau lebih baru (versi minor); 8030 untuk EMR V5.8.0, EMR V3.42.0, atau lebih lama. Lihat Access the UI and ports. |
sink.semantic | Tidak | Semantik pengiriman: at-least-once (default) atau exactly-once. |
starrocks.create.table.mode | Tidak | normal (default): tentukan engine, key, dan buckets di starrocks.create.table.properties. simple: Flink mengatur engine=olap, menggunakan kunci primer MySQL, dan mendistribusikan berdasarkan hash pada semua kolom kunci primer. Hanya buckets yang diperlukan di starrocks.create.table.properties. Partisi tidak dibuat. |
Jika versi Flink Anda lebih lama dari vvr-6.0.5-flink-1.15, tambahkan 'sink.use.new-apiapi' = 'false' ke klausa WITH. Untuk parameter sink tambahan, lihat Continuously load data from Apache Flink.Parameter klausa OPTIONS
Klausa OPTIONS mengonfigurasi sumber CDC MySQL.
| Parameter | Deskripsi |
|---|---|
connector | Jenis konektor. Atur ke mysql-cdc. |
hostname | Titik akhir internal instans ApsaraDB RDS for MySQL (misalnya, rm-bp1nu0c46fn9k****.mysql.rds.aliyuncs.com). |
port | Port MySQL. Default: 3306. |
username | Username untuk akses ApsaraDB RDS for MySQL. Gunakan akun yang dibuat di Langkah 1: Siapkan data uji. |
password | Password untuk akses ApsaraDB RDS for MySQL. |
table-name | Nama tabel sumber. Dalam contoh ini: runoob_tbl. |
database-name | Nama database sumber. Dalam contoh ini: test_cdc. |
Publikasikan dan mulai penerapan
Pada tab Advanced di halaman Draft Editor, atur Engine Version ke
vvr-6.0.3-flink-1.15atau lebih baru.Di pojok kanan atas halaman Draft Editor, klik Publish.
Pada halaman Deployments, temukan penerapan baru dan klik Start di kolom Actions.
Langkah 4: Verifikasi penyinkronan
Setelah pekerjaan dimulai, jalankan skenario berikut untuk memastikan bahwa perubahan data dan skema keduanya disebarkan ke StarRocks secara real time.
Hubungkan ke StarRocks sebelum menjalankan kueri verifikasi:
mysql -h127.0.0.1 -P 9030 -uroot
USE test_cdc;Verifikasi data awal
Kueri tabel StarRocks untuk memastikan baris seed telah disinkronkan:
SELECT * FROM runoob_tbl1;Output yang diharapkan:
+-----------+--------------+---------------+-----------------+---------+
| runoob_id | runoob_title | runoob_author | submission_date | add_col |
+-----------+--------------+---------------+-----------------+---------+
| 18 | first | tom | 2022-06-22 | 3 |
+-----------+--------------+---------------+-----------------+---------+Verifikasi INSERT
Pada tab SQL Console instans ApsaraDB RDS for MySQL, masukkan satu baris:
INSERT INTO runoob_tbl (`runoob_id`, `runoob_title`, `runoob_author`, `submission_date`, `add_col`)
VALUES (1, 'second', 'tom2', '2022-06-23', 1);Kueri StarRocks untuk memastikan kedua baris muncul:
SELECT * FROM runoob_tbl1;Output yang diharapkan:
+-----------+--------------+---------------+-----------------+---------+
| runoob_id | runoob_title | runoob_author | submission_date | add_col |
+-----------+--------------+---------------+-----------------+---------+
| 1 | second | tom2 | 2022-06-23 | 1 |
| 18 | new | tom | 2022-06-22 | 3 |
+-----------+--------------+---------------+-----------------+---------+Verifikasi UPDATE
Pada SQL Console MySQL, perbarui satu baris:
UPDATE runoob_tbl SET runoob_title = 'new' WHERE runoob_id = 18;Kueri StarRocks untuk memastikan perubahannya:
SELECT * FROM runoob_tbl1;Output yang diharapkan:
+-----------+--------------+---------------+-----------------+---------+
| runoob_id | runoob_title | runoob_author | submission_date | add_col |
+-----------+--------------+---------------+-----------------+---------+
| 1 | second | tom2 | 2022-06-23 | 1 |
| 18 | new | tom | 2022-06-22 | 3 |
+-----------+--------------+---------------+-----------------+---------+Verifikasi DELETE
Pada SQL Console MySQL, hapus satu baris:
DELETE FROM runoob_tbl WHERE runoob_id = 1;Kueri StarRocks untuk memastikan baris tersebut dihapus:
SELECT * FROM runoob_tbl1;Output yang diharapkan:
+-----------+--------------+---------------+-----------------+---------+
| runoob_id | runoob_title | runoob_author | submission_date | add_col |
+-----------+--------------+---------------+-----------------+---------+
| 18 | new | tom | 2022-06-22 | 3 |
+-----------+--------------+---------------+-----------------+---------+Verifikasi perubahan skema: tambahkan kolom nullable
Pada SQL Console MySQL, tambahkan kolom nullable dan masukkan satu baris dengan nilai pada kolom tersebut:
ALTER TABLE `runoob_tbl` ADD COLUMN `add_col2` INT;
INSERT INTO runoob_tbl (`runoob_id`, `runoob_title`, `runoob_author`, `submission_date`, `add_col`, `add_col2`)
VALUES (1, 'second', 'tom2', '2022-06-23', 1, 2);Kueri StarRocks untuk memastikan kolom baru muncul dan baris yang ada menampilkan NULL:
SELECT * FROM runoob_tbl1;Output yang diharapkan:
+-----------+--------------+---------------+-----------------+---------+----------+
| runoob_id | runoob_title | runoob_author | submission_date | add_col | add_col2 |
+-----------+--------------+---------------+-----------------+---------+----------+
| 18 | new | tom | 2022-06-22 | 3 | NULL |
| 1 | second | tom2 | 2022-06-23 | 1 | 2 |
+-----------+--------------+---------------+-----------------+---------+----------+Kolom add_col2 secara otomatis ditambahkan ke StarRocks ketika baris data pertama yang memuatnya tiba.
CDAS: menyinkronkan seluruh database
Pernyataan CREATE DATABASE AS (CDAS) merupakan syntactic sugar untuk CTAS — ia membuat satu penerapan Flink yang menyinkronkan semua tabel yang dipilih dari database MySQL ke StarRocks sekaligus. Gunakan klausa INCLUDING TABLE untuk memilih tabel tertentu berdasarkan nama.
Buat katalog MySQL dan StarRocks terlebih dahulu (sama seperti Langkah 2), lalu jalankan:
CREATE DATABASE IF NOT EXISTS sr_db WITH (
'starrocks.create.table.properties' = 'buckets 8',
'starrocks.create.table.mode' = 'simple',
'jdbc-url' = 'jdbc:mysql://172.16.**.**:9030',
'load-url' = '172.16.**.**:18030',
'username' = 'test',
'password' = '123456',
'sink.buffer-flush.interval-ms' = '5000',
'sink.properties.row_delimiter' = '\x02',
'sink.properties.column_separator' = '\x01'
)
AS DATABASE mysql.test_cdc INCLUDING TABLE 'tbl1', 'tbl2', 'tbl3'
/*+ OPTIONS (
'connector' = 'mysql-cdc',
'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
'port' = '3306',
'username' = 'test',
'password' = '123456',
'database-name' = 'test_cdc'
) */;Klausa INCLUDING TABLE menerima daftar nama tabel yang dipisahkan koma. Hilangkan klausa ini untuk menyinkronkan semua tabel di database sumber.
Langkah selanjutnya
Continuously load data from Apache Flink — referensi parameter lengkap untuk konektor StarRocks Flink
Checkpointing — konfigurasikan interval dan timeout checkpoint untuk semantik exactly-once
Access the UI and ports — konfirmasi port
load-urlyang benar untuk versi kluster EMR Anda