配置MySQL Catalog后,您就可以在Flink全托管控制台直接访问MySQL实例中的表。本文为您介绍如何在Flink全托管模式下配置、查看及删除MySQL Catalog。
背景信息
MySQL Catalog具有以下功能特点:
- 直接访问MySQL实例中的表,无需通过DDL语句手动注册MySQL表,提升开发效率和正确性。
- MySQL Catalog提供的表可以直接作为Flink SQL作业中的MySQL CDC源表、MySQL结果表和MySQL维表。
- 支持RDS MySQL、PolarDB MySQL或自建MySQL。
- 支持直接访问分库分表逻辑表。
- 支持配合CDAS和CTAS语法完成基于MySQL数据源的整库同步、分库分表合并同步、表结构变更同步。
本文将从以下方面为您介绍如何管理MySQL Catalog:
使用限制
- 仅Flink计算引擎vvr-4.0.11-flink-1.13及以上版本支持配置MySQL Catalog。
- 不支持修改Catalog DDL。
- 仅支持查询数据库和表,不支持创建数据库和表。
- 作为源表仅支持流读、不支持批读,支持作为维表和结果表。
- MySQL仅支持5.7和8.0.x版本。
说明 如果MySQL Catalog提供的表被作为MySQL CDC源表时,则需要在RDS MySQL、PolarDB MySQL或者自建MySQL上开启Binlog等配置,详情请参见配置MySQL。
配置MySQL Catalog
支持UI与SQL命令两种方式配置MySQL Catalog,推荐使用UI方式配置MySQL Catalog。
UI方式
SQL命令
查看MySQL Catalog
MySQL Catalog配置成功后,您可以通过以下步骤查看MySQL元数据。
使用MySQL Catalog
- 从MySQL源表中读取数据。
INSERT INTO ${other_sink_table} SELECT ... FROM `${mysql_catalog}`.`${db_name}`.`${table_name}` /*+ OPTIONS('server-id'='6000-6018') */;
说明 如果将MySQL Catalog作为MySQL CDC源表,建议使用SQL Hints来为作业指定不同的 server-id。如果源表需要多并发读取,server-id还需要配置成范围格式,范围中的server-id个数需要大于等于并发度。 - 读取MySQL分库分表逻辑表。
MySQL Catalog支持使用正则表达式,将库名和表名作为逻辑表名,来读取分库分表的数据。例如,有一个分库分表的MySQL数据库,包括user01、user02和user99等多个表,分散在db01~db10等数据库中,且所有表的Schema都相互兼容,则可以通过如下正则表达式的库名表名来访问到所有user的分库分表。
SELECT ... FROM `db.*`.`user.*` /*+ OPTIONS('server-id'='6000-6018') */;
分库分表的逻辑表会返回额外的_db_name (STRING) 和_table_name (STRING)两个系统字段,且这两个字段与原分表的主键会作为逻辑表的新联合主键以保证主键的唯一性。如果user01~user99的主键均为id,则user逻辑表的联合主键为(_db_name, _table_name, id)。MySQL Catalog支持结合正则表达式读取分库分表数据,具体示例请参见CREATE TABLE AS(CTAS)语句。
- 使用CTAS和CDAS实时同步MySQL数据变更和结构变更。
USE CATALOG `${target_catalog}`; -- 单表同步,实时同步表级别的表结构变更和数据变更。 CREATE TABLE IF NOT EXISTS `${target_table_name}` WITH (...) AS TABLE `${mysql_catalog}`.`${db_name}`.`${table_name}` /*+ OPTIONS('server-id'='6000-6018') */; -- 整库同步,实时同步整库级别的表结构变更和数据变更。 CREATE DATABASE `${target_db_name}` WITH (...) AS DATABASE `${mysql_catalog}`.`${db_name}` INCLUDING ALL TABLES /*+ OPTIONS('server-id'='6000-6018') */;
- 从MySQL维表中读取数据。
INSERT INTO ${other_sink_table} SELECT ... FROM ${other_source_table} AS e JOIN `${mysql_catalog}`.`${db_name}`.`${table_name}` FOR SYSTEM_TIME AS OF e.proctime AS w ON e.id = w.id;
- 写入结果数据至MySQL表中。
INSERT INTO `${mysql_catalog}`.`${db_name}`.`${table_name}` SELECT ... FROM ${other_source_table}
删除MySQL Catalog
支持UI与SQL命令两种方式删除MySQL Catalog,推荐使用UI方式删除MySQL Catalog。