After you configure a MySQL catalog, you can access the tables of a MySQL instance in the console of fully managed Flink. This topic describes how to configure, view, and delete a MySQL catalog in the console of fully managed Flink.
Background information
- You can directly access a table of a MySQL instance without the need to execute DDL statements to register the MySQL table. This improves the efficiency and accuracy of data development.
- Tables of MySQL catalogs can be used as MySQL Change Data Capture (CDC) source tables, MySQL result tables, and MySQL dimension tables in Flink SQL jobs.
- Catalogs of ApsaraDB RDS for MySQL, PolarDB for MySQL, and self-managed MySQL databases are supported.
- Sharding-based logical tables can be directly accessed.
- You can use the CREATE DATABASE AS and CREATE TABLE AS statements to synchronize full data of a database, the merged data of sharded tables in a sharded database, and changes in table schemas based on MySQL data sources.
Limits
- Only the Flink compute engine of vvr-4.0.11-flink-1.13 or later supports MySQL catalogs.
- You cannot modify the DDL statements that are related to catalogs.
- You can only query data in databases and tables. You cannot create databases or tables.
- If a table of a MySQL catalog is used as a MySQL CDC source table, you can read data from the source table only in streaming mode. You cannot read data from the source table in batch mode. If a table of a MySQL catalog is used as a dimension table or result table, both streaming processing and batch processing are supported.
- Only MySQL 5.7 and MySQL 8.0.X are supported.
Configure a MySQL catalog
You can configure a MySQL catalog on the UI or by executing an SQL statement. We recommend that you configure a MySQL catalog on the UI.
Configure a MySQL catalog on the UI
Configure a MySQL catalog by executing an SQL statement
View the metadata of a MySQL catalog
After you configure a MySQL catalog, you can perform the following steps to view the metadata of the MySQL catalog.
Use a MySQL catalog
- Read data from a MySQL CDC source table.
INSERT INTO ${other_sink_table} SELECT ... FROM `${mysql_catalog}`.`${db_name}`.`${table_name}` /*+ OPTIONS('server-id'='6000-6018') */;
Note If you want to use a table of a MySQL catalog as the MySQL CDC source table, we recommend that you use SQL hints to set the server-id parameter to a unique value for each job. If you want to run multiple jobs to read data from the source table at the same time, you must set the server-id parameter to a value range. The number of server-id values in the value range must be greater than or equal to the parallelism of jobs. - Read data from the sharding-based MySQL logic tables.
MySQL catalogs allow you to configure the databases and tables in a sharded database as a logical table in a query statement by using regular expressions and execute the query statement to read data from the logical table. For example, if a sharded MySQL database has multiple tables, such as user01, user02, and user99, in database shards from db01 to db10 and the schemas of all the tables are compatible with each other, you can access all the tables in the database shards by using the following regular expression:
SELECT ... FROM `db.*`.`user.*` /*+ OPTIONS('server-id'='6000-6018') */;
The query result contains two additional system fields _db_name (STRING) and _table_name (STRING). The two fields and the primary key of the original tables are used as the new joint primary key of the logical table to ensure that the joint primary key is unique. If the primary keys of the tables from user01 to user99 are id, the joint primary key of the logical table named user is (_db_name, _table_name, id). For more information about how to read data from sharded tables in a sharded database by using regular expressions, see CREATE TABLE AS statement.
- Execute the CREATE TABLE AS and CREATE DATABASE AS statements to synchronize MySQL
data changes and schema changes in real time.
USE CATALOG `${target_catalog}`; -- Single-table synchronization: Synchronize schema changes and data changes of tables in real time. CREATE TABLE IF NOT EXISTS `${target_table_name}` WITH (...) AS TABLE `${mysql_catalog}`.`${db_name}`.`${table_name}` /*+ OPTIONS('server-id'='6000-6018') */; -- Database synchronization: Synchronize schema changes and data changes of the database in real time. CREATE DATABASE `${target_db_name}` WITH (...) AS DATABASE `${mysql_catalog}`.`${db_name}` INCLUDING ALL TABLES /*+ OPTIONS('server-id'='6000-6018') */;
For more examples, see CREATE TABLE AS statement or CREATE DATABASE AS statement.
- Read data from a MySQL dimension table.
INSERT INTO ${other_sink_table} SELECT ... FROM ${other_source_table} AS e JOIN `${mysql_catalog}`.`${db_name}`.`${table_name}` FOR SYSTEM_TIME AS OF e.proctime AS w ON e.id = w.id;
- Write result data to a MySQL table.
INSERT INTO `${mysql_catalog}`.`${db_name}`.`${table_name}` SELECT ... FROM ${other_source_table}
Delete a MySQL catalog
You can delete a MySQL catalog on the UI or by using an SQL statement. We recommend that you delete a MySQL catalog on the UI.