Topik ini menjelaskan cara menggunakan Flink Change Data Capture (CDC) untuk menyinkronkan data dari instance ApsaraDB RDS for MySQL ke instance E-MapReduce (EMR) Serverless StarRocks.
Prasyarat
- Cluster Dataflow telah dibuat. Untuk informasi lebih lanjut, lihat Buat Cluster.
- Instance StarRocks telah dibuat. Untuk informasi lebih lanjut, lihat Buat Instance.
- Instance ApsaraDB RDS for MySQL telah dibuat. Untuk informasi lebih lanjut, lihat Buat Instance ApsaraDB RDS for MySQL.
Batasan
- Cluster Dataflow, instance StarRocks, dan instance ApsaraDB RDS for MySQL harus diterapkan di dalam virtual private cloud (VPC) yang sama.
- Cluster Dataflow dan instance StarRocks harus dapat diakses melalui Internet.
- Versi MySQL dari instance ApsaraDB RDS for MySQL harus 5.7 atau lebih baru.
Prosedur
Langkah 1: Siapkan data uji
- Buat database uji dan akun uji. Untuk informasi tentang cara membuat akun dan database, lihat Buat Akun dan Database. Setelah membuat database uji dan akun uji, berikan izin baca dan tulis kepada akun tersebut.Catatan Dalam contoh ini, database bernama test_cdc dibuat.
- Gunakan akun uji untuk masuk ke instance ApsaraDB RDS for MySQL. Untuk informasi lebih lanjut, lihat Gunakan DMS untuk Masuk ke Instance ApsaraDB RDS for MySQL.
- Eksekusi pernyataan berikut untuk membuat tabel:
CREATE TABLE test_cdc.`t_user` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `name` varchar(255) DEFAULT NULL, `age` tinyint(4) DEFAULT NULL, `create_time` datetime DEFAULT NULL, `update_time` datetime DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
Langkah 2: Konfigurasikan alat sinkronisasi dan jalankan pekerjaan Flink
- Masuk ke cluster Dataflow dalam mode SSH. Untuk informasi lebih lanjut, lihat Masuk ke Cluster.
- Unduh paket Flink CDC dan Flink StarRocks connectors. Catatan Mengunduh paket mungkin memerlukan waktu yang lama.
- Salin paket yang diunduh ke direktori /opt/apps/FLINK/flink-current/lib dari cluster Dataflow.
cp flink-* /opt/apps/FLINK/flink-current/lib/ - Jalankan perintah berikut untuk memulai cluster Dataflow. Penting Contoh dalam topik ini disediakan hanya untuk tujuan pengujian. Untuk menjalankan pekerjaan Flink di lingkungan produksi, gunakan YARN atau Kubernetes untuk mengirimkan pekerjaan. Untuk informasi lebih lanjut, lihat Apache Hadoop YARN dan Native Kubernetes.
/opt/apps/FLINK/flink-current/bin/start-cluster.sh - Unduh alat yang diperlukan dan modifikasi file konfigurasi.
- Unduh paket Alat Migrasi StarRocks dan unggah paket ke direktori /root dari cluster Dataflow.
- Jalankan perintah berikut untuk mengekstrak paket smt.tar.gz:
tar -zxvf smt.tar.gz && cd smt - Jalankan perintah berikut untuk memodifikasi file config_prod.conf.
Tabel berikut menjelaskan parameter. Anda dapat memodifikasi parameter berdasarkan kebutuhan bisnis Anda.Parameter Deskripsi host Titik akhir internal dari instance ApsaraDB RDS for MySQL. Anda dapat menyalin titik akhir internal dari halaman Koneksi Database instance ApsaraDB RDS for MySQL di konsol ApsaraDB RDS. Contoh: rm-bp1nu0c46fn9k****.mysql.rds.aliyuncs.com.
port Atur nilai menjadi 3306. user Nama pengguna akun yang dibuat untuk instance ApsaraDB RDS for MySQL. Atur parameter ini ke nama pengguna akun yang dibuat di Langkah 1: Siapkan data uji.
password Kata sandi akun yang dibuat untuk instance ApsaraDB RDS for MySQL. Atur parameter ini ke kata sandi akun yang dibuat di Langkah 1: Siapkan data uji.
be_num Jumlah node backend (BE) dalam instance StarRocks. Jika instance memiliki spesifikasi minimum, atur parameter ini menjadi 1. database Ekspresi reguler yang digunakan untuk mencocokkan database dari mana Anda ingin menyinkronkan data. Contoh: ^test.*$.table Ekspresi reguler yang digunakan untuk mencocokkan tabel dari mana Anda ingin menyinkronkan data. Contoh: ^.*$.flink.starrocks.jdbc-url URL Java Database Connectivity (JDBC) yang digunakan untuk terhubung ke StarRocks dan menjalankan kueri di StarRocks. Contoh: jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030.fe-c-9b354c83e891****-internal.starrocks.aliyuncs.comadalah titik akhir internal node frontend (FE) dalam instance StarRocks.Catatan Untuk informasi tentang cara mendapatkan titik akhir internal node FE dalam instance StarRocks, lihat Lihat daftar instance dan detail instance.flink.starrocks.load-url Titik akhir internal dan port HTTP node FE dalam instance StarRocks. Konfigurasikan parameter ini dalam format Titik akhir internal node FE dalam instance StarRocks: 8030.Contoh: fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030.Catatan Untuk informasi tentang cara mendapatkan titik akhir internal node FE dalam instance StarRocks, lihat Lihat daftar instance dan detail instance.flink.starrocks.username Nama pengguna yang digunakan untuk terhubung ke instance StarRocks. flink.starrocks.password Kata sandi yang digunakan untuk terhubung ke instance StarRocks. Catatan Secara default, parameter ini dibiarkan kosong.Kode sampel berikut menunjukkan konfigurasi parameter tertentu dalam file konfigurasi. Untuk informasi lebih lanjut, lihat Referensi.flink.starrocks.jdbc-url=jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030 flink.starrocks.load-url=fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030 flink.starrocks.username=admin flink.starrocks.password=1qaz!QAZ
- Jalankan perintah berikut untuk menghasilkan pernyataan CREATE TABLE di direktori result:
./starrocks-migrate-toolAnda dapat menjalankan perintah
ls resultuntuk melihat isi direktori result.Output berikut dikembalikan:flink-create.1.sql flink-create.all.sql starrocks-create.1.sql starrocks-create.all.sql starrocks-external-create.1.sql starrocks-external-create.all.sqlCatatan- Dalam contoh ini, file konfigurasi hanya mendefinisikan table-rule.1. File .1.sql sesuai dengan table-rule.1. Jika file konfigurasi Anda melibatkan table-rule.2, Anda dapat menggunakan file .all.sql yang berisi semua aturan.
- File dengan akhiran
external-createdigunakan untuk membuat tabel eksternal untuk sumber data. Jika Anda tidak ingin menyinkronkan tabel dimensi kecil dalam skenario tertentu, Anda dapat membuat tabel eksternal dan menanyakan data dalam tabel dimensi menggunakan tabel eksternal. Dalam hal ini, Anda dapat menggunakan file dengan akhiran external-create untuk menghasilkan tabel eksternal yang sesuai. Dalam contoh ini, file-file sebelumnya tidak digunakan.
- Jalankan perintah berikut untuk membuat tabel StarRocks:
mysql -h<Titik akhir internal node FE dalam instance StarRocks> -P9030 -uroot -p < result/starrocks-create.1.sqlCatatan Jika Anda tidak menentukan kata sandi yang digunakan untuk terhubung ke instance StarRocks saat Anda memodifikasi file config_prod.conf, Anda dapat menekan tombol Enter. - Jalankan perintah berikut untuk memulai pekerjaan Flink:
/opt/apps/FLINK/flink-current/bin/sql-client.sh -f result/flink-create.1.sql
Langkah 3: Verifikasi hasil sinkronisasi data
Kueri data
- Hubungkan ke instance StarRocks. Untuk informasi lebih lanjut, lihat Gunakan Klien MySQL untuk Terhubung ke Instance StarRocks.
- Jalankan perintah berikut untuk menanyakan informasi database:
show databases;Output berikut dikembalikan:+--------------------+ | Database | +--------------------+ | _statistics_ | | information_schema | | test_cdc | +--------------------+ 3 rows in set (0.00 sec) - Di CLI instance StarRocks, eksekusi perintah dan pernyataan berikut untuk menanyakan informasi tabel:
use test_cdc; select * from t_user;Dalam contoh ini, tabel t_user kosong, dan tidak ada data yang dikembalikan.
Kueri data yang dimasukkan
- Di tab SQL Console database ApsaraDB RDS for MySQL, eksekusi pernyataan berikut untuk memasukkan data:
INSERT INTO test_cdc.t_user(`name`,age,create_time,update_time) VALUES("aliyun.com.0",30,NOW(),NOW()); INSERT INTO test_cdc.t_user(`name`,age,create_time,update_time) VALUES("aliyun.com.1",31,NOW(),NOW()); INSERT INTO test_cdc.t_user(`name`,age,create_time,update_time) VALUES("aliyun.com.2",32,NOW(),NOW()); - Di CLI instance StarRocks, eksekusi perintah dan pernyataan berikut untuk menanyakan informasi tabel:
select * from t_user;Jika output berikut dikembalikan, data telah dimasukkan.+------+--------------+------+---------------------+---------------------+ | id | name | age | create_time | update_time | +------+--------------+------+---------------------+---------------------+ | 4 | aliyun.com.0 | 30 | 2022-03-10 13:22:41 | 2022-03-10 13:22:41 | | 5 | aliyun.com.1 | 31 | 2022-03-10 13:22:41 | 2022-03-10 13:22:41 | | 6 | aliyun.com.2 | 32 | 2022-03-10 13:22:42 | 2022-03-10 13:22:42 | +------+--------------+------+---------------------+---------------------+ 3 rows in set (0.00 sec)
Sinkronkan data yang diperbarui
- Di tab SQL Console database ApsaraDB RDS for MySQL, eksekusi pernyataan berikut untuk memperbarui data tertentu:
UPDATE test_cdc.t_user SET age=35 where name="aliyun.com.0"; - Di CLI instance StarRocks, eksekusi perintah dan pernyataan berikut untuk menanyakan informasi tabel:
select * from t_user where name = "aliyun.com.0";Jika output berikut dikembalikan, data yang diperbarui telah disinkronkan.+------+--------------+------+---------------------+---------------------+ | id | name | age | create_time | update_time | +------+--------------+------+---------------------+---------------------+ | 4 | aliyun.com.0 | 35 | 2022-03-10 13:22:41 | 2022-03-10 13:22:41 | +------+--------------+------+---------------------+---------------------+ 1 row in set (0.01 sec)
Sinkronkan data yang dihapus
- Di tab SQL Console database ApsaraDB RDS for MySQL, eksekusi pernyataan berikut untuk menghapus data tertentu:
DELETE FROM test_cdc.t_user where 1=1; - Di CLI instance StarRocks, eksekusi perintah dan pernyataan berikut untuk menanyakan informasi tabel:
select * from t_user;Jika output berikut dikembalikan, data yang dihapus telah disinkronkan.Empty set (0.01 sec)
Referensi
- Topik ini disediakan hanya untuk tujuan pengujian. Untuk pekerjaan Flink di lingkungan produksi, gunakan Ververica Platform (VVP) dari Realtime Compute for Apache Flink untuk mengonfigurasi pekerjaan Flink, atau gunakan YARN atau Kubernetes untuk mengirimkan pekerjaan.
Untuk informasi lebih lanjut, lihat Apache Hadoop YARN dan Native Kubernetes.
- Jika Anda mengeksekusi pernyataan
ALTER TABLEuntuk memodifikasi tabel dalam instance ApsaraDB RDS for MySQL, Anda harus secara manual menyinkronkan perubahan skema MySQL ke StarRocks.Jika Anda membuat tabel dalam instance ApsaraDB RDS for MySQL, Anda harus menjalankan ulang alat migrasi StarRocks untuk menyinkronkan data dari tabel tersebut. - Konfigurasi sampel file konfigurasi untuk alat migrasi StarRocks:
[db] host = rm-bp1nu0c46fn9k****.mysql.rds.aliyuncs.com port = 3306 user = *** password = *** # currently available types: `mysql`, `pgsql`, `oracle`, `hive`, `clickhouse` type = mysql # # only takes effect on `type == hive`. # # Available values: kerberos, none, nosasl, kerberos_http, none_http, zk, ldap # authentication = kerberos [other] # number of backends in StarRocks be_num = 1 # `decimal_v3` didukung sejak StarRocks-1.8.1 use_decimal_v3 = false # direktori untuk menyimpan DDL SQL yang dikonversi output_dir = ./result # !!!`database` `table` `schema` peka huruf besar/kecil di `oracle`!!! [table-rule.1] # pola untuk mencocokkan database untuk pengaturan properti # !!! database harus berupa `nama instance (atau pdb) lengkap` dan bukan regex ketika berhadapan dengan `oracle db` !!! database = ^test.*$ # pola untuk mencocokkan tabel untuk pengaturan properti table = ^.*$ # `schema` hanya berlaku pada `postgresql` dan `oracle` dan `sqlserver` schema = ^.*$ ############################################ ### konfigurasi tabel starrocks ############################################ # # atur kolom sebagai partition_key # partition_key = p_key # # timpa partisi yang dihasilkan otomatis # partitions = START ("2021-01-02") END ("2021-01-04") EVERY (INTERVAL 1 day) # # hanya berlaku pada tabel tanpa primary key atau indeks unik # duplicate_keys=k1,k2 # # timpa kunci distribusi yang dihasilkan otomatis # distributed_by=k1,k2 # # timpa jumlah bucket distribusi yang dihasilkan otomatis # bucket_num=32 # # properties.xxxxx: properti yang digunakan untuk membuat tabel # properties.in_memory = false ############################################ ### konfigurasi sink flink ### JANGAN atur `connector`, `table-name`, `database-name`, karena mereka dihasilkan otomatis ############################################ flink.starrocks.jdbc-url=jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030 flink.starrocks.load-url=fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030 flink.starrocks.username=admin flink.starrocks.password=1qaz!QAZ flink.starrocks.sink.max-retries=10 flink.starrocks.sink.buffer-flush.interval-ms=15000 flink.starrocks.sink.properties.format=json flink.starrocks.sink.properties.strip_outer_array=true # # digunakan untuk mengatur server-id untuk pekerjaan mysql-cdc alih-alih menggunakan server-id acak # flink.cdc.server-id = 5000 ############################################ ### konfigurasi plugin flink-cdc untuk `postgresql` ############################################ # # untuk `9.*` decoderbufs, wal2json, wal2json_rds, wal2json_streaming, wal2json_rds_streaming # # lihat https://ververica.github.io/flink-cdc-connectors/master/content/connectors/postgres-cdc.html # # dan https://debezium.io/documentation/reference/postgres-plugins.html # flink.cdc.decoding.plugin.name = decoderbufs