全部产品
Search
文档中心

E-MapReduce:Gunakan layanan Flink dalam kluster Dataflow untuk menyinkronkan data dari MySQL ke StarRocks menggunakan pernyataan CTAS

更新时间:Oct 18, 2025

Topik ini menjelaskan cara menggunakan layanan Flink dalam kluster EMR Dataflow untuk menyinkronkan data dari MySQL ke EMR Serverless StarRocks menggunakan pernyataan CREATE TABLE AS (CTAS).

Informasi latar belakang

Anda dapat menggunakan pernyataan CTAS atau CREATE DATABASE AS (CDAS) untuk menyinkronkan data dari MySQL ke EMR Serverless StarRocks. Pernyataan CTAS menyinkronkan skema dan data dari satu tabel, sedangkan pernyataan CDAS menyinkronkan skema dan data dari seluruh database atau beberapa tabel dalam database yang sama. Topik ini berfokus pada penggunaan pernyataan CTAS. Untuk informasi lebih lanjut tentang CDAS, lihat Pengenalan CDAS.

Dengan menggunakan pernyataan CTAS, Anda dapat secara otomatis membuat tabel di StarRocks dengan skema yang identik dengan tabel di MySQL serta menyinkronkan data. Selain itu, perubahan skema tabel sumber disinkronkan secara real-time ke tabel tujuan, meningkatkan efisiensi pembuatan tabel di penyimpanan tujuan dan pemeliharaan perubahan skema.

Saat mengeksekusi pernyataan CTAS, Flink melakukan operasi berikut:

  1. Memeriksa apakah tabel tujuan ada di penyimpanan tujuan.

    • Jika tabel tujuan tidak ada, tabel tersebut dibuat di penyimpanan tujuan berdasarkan katalog tempat tabel tersebut termasuk. Tabel tujuan memiliki skema yang sama dengan tabel sumber.

    • Jika tabel tujuan sudah ada, Flink melewati langkah pembuatan tabel. Namun, jika skema tabel tujuan berbeda dari skema tabel sumber, pesan kesalahan akan dikembalikan.

  2. Menjalankan pekerjaan sinkronisasi data. Flink menyinkronkan data dan perubahan skema tabel sumber ke tabel tujuan.

Kebijakan sinkronisasi perubahan skema menggunakan pernyataan CTAS untuk menyinkronkan data secara real-time dan perubahan skema tabel sumber ke tabel tujuan.

Perubahan skema mencakup pembuatan tabel dan modifikasi skema setelah tabel dibuat.

  • Perubahan skema berikut didukung:

    • Menambahkan kolom nullable: Pernyataan secara otomatis menambahkan kolom terkait ke akhir skema tabel tujuan dan menyinkronkan data ke kolom yang ditambahkan.

    • Menghapus kolom nullable: Pernyataan secara otomatis mengisi kolom nullable tabel tujuan dengan nilai null alih-alih menghapus kolom dari tabel.

    • Mengganti nama kolom: Pernyataan menambahkan kolom yang diganti namanya ke akhir tabel tujuan dan mengisi kolom sebelum penggantian nama dengan nilai null.

      Sebagai contoh, jika nama kolom col_a di tabel sumber diubah menjadi col_b, kolom col_b ditambahkan ke akhir tabel tujuan dan kolom col_a secara otomatis diisi dengan nilai null.

  • Perubahan skema berikut tidak didukung:

    • Perubahan tipe data.

      Sebagai contoh, data dalam kolom diubah dari tipe VARCHAR menjadi tipe BIGINT, atau properti kolom diubah dari NOT NULL menjadi NULLABLE.

    • Perubahan kendala, seperti kunci utama atau indeks.

    • Penambahan atau penghapusan kolom non-nullable.

Catatan
  • Jika skema tabel sumber mengalami salah satu perubahan di atas, Anda harus menghapus tabel tujuan dan memulai ulang pekerjaan yang mengeksekusi pernyataan CTAS. Dengan cara ini, tabel tujuan dibuat kembali dan data historis disinkronkan ulang ke tabel tujuan.
  • Pernyataan CTAS tidak mengidentifikasi jenis pernyataan DDL, tetapi membandingkan perbedaan skema antara dua catatan data sebelum dan sesudah skema diubah. Oleh karena itu, jika Anda menghapus kolom dan kemudian menambahkan kolom lagi tanpa adanya perubahan data antara dua pernyataan DDL, pernyataan CTAS menganggap bahwa tidak ada perubahan skema yang terjadi. Demikian pula, pernyataan CTAS tidak memicu sinkronisasi perubahan skema bahkan jika Anda menambahkan kolom ke tabel sumber. Pernyataan hanya mengidentifikasi perubahan skema ketika data berubah di tabel sumber. Dalam hal ini, pernyataan menyinkronkan perubahan skema ke tabel tujuan.
  • Untuk informasi lebih lanjut tentang tipe bidang yang didukung oleh pernyataan CTAS, lihat Muat data secara kontinu dari Apache Flink®.

Prasyarat

Catatan

Topik ini menggunakan MySQL 5.7 dan kluster Dataflow EMR-3.42.0 sebagai contoh.

Batasan

  • Kluster Dataflow, instans StarRocks, dan instans ApsaraDB RDS for MySQL harus diterapkan di virtual private cloud (VPC) yang sama.

  • Kluster Dataflow dan instans StarRocks harus dapat diakses melalui Internet.

  • Versi mesin instans ApsaraDB RDS for MySQL harus 5.7 atau lebih baru.

  • Kluster Dataflow harus versi EMR-3.42.0 atau lebih baru atau EMR-5.8.0 atau lebih baru.

Langkah 1: Persiapkan data uji

  1. Buat database uji dan akun uji. Untuk informasi lebih lanjut, lihat Langkah 1: Buat instans ApsaraDB RDS for MySQL dan konfigurasikan database.

    Setelah membuat database uji dan akun, berikan izin baca dan tulis kepada akun tersebut.

    Catatan

    Dalam topik ini, nama database adalah test_cdc dan nama akun adalah emr_test.

  2. Hubungkan ke instans ApsaraDB RDS for MySQL menggunakan akun uji. Untuk informasi lebih lanjut, lihat Langkah 2: Hubungkan ke instans ApsaraDB RDS for MySQL.

  3. Jalankan perintah berikut untuk membuat tabel:

    use test_cdc;
    
    CREATE TABLE IF NOT EXISTS `runoob_tbl`(
       `runoob_id` INT UNSIGNED AUTO_INCREMENT,
       `runoob_title` VARCHAR(100) NOT NULL,
       `runoob_author` VARCHAR(40) NOT NULL,
       `submission_date` DATE,
       `add_col` int DEFAULT NULL,
       PRIMARY KEY ( `runoob_id` )
    )ENGINE=InnoDB DEFAULT CHARSET=utf8;
    
    
    INSERT INTO test_cdc.`runoob_tbl` (`runoob_id`,`runoob_title`,`runoob_author`,`submission_date`,`add_col`) values (18,'first','tom','2022-06-22 17:13:44',3)
  4. Masuk dan hubungkan ke instans EMR Serverless StarRocks. Untuk informasi lebih lanjut, lihat Hubungkan ke instans StarRocks menggunakan klien MySQL.

  5. Jalankan perintah berikut untuk membuat database test_cdc, buat pengguna super administrator bernama test (dengan kata sandi contoh 1qaz!QAZ), atau buat pengguna biasa bernama test dan berikan izin pada database kepada pengguna tersebut. Untuk informasi lebih lanjut, lihat Kelola pengguna.

    CREATE DATABASE test_cdc;
    CREATE USER 'test' IDENTIFIED by '1qaz!QAZ';
    GRANT ALL on test_cdc to test;

Langkah 2: Unggah konektor kustom

Unggah konektor kustom untuk koneksi Flink, StarRocks, dan ApsaraDB RDS for MySQL.

  1. Masuk ke kluster Dataflow menggunakan SSH. Untuk informasi lebih lanjut, lihat Masuk ke kluster.

  2. Unduh flink-connector-starrocks-1.2.2_flink-1.13_2.11.jar dan ververica-connector-mysql-1.13-vvr-4.0.12-1-20220330.065158-3-jar-with-dependencies.jar, lalu unggah mereka ke direktori /opt/apps/FLINK/flink-current/lib kluster Dataflow.

Langkah 3: Eksekusi pernyataan CTAS

  1. Kirim pekerjaan dalam mode sesi.

    1. Masuk ke kluster Dataflow menggunakan SSH. Untuk informasi lebih lanjut, lihat Masuk ke kluster.

    2. Jalankan perintah berikut untuk masuk ke direktori /opt/apps/FLINK/flink-current:

      cd /opt/apps/FLINK/flink-current
    3. Jalankan perintah berikut untuk memulai sesi YARN:

      ./bin/yarn-session.sh --detached

      Jika perintah berhasil dieksekusi, application_XXXX_YY dikembalikan dalam output. Ini adalah sessionId yang Anda perlukan untuk masuk ke klien SQL.sessionid

    4. Jalankan perintah berikut untuk membuka klien SQL:

      ./bin/sql-client.sh -s <application_XXXX_YY>
      Catatan

      Ganti <application_XXXX_YY> dengan sessionId yang Anda peroleh di langkah sebelumnya.

  2. Buat katalog untuk MySQL dan StarRocks.

    CREATE CATALOG sr WITH (
      'type' = 'starrocks',
      'endpoint' = 'fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030',
      'username' = 'test',
      'password' = '1qaz!QAZ',
      'dbname' = 'test_cdc'
    );
    
    CREATE CATALOG mysql WITH (
      'type' = 'mysql',
      'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
      'port' = '3306',
      'username' = 'emr_test',
      'password' = '123456',
      'default-database' = 'test_cdc'
    );

    Tabel berikut menjelaskan parameter. Anda dapat memodifikasi parameter berdasarkan kebutuhan bisnis Anda.

    Tabel 1. Parameter Katalog StarRocks

    Parameter

    Deskripsi

    type

    Tipe katalog. Atur nilainya menjadi starrocks.

    endpoint

    Titik akhir internal dan port query node FE. Formatnya adalah Titik akhir internal node FE instans EMR Serverless StarRocks:9030. Contoh: fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030.

    Catatan

    Untuk informasi tentang cara mendapatkan titik akhir internal node FE instans EMR Serverless StarRocks, lihat Lihat daftar instans dan detailnya.

    username

    Nama pengguna yang digunakan untuk mengakses database StarRocks.

    Masukkan nama pengguna yang Anda buat di Langkah 1: Persiapkan data uji. Dalam contoh ini, test digunakan.

    password

    Kata sandi yang digunakan untuk mengakses database StarRocks.

    Masukkan kata sandi yang Anda atur untuk akun di Langkah 1: Persiapkan data uji. Dalam contoh ini, 1qaz!QAZ digunakan.

    dbname

    Nama database StarRocks.

    Masukkan nama database yang Anda buat di Langkah 1: Persiapkan data uji. Dalam contoh ini, test_cdc digunakan.

    Tabel 2. Parameter Katalog MySQL

    Parameter

    Deskripsi

    type

    Tipe katalog. Atur nilainya menjadi mysql.

    hostname

    Titik akhir internal instans ApsaraDB RDS for MySQL.

    Anda dapat menyalin titik akhir internal pada halaman Koneksi Database instans ApsaraDB RDS for MySQL di konsol ApsaraDB RDS. Contoh: rm-bp1nu0c46fn9k****.mysql.rds.aliyuncs.com.

    port

    Nomor port database MySQL. Nilai default: 3306.

    username

    Nama pengguna yang digunakan untuk mengakses database MySQL.

    Masukkan nama pengguna akun yang Anda buat di Langkah 1: Persiapkan data uji. Dalam contoh ini, emr_test digunakan.

    password

    Kata sandi yang digunakan untuk mengakses database MySQL.

    Masukkan kata sandi akun yang Anda buat di Langkah 1: Persiapkan data uji. Dalam contoh ini, 123456 digunakan.

    default-database

    Nama database MySQL default.

    Masukkan nama database yang Anda buat di Langkah 1: Persiapkan data uji. Dalam contoh ini, test_cdc digunakan.

  3. Eksekusi pernyataan CTAS di katalog StarRocks.

    Anda dapat menggunakan salah satu dari tiga metode berikut untuk mengeksekusi pernyataan CTAS:

    • Semantik setidaknya sekali: Gunakan parameter sink.buffer-flush.interval-ms untuk mengonfigurasi interval penulisan data ke StarRocks. Keuntungannya adalah interval penulisan pendek dan lebih sedikit memori yang digunakan.

      use CATALOG sr;
      
      CREATE TABLE IF NOT EXISTS runoob_tbl1 with (
      'starrocks.create.table.properties'=' engine = olap primary key(runoob_id) distributed by hash(runoob_id ) buckets 8',
      'database-name'='test_cdc',
      'jdbc-url'='jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030',
      'load-url'='fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030',
      'table-name'='runoob_tbl_sr',
      'username'='test',
      'password' = '1qaz!QAZ',
      'sink.buffer-flush.interval-ms' = '5000',
      'sink.properties.row_delimiter' = '\x02',
      'sink.properties.column_separator' = '\x01'
      )
       as table mysql.test_cdc.runoob_tbl  /*+ OPTIONS (   'connector' = 'mysql-cdc',
        'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
        'port' = '3306',
        'username' = 'test',
        'password' = '123456',
        'database-name' = 'test_cdc',
        'table-name' = 'runoob_tbl'  )*/;
    • Semantik exactly-once: Tentukan interval di mana checkpoint dijadwalkan secara berkala. Keuntungannya adalah data tidak hilang atau terduplikasi saat terjadi kesalahan. Kerugiannya adalah interval checkpoint menentukan kapan data dapat terlihat. Untuk informasi selengkapnya, lihat Checkpointing.

      set 'execution.checkpointing.interval' = '1 min';
      set 'execution.checkpointing.mode' = 'EXACTLY_ONCE';
      set 'execution.checkpointing.timeout' = '10 min';
      
      use CATALOG sr;
      
      CREATE TABLE IF NOT EXISTS runoob_tbl1 with (
      'starrocks.create.table.properties'=' engine = olap primary key(runoob_id) distributed by hash(runoob_id ) buckets 8',
      'database-name'='test_cdc',
      'jdbc-url'='jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030',
      'load-url'='fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030',
      'table-name'='runoob_tbl',
      'username'='test',
      'password' = '1qaz!QAZ',
      'sink.semantic' = 'exactly-once',
      'sink.properties.row_delimiter' = '\x02',
      'sink.properties.column_separator' = '\x01'
      )
       as table mysql.test_cdc.runoob_tbl  /*+ OPTIONS ( 'connector' = 'mysql-cdc',
        'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
        'port' = '3306',
        'username' = 'test',
        'password' = '123456',
        'database-name' = 'test_cdc',
        'table-name' = 'runoob_tbl'  )*/;
                                      
    • Mode sederhana: Keuntungannya adalah Anda tidak perlu memperhatikan bidang dalam tabel di database MySQL saat membuat tabel. Skema tabel yang ingin Anda buat sama dengan skema tabel di database MySQL. Mode ini mudah digunakan untuk pengembang. Kerugiannya adalah Anda tidak dapat membuat partisi. Untuk tabel yang perlu dipartisi, Anda harus membuat partisi dalam mode normal.

      use CATALOG sr;
      
      CREATE TABLE IF NOT EXISTS runoob_tbl1 with (
      'starrocks.create.table.properties'='buckets 8',
      'starrocks.create.table.mode'='simple',
       'database-name'='test_cdc',
      'jdbc-url'='jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030',
      'load-url'='fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030',
      'table-name'='runoob_tbl_sr',
      'username'='test',
      'password' = '1qaz!QAZ',
      'sink.buffer-flush.interval-ms' = '5000',
      'sink.properties.row_delimiter' = '\x02',
      'sink.properties.column_separator' = '\x01'
      )
       as table mysql.test_cdc.runoob_tbl  /*+ OPTIONS (   'connector' = 'mysql-cdc',
        'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
        'port' = '3306',
        'username' = 'emr_test',
        'password' = '123456',
        'database-name' = 'test_cdc',
        'table-name' = 'runoob_tbl'  )*/;
                                      

      Tabel 3. Parameter WITH

      Parameter

      Diperlukan

      Deskripsi

      starrocks.create.table.properties

      Ya

      Definisi lainnya kecuali definisi bidang dalam pernyataan yang digunakan untuk membuat tabel di database StarRocks, seperti engine, key, dan buckets dalam kode contoh.

      database-name

      Ya

      Nama database StarRocks.

      Dalam contoh ini, test_cdc digunakan.

      jdbc-url

      Ya

      URL JDBC yang digunakan untuk terhubung ke StarRocks dan menjalankan kueri di StarRocks.

      Contoh: jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030. fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com adalah titik akhir internal node FE instans EMR Serverless StarRocks.

      Catatan

      Untuk informasi tentang cara mendapatkan titik akhir internal node FE instans EMR Serverless StarRocks, lihat Lihat daftar instans dan detailnya.

      load-url

      Ya

      Titik akhir internal dan port query node FE. Formatnya adalah Titik akhir internal node FE instans EMR Serverless StarRocks:8030.

      Contoh: fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030.

      Catatan

      Untuk informasi tentang cara mendapatkan titik akhir internal node FE instans EMR Serverless StarRocks, lihat Lihat daftar instans dan detailnya.

      sink.semantic

      Tidak

      Semantik yang digunakan untuk mengeksekusi pernyataan. Atur parameter ini menjadi exactly-once untuk memastikan konsistensi data. Nilai default: at-least-once.

      starrocks.create.table.mode

      Tidak

      Nilai valid:

      • normal (default): Anda harus menentukan konfigurasi lengkap seperti engine, key, dan buckets dalam parameter starrocks.create.table.properties, seperti yang ditunjukkan dalam contoh.

      • simple: Secara default, parameter engine diatur ke olap, parameter key diatur ke primary key. Primary key sama dengan primary key di tabel MySQL. Parameter distributed by hash dikonfigurasikan untuk semua primary key, dan tidak ada partisi. Anda harus menentukan buckets dalam parameter starrocks.create.table.properties. Anda juga dapat menentukan parameter opsional seperti properties.

      sink.properties.row_delimiter

      Tidak

      Pemisah baris kustom.

      sink.properties.column_separator

      Tidak

      Pemisah kolom kustom.

      Catatan
      • Jika Anda menggunakan versi lebih lama dari Flink vvr-6.0.5-flink-1.15, tambahkan 'sink.use.new-apiapi'='false', ke klausa WITH.

      • Untuk informasi lebih lanjut tentang konfigurasi lainnya, lihat Muat data secara kontinu dari Apache Flink.

      Tabel 4. Parameter OPTIONS

      Parameter

      Deskripsi

      connector

      Tipe konektor. Atur nilainya menjadi mysql-cdc.

      hostname

      Titik akhir internal instans ApsaraDB RDS for MySQL.

      Anda dapat menyalin titik akhir internal pada halaman Koneksi Database instans ApsaraDB RDS for MySQL di konsol ApsaraDB RDS. Contoh: rm-bp1nu0c46fn9k****.mysql.rds.aliyuncs.com.

      port

      Nomor port database MySQL. Nilai default: 3306.

      username

      Nama pengguna yang digunakan untuk mengakses database MySQL.

      Masukkan nama pengguna akun yang Anda buat di Langkah 1: Persiapkan data uji. Dalam contoh ini, emr_test digunakan.

      password

      Kata sandi yang digunakan untuk mengakses database MySQL.

      Masukkan kata sandi akun yang Anda buat di Langkah 1: Persiapkan data uji.

      table-name

      Nama tabel di database StarRocks.

      Masukkan nama tabel yang Anda buat di Langkah 1: Persiapkan data uji. Dalam contoh ini, runoob_tbl digunakan.

      database-name

      Nama database MySQL default.

      Masukkan nama database yang Anda buat di Langkah 1: Persiapkan data uji. Dalam contoh ini, test_cdc digunakan.

Langkah 4: Lihat hasil sinkronisasi data

Catatan

Jika checkpointing diaktifkan, waktu tunggu maksimum sekitar interval checkpoint.

Kueri data

  1. Masuk dan hubungkan ke instans EMR Serverless StarRocks. Untuk informasi lebih lanjut, lihat Hubungkan ke instans StarRocks menggunakan klien MySQL.

  2. Jalankan perintah berikut di jendela koneksi StarRocks untuk melihat data tabel:

    use test_cdc;
    select * from runoob_tbl1;

    Keluaran berikut dikembalikan, menunjukkan bahwa data telah disinkronkan dari instans ApsaraDB RDS for MySQL ke kluster StarRocks.

    +-----------+--------------+---------------+-----------------+---------+
    | runoob_id | runoob_title | runoob_author | submission_date | add_col |
    +-----------+--------------+---------------+-----------------+---------+
    |        18 | first        | tom           | 2022-06-22      |       3 |
    +-----------+--------------+---------------+-----------------+---------+

Kueri data yang dimasukkan

  1. Jalankan perintah berikut di jendela database ApsaraDB RDS for MySQL untuk memasukkan data:

    INSERT INTO runoob_tbl(`runoob_id`,`runoob_title`,`runoob_author`,`submission_date`,`add_col`)  values(1,'second','tom2','2022-06-23',1);
  2. Jalankan perintah berikut di jendela koneksi StarRocks untuk melihat data tabel:

    select * from runoob_tbl1;

    Keluaran berikut dikembalikan, menunjukkan bahwa data telah dimasukkan.

    +-----------+--------------+---------------+-----------------+---------+
    | runoob_id | runoob_title | runoob_author | submission_date | add_col |
    +-----------+--------------+---------------+-----------------+---------+
    |         1 | second       | tom2          | 2022-06-23      |       1 |
    |        18 | first        | tom           | 2022-06-22      |       3 |
    +-----------+--------------+---------------+-----------------+---------+

Sinkronkan data yang diperbarui

  1. Jalankan perintah berikut di jendela database ApsaraDB RDS for MySQL untuk memperbarui data tertentu:

    update runoob_tbl set runoob_title= 'new' where runoob_id = 18;
  2. Jalankan perintah berikut di jendela koneksi StarRocks untuk melihat data tabel:

    select * from runoob_tbl1;

    Keluaran berikut dikembalikan, menunjukkan bahwa data yang diperbarui telah disinkronkan.

    +-----------+--------------+---------------+-----------------+---------+
    | runoob_id | runoob_title | runoob_author | submission_date | add_col |
    +-----------+--------------+---------------+-----------------+---------+
    |         1 | second       | tom2          | 2022-06-23      |       1 |
    |        18 | new          | tom           | 2022-06-22      |       3 |
    +-----------+--------------+---------------+-----------------+---------+

Sinkronkan data yang dihapus

  1. Jalankan perintah berikut di jendela database ApsaraDB RDS for MySQL untuk menghapus data tertentu:

    DELETE FROM runoob_tbl WHERE runoob_id = 1;
  2. Jalankan perintah berikut di jendela koneksi StarRocks untuk melihat data tabel:

    select * from runoob_tbl1;

    Keluaran berikut dikembalikan, menunjukkan bahwa data yang dihapus telah disinkronkan.

    +-----------+--------------+---------------+-----------------+---------+
    | runoob_id | runoob_title | runoob_author | submission_date | add_col |
    +-----------+--------------+---------------+-----------------+---------+
    |        18 | new          | tom           | 2022-06-22      |       3 |
    +-----------+--------------+---------------+-----------------+---------+

Tambahkan kolom nullable

  1. Jalankan perintah berikut di jendela database ApsaraDB RDS for MySQL untuk menambahkan kolom nullable:

    alter table `runoob_tbl` add COLUMN `add_col2` INT;
  2. Jalankan perintah berikut untuk memasukkan data:

    INSERT INTO runoob_tbl(`runoob_id`,`runoob_title`,`runoob_author`,`submission_date`,`add_col`,`add_col2`)  values(1,'second','tom2','2022-06-23',1,2)
  3. Jalankan perintah berikut di jendela koneksi StarRocks untuk melihat data tabel:

    select * from runoob_tbl1;

    Keluaran berikut dikembalikan, menunjukkan bahwa skema telah berubah dan kolom nullable telah ditambahkan.

    +-----------+--------------+---------------+-----------------+---------+---------+
    | runoob_id | runoob_title | runoob_author | submission_date | add_col | add_co2 |
    +-----------+--------------+---------------+-----------------+---------+---------+
    |         1 | second       | tom2          | 2022-06-23      |       1 |       2 |
    |        18 | new          | tom           | 2022-06-22      |       3 |    NULL |
    +-----------+--------------+---------------+-----------------+---------+---------+

Pengenalan CDAS

Pernyataan CREATE DATABASE AS (CDAS) adalah gula sintaksis dari pernyataan CTAS. Dengan menggunakan pernyataan CDAS, Anda dapat menyinkronkan seluruh database dari MySQL, yang berarti pekerjaan Flink dibuat. Tabel sumber adalah database di MySQL, dan tabel tujuan adalah beberapa tabel di StarRocks. Anda juga dapat menggunakan sintaks including table untuk memilih hanya beberapa tabel dalam database untuk operasi CDAS.

Mirip dengan eksekusi pernyataan CTAS, Anda harus membuat katalog di database MySQL dan database StarRocks sebelum mengeksekusi pernyataan CDAS. Contoh pernyataan:

CREATE DATABASE IF NOT EXISTS sr_db with (
'starrocks.create.table.properties'=' buckets 8',
'starrocks.create.table.mode'='simple',
'jdbc-url'='jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030',
'load-url'='fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030',
'username'='test',
'password' = '1qaz!QAZ',
'sink.buffer-flush.interval-ms' = '5000',
'sink.properties.row_delimiter' = '\x02',
'sink.properties.column_separator' = '\x01'
)
 as DATABASEmysql.test_cdc including table
 'tabl1','tbl2','tbl3'   /*+ OPTIONS (   'connector' = 'mysql-cdc',
  'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
  'port' = '3306',
  'username' = 'test',
  'password' = '123456',
  'database-name' = 'test_cdc' )*/;