配置MySQL Catalog后,您就可以在Flink全托管控制台直接访问MySQL实例中的表。本文为您介绍如何在Flink全托管模式下配置、查看及删除MySQL Catalog。

背景信息

MySQL Catalog具有以下功能特点:
  • 直接访问MySQL实例中的表,无需通过DDL语句手动注册MySQL表,提升开发效率和正确性。
  • MySQL Catalog提供的表可以直接作为Flink SQL作业中的MySQL CDC源表、MySQL结果表和MySQL维表。
  • 支持RDS MySQL、PolarDB MySQL或自建MySQL。
  • 支持直接访问分库分表逻辑表。
  • 支持配合CDAS和CTAS语法完成基于MySQL数据源的整库同步、分库分表合并同步、表结构变更同步。
本文将从以下方面为您介绍如何管理MySQL Catalog:

使用限制

  • 仅Flink计算引擎vvr-4.0.11-flink-1.13及以上版本支持配置MySQL Catalog。
  • 不支持修改Catalog DDL。
  • 仅支持查询数据库和表,不支持创建数据库和表。
  • 作为源表仅支持流读、不支持批读,支持作为维表和结果表。
  • MySQL仅支持5.7和8.0.x版本。
说明 如果MySQL Catalog提供的表被作为MySQL CDC源表时,则需要在RDS MySQL、PolarDB MySQL或者自建MySQL上开启Binlog等配置,详情请参见配置MySQL

配置MySQL Catalog

支持UI与SQL命令两种方式配置MySQL Catalog,推荐使用UI方式配置MySQL Catalog。

UI方式

  1. 登录实时计算管理控制台
  2. Flink全托管页签,单击目标工作空间操作列下的控制台
  3. 在左侧导航栏,单击作业开发
  4. 在左侧,单击Schemas页签。
  5. 单击加号图标,在菜单栏下拉框中选择创建Catalog
  6. 创建Catalog页面,选择MySQL
  7. 填写参数配置信息。
    注意 Catalog创建完成后,以下配置信息都不支持修改。如果需要修改,则您需要删除掉已创建的Catalog,重新进行创建。
    配置信息
    参数 说明 是否必填
    catalogname MySQL Catalog名称。
    type 类型,固定值为mysql。
    hostname MySQL数据库的IP地址或者Hostname。
    port MySQL数据库服务的端口号,默认值为3306。
    default-database 默认的MySQL数据库名称。
    username MySQL数据库服务的用户名。
    password MySQL数据库服务的密码。
  8. 单击确认
  9. 单击刷新图标,刷新查看新建的MySQL Catalog。
    刷新按钮

SQL命令

  1. 登录实时计算管理控制台
  2. Flink全托管页签,单击目标工作空间操作列下的控制台
  3. 在左侧导航栏,单击作业开发
  4. 在页面左上角,单击新建,文件类型选择SQL。
  5. 在文本编辑区域,输入配置MySQL Catalog的命令。
    CREATE CATALOG <catalogname> WITH(
      'type' = 'mysql',
      'hostname' = '<hostname>',
      'port' = '<port>',
      'username' = '<username>',
      'password' = '<password>',
      'default-database' = '<dbname>'
    );
    参数 说明 是否必填
    catalogname MySQL Catalog名称。
    type 类型,固定值为mysql。
    hostname MySQL数据库的IP地址或者Hostname。
    port MySQL数据库服务的端口号,默认值为3306。
    default-database 默认的MySQL数据库名称。
    username MySQL数据库服务的用户名。
    password MySQL数据库服务的密码。
  6. 单击执行
    执行完会提示Query has been executed。
  7. 在左侧,单击Schemas页签。
  8. 单击刷新图标,刷新查看新建的MySQL Catalog。
    刷新按钮

查看MySQL Catalog

MySQL Catalog配置成功后,您可以通过以下步骤查看MySQL元数据。

  1. 登录实时计算管理控制台
  2. Flink全托管页签,单击目标工作空间操作列下的控制台
  3. 在左侧导航栏,单击作业开发
  4. 单击Schemas页签。
  5. 在顶部菜单栏下拉框中,切换到目标MySQL Catalog名称,本文以mysql为例。
    mysql catalog
  6. 查看目标MySQL Catalog下的数据库、表和函数信息。
    表名

使用MySQL Catalog

  • 从MySQL源表中读取数据。
    INSERT INTO ${other_sink_table}
    SELECT ...
    FROM `${mysql_catalog}`.`${db_name}`.`${table_name}` /*+ OPTIONS('server-id'='6000-6018') */;
    说明 如果将MySQL Catalog作为MySQL CDC源表,建议使用SQL Hints来为作业指定不同的 server-id。如果源表需要多并发读取,server-id还需要配置成范围格式,范围中的server-id个数需要大于等于并发度。
  • 读取MySQL分库分表逻辑表。
    MySQL Catalog支持使用正则表达式,将库名和表名作为逻辑表名,来读取分库分表的数据。例如,有一个分库分表的MySQL数据库,包括user01、user02和user99等多个表,分散在db01~db10等数据库中,且所有表的Schema都相互兼容,则可以通过如下正则表达式的库名表名来访问到所有user的分库分表。
    SELECT ... FROM `db.*`.`user.*` /*+ OPTIONS('server-id'='6000-6018') */;

    分库分表的逻辑表会返回额外的_db_name (STRING) 和_table_name (STRING)两个系统字段,且这两个字段与原分表的主键会作为逻辑表的新联合主键以保证主键的唯一性。如果user01~user99的主键均为id,则user逻辑表的联合主键为(_db_name, _table_name, id)。MySQL Catalog支持结合正则表达式读取分库分表数据,具体示例请参见CREATE TABLE AS(CTAS)语句

  • 使用CTAS和CDAS实时同步MySQL数据变更和结构变更。
    USE CATALOG `${target_catalog}`;
    
    -- 单表同步,实时同步表级别的表结构变更和数据变更。
    CREATE TABLE IF NOT EXISTS `${target_table_name}`
    WITH (...)
    AS TABLE `${mysql_catalog}`.`${db_name}`.`${table_name}`
    /*+ OPTIONS('server-id'='6000-6018') */;
    
    -- 整库同步,实时同步整库级别的表结构变更和数据变更。
    CREATE DATABASE `${target_db_name}` WITH (...)
    AS DATABASE `${mysql_catalog}`.`${db_name}` INCLUDING ALL TABLES
    /*+ OPTIONS('server-id'='6000-6018') */;
                        

    更多示例请参见CREATE TABLE AS(CTAS)语句CREATE DATABASE AS(CDAS)语句

  • 从MySQL维表中读取数据。
    INSERT INTO ${other_sink_table}
    SELECT ...
    FROM ${other_source_table} AS e
    JOIN `${mysql_catalog}`.`${db_name}`.`${table_name}` FOR SYSTEM_TIME AS OF e.proctime AS w
    ON e.id = w.id;
  • 写入结果数据至MySQL表中。
    INSERT INTO `${mysql_catalog}`.`${db_name}`.`${table_name}`
    SELECT ...
    FROM ${other_source_table}

删除MySQL Catalog

支持UI与SQL命令两种方式删除MySQL Catalog,推荐使用UI方式删除MySQL Catalog。

UI方式

  1. 登录实时计算管理控制台
  2. Flink全托管页签,单击目标工作空间操作列下的控制台
  3. 在左侧导航栏,单击作业开发
  4. 在左侧,单击Schemas页签。
  5. 在顶部菜单栏下拉框中,选择目标MySQL Catalog名称,单击删除图标图标。
    删除按钮
  6. 在弹出的页面中,单击删除
  7. 单击刷新图标,刷新查看新建的MySQL Catalog是否已被删除。

SQL命令方式

  1. 登录实时计算管理控制台
  2. Flink全托管页签,单击目标工作空间操作列下的控制台
  3. 在左侧导航栏,单击作业开发
  4. 在页面左上角,单击新建,文件类型选择SQL。
  5. 在文本编辑区域,输入以下命令。
    DROP CATALOG ${catalog_name}
    其中,catalog_name为您要删除的在Flink全托管开发控制台上显示的MySQL Catalog名称。
    说明 删除MySQL Catalog不会影响已运行的作业,但会导致使用该Catalog下表的作业,在上线或重启时报无法找到该表的错误,请您谨慎操作。
  6. 单击执行
  7. 在左侧,单击Schemas页签。
  8. 单击刷新图标,刷新查看新建的MySQL Catalog是否已被删除。