Setelah membuat katalog MySQL, Anda dapat mengakses tabel dari instance MySQL di konsol pengembangan Realtime Compute for Apache Flink. Tabel tersebut juga dapat digunakan dalam penyebaran SQL Flink. Topik ini menjelaskan cara membuat dan menggunakan katalog MySQL.
Informasi latar belakang
Saat menggunakan katalog MySQL, perhatikan poin-poin berikut:
Anda dapat langsung mengakses tabel dari instance MySQL tanpa perlu mengeksekusi pernyataan DDL untuk mendaftarkan tabel MySQL. Ini meningkatkan efisiensi dan akurasi pengembangan data.
Tabel dari katalog MySQL dapat digunakan sebagai tabel sumber Change Data Capture (CDC) MySQL, tabel sink MySQL, dan tabel dimensi MySQL dalam penyebaran SQL Realtime Compute for Apache Flink.
Katalog dari ApsaraDB RDS untuk MySQL, PolarDB untuk MySQL, dan database MySQL yang dikelola sendiri didukung.
Tabel logis berbasis sharding dapat diakses secara langsung.
Anda dapat menggunakan pernyataan CREATE DATABASE AS dan CREATE TABLE AS untuk menyinkronkan data lengkap dari database, data gabungan dari tabel terpecah di database terpecah, serta perubahan dalam skema tabel berdasarkan sumber data MySQL.
Batasan
Instance MySQL dan Realtime Compute for Apache Flink harus berada di virtual private cloud (VPC) yang sama. Jika instance MySQL tidak berada di VPC yang sama dengan Realtime Compute for Apache Flink atau Anda ingin mengakses instance MySQL dan Realtime Compute for Apache Flink melalui Internet, Anda harus membuat koneksi jaringan. Untuk informasi lebih lanjut, lihat FAQ tentang konektivitas jaringan.
Konfigurasi katalog tidak dapat dimodifikasi setelah dibuat. Untuk memodifikasi katalog, Anda harus menghapusnya dan membuat yang baru.
Anda hanya dapat menanyakan data di database dan tabel yang ada. Anda tidak dapat membuat database atau tabel menggunakan Realtime Compute for Apache Flink.
Jika tabel dari katalog MySQL digunakan sebagai tabel sumber CDC MySQL, Anda hanya dapat membaca data dari tabel sumber dalam mode streaming. Pembacaan data dalam mode batch tidak didukung.
CatatanJika tabel dari katalog MySQL digunakan sebagai tabel sumber CDC MySQL, Anda harus mengaktifkan binary logging pada ApsaraDB RDS untuk MySQL, PolarDB untuk MySQL, atau database MySQL yang dikelola sendiri. Untuk informasi lebih lanjut, lihat Konfigurasikan database MySQL.
Tabel yang dibuat menggunakan sintaks spesifik PolarDB dalam pernyataan CREATE TABLE tidak dapat diidentifikasi.
Sebagai contoh, jika sebuah tabel dibuat menggunakan sintaks
PARTITION BY KEY(`idempotent_id`) PARTITIONS 16, UNIQUE KEY `uk_order_id` (`order_id`)dalam pernyataan CREATE TABLE, katalog MySQL tidak dapat mengidentifikasi tabel tersebut.Tampilan tidak dapat digunakan sebagai tabel dalam katalog MySQL yang dibuat di Realtime Compute for Apache Flink yang menggunakan Ververica Runtime (VVR) 8.0.7 atau versi lebih baru.
Hanya MySQL 5.7 dan MySQL 8.0.x yang didukung.
Buat katalog MySQL
Anda dapat membuat katalog MySQL melalui UI atau dengan mengeksekusi pernyataan SQL.
UI (direkomendasikan)
Pergi ke halaman Katalog.
Masuk ke Konsol Realtime Compute for Apache Flink. Temukan ruang kerja yang ingin Anda kelola dan klik Console di kolom Actions.
Di panel navigasi di sebelah kiri, klik Manajemen Data.
Di halaman Daftar Katalog, klik Create Catalog. Di tab Katalog Bawaan dari kotak dialog Buat Katalog, klik MySQL dan klik Next.
Di langkah Konfigurasi Katalog, konfigurasikan parameter.
PentingSetelah katalog dibuat, konfigurasi parameter berikut tidak dapat dimodifikasi. Untuk memodifikasi katalog, Anda harus menghapusnya dan membuat yang baru.

Parameter
Deskripsi
Diperlukan
catalogname
Nama katalog MySQL kustom.
Ya
hostname
Alamat IP atau nama host yang digunakan untuk mengakses database MySQL.
CatatanJika instance MySQL tidak berada di VPC yang sama dengan Realtime Compute for Apache Flink atau Anda ingin mengakses instance MySQL dan Realtime Compute for Apache Flink melalui Internet, Anda harus membuat koneksi jaringan. Untuk informasi lebih lanjut, lihat FAQ tentang konektivitas jaringan.
Ya
port
Nomor port database MySQL. Nilai default: 3306.
Tidak
default-database
Nama database MySQL default.
Ya
username
Nama pengguna yang digunakan untuk mengakses database MySQL.
Ya
password
Kata sandi yang digunakan untuk mengakses database MySQL.
Untuk mencegah risiko yang disebabkan oleh pasangan AccessKey dalam teks biasa, kami sarankan Anda menggunakan variabel untuk menentukan parameter ini. Pada gambar sampel, variabel bernama mysqlpw digunakan. Untuk informasi lebih lanjut, lihat Buat variabel.
Ya
Klik Confirm.
Di panel Catalogs di sebelah kiri halaman Daftar Katalog, lihat katalog yang telah Anda buat.
SQL
Pergi ke halaman Skrip.
Masuk ke Konsol Realtime Compute for Apache Flink. Temukan ruang kerja yang ingin Anda kelola dan klik Console di kolom Actions.
Di panel navigasi di sebelah kiri, pilih .
Klik
dan pilih New Script. Di kotak dialog, masukkan nama skrip, pilih lokasi, lalu klik Save.Salin dan tempel kode berikut:
CREATE CATALOG YourCatalogName WITH( 'type' = 'mysql', 'hostname' = 'rm-bp1gcn0q0j0******.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = 'usertest', 'password' = '${secret_values.mysqlpw}', 'default-database' = 'flinktest', 'catalog.table.metadata-columns'='table_name' );Opsi
Deskripsi
Diperlukan
YourCatalogName
Nama katalog MySQL kustom.
Ya
type
Tipe katalog. Atur nilainya menjadi mysql.
Ya
hostname
Alamat IP atau nama host yang digunakan untuk mengakses database MySQL.
CatatanUntuk mengakses MySQL lintas VPC atau melalui Internet, Anda harus membuat koneksi jaringan. Untuk informasi lebih lanjut, lihat FAQ tentang konektivitas jaringan.
Ya
port
Nomor port database MySQL. Nilai default: 3306.
Tidak
default-database
Nama database MySQL default.
Ya
username
Nama pengguna yang digunakan untuk mengakses database MySQL.
Ya
password
Kata sandi yang digunakan untuk mengakses database MySQL.
Untuk mencegah risiko yang disebabkan oleh pasangan AccessKey dalam teks biasa, kami sarankan Anda menggunakan variabel untuk menentukan opsi ini. Kode sampel menggunakan variabel bernama
mysqlpw. Untuk informasi lebih lanjut, lihat Buat variabel.Ya
property-version
Versi opsi katalog. Nilai valid: 0 dan 1. Sementara 0 adalah nilai default, 1 direkomendasikan.
Opsi yang tersedia dan nilai default dari opsi tersebut dapat bervariasi berdasarkan versi opsi katalog. Untuk informasi lebih lanjut, lihat deskripsi opsi.
CatatanHanya VVR 8.0.6 atau lebih baru yang mendukung opsi ini.
Di VVR 11.1 atau lebih baru, nilai default adalah
1.
Tidak
catalog.table.metadata-columns
Kolom metadata dalam tabel sumber CDC MySQL yang ingin Anda tambahkan ke skema tabel saat Anda menanyakan tabel. Secara default, tidak ada kolom metadata yang ditambahkan.
Pisahkan beberapa kolom metadata dengan titik koma (;). Contoh:
op_ts;table_name;database_name.CatatanHanya VVR 6.0.5 atau lebih baru yang mendukung opsi ini.
Jika Anda menentukan opsi ini, kolom metadata yang ditentukan akan ditambahkan ke skema tabel yang dikembalikan. Kolom metadata hanya tersedia di tabel sumber CDC MySQL. Oleh karena itu, tabel yang dibuat di katalog MySQL dapat digunakan hanya sebagai tabel sumber dan tidak dapat digunakan sebagai tabel sink atau tabel dimensi.
Tidak
catalog.table.treat-tinyint1-as-boolean
Menentukan apakah akan memetakan tipe data TINYINT(1) dan BOOLEAN dari MySQL ke tipe data BOOLEAN dari Flink saat Realtime Compute for Apache Flink mendapatkan skema tabel. Nilai valid:
true: Tipe data TINYINT(1) dan BOOLEAN dari MySQL dipetakan ke tipe data BOOLEAN dari Flink.
false: Tipe data TINYINT(1) dan BOOLEAN dari MySQL dipetakan ke tipe data TINYINT dari Flink.
Nilai default:
Jika Anda mengatur opsi property-version ke 0, nilai default opsi ini adalah
true.Jika Anda mengatur opsi property-version ke 1, nilai default opsi ini adalah
false.
CatatanHanya VVR 8.0.4 atau lebih baru yang mendukung opsi ini.
Kami sarankan Anda menggunakan bidang tipe data TINYINT(1) dari MySQL untuk menyimpan hanya nilai 0 dan 1. Anda harus memilih tipe data yang sesuai dengan kebutuhan bisnis Anda untuk pemetaan. Untuk informasi lebih lanjut, lihat Pemetaan tipe data.
Tidak
Pilih kode untuk pembuatan katalog dan klik Run di sebelah kiri kode.
Pesan
Pernyataan berikut telah berhasil dieksekusi!menunjukkan bahwa katalog telah dibuat.
Lihat dan hapus katalog MySQL
UI (direkomendasikan)
Di halaman Catalogs, klik katalog yang telah dibuat untuk melihat parameter Name dan Type katalog di bagian Catalog List.
Lihat katalog: Klik View di kolom Actions katalog yang ingin Anda kelola dan lihat database dan tabel di katalog.
Bidang komentar tidak ditampilkan dalam detail skema tabel.
Hapus katalog: Klik Delete di kolom Actions katalog yang ingin Anda hapus.
Operasi penghapusan hanya menghapus katalog yang Anda buat dan tidak menghapus tabel di layanan terkait. Setelah katalog dihapus, penyebaran yang sedang berjalan yang menggunakan tabel di katalog tidak terpengaruh secara negatif. Jika penyebaran diredploy atau dimulai ulang, kesalahan akan terjadi, menunjukkan bahwa tabel tidak dapat ditemukan. Berhati-hatilah saat melakukan operasi penghapusan.
SQL
Di editor kode tab Skrip di halaman Skrip, masukkan kode berikut:
-- Lihat informasi skema tabel di katalog. Bidang komentar tidak ditampilkan. DESCRIBE `<catalogname>`.`<dbname>`.`<tablename>`; -- Hapus katalog. DROP CATALOG `<catalogname>`;CatatanOperasi penghapusan hanya menghapus katalog yang Anda buat dan tidak menghapus tabel di layanan terkait. Setelah katalog dihapus, penyebaran yang sedang berjalan yang menggunakan tabel di katalog tidak terpengaruh. Jika penyebaran diredploy atau dimulai ulang, kesalahan akan terjadi menunjukkan bahwa tabel tidak dapat ditemukan. Berhati-hatilah saat melakukan operasi penghapusan.
Klik kanan pernyataan dan pilih Run dari menu pintasan.

Gunakan katalog MySQL
Baca data dari tabel sumber MySQL
INSERT INTO `<othersinktable>`
SELECT ...
FROM `<mysqlcatalog>`.`<dbname>`.`<tablename>` /*+ OPTIONS('server-id' = '6000-6008') */;Jika Anda menggunakan tabel dari katalog MySQL sebagai tabel sumber CDC MySQL, kami sarankan Anda menggunakan petunjuk SQL untuk mengatur parameter server-id ke nilai unik untuk setiap penyebaran. Jika Anda ingin menjalankan beberapa penyebaran untuk membaca data dari tabel sumber secara bersamaan, Anda harus mengatur parameter server-id ke rentang nilai. Jumlah nilai server-id dalam rentang nilai harus lebih besar dari atau sama dengan paralelisme penyebaran.
Baca data dari tabel logika MySQL berbasis sharding
Katalog MySQL memungkinkan Anda mengonfigurasi database dan tabel di database terpecah sebagai tabel logis dalam pernyataan kueri menggunakan ekspresi reguler dan mengeksekusi pernyataan kueri untuk membaca data dari tabel logis tersebut.
Sebagai contoh, jika database MySQL yang terpecah berisi tabel seperti user01, user02, dan user99, di shard database dari db01 hingga db10 dan semua skema tabel kompatibel satu sama lain, Anda dapat mengakses semua tabel di shard database menggunakan ekspresi reguler berikut:
SELECT ... FROM `db.*`.`user.*` /*+ OPTIONS('server-id'='6000-6018') */;Hasil kueri berisi dua bidang sistem tambahan _db_name (STRING) dan _table_name (STRING). Kedua bidang ini dan kunci utama dari tabel asli digunakan sebagai kunci utama gabungan baru dari tabel logis untuk memastikan bahwa kunci utama gabungan unik. Jika kunci utama dari tabel user01 hingga user99 adalah id, maka kunci utama gabungan dari tabel logis bernama user adalah (_db_name, _table_name, id).
Katalog MySQL memungkinkan Anda menggunakan ekspresi reguler untuk mencocokkan beberapa tabel yang akan disinkronkan. Dengan cara ini, Anda dapat menggabungkan dan menyinkronkan data di beberapa tabel dari database terpecah. Untuk informasi lebih lanjut, lihat Konsolidasi dan Sinkronisasi Shard Tabel dan Database.
Eksekusi pernyataan CREATE TABLE AS dan CREATE DATABASE AS untuk menyinkronkan perubahan data dan skema MySQL secara real-time
Saat menyinkronkan data, Anda harus menggunakan penyimpanan data upstream dan downstream yang didukung oleh pernyataan CREATE TABLE AS dan CREATE DATABASE AS. Sebagai contoh, saat konektor MongoDB digunakan sebagai tabel sink, kesalahan ini akan dilaporkan: Pernyataan CREATE TABLE ... AS TABLE ... memerlukan katalog target ... mengimplementasikan antarmuka org.apache.flink.table.catalog.CatalogTableProvider.
Pernyataan CREATE TABLE AS dapat menyinkronkan data dari satu tabel, menyinkronkan perubahan skema tabel, menggabungkan dan menyinkronkan data dari beberapa tabel di database terpecah, serta menambahkan kolom komputasi kustom. Anda juga dapat menambahkan pernyataan CREATE TABLE AS dalam penyebaran sinkronisasi data. Untuk informasi lebih lanjut, lihat CREATE TABLE AS (CTAS). Pernyataan CREATE DATABASE AS dapat menyinkronkan skema tabel dan data dari seluruh database secara real-time. Pernyataan ini juga dapat menyinkronkan perubahan skema tabel. Untuk informasi lebih lanjut, lihat CREATE DATABASE AS (CDAS).
-- Sinkronisasi tabel tunggal: Sinkronkan perubahan skema dan perubahan data tabel secara real-time.
CREATE TABLE IF NOT EXISTS `<targetcatalog>`.`<targetdbname>`.`<targettablename>`
WITH (...)
AS TABLE `<mysqlcatalog>`.`<dbname>`.`<tablename>`
/*+ OPTIONS('server-id'='6000-6018') */;
-- Sinkronisasi database: Sinkronkan perubahan skema dan perubahan data database secara real-time.
CREATE DATABASE `<targetcatalog>`.`<targetdbname>` WITH (...)
AS DATABASE `<mysqlcatalog>`.`<dbname>` INCLUDING ALL TABLES
/*+ OPTIONS('server-id'='6000-6018') */;Sebagai contoh, Anda dapat menyinkronkan data dari database MySQL ke Hologres. Untuk informasi lebih lanjut, lihat Gunakan Katalog Hologres.
USE CATALOG holocatalog; -- Tentukan katalog.
CREATE TABLE IF NOT EXISTS holotable -- Tentukan nama tabel tempat data disinkronkan. Jika Anda tidak menentukan tingkat database, tabel akan disinkronkan secara otomatis ke database default dari katalog.
WITH ('jdbcWriteBatchSize' = '1024') -- Secara opsional konfigurasikan opsi konektor untuk tabel sink.
AS TABLE mysqlcatalog.dbmysql.mysqltable
/*+ OPTIONS('server-id'='8001-8004') */; -- Konfigurasikan opsi tambahan untuk tabel sumber CDC MySQL.Baca data dari tabel dimensi MySQL
INSERT INTO `<othersinktable>`
SELECT ...
FROM `<othersourcetable>` AS e
JOIN `<mysqlcatalog>`.`<dbname>`.`<tablename>` FOR SYSTEM_TIME AS OF e.proctime AS w
ON e.id = w.id;Tulis data ke tabel MySQL
INSERT INTO `<mysqlcatalog>`.`<dbname>`.`<tablename>`
SELECT ...
FROM `<othersourcetable>`