All Products
Search
Document Center

Realtime Compute for Apache Flink:Manage MySQL catalogs

Last Updated:Mar 26, 2026

A MySQL catalog lets you query tables from a MySQL instance directly in Realtime Compute for Apache Flink — no DDL registration required. Once created, catalog tables work as CDC (Change Data Capture) source tables, sink tables, and dimension tables in Flink SQL deployments.

Supported databases

ApsaraDB RDS for MySQL, PolarDB for MySQL, and self-managed MySQL databases are supported. Only MySQL 5.7 and MySQL 8.0.x are supported.

Limitations

  • The MySQL instance and Realtime Compute for Apache Flink must be in the same virtual private cloud (VPC). To connect across VPCs or over the Internet, see FAQ about network connectivity.

  • Catalog configuration cannot be modified after creation. To change the configuration, drop the catalog and create a new one.

  • You can only query tables in existing databases. Creating databases or tables through Realtime Compute for Apache Flink is not supported.

  • MySQL CDC source tables can only be read in streaming mode, not batch mode.

  • Views cannot be used as tables in a MySQL catalog created with Ververica Runtime (VVR) 8.0.7 or later.

  • Tables created with PolarDB-specific syntax (for example, PARTITION BY KEY(...) PARTITIONS 16, UNIQUE KEY ... in a CREATE TABLE statement) cannot be identified by the MySQL catalog.

Usage notes

  • Enable binary logging on ApsaraDB RDS for MySQL, PolarDB for MySQL, or your self-managed MySQL database before using a table as a MySQL CDC source table. For more information, see Configure a MySQL database.

  • Set a unique server-id per deployment when reading from a MySQL CDC source table. Use a value range when running multiple deployments concurrently — the range must contain at least as many values as the deployment parallelism.

  • Store the database password as a variable instead of specifying it in plaintext. For more information, see Create a variable.

  • The comment field is not displayed in table schema details.

  • Dropping a catalog does not drop the underlying tables in MySQL. Running deployments continue unaffected, but redeploying or restarting a deployment that references a dropped catalog causes a "table not found" error.

Create a MySQL catalog

Create a MySQL catalog from the console (recommended) or by running a CREATE CATALOG SQL statement.

Console (recommended)

  1. Go to the Catalogs page.

    1. Log on to the Realtime Compute for Apache Flink console. Find the workspace you want to manage and click Console in the Actions column.

    2. In the left-side navigation pane, click Data Management.

  2. On the Catalog List page, click Create Catalog. On the Built-in Catalog tab, click MySQL, then click Next.

  3. In the Configure Catalog step, fill in the parameters.

    Important

    Catalog configuration cannot be modified after creation. To change the configuration, drop the catalog and create a new one.

    ParameterDescriptionRequired
    catalognameA name for the MySQL catalog.Yes
    hostnameThe IP address or hostname of the MySQL database. If the MySQL instance is in a different VPC or must be accessed over the Internet, establish network connections first. See FAQ about network connectivity.Yes
    portThe port number of the MySQL database. Default: 3306.No
    default-databaseThe name of the default MySQL database.Yes
    usernameThe username for the MySQL database.Yes
    passwordThe password for the MySQL database. Store it as a variable to avoid plaintext exposure. See Create a variable.Yes

    Configure catalog parameters

  4. Click Confirm. The new catalog appears in the Catalogs pane on the left side of the Catalog List page.

View and drop a MySQL catalog

Console (recommended)

On the Catalogs page, click the catalog name to view its Name and Type in the Catalog List section.

  • View databases and tables: Click View in the Actions column.

  • Drop the catalog: Click Delete in the Actions column.

Use a MySQL catalog

Read from a CDC source table

Use SQL hints to set server-id for each deployment. When multiple deployments read from the same source table concurrently, use a range — the range must contain at least as many values as the deployment parallelism.

INSERT INTO `<othersinktable>`
SELECT ...
FROM `<mysqlcatalog>`.`<dbname>`.`<tablename>` /*+ OPTIONS('server-id' = '6000-6008') */;

Read from sharded tables

Query sharded tables as a single logical table using regular expressions in the database and table name segments. The query result includes two system fields, _db_name (STRING) and _table_name (STRING), which together with the original primary keys form the new joint primary key.

For example, tables user01 through user99 spread across database shards db01 through db10:

SELECT ... FROM `db.*`.`user.*` /*+ OPTIONS('server-id'='6000-6018') */;

The joint primary key for this logical table is (_db_name, _table_name, id), where id is the original primary key.

To merge and synchronize data across multiple tables in a sharded database, see Consolidate and synchronize table and database shards.

Synchronize data with CTAS and CDAS

Use CREATE TABLE AS (CTAS) to synchronize a single table — including schema changes — or to merge data from sharded tables. Use CREATE DATABASE AS (CDAS) to synchronize an entire database, including ongoing schema changes.

Note

CTAS and CDAS require supported upstream and downstream data stores. If the downstream connector does not implement CatalogTableProvider, an error is returned.

-- Single-table sync: propagates schema changes and data changes in real time
CREATE TABLE IF NOT EXISTS `<targetcatalog>`.`<targetdbname>`.`<targettablename>`
WITH (...)
AS TABLE `<mysqlcatalog>`.`<dbname>`.`<tablename>`
/*+ OPTIONS('server-id'='6000-6018') */;

-- Full-database sync: propagates schema changes and data changes in real time
CREATE DATABASE `<targetcatalog>`.`<targetdbname>` WITH (...)
AS DATABASE `<mysqlcatalog>`.`<dbname>` INCLUDING ALL TABLES
/*+ OPTIONS('server-id'='6000-6018') */;

The following example synchronizes a MySQL table to Hologres:

USE CATALOG holocatalog; -- Set the target catalog

CREATE TABLE IF NOT EXISTS holotable   -- Target table name; defaults to the catalog's default database if no database is specified
WITH ('jdbcWriteBatchSize' = '1024')   -- Optional sink connector options
AS TABLE mysqlcatalog.dbmysql.mysqltable
/*+ OPTIONS('server-id'='8001-8004') */; -- Additional options for the MySQL CDC source table

For a complete walkthrough, see Use a Hologres catalog.

For more information, see CREATE TABLE AS (CTAS) for single-table and sharded-table sync, and CREATE DATABASE AS (CDAS) for full-database sync.

Read from a dimension table

INSERT INTO `<othersinktable>`
SELECT ...
FROM `<othersourcetable>` AS e
JOIN `<mysqlcatalog>`.`<dbname>`.`<tablename>` FOR SYSTEM_TIME AS OF e.proctime AS w
ON e.id = w.id;

Write to a MySQL table

INSERT INTO `<mysqlcatalog>`.`<dbname>`.`<tablename>`
SELECT ...
FROM `<othersourcetable>`;