全部产品
Search
文档中心

E-MapReduce:Gunakan pernyataan CTAS dan CDAS Realtime Compute for Apache Flink untuk menyinkronkan data dari instans ApsaraDB RDS for MySQL ke kluster StarRocks

更新时间:Mar 26, 2026

Pernyataan CREATE TABLE AS (CTAS) secara otomatis membuat tabel yang sesuai di StarRocks dan terus-menerus menyinkronkan data serta perubahan skema dari sumber MySQL. Pernyataan CREATE DATABASE AS (CDAS) memperluas fungsionalitas ini ke seluruh database. Topik ini menjelaskan cara menggunakan kedua pernyataan tersebut di Realtime Compute for Apache Flink untuk memindahkan data pemrosesan transaksi (TP) dari ApsaraDB RDS for MySQL ke kluster StarRocks E-MapReduce (EMR) guna pemrosesan analitis (AP).

Ikhtisar arsitektur

KomponenPeran
ApsaraDB RDS for MySQLSumber — asal event change data capture (CDC) dan perubahan skema
Realtime Compute for Apache FlinkMesin pemrosesan — membaca CDC MySQL dan menulis ke StarRocks
Kluster EMR StarRocksSink — tujuan analitis untuk data yang disinkronkan

Cara kerja

Saat menjalankan pernyataan CTAS, Flink melakukan dua operasi secara berurutan:

  1. Pemeriksaan pembuatan tabel — Flink memeriksa apakah tabel tujuan sudah ada di StarRocks.

    • Jika belum ada, Flink membuat tabel tersebut dengan skema yang sama seperti sumber berdasarkan katalog tujuan.

    • Jika sudah ada, Flink melewatkan pembuatan. Jika skema yang ada berbeda dari skema sumber, akan dikembalikan error.

  2. Penyinkronan data — Flink memulai pekerjaan berkelanjutan yang menyinkronkan data dan perubahan skema dari tabel sumber ke tabel tujuan.

Perilaku perubahan skema

CTAS menggunakan kebijakan tetap untuk menyebarkan perubahan skema. Tabel berikut merangkum perubahan yang didukung dan tidak didukung secara otomatis.

Perubahan skema yang didukung

Jenis perubahanPerilaku di StarRocks
Menambahkan kolom nullableKolom ditambahkan di akhir tabel tujuan; data masuk mengisi kolom tersebut
Menghapus kolom nullableKolom tetap ada di StarRocks tetapi diisi dengan nilai NULL
Mengganti nama kolomKolom dengan nama baru ditambahkan di akhir; kolom dengan nama lama diisi dengan nilai NULL. Misalnya, mengganti nama col_a menjadi col_b akan menambahkan col_b di akhir dan mengatur col_a menjadi NULL.

Perubahan skema yang tidak didukung

Jika salah satu perubahan berikut terjadi pada tabel sumber, hapus tabel tujuan di StarRocks dan mulai ulang pekerjaan CTAS. Flink akan membuat ulang tabel tersebut dan menyinkronkan kembali semua data historis.

  • Mengubah tipe data kolom (misalnya, VARCHAR menjadi BIGINT, atau NOT NULL menjadi NULLABLE)

  • Mengubah constraint seperti kunci primer atau indeks

  • Menambah atau menghapus kolom non-nullable

  • Menyesuaikan panjang field dalam pernyataan DDL (Data Definition Language)

CTAS mendeteksi perubahan skema dengan membandingkan skema dari catatan data berturut-turut — bukan dengan mengurai jenis pernyataan DDL. Akibatnya:
Jika sebuah kolom dihapus lalu ditambahkan kembali tanpa perubahan data di antaranya, CTAS menganggap hal ini sebagai tidak ada perubahan skema.
Perubahan skema hanya disebarkan ketika data baru tiba di tabel sumber setelah perubahan tersebut.
Untuk pemetaan tipe field, lihat Continuously load data from Apache Flink.
Saat CTAS menggabungkan beberapa tabel MySQL, Flink secara otomatis menambahkan kolom _db_name dan _table_name ke tabel tujuan untuk melacak sumbernya. Tentukan urutan kolom Anda sendiri mulai dari kolom ketiga.

Prasyarat

Sebelum memulai, pastikan Anda telah:

Contoh dalam topik ini menggunakan MySQL 5.7, EMR 3.39.1, dan Flink yang sepenuhnya dikelola vvr-6.0.3-flink-1.15.

Batasan

  • Ruang kerja Flink, kluster StarRocks, dan instans ApsaraDB RDS for MySQL harus berada dalam virtual private cloud (VPC) yang sama.

  • Versi mesin ApsaraDB RDS for MySQL harus 5.7 atau lebih baru.

  • Akses Internet harus diaktifkan untuk kluster StarRocks.

  • Flink yang sepenuhnya dikelola harus vvr-6.0.3-flink-1.15 atau lebih baru.

Langkah 1: Siapkan data uji

  1. Buat database dan akun pada instans ApsaraDB RDS for MySQL. Lihat Buat database dan akun untuk instans ApsaraDB RDS for MySQL. Berikan izin baca dan tulis kepada akun uji.

    Topik ini menggunakan database bernama test_cdc dan akun bernama test.
  2. Login ke instans ApsaraDB RDS for MySQL menggunakan akun uji. Lihat Gunakan DMS untuk login ke instans ApsaraDB RDS for MySQL.

  3. Buat tabel uji dan masukkan satu baris:

    USE test_cdc;
    
    CREATE TABLE IF NOT EXISTS `runoob_tbl` (
      `runoob_id`      INT UNSIGNED AUTO_INCREMENT,
      `runoob_title`   VARCHAR(100) NOT NULL,
      `runoob_author`  VARCHAR(40)  NOT NULL,
      `submission_date` DATE,
      `add_col`        INT DEFAULT NULL,
      PRIMARY KEY (`runoob_id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
    
    INSERT INTO test_cdc.`runoob_tbl`
      (`runoob_id`, `runoob_title`, `runoob_author`, `submission_date`, `add_col`)
    VALUES (18, 'first', 'tom', '2022-06-22', 3);
  4. Login ke kluster StarRocks melalui SSH. Lihat Login ke kluster.

  5. Hubungkan ke StarRocks:

    mysql -h127.0.0.1 -P 9030 -uroot
  6. Buat pengguna dan berikan izin yang diperlukan untuk tutorial ini:

    CREATE DATABASE test_cdc;
    CREATE USER 'test' IDENTIFIED BY '123456';
    GRANT CREATE TABLE ON DATABASE test_cdc TO test;

Langkah 2: Buat katalog di editor SQL Flink

Pada halaman Draft Editor di konsol Flink yang sepenuhnya dikelola, buat satu katalog untuk MySQL dan satu untuk StarRocks. Lihat Memulai penerapan Flink SQL.

Nilai parameter di bawah ini bersifat contoh. Sesuaikan dengan lingkungan Anda.

Katalog MySQL

CREATE CATALOG mysql WITH (
  'type'             = 'mysql',
  'hostname'         = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
  'port'             = '3306',
  'username'         = 'test',
  'password'         = '123456',
  'default-database' = 'test_cdc'
);
ParameterDeskripsi
typeJenis katalog. Atur ke mysql.
hostnameTitik akhir internal instans ApsaraDB RDS for MySQL. Salin dari halaman Database Connection di konsol ApsaraDB RDS (misalnya, rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com).
portPort database MySQL. Default: 3306.
usernameUsername yang dibuat di Langkah 1: Siapkan data uji. Dalam contoh ini: test.
passwordPassword untuk username yang dibuat di Langkah 1: Siapkan data uji.
default-databaseNama database yang dibuat di Langkah 1: Siapkan data uji. Dalam contoh ini: test_cdc.

Katalog StarRocks

CREATE CATALOG sr WITH (
  'type'     = 'starrocks',
  'endpoint' = '172.16.**.**:9030',
  'username' = 'test',
  'password' = '123456',
  'dbname'   = 'test_cdc'
);
ParameterDeskripsi
typeJenis katalog. Atur ke starrocks.
endpointAlamat IP dan port antarmuka depan StarRocks (misalnya, 172.16..:9030).
usernameUsername yang dibuat di Langkah 1: Siapkan data uji. Dalam contoh ini: test.
passwordPassword untuk username yang dibuat di Langkah 1: Siapkan data uji.
dbnameNama database StarRocks. Dalam contoh ini: test_cdc.

Langkah 3: Tulis dan publikasikan penerapan CTAS

Pada halaman Draft Editor, tulis pernyataan CTAS. Tersedia tiga mode pengiriman — pilih berdasarkan kebutuhan konsistensi Anda.

Semantik at-least-once (direkomendasikan untuk skenario latensi rendah)

Data ditulis pada interval flush yang dapat dikonfigurasi. Penggunaan memori lebih rendah, tetapi penulisan duplikat mungkin terjadi saat terjadi kegagalan.

/* Semantik at-least-once */

USE CATALOG sr;

CREATE TABLE IF NOT EXISTS runoob_tbl_sr WITH (
  'starrocks.create.table.properties' = 'engine = olap primary key(runoob_id) distributed by hash(runoob_id) buckets 8',
  'database-name'                     = 'test_cdc',
  'jdbc-url'                          = 'jdbc:mysql://172.16.**.**:9030',
  'load-url'                          = '172.16.**.**:18030',
  'table-name'                        = 'runoob_tbl_sr',
  'username'                          = 'test',
  'password'                          = '123456',
  'sink.buffer-flush.interval-ms'     = '5000',
  'sink.properties.row_delimiter'     = '\x02',
  'sink.properties.column_separator'  = '\x01'
)
AS TABLE mysql.test_cdc.runoob_tbl
/*+ OPTIONS (
  'connector'     = 'mysql-cdc',
  'hostname'      = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
  'port'          = '3306',
  'username'      = 'test',
  'password'      = '123456',
  'database-name' = 'test_cdc',
  'table-name'    = 'runoob_tbl'
) */;

Semantik exactly-once (direkomendasikan untuk skenario kritis data)

Tidak ada kehilangan data atau duplikasi saat terjadi kegagalan. Visibilitas data bergantung pada interval checkpoint.

/* Semantik exactly-once */

SET 'execution.checkpointing.interval' = '1 min';
SET 'execution.checkpointing.mode'     = 'EXACTLY_ONCE';
SET 'execution.checkpointing.timeout'  = '10 min';

USE CATALOG sr;

CREATE TABLE IF NOT EXISTS runoob_tbl WITH (
  'starrocks.create.table.properties' = 'engine = olap primary key(runoob_id) distributed by hash(runoob_id) buckets 8',
  'database-name'                     = 'test_cdc',
  'jdbc-url'                          = 'jdbc:mysql://172.16.**.**:9030',
  'load-url'                          = '172.16.**.**:18030',
  'table-name'                        = 'runoob_tbl',
  'username'                          = 'test',
  'password'                          = '123456',
  'sink.semantic'                     = 'exactly-once',
  'sink.properties.row_delimiter'     = '\x02',
  'sink.properties.column_separator'  = '\x01'
)
AS TABLE mysql.test_cdc.runoob_tbl
/*+ OPTIONS (
  'connector'     = 'mysql-cdc',
  'hostname'      = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
  'port'          = '3306',
  'username'      = 'test',
  'password'      = '123456',
  'database-name' = 'test_cdc',
  'table-name'    = 'runoob_tbl'
) */;

Untuk opsi konfigurasi checkpoint, lihat Checkpointing.

Mode simple (direkomendasikan untuk penyiapan cepat)

Flink menurunkan definisi tabel dari skema MySQL — tidak perlu menentukan engine, kunci, atau distribusi secara manual. Tabel terpartisi tidak didukung dalam mode simple; buat partisi menggunakan mode normal sebagai gantinya.

/* Mode simple */

USE CATALOG sr;

CREATE TABLE IF NOT EXISTS runoob_tbl1 WITH (
  'starrocks.create.table.properties' = 'buckets 8',
  'starrocks.create.table.mode'       = 'simple',
  'database-name'                     = 'test_cdc',
  'jdbc-url'                          = 'jdbc:mysql://172.16.**.**:9030',
  'load-url'                          = '172.16.**.**:18030',
  'table-name'                        = 'runoob_tbl_sr',
  'username'                          = 'test',
  'password'                          = '123456',
  'sink.buffer-flush.interval-ms'     = '5000',
  'sink.properties.row_delimiter'     = '\x02',
  'sink.properties.column_separator'  = '\x01'
)
AS TABLE mysql.test_cdc.runoob_tbl
/*+ OPTIONS (
  'connector'     = 'mysql-cdc',
  'hostname'      = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
  'port'          = '3306',
  'username'      = 'test',
  'password'      = '123456',
  'database-name' = 'test_cdc',
  'table-name'    = 'runoob_tbl'
) */;

Parameter klausa WITH

ParameterWajibDeskripsi
starrocks.create.table.propertiesYaDefinisi sufiks untuk pernyataan CREATE TABLE StarRocks, tidak termasuk definisi field — misalnya, engine, key, dan buckets.
database-nameYaNama database StarRocks.
jdbc-urlYaURL Java Database Connectivity (JDBC) untuk kueri StarRocks — misalnya, jdbc:mysql://172.16..:9030, di mana 172.16.. adalah alamat IP internal kluster StarRocks.
load-urlYaAlamat IP internal dan port HTTP antarmuka depan StarRocks. Pilih port berdasarkan versi kluster EMR Anda: 18030 untuk EMR V5.9.0 atau lebih baru (versi minor) dan EMR V3.43.0 atau lebih baru (versi minor); 8030 untuk EMR V5.8.0, EMR V3.42.0, atau lebih lama. Lihat Access the UI and ports.
sink.semanticTidakSemantik pengiriman: at-least-once (default) atau exactly-once.
starrocks.create.table.modeTidaknormal (default): tentukan engine, key, dan buckets di starrocks.create.table.properties. simple: Flink mengatur engine=olap, menggunakan kunci primer MySQL, dan mendistribusikan berdasarkan hash pada semua kolom kunci primer. Hanya buckets yang diperlukan di starrocks.create.table.properties. Partisi tidak dibuat.
Jika versi Flink Anda lebih lama dari vvr-6.0.5-flink-1.15, tambahkan 'sink.use.new-apiapi' = 'false' ke klausa WITH. Untuk parameter sink tambahan, lihat Continuously load data from Apache Flink.

Parameter klausa OPTIONS

Klausa OPTIONS mengonfigurasi sumber CDC MySQL.

ParameterDeskripsi
connectorJenis konektor. Atur ke mysql-cdc.
hostnameTitik akhir internal instans ApsaraDB RDS for MySQL (misalnya, rm-bp1nu0c46fn9k****.mysql.rds.aliyuncs.com).
portPort MySQL. Default: 3306.
usernameUsername untuk akses ApsaraDB RDS for MySQL. Gunakan akun yang dibuat di Langkah 1: Siapkan data uji.
passwordPassword untuk akses ApsaraDB RDS for MySQL.
table-nameNama tabel sumber. Dalam contoh ini: runoob_tbl.
database-nameNama database sumber. Dalam contoh ini: test_cdc.

Publikasikan dan mulai penerapan

  1. Pada tab Advanced di halaman Draft Editor, atur Engine Version ke vvr-6.0.3-flink-1.15 atau lebih baru.

  2. Di pojok kanan atas halaman Draft Editor, klik Publish.

  3. Pada halaman Deployments, temukan penerapan baru dan klik Start di kolom Actions.

Langkah 4: Verifikasi penyinkronan

Setelah pekerjaan dimulai, jalankan skenario berikut untuk memastikan bahwa perubahan data dan skema keduanya disebarkan ke StarRocks secara real time.

Hubungkan ke StarRocks sebelum menjalankan kueri verifikasi:

mysql -h127.0.0.1 -P 9030 -uroot
USE test_cdc;

Verifikasi data awal

Kueri tabel StarRocks untuk memastikan baris seed telah disinkronkan:

SELECT * FROM runoob_tbl1;

Output yang diharapkan:

+-----------+--------------+---------------+-----------------+---------+
| runoob_id | runoob_title | runoob_author | submission_date | add_col |
+-----------+--------------+---------------+-----------------+---------+
|        18 | first        | tom           | 2022-06-22      |       3 |
+-----------+--------------+---------------+-----------------+---------+

Verifikasi INSERT

Pada tab SQL Console instans ApsaraDB RDS for MySQL, masukkan satu baris:

INSERT INTO runoob_tbl (`runoob_id`, `runoob_title`, `runoob_author`, `submission_date`, `add_col`)
VALUES (1, 'second', 'tom2', '2022-06-23', 1);

Kueri StarRocks untuk memastikan kedua baris muncul:

SELECT * FROM runoob_tbl1;

Output yang diharapkan:

+-----------+--------------+---------------+-----------------+---------+
| runoob_id | runoob_title | runoob_author | submission_date | add_col |
+-----------+--------------+---------------+-----------------+---------+
|         1 | second       | tom2          | 2022-06-23      |       1 |
|        18 | new          | tom           | 2022-06-22      |       3 |
+-----------+--------------+---------------+-----------------+---------+

Verifikasi UPDATE

Pada SQL Console MySQL, perbarui satu baris:

UPDATE runoob_tbl SET runoob_title = 'new' WHERE runoob_id = 18;

Kueri StarRocks untuk memastikan perubahannya:

SELECT * FROM runoob_tbl1;

Output yang diharapkan:

+-----------+--------------+---------------+-----------------+---------+
| runoob_id | runoob_title | runoob_author | submission_date | add_col |
+-----------+--------------+---------------+-----------------+---------+
|         1 | second       | tom2          | 2022-06-23      |       1 |
|        18 | new          | tom           | 2022-06-22      |       3 |
+-----------+--------------+---------------+-----------------+---------+

Verifikasi DELETE

Pada SQL Console MySQL, hapus satu baris:

DELETE FROM runoob_tbl WHERE runoob_id = 1;

Kueri StarRocks untuk memastikan baris tersebut dihapus:

SELECT * FROM runoob_tbl1;

Output yang diharapkan:

+-----------+--------------+---------------+-----------------+---------+
| runoob_id | runoob_title | runoob_author | submission_date | add_col |
+-----------+--------------+---------------+-----------------+---------+
|        18 | new          | tom           | 2022-06-22      |       3 |
+-----------+--------------+---------------+-----------------+---------+

Verifikasi perubahan skema: tambahkan kolom nullable

Pada SQL Console MySQL, tambahkan kolom nullable dan masukkan satu baris dengan nilai pada kolom tersebut:

ALTER TABLE `runoob_tbl` ADD COLUMN `add_col2` INT;

INSERT INTO runoob_tbl (`runoob_id`, `runoob_title`, `runoob_author`, `submission_date`, `add_col`, `add_col2`)
VALUES (1, 'second', 'tom2', '2022-06-23', 1, 2);

Kueri StarRocks untuk memastikan kolom baru muncul dan baris yang ada menampilkan NULL:

SELECT * FROM runoob_tbl1;

Output yang diharapkan:

+-----------+--------------+---------------+-----------------+---------+----------+
| runoob_id | runoob_title | runoob_author | submission_date | add_col | add_col2 |
+-----------+--------------+---------------+-----------------+---------+----------+
|        18 | new          | tom           | 2022-06-22      |       3 |     NULL |
|         1 | second       | tom2          | 2022-06-23      |       1 |        2 |
+-----------+--------------+---------------+-----------------+---------+----------+

Kolom add_col2 secara otomatis ditambahkan ke StarRocks ketika baris data pertama yang memuatnya tiba.

CDAS: menyinkronkan seluruh database

Pernyataan CREATE DATABASE AS (CDAS) merupakan syntactic sugar untuk CTAS — ia membuat satu penerapan Flink yang menyinkronkan semua tabel yang dipilih dari database MySQL ke StarRocks sekaligus. Gunakan klausa INCLUDING TABLE untuk memilih tabel tertentu berdasarkan nama.

Buat katalog MySQL dan StarRocks terlebih dahulu (sama seperti Langkah 2), lalu jalankan:

CREATE DATABASE IF NOT EXISTS sr_db WITH (
  'starrocks.create.table.properties' = 'buckets 8',
  'starrocks.create.table.mode'       = 'simple',
  'jdbc-url'                          = 'jdbc:mysql://172.16.**.**:9030',
  'load-url'                          = '172.16.**.**:18030',
  'username'                          = 'test',
  'password'                          = '123456',
  'sink.buffer-flush.interval-ms'     = '5000',
  'sink.properties.row_delimiter'     = '\x02',
  'sink.properties.column_separator'  = '\x01'
)
AS DATABASE mysql.test_cdc INCLUDING TABLE 'tbl1', 'tbl2', 'tbl3'
/*+ OPTIONS (
  'connector'     = 'mysql-cdc',
  'hostname'      = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
  'port'          = '3306',
  'username'      = 'test',
  'password'      = '123456',
  'database-name' = 'test_cdc'
) */;

Klausa INCLUDING TABLE menerima daftar nama tabel yang dipisahkan koma. Hilangkan klausa ini untuk menyinkronkan semua tabel di database sumber.

Langkah selanjutnya