全部产品
Search
文档中心

E-MapReduce:Gunakan konektor Flink CDC untuk menyinkronkan data dari MySQL ke StarRocks

更新时间:Jul 06, 2025

Topik ini menjelaskan cara menggunakan Flink Change Data Capture (CDC) untuk menyinkronkan data dari instance ApsaraDB RDS for MySQL ke instance E-MapReduce (EMR) Serverless StarRocks.

Prasyarat

Catatan Dalam contoh ini, versi MySQL dari instance ApsaraDB RDS for MySQL adalah 5.7, dan cluster Dataflow adalah EMR V3.39.1.

Batasan

  • Cluster Dataflow, instance StarRocks, dan instance ApsaraDB RDS for MySQL harus diterapkan di dalam virtual private cloud (VPC) yang sama.
  • Cluster Dataflow dan instance StarRocks harus dapat diakses melalui Internet.
  • Versi MySQL dari instance ApsaraDB RDS for MySQL harus 5.7 atau lebih baru.

Prosedur

  1. Langkah 1: Siapkan Data Uji
  2. Langkah 2: Konfigurasikan Alat Sinkronisasi dan Jalankan Pekerjaan Flink
  3. Langkah 3: Verifikasi Hasil Sinkronisasi Data

Langkah 1: Siapkan data uji

  1. Buat database uji dan akun uji. Untuk informasi tentang cara membuat akun dan database, lihat Buat Akun dan Database.
    Setelah membuat database uji dan akun uji, berikan izin baca dan tulis kepada akun tersebut.
    Catatan Dalam contoh ini, database bernama test_cdc dibuat.
  2. Gunakan akun uji untuk masuk ke instance ApsaraDB RDS for MySQL. Untuk informasi lebih lanjut, lihat Gunakan DMS untuk Masuk ke Instance ApsaraDB RDS for MySQL.
  3. Eksekusi pernyataan berikut untuk membuat tabel:
    CREATE TABLE test_cdc.`t_user` (
      `id` bigint(20) NOT NULL AUTO_INCREMENT,
      `name` varchar(255) DEFAULT NULL,
      `age` tinyint(4) DEFAULT NULL,
      `create_time` datetime DEFAULT NULL,
      `update_time` datetime DEFAULT NULL,
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;

Langkah 2: Konfigurasikan alat sinkronisasi dan jalankan pekerjaan Flink

  1. Masuk ke cluster Dataflow dalam mode SSH. Untuk informasi lebih lanjut, lihat Masuk ke Cluster.
  2. Unduh paket Flink CDC dan Flink StarRocks connectors.
    Catatan Mengunduh paket mungkin memerlukan waktu yang lama.
  3. Salin paket yang diunduh ke direktori /opt/apps/FLINK/flink-current/lib dari cluster Dataflow.
    cp flink-* /opt/apps/FLINK/flink-current/lib/
  4. Jalankan perintah berikut untuk memulai cluster Dataflow.
    Penting Contoh dalam topik ini disediakan hanya untuk tujuan pengujian. Untuk menjalankan pekerjaan Flink di lingkungan produksi, gunakan YARN atau Kubernetes untuk mengirimkan pekerjaan. Untuk informasi lebih lanjut, lihat Apache Hadoop YARN dan Native Kubernetes.
    /opt/apps/FLINK/flink-current/bin/start-cluster.sh
  5. Unduh alat yang diperlukan dan modifikasi file konfigurasi.
    1. Unduh paket Alat Migrasi StarRocks dan unggah paket ke direktori /root dari cluster Dataflow.
    2. Jalankan perintah berikut untuk mengekstrak paket smt.tar.gz:
      tar -zxvf smt.tar.gz && cd smt
    3. Jalankan perintah berikut untuk memodifikasi file config_prod.conf.
      Tabel berikut menjelaskan parameter. Anda dapat memodifikasi parameter berdasarkan kebutuhan bisnis Anda.
      ParameterDeskripsi
      hostTitik akhir internal dari instance ApsaraDB RDS for MySQL.

      Anda dapat menyalin titik akhir internal dari halaman Koneksi Database instance ApsaraDB RDS for MySQL di konsol ApsaraDB RDS. Contoh: rm-bp1nu0c46fn9k****.mysql.rds.aliyuncs.com.

      portAtur nilai menjadi 3306.
      userNama pengguna akun yang dibuat untuk instance ApsaraDB RDS for MySQL.

      Atur parameter ini ke nama pengguna akun yang dibuat di Langkah 1: Siapkan data uji.

      passwordKata sandi akun yang dibuat untuk instance ApsaraDB RDS for MySQL.

      Atur parameter ini ke kata sandi akun yang dibuat di Langkah 1: Siapkan data uji.

      be_numJumlah node backend (BE) dalam instance StarRocks. Jika instance memiliki spesifikasi minimum, atur parameter ini menjadi 1.
      databaseEkspresi reguler yang digunakan untuk mencocokkan database dari mana Anda ingin menyinkronkan data. Contoh: ^test.*$.
      tableEkspresi reguler yang digunakan untuk mencocokkan tabel dari mana Anda ingin menyinkronkan data. Contoh: ^.*$.
      flink.starrocks.jdbc-urlURL Java Database Connectivity (JDBC) yang digunakan untuk terhubung ke StarRocks dan menjalankan kueri di StarRocks.
      Contoh: jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030. fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com adalah titik akhir internal node frontend (FE) dalam instance StarRocks.
      Catatan Untuk informasi tentang cara mendapatkan titik akhir internal node FE dalam instance StarRocks, lihat Lihat daftar instance dan detail instance.
      flink.starrocks.load-urlTitik akhir internal dan port HTTP node FE dalam instance StarRocks. Konfigurasikan parameter ini dalam format Titik akhir internal node FE dalam instance StarRocks: 8030.
      Contoh: fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030.
      Catatan Untuk informasi tentang cara mendapatkan titik akhir internal node FE dalam instance StarRocks, lihat Lihat daftar instance dan detail instance.
      flink.starrocks.usernameNama pengguna yang digunakan untuk terhubung ke instance StarRocks.
      flink.starrocks.passwordKata sandi yang digunakan untuk terhubung ke instance StarRocks.
      Catatan Secara default, parameter ini dibiarkan kosong.
      Kode sampel berikut menunjukkan konfigurasi parameter tertentu dalam file konfigurasi. Untuk informasi lebih lanjut, lihat Referensi.
      flink.starrocks.jdbc-url=jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030
      flink.starrocks.load-url=fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030
      flink.starrocks.username=admin
      flink.starrocks.password=1qaz!QAZ
  6. Jalankan perintah berikut untuk menghasilkan pernyataan CREATE TABLE di direktori result:
    ./starrocks-migrate-tool

    Anda dapat menjalankan perintah ls result untuk melihat isi direktori result.

    Output berikut dikembalikan:
    flink-create.1.sql  flink-create.all.sql  starrocks-create.1.sql  starrocks-create.all.sql  starrocks-external-create.1.sql  starrocks-external-create.all.sql
    Catatan
    • Dalam contoh ini, file konfigurasi hanya mendefinisikan table-rule.1. File .1.sql sesuai dengan table-rule.1. Jika file konfigurasi Anda melibatkan table-rule.2, Anda dapat menggunakan file .all.sql yang berisi semua aturan.
    • File dengan akhiran external-create digunakan untuk membuat tabel eksternal untuk sumber data. Jika Anda tidak ingin menyinkronkan tabel dimensi kecil dalam skenario tertentu, Anda dapat membuat tabel eksternal dan menanyakan data dalam tabel dimensi menggunakan tabel eksternal. Dalam hal ini, Anda dapat menggunakan file dengan akhiran external-create untuk menghasilkan tabel eksternal yang sesuai. Dalam contoh ini, file-file sebelumnya tidak digunakan.
  7. Jalankan perintah berikut untuk membuat tabel StarRocks:
    mysql -h<Titik akhir internal node FE dalam instance StarRocks> -P9030 -uroot -p < result/starrocks-create.1.sql
    Catatan Jika Anda tidak menentukan kata sandi yang digunakan untuk terhubung ke instance StarRocks saat Anda memodifikasi file config_prod.conf, Anda dapat menekan tombol Enter.
  8. Jalankan perintah berikut untuk memulai pekerjaan Flink:
    /opt/apps/FLINK/flink-current/bin/sql-client.sh -f result/flink-create.1.sql

Langkah 3: Verifikasi hasil sinkronisasi data

Kueri data

  1. Hubungkan ke instance StarRocks. Untuk informasi lebih lanjut, lihat Gunakan Klien MySQL untuk Terhubung ke Instance StarRocks.
  2. Jalankan perintah berikut untuk menanyakan informasi database:
    show databases;
    Output berikut dikembalikan:
    +--------------------+
    | Database           |
    +--------------------+
    | _statistics_       |
    | information_schema |
    | test_cdc           |
    +--------------------+
    3 rows in set (0.00 sec)
  3. Di CLI instance StarRocks, eksekusi perintah dan pernyataan berikut untuk menanyakan informasi tabel:
    use test_cdc;
    select * from t_user;

    Dalam contoh ini, tabel t_user kosong, dan tidak ada data yang dikembalikan.

Kueri data yang dimasukkan

  1. Di tab SQL Console database ApsaraDB RDS for MySQL, eksekusi pernyataan berikut untuk memasukkan data:
    INSERT INTO test_cdc.t_user(`name`,age,create_time,update_time) VALUES("aliyun.com.0",30,NOW(),NOW());
    INSERT INTO test_cdc.t_user(`name`,age,create_time,update_time) VALUES("aliyun.com.1",31,NOW(),NOW());
    INSERT INTO test_cdc.t_user(`name`,age,create_time,update_time) VALUES("aliyun.com.2",32,NOW(),NOW());
  2. Di CLI instance StarRocks, eksekusi perintah dan pernyataan berikut untuk menanyakan informasi tabel:
    select * from t_user;
    Jika output berikut dikembalikan, data telah dimasukkan.
    +------+--------------+------+---------------------+---------------------+
    | id   | name         | age  | create_time         | update_time         |
    +------+--------------+------+---------------------+---------------------+
    | 4    | aliyun.com.0 |   30 | 2022-03-10 13:22:41 | 2022-03-10 13:22:41 |
    | 5    | aliyun.com.1 |   31 | 2022-03-10 13:22:41 | 2022-03-10 13:22:41 |
    | 6    | aliyun.com.2 |   32 | 2022-03-10 13:22:42 | 2022-03-10 13:22:42 |
    +------+--------------+------+---------------------+---------------------+
    3 rows in set (0.00 sec)

Sinkronkan data yang diperbarui

  1. Di tab SQL Console database ApsaraDB RDS for MySQL, eksekusi pernyataan berikut untuk memperbarui data tertentu:
    UPDATE test_cdc.t_user SET age=35 where name="aliyun.com.0";
  2. Di CLI instance StarRocks, eksekusi perintah dan pernyataan berikut untuk menanyakan informasi tabel:
    select * from t_user where name = "aliyun.com.0";
    Jika output berikut dikembalikan, data yang diperbarui telah disinkronkan.
    +------+--------------+------+---------------------+---------------------+
    | id   | name         | age  | create_time         | update_time         |
    +------+--------------+------+---------------------+---------------------+
    | 4    | aliyun.com.0 |   35 | 2022-03-10 13:22:41 | 2022-03-10 13:22:41 |
    +------+--------------+------+---------------------+---------------------+
    1 row in set (0.01 sec)

Sinkronkan data yang dihapus

  1. Di tab SQL Console database ApsaraDB RDS for MySQL, eksekusi pernyataan berikut untuk menghapus data tertentu:
    DELETE FROM test_cdc.t_user where 1=1;
  2. Di CLI instance StarRocks, eksekusi perintah dan pernyataan berikut untuk menanyakan informasi tabel:
    select * from t_user;
    Jika output berikut dikembalikan, data yang dihapus telah disinkronkan.
    Empty set (0.01 sec)

Referensi

  • Topik ini disediakan hanya untuk tujuan pengujian. Untuk pekerjaan Flink di lingkungan produksi, gunakan Ververica Platform (VVP) dari Realtime Compute for Apache Flink untuk mengonfigurasi pekerjaan Flink, atau gunakan YARN atau Kubernetes untuk mengirimkan pekerjaan.

    Untuk informasi lebih lanjut, lihat Apache Hadoop YARN dan Native Kubernetes.

  • Jika Anda mengeksekusi pernyataan ALTER TABLE untuk memodifikasi tabel dalam instance ApsaraDB RDS for MySQL, Anda harus secara manual menyinkronkan perubahan skema MySQL ke StarRocks. Jika Anda membuat tabel dalam instance ApsaraDB RDS for MySQL, Anda harus menjalankan ulang alat migrasi StarRocks untuk menyinkronkan data dari tabel tersebut.
  • Konfigurasi sampel file konfigurasi untuk alat migrasi StarRocks:
    [db]
    host = rm-bp1nu0c46fn9k****.mysql.rds.aliyuncs.com
    port = 3306
    user = ***
    password = ***
    # currently available types: `mysql`, `pgsql`, `oracle`, `hive`, `clickhouse`
    type = mysql
    # # only takes effect on `type == hive`.
    # # Available values: kerberos, none, nosasl, kerberos_http, none_http, zk, ldap
    # authentication = kerberos
    
    [other]
    # number of backends in StarRocks
    be_num = 1
    # `decimal_v3` didukung sejak StarRocks-1.8.1  
    use_decimal_v3 = false  
    # direktori untuk menyimpan DDL SQL yang dikonversi  
    output_dir = ./result  
    
    
    # !!!`database` `table` `schema` peka huruf besar/kecil di `oracle`!!!  
    [table-rule.1]  
    # pola untuk mencocokkan database untuk pengaturan properti  
    # !!!  database harus berupa `nama instance (atau pdb) lengkap` dan bukan regex ketika berhadapan dengan `oracle db` !!!  
    database = ^test.*$  
    # pola untuk mencocokkan tabel untuk pengaturan properti  
    table = ^.*$  
    # `schema` hanya berlaku pada `postgresql` dan `oracle` dan `sqlserver`  
    schema = ^.*$  
    
    ############################################  
    ### konfigurasi tabel starrocks  
    ############################################  
    # # atur kolom sebagai partition_key  
    # partition_key = p_key  
    # # timpa partisi yang dihasilkan otomatis  
    # partitions = START ("2021-01-02") END ("2021-01-04") EVERY (INTERVAL 1 day)  
    # # hanya berlaku pada tabel tanpa primary key atau indeks unik  
    # duplicate_keys=k1,k2  
    # # timpa kunci distribusi yang dihasilkan otomatis  
    # distributed_by=k1,k2  
    # # timpa jumlah bucket distribusi yang dihasilkan otomatis  
    # bucket_num=32  
    # # properties.xxxxx: properti yang digunakan untuk membuat tabel  
    # properties.in_memory = false  
    
    ############################################  
    ### konfigurasi sink flink  
    ### JANGAN atur `connector`, `table-name`, `database-name`, karena mereka dihasilkan otomatis  
    ############################################  
    flink.starrocks.jdbc-url=jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030  
    flink.starrocks.load-url=fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030  
    flink.starrocks.username=admin  
    flink.starrocks.password=1qaz!QAZ  
    flink.starrocks.sink.max-retries=10  
    flink.starrocks.sink.buffer-flush.interval-ms=15000  
    flink.starrocks.sink.properties.format=json  
    flink.starrocks.sink.properties.strip_outer_array=true  
    # # digunakan untuk mengatur server-id untuk pekerjaan mysql-cdc alih-alih menggunakan server-id acak  
    # flink.cdc.server-id = 5000  
    ############################################  
    ### konfigurasi plugin flink-cdc untuk `postgresql`  
    ############################################  
    # # untuk `9.*` decoderbufs, wal2json, wal2json_rds, wal2json_streaming, wal2json_rds_streaming  
    # # lihat https://ververica.github.io/flink-cdc-connectors/master/content/connectors/postgres-cdc.html  
    # # dan https://debezium.io/documentation/reference/postgres-plugins.html  
    # flink.cdc.decoding.plugin.name = decoderbufs