Realtime Compute for Apache Flink menggunakan Flink Change Data Capture (CDC) untuk mengingesti data dari sumber ke tujuan. Anda dapat mengembangkan pekerjaan YAML untuk menyinkronkan data. Topik ini menjelaskan cara membuat pekerjaan ingesti data Flink CDC guna menyinkronkan seluruh data dari database MySQL ke StarRocks.
Prasyarat
Ruang kerja Flink telah dibuat. Untuk informasi selengkapnya, lihat Aktifkan Realtime Compute for Apache Flink.
Penyimpanan hulu dan hilir
Instans RDS MySQL telah dibuat. Untuk informasi selengkapnya, lihat Buat instans RDS MySQL.
Instans StarRocks telah dibuat. Untuk informasi selengkapnya, lihat Prosedur.
CatatanInstans RDS MySQL dan StarRocks harus berada dalam virtual private cloud (VPC) yang sama dengan ruang kerja Flink. Jika tidak, Anda harus membuat koneksi jaringan dan mengonfigurasi daftar putih alamat IP untuk instans RDS MySQL. Untuk informasi selengkapnya, lihat Bagaimana cara mengakses layanan lain lintas VPC?, Bagaimana cara mengakses Internet?, dan Bagaimana cara mengonfigurasi daftar putih?.
Informasi latar belakang
Asumsikan bahwa sebuah instans MySQL memiliki database bernama order_dw_mysql. Database ini berisi tiga tabel bisnis: orders, orders_pay, dan product_catalog. Untuk mengembangkan pekerjaan ingesti data Flink CDC yang menyinkronkan tabel-tabel tersebut beserta datanya ke database order_dw_sr di StarRocks, ikuti langkah-langkah berikut:
Langkah 1: Siapkan data uji RDS MySQL
Buat database dan akun.
Pada instans RDS MySQL, buat database bernama order_dw_mysql dan akun standar yang memiliki izin baca dan tulis pada database tersebut. Untuk informasi selengkapnya, lihat Buat database dan akun dan Kelola database.
Masuk ke instans RDS MySQL menggunakan DMS.
Untuk informasi selengkapnya, lihat Masuk ke instans RDS MySQL menggunakan DMS.
Pada SQL Console, masukkan perintah berikut dan klik Execute untuk membuat tiga tabel bisnis dan memasukkan data.
CREATE TABLE `orders` ( order_id bigint not null primary key, user_id varchar(50) not null, shop_id bigint not null, product_id bigint not null, buy_fee numeric(20,2) not null, create_time timestamp not null, update_time timestamp not null default now(), state int not null ); CREATE TABLE `orders_pay` ( pay_id bigint not null primary key, order_id bigint not null, pay_platform int not null, create_time timestamp not null ); CREATE TABLE `product_catalog` ( product_id bigint not null primary key, catalog_name varchar(50) not null ); -- Siapkan data INSERT INTO product_catalog VALUES(1, 'phone_aaa'),(2, 'phone_bbb'),(3, 'phone_ccc'),(4, 'phone_ddd'),(5, 'phone_eee'); INSERT INTO orders VALUES (100001, 'user_001', 12345, 1, 5000.05, '2023-02-15 16:40:56', '2023-02-15 18:42:56', 1), (100002, 'user_002', 12346, 2, 4000.04, '2023-02-15 15:40:56', '2023-02-15 18:42:56', 1), (100003, 'user_003', 12347, 3, 3000.03, '2023-02-15 14:40:56', '2023-02-15 18:42:56', 1), (100004, 'user_001', 12347, 4, 2000.02, '2023-02-15 13:40:56', '2023-02-15 18:42:56', 1), (100005, 'user_002', 12348, 5, 1000.01, '2023-02-15 12:40:56', '2023-02-15 18:42:56', 1), (100006, 'user_001', 12348, 1, 1000.01, '2023-02-15 11:40:56', '2023-02-15 18:42:56', 1), (100007, 'user_003', 12347, 4, 2000.02, '2023-02-15 10:40:56', '2023-02-15 18:42:56', 1); INSERT INTO orders_pay VALUES (2001, 100001, 1, '2023-02-15 17:40:56'), (2002, 100002, 1, '2023-02-15 17:40:56'), (2003, 100003, 0, '2023-02-15 17:40:56'), (2004, 100004, 0, '2023-02-15 17:40:56'), (2005, 100005, 0, '2023-02-15 18:40:56'), (2006, 100006, 0, '2023-02-15 18:40:56'), (2007, 100007, 0, '2023-02-15 18:40:56');
Langkah 2: Kembangkan pekerjaan ingesti data Flink CDC
Masuk ke Konsol Realtime Compute for Apache Flink.
Klik Console untuk menavigasi ke ruang kerja yang diperlukan.
Pada panel navigasi di sebelah kiri, pilih .
Klik ikon
, klik New from Template, pilih MySQL to StarRocks Data Synchronization, lalu klik Next.
Masukkan Job Name, tentukan Storage Location, pilih Engine Version, lalu klik OK.
Konfigurasikan kode pekerjaan YAML.
Kode berikut memberikan contoh cara menyinkronkan semua tabel dari database order_dw_mysql di MySQL ke database order_dw_sr di StarRocks.
source: type: mysql hostname: rm-bp1rk934iidc3****.mysql.rds.aliyuncs.com port: 3306 username: ${secret_values.mysqlusername} password: ${secret_values.mysqlpassword} tables: order_dw_mysql.\.* server-id: 8601-8604 # (Opsional) Sinkronkan data dari tabel yang baru dibuat selama fase inkremental. scan.binlog.newly-added-table.enabled: true # (Opsional) Sinkronkan komentar tabel dan bidang. include-comments.enabled: true # (Opsional) Utamakan distribusi chunk tak terbatas untuk mencegah potensi error OutOfMemory pada TaskManager. scan.incremental.snapshot.unbounded-chunk-first.enabled: true # (Opsional) Aktifkan filter parsing untuk mempercepat pembacaan. scan.only.deserialize.captured.tables.changelog.enabled: true sink: type: starrocks name: StarRocks Sink jdbc-url: jdbc:mysql://fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:9030 load-url: fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:8030 username: ${secret_values.starrocksusername} password: ${secret_values.starrockspassword} table.create.properties.replication_num: 1 sink.buffer-flush.interval-ms: 5000 # Flush data setiap 5 detik. route: - source-table: order_dw_mysql.\.* sink-table: order_dw_sr.<> replace-symbol: <> description: route all tables in source_db to sink_db pipeline: name: Sync MySQL Database to StarRocksTabel berikut menjelaskan informasi konfigurasi yang diperlukan untuk contoh ini. Untuk informasi selengkapnya tentang parameter ingesti data, lihat MySQL dan StarRocks.
CatatanPekerjaan YAML hanya mendukung variabel proyek. Anda dapat menggunakan variabel untuk menghindari pengeksposan password teks biasa dan informasi sensitif lainnya. Untuk informasi selengkapnya, lihat Manajemen Variabel.
Category
Parameter
Description
Nilai contoh
source
hostname
Alamat IP atau hostname database MySQL.
Kami menyarankan Anda menggunakan alamat VPC.
rm-bp1rk934iidc3****.mysql.rds.aliyuncs.comport
Nomor port layanan database MySQL.
3306
username
Username dan password untuk layanan database MySQL. Gunakan informasi akun yang Anda buat di Langkah 1: Siapkan data uji RDS MySQL.
${secret_values.mysqlusername}password
${secret_values.mysqlpassword}tables
Nama tabel MySQL. Ekspresi reguler didukung untuk membaca data dari beberapa tabel.
Topik ini menyinkronkan semua tabel dan data dalam database order_dw_mysql.
order_dw_mysql.\.*
server-id
ID numerik untuk client database.
5405-5415
sink
jdbc-url
URL Java Database Connectivity (JDBC).
Tentukan alamat IP dan port kueri frontend (FE) dalam format
jdbc:mysql://ip:port.Pada tab Instance Details di Konsol E-MapReduce, Anda dapat melihat Internal Network Address dan Query Port FE dari instans target.
jdbc:mysql://fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:9030load-url
URL layanan HTTP yang digunakan untuk menghubungkan ke node FE.
Anda dapat melihat Internal Endpoint dan HTTP Port FE dari instans target pada tab Instance Details di Konsol E-MapReduce.
fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:8030username
Username dan password untuk koneksi StarRocks.
Gunakan username dan password yang Anda tentukan saat membuat instans StarRocks.
CatatanContoh ini menggunakan variabel untuk menghindari pengeksposan password teks biasa dan informasi sensitif lainnya. Untuk informasi selengkapnya, lihat Manajemen Variabel.
${secret_values.starrocksusername}password
${secret_values.starrockspassword}sink.buffer-flush.interval-ms
Interval refresh buffer internal.
Karena volume data dalam contoh ini kecil, interval pendek (5 detik) ditetapkan agar hasil dapat diamati lebih cepat.
5000
route
source-table
Menentukan tabel leluhur yang akan dirutekan.
Anda dapat menggunakan ekspresi reguler untuk mencocokkan beberapa tabel. Misalnya,
order_dw_mysql.\.*merutekan semua tabel dalam databaseorder_dw_mysql.order_dw_mysql.\.*
sink-table
Menentukan tujuan routing data.
Anda dapat menggunakan simbol dari
replace-symbolsebagai placeholder untuk setiap nama tabel leluhur guna menerapkan perutean banyak-ke-banyak.Untuk informasi selengkapnya tentang aturan routing, lihat Modul Route.
order_dw_sr.<>
replace-symbol
String yang merepresentasikan nama tabel leluhur saat Anda menggunakan fitur pencocokan pola.
<>
Klik Deploy.
Langkah 3: Jalankan pekerjaan ingesti data Flink CDC
Pada halaman Data Ingestion, klik Deploy. Pada kotak dialog yang muncul, klik OK.
Pada halaman , temukan pekerjaan YAML target dan klik Start pada kolom Actions.
Klik Start.
Dalam contoh ini, Stateless Start dipilih. Untuk informasi selengkapnya tentang pengaturan parameter, lihat Jalankan pekerjaan. Setelah pekerjaan dimulai, Anda dapat memantau informasi waktu proses dan statusnya pada halaman Job O&M.
Step 4: View the synchronization result in StarRocks
Setelah pekerjaan YAML memasuki status Running, Anda dapat melihat hasil sinkronisasi data di StarRocks.
Hubungkan ke instans StarRocks menggunakan EMR StarRocks Manager.
Pada panel navigasi di sebelah kiri, klik SQL Editor. Pada tab Databases, klik ikon
.Database bernama order_dw_sr muncul di bawah default_catalog.
Pada tab Query List, klik +File untuk membuat query script. Kemudian, masukkan pernyataan SQL berikut dan klik Run.
SELECT * FROM default_catalog.order_dw_sr.orders order by order_id; SELECT * FROM default_catalog.order_dw_sr.orders_pay order by pay_id; SELECT * FROM default_catalog.order_dw_sr.product_catalog order by product_id;Lihat hasil sinkronisasi di bawah perintah.
Anda dapat melihat bahwa tabel dan data dari database MySQL kini tersedia di StarRocks.

Referensi
Untuk informasi selengkapnya tentang cara mengembangkan pekerjaan ingesti data Flink CDC, lihat Kembangkan pekerjaan ingesti data Flink CDC (Public Preview).
Untuk informasi selengkapnya tentang modul source, sink, transform, dan route dalam pekerjaan ingesti data Flink CDC, lihat Referensi pengembangan untuk pekerjaan ingesti data Flink CDC.