全部产品
Search
文档中心

E-MapReduce:Gunakan Realtime Compute for Apache Flink untuk menyinkronkan data dari MySQL ke StarRocks menggunakan pernyataan CTAS

更新时间:Oct 30, 2025

Topik ini menjelaskan cara menggunakan Realtime Compute for Apache Flink untuk menyinkronkan data dari MySQL ke EMR Serverless StarRocks menggunakan pernyataan CREATE TABLE AS (CTAS).

Informasi latar belakang

Anda dapat menggunakan pernyataan CTAS atau CREATE DATABASE AS (CDAS) untuk menyinkronkan data dari MySQL ke EMR Serverless StarRocks. Pernyataan CTAS dapat menyinkronkan skema dan data dari satu tabel, sedangkan pernyataan CDAS dapat menyinkronkan skema dan data dari seluruh database atau beberapa tabel dalam database yang sama. Topik ini berfokus pada penggunaan pernyataan CTAS. Pernyataan CDAS digunakan dengan cara yang mirip dengan pernyataan CTAS. Untuk informasi lebih lanjut, lihat Pengenalan CDAS.

Dengan menggunakan pernyataan CREATE TABLE AS (CTAS), Anda dapat secara otomatis membuat tabel di StarRocks dengan skema yang sama seperti tabel di MySQL dan menyinkronkan data. Anda juga dapat menyinkronkan perubahan skema tabel sumber ke tabel tujuan secara real-time, meningkatkan efisiensi pembuatan tabel di penyimpanan tujuan dan pemeliharaan perubahan skema tabel sumber.

Saat Anda menjalankan pernyataan CTAS, Flink melakukan operasi berikut:

  1. Periksa apakah tabel tujuan ada di penyimpanan tujuan.

    • Jika tabel tujuan tidak ada, tabel tujuan yang sesuai dibuat di penyimpanan tujuan berdasarkan katalog tempat tabel tersebut milik. Tabel tujuan memiliki skema yang sama dengan tabel sumber.

    • Jika tabel tujuan sudah ada, Flink melewati langkah pembuatan tabel. Namun, jika skema tabel tujuan yang ada berbeda dari skema tabel sumber, pesan kesalahan akan dikembalikan.

  2. Komit dan jalankan pekerjaan sinkronisasi data. Flink menyinkronkan data dan perubahan skema tabel sumber ke tabel tujuan.

Kebijakan sinkronisasi perubahan skema menggunakan pernyataan CTAS untuk menyinkronkan data secara real-time dan perubahan skema tabel sumber ke tabel tujuan.

Perubahan skema mencakup pembuatan tabel dan perubahan skema setelah tabel dibuat.

  • Perubahan skema berikut didukung:

    • Tambah kolom nullable: Pernyataan secara otomatis menambahkan kolom terkait ke akhir skema tabel tujuan dan menyinkronkan data ke kolom yang ditambahkan.

    • Hapus kolom nullable: Pernyataan secara otomatis mengisi kolom nullable tabel tujuan dengan nilai null alih-alih menghapus kolom dari tabel.

    • Ganti nama kolom: Pernyataan menambahkan kolom yang diganti namanya ke akhir tabel tujuan dan mengisi kolom sebelum penggantian nama dengan nilai null.

      Sebagai contoh, jika nama kolom col_a di tabel sumber diubah menjadi col_b, kolom col_b ditambahkan ke akhir tabel tujuan dan kolom col_a secara otomatis diisi dengan nilai null.

  • Perubahan skema berikut tidak didukung:

    • Perubahan tipe data.

      Sebagai contoh, data di kolom diubah dari tipe VARCHAR menjadi tipe BIGINT, atau properti kolom diubah dari NOT NULL menjadi NULLABLE.

    • Perubahan kendala, seperti kunci utama atau indeks.

    • Penambahan atau penghapusan kolom non-nullable.

Catatan
  • Jika skema tabel sumber memiliki salah satu perubahan di atas, Anda harus menghapus tabel tujuan dan memulai ulang pekerjaan yang menjalankan pernyataan CTAS. Dengan cara ini, tabel tujuan dibuat kembali dan data historis disinkronkan ulang ke tabel tujuan.
  • Pernyataan CTAS tidak mengidentifikasi jenis pernyataan DDL, tetapi membandingkan perbedaan skema antara dua catatan data sebelum dan sesudah skema diubah. Oleh karena itu, jika Anda menghapus kolom dan kemudian menambahkan kolom lagi, dan tidak ada perubahan data antara dua pernyataan DDL yang digunakan untuk menghapus dan menambah kolom, pernyataan CTAS menganggap bahwa tidak ada perubahan skema yang terjadi. Demikian pula, pernyataan CTAS tidak memicu sinkronisasi perubahan skema bahkan jika Anda menambahkan kolom ke tabel sumber. Pernyataan hanya mengidentifikasi perubahan skema ketika data berubah di tabel sumber. Dalam hal ini, pernyataan menyinkronkan perubahan skema ke tabel tujuan.
  • Untuk informasi lebih lanjut tentang tipe bidang yang didukung oleh pernyataan CTAS, lihat Muat data secara kontinu dari Apache Flink®.

Prasyarat

Catatan

Topik ini menggunakan MySQL 5.7 dan versi Flink vvr-8.0.11-flink-1.17 sebagai contoh.

Batasan

  • Kluster Flink, instans EMR Serverless StarRocks, dan instans ApsaraDB RDS for MySQL harus berada di VPC yang sama.

  • Instans ApsaraDB RDS for MySQL harus versi 5.7 atau yang lebih baru.

Langkah 1: Persiapkan data uji

  1. Buat database uji dan akun. Untuk informasi lebih lanjut, lihat Buat Database dan Akun.

    Setelah Anda membuat database dan akun, berikan izin baca dan tulis kepada akun uji.

    Catatan

    Dalam topik ini, database diberi nama test_cdc, dan akun diberi nama test.

  2. Gunakan akun uji yang dibuat untuk terhubung ke instans MySQL. Untuk informasi lebih lanjut, lihat Gunakan DMS untuk Masuk ke Instans ApsaraDB RDS for MySQL.

  3. Di MySQL, jalankan perintah berikut untuk membuat tabel data.

    use test_cdc;
    -- Buat tabel
    CREATE TABLE IF NOT EXISTS `runoob_tbl`(
       `runoob_id` INT UNSIGNED AUTO_INCREMENT,
       `runoob_title` VARCHAR(100) NOT NULL,
       `runoob_author` VARCHAR(40) NOT NULL,
       `submission_date` DATE,
       `add_col` int DEFAULT NULL,
       PRIMARY KEY ( `runoob_id` )
    )ENGINE=InnoDB DEFAULT CHARSET=utf8;
    
    -- Masukkan data
    INSERT INTO test_cdc.`runoob_tbl` (`runoob_id`,`runoob_title`,`runoob_author`,`submission_date`,`add_col`) values (18,'first','tom','2025-06-22 17:13:44',3)
  4. Masuk dan terhubung ke instans EMR Serverless StarRocks. Untuk informasi lebih lanjut, lihat Gunakan Klien MySQL untuk Terhubung ke Instans StarRocks.

  5. Jalankan perintah berikut untuk membuat database test_cdc dan pengguna uji. Anda dapat membuat pengguna sebagai super administrator (contoh kata sandi: 1qaz!QAZ) atau sebagai pengguna biasa dan memberikan izin pengguna pada database dan tabelnya. Untuk informasi lebih lanjut, lihat Kelola Pengguna.

    -- Buat database
    CREATE DATABASE test_cdc;
    -- Buat pengguna
    CREATE USER 'test' IDENTIFIED by '1qaz!QAZ';
    -- Berikan izin database kepada pengguna
    GRANT ALL on test_cdc to test;
    -- Berikan izin tabel kepada pengguna
    GRANT ALL ON ALL TABLES IN DATABASE test_cdc to test;

Langkah 2: Buat katalog di Konsol Flink

Buat katalog MySQL dan StarRocks di halaman Data Management di Konsol Realtime Compute for Apache Flink. Untuk informasi lebih lanjut, lihat Data Management.

Catatan

Konfigurasi parameter hanya untuk referensi. Konfigurasikan parameter sesuai kebutuhan.

  • Katalog MySQL

    • Contoh Kode

      CREATE CATALOG mysql WITH (
        'type' = 'mysql',
        'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
        'port' = '3306',
        'username' = 'emr-test',
        'password' = '123456',
        'default-database' = 'test_cdc'
      );
    • Parameter

      Parameter

      Deskripsi

      type

      Tipe. Nilainya tetap mysql.

      hostname

      Titik akhir internal dari instans RDS. Anda dapat pergi ke halaman Koneksi Database dari instans RDS dan klik titik akhir internal untuk menyalinnya. Contoh: rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com.

      port

      Nomor port layanan database MySQL. Nilai default adalah 3306.

      username

      Nama pengguna untuk layanan database MySQL.

      Masukkan nama pengguna akun di Langkah 1: Persiapkan data uji. Dalam contoh ini, test digunakan.

      password

      Kata sandi untuk layanan database MySQL.

      Masukkan kata sandi akun di Langkah 1: Persiapkan data uji.

      default-database

      Nama database MySQL default.

      Masukkan nama database yang dibuat di Langkah 1: Persiapkan data uji. Dalam contoh ini, test_cdc digunakan.

  • Katalog StarRocks

    • Contoh Kode

      CREATE CATALOG sr  WITH (
        'type' = 'starrocks',
        'endpoint' = 'fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030',
        'username' = 'test',
        'password' = '1qaz!QAZ',
        'dbname' = 'test_cdc'
      );
    • Parameter

      Parameter

      Deskripsi

      type

      Tipe. Nilainya tetap starrocks.

      endpoint

      Titik akhir internal dan port query node frontend (FE). Formatnya adalah Titik akhir internal node FE dari instans EMR Serverless StarRocks:9030.

      Contoh: fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030.

      Catatan

      Untuk informasi tentang cara mendapatkan titik akhir internal node FE dari instans EMR Serverless StarRocks, lihat Lihat daftar instans dan detail instans.

      username

      Nama pengguna untuk StarRocks.

      Masukkan nama pengguna akun di Langkah 1: Persiapkan data uji. Dalam contoh ini, test digunakan.

      password

      Kata sandi untuk layanan database StarRocks.

      Masukkan kata sandi akun di Langkah 1: Persiapkan data uji.

      dbname

      Nama database StarRocks.

      Masukkan nama database yang dibuat di Langkah 1: Persiapkan data uji. Dalam contoh ini, test_cdc digunakan.

Langkah 3: Buat dan terapkan pekerjaan

  1. Di halaman Data Development > ETL di Konsol Realtime Compute for Apache Flink, tulis pernyataan CTAS.

    Berikut ini tiga contoh yang diberikan:

    • Semantik setidaknya sekali: Gunakan item konfigurasi sink.buffer-flush.interval-ms untuk mengonfigurasi interval penulisan data ke StarRocks. Mode ini memberikan interval penulisan pendek dan penggunaan memori rendah.

      /*
            Semantik setidaknya sekali
      */
      
      use CATALOG sr;
      
      CREATE TABLE IF NOT EXISTS runoob_tbl with (
      'starrocks.create.table.properties'=' engine = olap primary key(runoob_id) distributed by hash(runoob_id ) buckets 8',
      'database-name'='test_cdc',
      'jdbc-url'='jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030',
      'load-url'='fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030',
      'table-name'='runoob_tbl',
      'username'='test',
      'password' = '1qaz!QAZ',
      'sink.buffer-flush.interval-ms' = '5000',
      'sink.properties.row_delimiter' = '\x02',
      'sink.properties.column_separator' = '\x01'
      )
       as table mysql.test_cdc.runoob_tbl;
                                      
    • Semantik tepat-sekali: Anda harus menentukan interval checkpoint. Keuntungannya adalah data tidak hilang atau duplikat saat terjadi kesalahan. Kerugiannya adalah interval checkpoint menentukan kapan data terlihat. Untuk informasi lebih lanjut, lihat Checkpointing.

      /*
            Semantik tepat-sekali.
      */
      set 'execution.checkpointing.interval' = '1 min';
      set 'execution.checkpointing.mode' = 'EXACTLY_ONCE';
      set 'execution.checkpointing.timeout' = '10 min';
      
      use CATALOG sr;
      
      CREATE TABLE IF NOT EXISTS runoob_tbl with (
      'starrocks.create.table.properties'=' engine = olap primary key(runoob_id) distributed by hash(runoob_id ) buckets 8',
      'database-name'='test_cdc',
      'jdbc-url'='jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030',
      'load-url'='fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030',
      'table-name'='runoob_tbl',
      'username'='test',
      'password' = '1qaz!QAZ',
      'sink.semantic' = 'exactly-once',
      'sink.properties.row_delimiter' = '\x02',
      'sink.properties.column_separator' = '\x01'
      )
       as table mysql.test_cdc.runoob_tbl;
                                      
    • Mode Sederhana: Saat Anda membuat tabel, Anda tidak perlu menentukan bidang tabel sumber. Tabel dibuat berdasarkan skema tabel MySQL. Ini nyaman bagi pengembang. Namun, Anda tidak dapat membuat partisi. Untuk tabel yang memerlukan partisi, Anda harus membuatnya dalam mode normal.

      /*
            Dua contoh sebelumnya menggunakan mode normal. Contoh ini menggunakan mode sederhana.
      */
      
      use CATALOG sr;
      
      CREATE TABLE IF NOT EXISTS runoob_tbl with (
      'starrocks.create.table.mode'='simple',
       'database-name'='test_cdc',
      'jdbc-url'='jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030',
      'load-url'='fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030',
      'table-name'='runoob_tbl',
      'username'='test',
      'password' = '1qaz!QAZ',
      'sink.buffer-flush.interval-ms' = '5000',
      'sink.properties.row_delimiter' = '\x02',
      'sink.properties.column_separator' = '\x01'
      )
       as table mysql.test_cdc.runoob_tbl;
                                      

    Tabel 1. Parameter WITH

    Parameter

    Diperlukan

    Deskripsi

    starrocks.create.table.properties

    Ya

    Definisi sufiks lain dalam pernyataan pembuatan tabel StarRocks, tidak termasuk definisi bidang. Contoh: engine, key, dan buckets.

    database-name

    Ya

    Nama database StarRocks.

    Dalam topik ini, nama database adalah test_cdc.

    jdbc-url

    Ya

    Digunakan untuk menjalankan query di StarRocks.

    Contoh: jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030. Dalam contoh ini, fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com adalah titik akhir internal node FE dari instans EMR Serverless StarRocks.

    Catatan

    Untuk informasi tentang cara mendapatkan alamat jaringan internal node FE dari instans EMR Serverless StarRocks, lihat Lihat daftar instans dan detail.

    load-url

    Ya

    Titik akhir internal dan port query node FE. Formatnya adalah Titik akhir internal node FE dari instans EMR Serverless StarRocks:8030.

    Contoh: fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030.

    Catatan

    Untuk informasi tentang cara mendapatkan alamat jaringan internal node FE dari instans EMR Serverless StarRocks, lihat Lihat Daftar Instans dan Detail.

    sink.semantic

    Tidak

    Atur nilai menjadi exactly-once untuk memastikan semantik tepat-sekali. Nilai default adalah at-least-once.

    starrocks.create.table.mode

    Tidak

    Nilai berikut didukung:

    • mode normal (default): Anda harus menentukan konfigurasi lengkap, seperti engine, key, dan buckets, dalam parameter starrocks.create.table.properties seperti yang ditunjukkan dalam contoh.

    • mode sederhana: Engine secara default adalah olap. Tipe kunci adalah primary key, dan primary key sama dengan primary key tabel MySQL. Distribusi default adalah by hash (semua primary key), tanpa partisi. Dalam parameter starrocks.create.table.properties, buckets diperlukan, dan properties opsional.

    sink.properties.row_delimiter

    Tidak

    Pemisah baris kustom.

    sink.properties.column_separator

    Tidak

    Pemisah kolom kustom.

    Catatan
    • Jika Anda menggunakan versi lebih awal dari Flink vvr-6.0.5-flink-1.15, Anda harus menambahkan 'sink.use.new-apiapi'='false', ke klausa WITH.

    • Untuk informasi lebih lanjut tentang konfigurasi lainnya, lihat Muat Data Secara Kontinu dari Apache Flink.

    Tabel 2. Parameter OPTIONS

    Parameter

    Deskripsi

    connector

    Tipe. Nilainya tetap mysql-cdc.

    hostname

    Titik akhir internal dari instans RDS.

    Anda dapat pergi ke halaman Koneksi Database dari instans RDS dan klik titik akhir internal untuk menyalinnya. Contoh: rm-bp1nu0c46fn9k****.mysql.rds.aliyuncs.com.

    port

    Nomor port layanan database MySQL. Nilai default adalah 3306.

    username

    Nama pengguna untuk layanan database MySQL.

    Masukkan nama pengguna akun di Langkah 1: Persiapkan data uji. Dalam contoh ini, test digunakan.

    password

    Kata sandi untuk layanan database MySQL.

    Masukkan kata sandi akun dari Langkah 1: Persiapkan data uji.

    table-name

    Nama tabel di StarRocks.

    Masukkan nama tabel yang dibuat di Langkah 1: Persiapkan data uji. Dalam contoh ini, runoob_tbl digunakan.

    database-name

    Nama database MySQL default.

    Masukkan nama database yang dibuat di Langkah 1: Persiapkan data uji. Dalam contoh ini, test_cdc digunakan.

  2. Klik Deploy.

  3. Di halaman Terapkan Versi Baru, pilih target penyebaran dan klik OK.

  4. Di halaman O&M Pekerjaan, klik Start di kolom Aksi untuk pekerjaan yang ingin Anda mulai.

Catatan

Anda tidak dapat men-debug pernyataan CTAS di Konsol Realtime Compute for Apache Flink.

Langkah 4: Verifikasi hasil sinkronisasi data

Query data

  1. Masuk dan terhubung ke instans EMR Serverless StarRocks. Untuk informasi lebih lanjut, lihat Gunakan Klien MySQL untuk Terhubung ke Instans StarRocks.

  2. Di jendela koneksi StarRocks, jalankan perintah berikut untuk melihat data tabel.

    use test_cdc;
    select * from runoob_tbl;

    Hasil berikut dikembalikan, yang menunjukkan bahwa data dari MySQL telah disinkronkan ke StarRocks.

    +-----------+--------------+---------------+-----------------+---------+
    | runoob_id | runoob_title | runoob_author | submission_date | add_col |
    +-----------+--------------+---------------+-----------------+---------+
    |        18 | first        | tom           | 2025-06-22      |       3 |
    +-----------+--------------+---------------+-----------------+---------+

Sinkronkan data yang dimasukkan

  1. Di jendela database RDS, jalankan perintah berikut untuk memasukkan data.

    INSERT INTO runoob_tbl(`runoob_id`,`runoob_title`,`runoob_author`,`submission_date`,`add_col`)  values(1,'second','tom2','2022-06-23',1)
  2. Di jendela koneksi StarRocks, jalankan perintah berikut untuk melihat data tabel.

    select * from runoob_tbl;

    Hasil berikut dikembalikan, yang menunjukkan bahwa data telah dimasukkan.

    +-----------+--------------+---------------+-----------------+---------+
    | runoob_id | runoob_title | runoob_author | submission_date | add_col |
    +-----------+--------------+---------------+-----------------+---------+
    |         1 | second       | tom2          | 2025-06-23      |       1 |
    |        18 | first        | tom           | 2025-06-22      |       3 |
    +-----------+--------------+---------------+-----------------+---------+

Sinkronkan data yang diperbarui

  1. Di jendela database RDS, jalankan perintah berikut untuk memperbarui data yang ditentukan.

    update runoob_tbl set runoob_title= 'new' where runoob_id = 18
  2. Di jendela koneksi StarRocks, jalankan perintah berikut untuk melihat data tabel.

    select * from runoob_tbl;

    Hasil berikut dikembalikan, yang menunjukkan bahwa data telah diperbarui.

    +-----------+--------------+---------------+-----------------+---------+
    | runoob_id | runoob_title | runoob_author | submission_date | add_col |
    +-----------+--------------+---------------+-----------------+---------+
    |         1 | second       | tom2          | 2025-06-23      |       1 |
    |        18 | new          | tom           | 2025-06-22      |       3 |
    +-----------+--------------+---------------+-----------------+---------+

Sinkronkan data yang dihapus

  1. Di jendela database RDS, jalankan perintah berikut untuk menghapus data yang ditentukan.

    DELETE FROM runoob_tbl WHERE runoob_id = 1
  2. Di jendela koneksi StarRocks, jalankan perintah berikut untuk melihat data tabel.

    select * from runoob_tbl;

    Hasil berikut dikembalikan, yang menunjukkan bahwa data telah dihapus.

    +-----------+--------------+---------------+-----------------+---------+ 
    | runoob_id | runoob_title | runoob_author | submission_date | add_col | 
    +-----------+--------------+---------------+-----------------+---------+
    |        18 | new          | tom           | 2025-06-22      |       3 | 
    +-----------+--------------+---------------+-----------------+---------+

Sinkronkan kolom nullable yang ditambahkan

  1. Di jendela database RDS, jalankan perintah berikut untuk menambahkan kolom nullable.

    alter table `runoob_tbl` add COLUMN `add_col2` INT;
  2. Jalankan perintah berikut untuk memasukkan data.

    INSERT INTO runoob_tbl(`runoob_id`,`runoob_title`,`runoob_author`,`submission_date`,`add_col`,`add_col2`)  values(1,'second','tom2','2022-06-23',1,2)
  3. Di jendela koneksi StarRocks, jalankan perintah berikut untuk melihat data tabel.

    select * from runoob_tbl;

    Hasil berikut dikembalikan, yang menunjukkan bahwa skema telah berubah.

    +-----------+--------------+---------------+-----------------+---------+----------+
    | runoob_id | runoob_title | runoob_author | submission_date | add_col | add_col2 | 
    +-----------+--------------+---------------+-----------------+---------+----------+ 
    |        18 | new          | tom           | 2025-06-22      |       3 |     NULL |
    +-----------+--------------+---------------+-----------------+---------+----------+ 
    |         1 | second       | tom2          | 2025-06-23      |       1 |      2   |
    +-----------+--------------+---------------+-----------------+---------+----------+ 

Pengenalan CDAS

CDAS adalah gula sintaksis untuk CTAS. Pernyataan CDAS memungkinkan sinkronisasi seluruh database dari MySQL. Ini menghasilkan pekerjaan Flink di mana sumbernya adalah database di MySQL dan tujuannya adalah set tabel yang sesuai di StarRocks. Anda juga dapat menggunakan sintaks `including table` untuk memilih hanya sebagian subset tabel dari database untuk operasi CDAS.

Mirip dengan eksekusi CTAS, Anda harus membuat katalog MySQL dan StarRocks sebelum menjalankan pernyataan CDAS. Kode berikut memberikan contoh sintaksis.

CREATE DATABASE IF NOT EXISTS sr_db with (
'starrocks.create.table.properties'=' buckets 8',
'starrocks.create.table.mode'='simple',
'jdbc-url'='jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030',
'load-url'='fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030',
'sink.buffer-flush.interval-ms' = '5000',
'sink.properties.row_delimiter' = '\x02',
'sink.properties.column_separator' = '\x01'
)
 as DATABASE mysql.test_cdc including table
 'tabl1','tbl2','tbl3';

Referensi

Realtime Compute for Apache Flink mendukung menyinkronkan data ke StarRocks tidak hanya menggunakan pernyataan CTAS tetapi juga file YAML ingest data. Untuk informasi lebih lanjut, lihat Ingest Data.