All Products
Search
Document Center

Realtime Compute for Apache Flink:Kelola katalog MySQL

Last Updated:Nov 10, 2025

Setelah membuat katalog MySQL, Anda dapat mengakses tabel dari instance MySQL di konsol pengembangan Realtime Compute for Apache Flink. Tabel tersebut juga dapat digunakan dalam penyebaran SQL Flink. Topik ini menjelaskan cara membuat dan menggunakan katalog MySQL.

Informasi latar belakang

Saat menggunakan katalog MySQL, perhatikan poin-poin berikut:

  • Anda dapat langsung mengakses tabel dari instance MySQL tanpa perlu mengeksekusi pernyataan DDL untuk mendaftarkan tabel MySQL. Ini meningkatkan efisiensi dan akurasi pengembangan data.

  • Tabel dari katalog MySQL dapat digunakan sebagai tabel sumber Change Data Capture (CDC) MySQL, tabel sink MySQL, dan tabel dimensi MySQL dalam penyebaran SQL Realtime Compute for Apache Flink.

  • Katalog dari ApsaraDB RDS untuk MySQL, PolarDB untuk MySQL, dan database MySQL yang dikelola sendiri didukung.

  • Tabel logis berbasis sharding dapat diakses secara langsung.

  • Anda dapat menggunakan pernyataan CREATE DATABASE AS dan CREATE TABLE AS untuk menyinkronkan data lengkap dari database, data gabungan dari tabel terpecah di database terpecah, serta perubahan dalam skema tabel berdasarkan sumber data MySQL.

Batasan

  • Instance MySQL dan Realtime Compute for Apache Flink harus berada di virtual private cloud (VPC) yang sama. Jika instance MySQL tidak berada di VPC yang sama dengan Realtime Compute for Apache Flink atau Anda ingin mengakses instance MySQL dan Realtime Compute for Apache Flink melalui Internet, Anda harus membuat koneksi jaringan. Untuk informasi lebih lanjut, lihat FAQ tentang konektivitas jaringan.

  • Konfigurasi katalog tidak dapat dimodifikasi setelah dibuat. Untuk memodifikasi katalog, Anda harus menghapusnya dan membuat yang baru.

  • Anda hanya dapat menanyakan data di database dan tabel yang ada. Anda tidak dapat membuat database atau tabel menggunakan Realtime Compute for Apache Flink.

  • Jika tabel dari katalog MySQL digunakan sebagai tabel sumber CDC MySQL, Anda hanya dapat membaca data dari tabel sumber dalam mode streaming. Pembacaan data dalam mode batch tidak didukung.

    Catatan

    Jika tabel dari katalog MySQL digunakan sebagai tabel sumber CDC MySQL, Anda harus mengaktifkan binary logging pada ApsaraDB RDS untuk MySQL, PolarDB untuk MySQL, atau database MySQL yang dikelola sendiri. Untuk informasi lebih lanjut, lihat Konfigurasikan database MySQL.

  • Tabel yang dibuat menggunakan sintaks spesifik PolarDB dalam pernyataan CREATE TABLE tidak dapat diidentifikasi.

    Sebagai contoh, jika sebuah tabel dibuat menggunakan sintaks PARTITION BY KEY(`idempotent_id`) PARTITIONS 16, UNIQUE KEY `uk_order_id` (`order_id`) dalam pernyataan CREATE TABLE, katalog MySQL tidak dapat mengidentifikasi tabel tersebut.

  • Tampilan tidak dapat digunakan sebagai tabel dalam katalog MySQL yang dibuat di Realtime Compute for Apache Flink yang menggunakan Ververica Runtime (VVR) 8.0.7 atau versi lebih baru.

  • Hanya MySQL 5.7 dan MySQL 8.0.x yang didukung.

Buat katalog MySQL

Anda dapat membuat katalog MySQL melalui UI atau dengan mengeksekusi pernyataan SQL.

UI (direkomendasikan)

  1. Pergi ke halaman Katalog.

    1. Masuk ke Konsol Realtime Compute for Apache Flink. Temukan ruang kerja yang ingin Anda kelola dan klik Console di kolom Actions.

    2. Di panel navigasi di sebelah kiri, klik Manajemen Data.

  2. Di halaman Daftar Katalog, klik Create Catalog. Di tab Katalog Bawaan dari kotak dialog Buat Katalog, klik MySQL dan klik Next.

  3. Di langkah Konfigurasi Katalog, konfigurasikan parameter.

    Penting

    Setelah katalog dibuat, konfigurasi parameter berikut tidak dapat dimodifikasi. Untuk memodifikasi katalog, Anda harus menghapusnya dan membuat yang baru.

    配置信息

    Parameter

    Deskripsi

    Diperlukan

    catalogname

    Nama katalog MySQL kustom.

    Ya

    hostname

    Alamat IP atau nama host yang digunakan untuk mengakses database MySQL.

    Catatan

    Jika instance MySQL tidak berada di VPC yang sama dengan Realtime Compute for Apache Flink atau Anda ingin mengakses instance MySQL dan Realtime Compute for Apache Flink melalui Internet, Anda harus membuat koneksi jaringan. Untuk informasi lebih lanjut, lihat FAQ tentang konektivitas jaringan.

    Ya

    port

    Nomor port database MySQL. Nilai default: 3306.

    Tidak

    default-database

    Nama database MySQL default.

    Ya

    username

    Nama pengguna yang digunakan untuk mengakses database MySQL.

    Ya

    password

    Kata sandi yang digunakan untuk mengakses database MySQL.

    Untuk mencegah risiko yang disebabkan oleh pasangan AccessKey dalam teks biasa, kami sarankan Anda menggunakan variabel untuk menentukan parameter ini. Pada gambar sampel, variabel bernama mysqlpw digunakan. Untuk informasi lebih lanjut, lihat Buat variabel.

    Ya

  4. Klik Confirm.

    Di panel Catalogs di sebelah kiri halaman Daftar Katalog, lihat katalog yang telah Anda buat.

SQL

  1. Pergi ke halaman Skrip.

    1. Masuk ke Konsol Realtime Compute for Apache Flink. Temukan ruang kerja yang ingin Anda kelola dan klik Console di kolom Actions.

    2. Di panel navigasi di sebelah kiri, pilih Development > Scripts.

  2. Klik image.png dan pilih New Script. Di kotak dialog, masukkan nama skrip, pilih lokasi, lalu klik Save.

  3. Salin dan tempel kode berikut:

    CREATE CATALOG YourCatalogName WITH(
      'type' = 'mysql',
      'hostname' = 'rm-bp1gcn0q0j0******.mysql.rds.aliyuncs.com',
      'port' = '3306',
      'username' = 'usertest',
      'password' = '${secret_values.mysqlpw}',
      'default-database' = 'flinktest',
      'catalog.table.metadata-columns'='table_name'
    );

    Opsi

    Deskripsi

    Diperlukan

    YourCatalogName

    Nama katalog MySQL kustom.

    Ya

    type

    Tipe katalog. Atur nilainya menjadi mysql.

    Ya

    hostname

    Alamat IP atau nama host yang digunakan untuk mengakses database MySQL.

    Catatan

    Untuk mengakses MySQL lintas VPC atau melalui Internet, Anda harus membuat koneksi jaringan. Untuk informasi lebih lanjut, lihat FAQ tentang konektivitas jaringan.

    Ya

    port

    Nomor port database MySQL. Nilai default: 3306.

    Tidak

    default-database

    Nama database MySQL default.

    Ya

    username

    Nama pengguna yang digunakan untuk mengakses database MySQL.

    Ya

    password

    Kata sandi yang digunakan untuk mengakses database MySQL.

    Untuk mencegah risiko yang disebabkan oleh pasangan AccessKey dalam teks biasa, kami sarankan Anda menggunakan variabel untuk menentukan opsi ini. Kode sampel menggunakan variabel bernama mysqlpw. Untuk informasi lebih lanjut, lihat Buat variabel.

    Ya

    property-version

    Versi opsi katalog. Nilai valid: 0 dan 1. Sementara 0 adalah nilai default, 1 direkomendasikan.

    Opsi yang tersedia dan nilai default dari opsi tersebut dapat bervariasi berdasarkan versi opsi katalog. Untuk informasi lebih lanjut, lihat deskripsi opsi.

    Catatan
    • Hanya VVR 8.0.6 atau lebih baru yang mendukung opsi ini.

    • Di VVR 11.1 atau lebih baru, nilai default adalah 1.

    Tidak

    catalog.table.metadata-columns

    Kolom metadata dalam tabel sumber CDC MySQL yang ingin Anda tambahkan ke skema tabel saat Anda menanyakan tabel. Secara default, tidak ada kolom metadata yang ditambahkan.

    Pisahkan beberapa kolom metadata dengan titik koma (;). Contoh: op_ts;table_name;database_name.

    Catatan
    • Hanya VVR 6.0.5 atau lebih baru yang mendukung opsi ini.

    • Jika Anda menentukan opsi ini, kolom metadata yang ditentukan akan ditambahkan ke skema tabel yang dikembalikan. Kolom metadata hanya tersedia di tabel sumber CDC MySQL. Oleh karena itu, tabel yang dibuat di katalog MySQL dapat digunakan hanya sebagai tabel sumber dan tidak dapat digunakan sebagai tabel sink atau tabel dimensi.

    Tidak

    catalog.table.treat-tinyint1-as-boolean

    Menentukan apakah akan memetakan tipe data TINYINT(1) dan BOOLEAN dari MySQL ke tipe data BOOLEAN dari Flink saat Realtime Compute for Apache Flink mendapatkan skema tabel. Nilai valid:

    • true: Tipe data TINYINT(1) dan BOOLEAN dari MySQL dipetakan ke tipe data BOOLEAN dari Flink.

    • false: Tipe data TINYINT(1) dan BOOLEAN dari MySQL dipetakan ke tipe data TINYINT dari Flink.

    Nilai default:

    • Jika Anda mengatur opsi property-version ke 0, nilai default opsi ini adalah true.

    • Jika Anda mengatur opsi property-version ke 1, nilai default opsi ini adalah false.

    Catatan
    • Hanya VVR 8.0.4 atau lebih baru yang mendukung opsi ini.

    • Kami sarankan Anda menggunakan bidang tipe data TINYINT(1) dari MySQL untuk menyimpan hanya nilai 0 dan 1. Anda harus memilih tipe data yang sesuai dengan kebutuhan bisnis Anda untuk pemetaan. Untuk informasi lebih lanjut, lihat Pemetaan tipe data.

    Tidak

  4. Pilih kode untuk pembuatan katalog dan klik Run di sebelah kiri kode.

    Pesan Pernyataan berikut telah berhasil dieksekusi! menunjukkan bahwa katalog telah dibuat.

    image..png

Lihat dan hapus katalog MySQL

UI (direkomendasikan)

Di halaman Catalogs, klik katalog yang telah dibuat untuk melihat parameter Name dan Type katalog di bagian Catalog List.

  • Lihat katalog: Klik View di kolom Actions katalog yang ingin Anda kelola dan lihat database dan tabel di katalog.

    Bidang komentar tidak ditampilkan dalam detail skema tabel.

  • Hapus katalog: Klik Delete di kolom Actions katalog yang ingin Anda hapus.

    Operasi penghapusan hanya menghapus katalog yang Anda buat dan tidak menghapus tabel di layanan terkait. Setelah katalog dihapus, penyebaran yang sedang berjalan yang menggunakan tabel di katalog tidak terpengaruh secara negatif. Jika penyebaran diredploy atau dimulai ulang, kesalahan akan terjadi, menunjukkan bahwa tabel tidak dapat ditemukan. Berhati-hatilah saat melakukan operasi penghapusan.

SQL

  1. Di editor kode tab Skrip di halaman Skrip, masukkan kode berikut:

    -- Lihat informasi skema tabel di katalog. Bidang komentar tidak ditampilkan.
    DESCRIBE `<catalogname>`.`<dbname>`.`<tablename>`;
    
    -- Hapus katalog.
    DROP CATALOG `<catalogname>`;
    Catatan

    Operasi penghapusan hanya menghapus katalog yang Anda buat dan tidak menghapus tabel di layanan terkait. Setelah katalog dihapus, penyebaran yang sedang berjalan yang menggunakan tabel di katalog tidak terpengaruh. Jika penyebaran diredploy atau dimulai ulang, kesalahan akan terjadi menunjukkan bahwa tabel tidak dapat ditemukan. Berhati-hatilah saat melakukan operasi penghapusan.

  2. Klik kanan pernyataan dan pilih Run dari menu pintasan.

    image

Gunakan katalog MySQL

Baca data dari tabel sumber MySQL

INSERT INTO `<othersinktable>`
SELECT ...
FROM `<mysqlcatalog>`.`<dbname>`.`<tablename>` /*+ OPTIONS('server-id' = '6000-6008') */;

    Jika Anda menggunakan tabel dari katalog MySQL sebagai tabel sumber CDC MySQL, kami sarankan Anda menggunakan petunjuk SQL untuk mengatur parameter server-id ke nilai unik untuk setiap penyebaran. Jika Anda ingin menjalankan beberapa penyebaran untuk membaca data dari tabel sumber secara bersamaan, Anda harus mengatur parameter server-id ke rentang nilai. Jumlah nilai server-id dalam rentang nilai harus lebih besar dari atau sama dengan paralelisme penyebaran.

    Baca data dari tabel logika MySQL berbasis sharding

    Katalog MySQL memungkinkan Anda mengonfigurasi database dan tabel di database terpecah sebagai tabel logis dalam pernyataan kueri menggunakan ekspresi reguler dan mengeksekusi pernyataan kueri untuk membaca data dari tabel logis tersebut.

    Sebagai contoh, jika database MySQL yang terpecah berisi tabel seperti user01, user02, dan user99, di shard database dari db01 hingga db10 dan semua skema tabel kompatibel satu sama lain, Anda dapat mengakses semua tabel di shard database menggunakan ekspresi reguler berikut:

    SELECT ... FROM `db.*`.`user.*` /*+ OPTIONS('server-id'='6000-6018') */;

    Hasil kueri berisi dua bidang sistem tambahan _db_name (STRING) dan _table_name (STRING). Kedua bidang ini dan kunci utama dari tabel asli digunakan sebagai kunci utama gabungan baru dari tabel logis untuk memastikan bahwa kunci utama gabungan unik. Jika kunci utama dari tabel user01 hingga user99 adalah id, maka kunci utama gabungan dari tabel logis bernama user adalah (_db_name, _table_name, id).

    Katalog MySQL memungkinkan Anda menggunakan ekspresi reguler untuk mencocokkan beberapa tabel yang akan disinkronkan. Dengan cara ini, Anda dapat menggabungkan dan menyinkronkan data di beberapa tabel dari database terpecah. Untuk informasi lebih lanjut, lihat Konsolidasi dan Sinkronisasi Shard Tabel dan Database.

    Eksekusi pernyataan CREATE TABLE AS dan CREATE DATABASE AS untuk menyinkronkan perubahan data dan skema MySQL secara real-time

    Catatan

    Saat menyinkronkan data, Anda harus menggunakan penyimpanan data upstream dan downstream yang didukung oleh pernyataan CREATE TABLE AS dan CREATE DATABASE AS. Sebagai contoh, saat konektor MongoDB digunakan sebagai tabel sink, kesalahan ini akan dilaporkan: Pernyataan CREATE TABLE ... AS TABLE ... memerlukan katalog target ... mengimplementasikan antarmuka org.apache.flink.table.catalog.CatalogTableProvider.

    Pernyataan CREATE TABLE AS dapat menyinkronkan data dari satu tabel, menyinkronkan perubahan skema tabel, menggabungkan dan menyinkronkan data dari beberapa tabel di database terpecah, serta menambahkan kolom komputasi kustom. Anda juga dapat menambahkan pernyataan CREATE TABLE AS dalam penyebaran sinkronisasi data. Untuk informasi lebih lanjut, lihat CREATE TABLE AS (CTAS). Pernyataan CREATE DATABASE AS dapat menyinkronkan skema tabel dan data dari seluruh database secara real-time. Pernyataan ini juga dapat menyinkronkan perubahan skema tabel. Untuk informasi lebih lanjut, lihat CREATE DATABASE AS (CDAS).

    -- Sinkronisasi tabel tunggal: Sinkronkan perubahan skema dan perubahan data tabel secara real-time.
    CREATE TABLE IF NOT EXISTS `<targetcatalog>`.`<targetdbname>`.`<targettablename>`
    WITH (...)
    AS TABLE `<mysqlcatalog>`.`<dbname>`.`<tablename>`
    /*+ OPTIONS('server-id'='6000-6018') */;
    
    -- Sinkronisasi database: Sinkronkan perubahan skema dan perubahan data database secara real-time.
    CREATE DATABASE `<targetcatalog>`.`<targetdbname>` WITH (...)
    AS DATABASE `<mysqlcatalog>`.`<dbname>` INCLUDING ALL TABLES
    /*+ OPTIONS('server-id'='6000-6018') */;

    Sebagai contoh, Anda dapat menyinkronkan data dari database MySQL ke Hologres. Untuk informasi lebih lanjut, lihat Gunakan Katalog Hologres.

    USE CATALOG holocatalog; -- Tentukan katalog.
    
    CREATE TABLE IF NOT EXISTS holotable -- Tentukan nama tabel tempat data disinkronkan. Jika Anda tidak menentukan tingkat database, tabel akan disinkronkan secara otomatis ke database default dari katalog.
    WITH ('jdbcWriteBatchSize' = '1024')   -- Secara opsional konfigurasikan opsi konektor untuk tabel sink.
    AS TABLE mysqlcatalog.dbmysql.mysqltable   
    /*+ OPTIONS('server-id'='8001-8004') */; -- Konfigurasikan opsi tambahan untuk tabel sumber CDC MySQL.

    Baca data dari tabel dimensi MySQL

    INSERT INTO `<othersinktable>`
    SELECT ...
    FROM `<othersourcetable>` AS e
    JOIN `<mysqlcatalog>`.`<dbname>`.`<tablename>` FOR SYSTEM_TIME AS OF e.proctime AS w
    ON e.id = w.id;

    Tulis data ke tabel MySQL

    INSERT INTO `<mysqlcatalog>`.`<dbname>`.`<tablename>`
    SELECT ...
    FROM `<othersourcetable>`