Flink全托管产品提供丰富强大的数据实时入仓入湖能力。本文为您介绍如何在Flink全托管控制台上快速构建一个从MySQL到Hologres的数据同步作业。

背景信息

假设MySQL实例中有一个tpc_ds库,里面有24张表结构不相同的业务表。另外还有user_db1~user_db3三个库,由于进行了分库分表的设计,每个库中分别有3张表结构相同的表,共包含名称为user01~user09的9张表。在阿里云DMS控制台观察到MySQL中的库和表情况如下图所示。数据库和表情况
此时,如果您希望开发一个数据同步的作业,将这些表和数据都同步到Hologres中,其中user分库分表能合并到Hologres的一张表中,则可以按照以下步骤进行:

本文使用Flink全托管提供的CREATE TABLE AS(CTAS)语句CREATE DATABASE AS(CDAS)语句来完成整库同步、分库分表合并同步,一键完成数据的全量和增量同步,以及实时的表结构变更同步。

前提条件

准备测试数据

  1. 单击tpc_ds.sqluser_db1.sqluser_db2.sqluser_db3.sql下载测试数据到本地。
  2. 在DMS数据管理控制台上,准备RDS MySQL的测试数据。
    1. 通过DMS登录RDS MySQL。
      详情请参见通过DMS登录RDS MySQL
    2. 在已登录的SQLConsole窗口,输入如下命令后单击执行
      创建tpc_ds、user_db1、user_db2和user_db3四个数据库。
      CREATE DATABASE tpc_ds;
      CREATE DATABASE user_db1;
      CREATE DATABASE user_db2;
      CREATE DATABASE user_db3;
    3. 在DMS控制台顶部菜单栏,单击数据导入
    4. 单击批量数据导入
    5. 选择需要导入的数据库,上传对应的SQL文件,单击提交申请后,单击执行变更
      同样的操作依次为tpc_ds、user_db1、user_db2和user_db3数据库导入对应的数据文件。导入数据
  3. 在Hologres控制台创建my_user数据库,用于存放合并后的user表数据。
    操作步骤详情请参见创建数据库

配置IP白名单

为了让Flink能访问MySQL和Hologres实例,您需要将Flink全托管实例的网段添加到在MySQL和Hologres的白名单中。

  1. 获取Flink全托管实例的VPC网段。
    1. 登录实时计算控制台
    2. 在目标工作空间右侧操作列,选择其他 > 工作空间详情
    3. 工作空间详情对话框,查看Flink全托管虚拟交换机的网段信息。
      网段信息
  2. 在RDS MySQ的IP白名单中,添加Flink全托管网段信息。
    操作步骤详情请参见设置IP白名单RDS白名单
  3. 在Hologres的IP白名单中,添加Flink全托管网段信息。
    操作步骤详情请参见IP白名单Holo白名单

步骤一:创建Catalog

整库同步、分库分表合并同步、单表同步都需要依赖目标Catalog来创建目标表,也依赖源Catalog来获取源表列表和信息。因此,您需要通过控制台创建源Catalog和目标Catalog。本文将以源Catalog为MySQL Catalog和目标Catalog为Hologres Catalog为例,为您进行介绍。

  1. 创建名称为mysql的MySQL Catalog。
    操作步骤详情请参见配置MySQL Catalogmysql catalog
  2. 创建名称为holo的Hologres Catalog。
    操作步骤详情请参见配置Hologres CatalogHolo Catalog
  3. Schemas页签,确认已创建名为mysql和holo的Catalog。
    刷新查看

步骤二:开发数据同步作业

  1. 登录Flink全托管开发控制台,新建作业。
    1. 登录实时计算管理控制台
    2. Flink全托管页签,单击目标工作空间操作列下的控制台
    3. 在左侧导航栏,单击作业开发
    4. 单击新建
    5. 新建文件对话框,填写作业配置信息。
      作业参数 示例 说明
      文件名称 flink-test 作业的名称。
      说明 作业名称在当前项目中必须保持唯一。
      文件类型 流作业/SQL 数据同步作业仅支持流作业/SQL类型。
      部署目标 vvp-workload 选择作业需要部署的集群名称。
      存储位置 作业开发 指定该作业的代码文件所属的文件夹。默认存放在作业开发目录。

      您还可以在现有文件夹右侧,单击新建文件夹图标,新建子文件夹。

    6. 单击确认
  2. 将以下作业代码拷贝到作业文本编辑区。
    将tpc_ds库中所有表同步至Hologres,并将user的分库分表合并同步到 Hologres的单表中。代码示例如下所示。
    USE CATALOG holo;
    
    BEGIN STATEMENT SET;
    
    -- 同步TPCDS整库到Hologres的tpc_ds库中。
    CREATE DATABASE IF NOT EXISTS tpc_ds
    AS DATABASE mysql.tpc_ds INCLUDING ALL TABLES
    /*+ OPTIONS('server-id'='8001-8004') */ ;
    
    -- 同步user分库分表到Hologres的my_user.users表中。
    CREATE TABLE IF NOT EXISTS my_user.users
    AS TABLE mysql.`user_db[0-9]+`.`user[0-9]+`
    /*+ OPTIONS('server-id'='8001-8004') */;
    
    END;
    将tpc_ds库中所有表同步至Hologres使用CDAS (CREATE DATABASE AS) 语法来实现,将user的分库分表合并同步到 Hologres的单表使用CTAS (CREATE TABLE AS) 语法来实现,最后再使用STATEMENT SET语法将这两条SQL语句合并在一个作业中提交。Flink会自动为Source进行优化,复用一个Source节点读取多张MySQL表的数据,这能显著降低MySQL的连接数和读取压力,提升稳定性。
    说明 如果只想同步库中的某些表,您也可以在CDAS语法中使用 INCLUDING TABLE或EXCLUDING TABLE 语法来指定具体需要同步的表。例如INCLUDING TABLE 'web.*'表示只同步中所有web开头的表。

步骤三:启动作业

  1. 作业开发页面,单击上线
  2. 作业运维页面,单击目标作业名称操作列中的启动
    单击确认启动后,您可以看到作业从当前状态到期望状态的变化过程及最终结果。直到状态变为RUNNING,则代表作业运行正常。可以在作业运维页面观察作业的运行信息和状态。状态变化

步骤四:观察全量同步结果

  1. 登录Hologres管理控制台
  2. 元数据管理页签,查看Hologres实例下的tpc_ds数据库中24张表和表数据。
    holo表数据
  3. 元数据管理页签,查看my_user库下users表结构。
    同步后的表结构和数据如下图所示。
    • 表结构表结构

      users表的表结构比MySQL源表中多了_db_name和_table_name两列,代表数据来源的库名和表名,且作为联合主键的一部分来保证分库分表合并后的数据唯一性。

    • 表数据
      在users表信息页面右上角,单击查询表后,输入如下命令,单击运行
      select * from users order by _db_name,_table_name,id;
      表数据结果如下图所示。表数据

步骤五:观察增量同步结果

同步作业会在全量数据同步完以后自动切换到增量数据同步阶段,无需干预。您可以通过数据曲线页签的currentEmitEventTimeLag值来确定数据同步的阶段。

  1. 登录实时计算管理控制台
  2. Flink全托管页签,单击对应工作空间操作列下的控制台
  3. 在左侧导航栏上,选择应用 > 作业运维
  4. 单击目标作业名称。
  5. 单击数据曲线页签。
  6. 观察currentEmitEventTimeLag曲线图,确定数据同步阶段。
    数据曲线
    • 值为0时,代表还在全量同步阶段。
    • 值大于0时,代表已经进入增量同步阶段。
  7. 验证实时同步数据变更和结构变更的能力。
    MySQL CDC数据源支持在增量同步阶段,实时同步表的数据变更以及表的结构变更。您可以在作业进入到增量同步阶段后,通过修改MySQL的user分表的表结构和数据,来验证实时同步数据变更和结构变更的能力。
    1. 通过DMS登录RDS MySQL。
      详情请参见通过DMS登录RDS MySQL
    2. 在user_db2数据库下,执行如下命令修改user02表的表结构,并插入和更新数据。
      USE DATABASE `user_db2`;
      ALTER TABLE `user02` ADD COLUMN `age` INT;   -- 添加age列。
      INSERT INTO `user02` (id, name, age) VALUES (27, 'Tony', 30); -- 插入带有age的数据。
      UPDATE `user05` SET name='JARK' WHERE id=15;  -- 更新另一张表,名字改成大写。
    3. 在Hologres控制台,查看users表结构和数据的变化。
      在users表信息页面右上角,单击查询表后,输入如下命令,单击运行
      select * from users order by _db_name,_table_name,id;
      表数据结果如下图所示。表结构和数据变化虽然多张分表的Schema并不一致,但是在user02上的表结构变更,以及数据变更都能实时地同步到下游表中。在Hologres的users表中,看到了新增的age字段,插入的Tony数据以及更新成大写的JARK数据。

(可选)步骤六:作业资源配置

根据数据量的不同,我们往往需要调节不同节点的并发和资源,以达到更优的作业性能。您可以使用资源配置的基础模式简单配置作业并发度和CU数,也可以使用资源配置的专家模式细粒度地调整节点的并发和资源。

  1. 作业运维页面,作业详情页面右上角,单击编辑
  2. 在页面右侧,单击资源配置页签。
  3. 资源模式选择为专家模式
  4. 单击立刻获取
  5. 单击展开全部
    观察完整的拓扑图,通过完整的拓扑图能了解到整个数据的同步计划,即具体同步哪些表。
  6. 手动设置每个节点的并发。
    设置作业为4并发;由于tpc_ds中的store_sales表数据量最大,可以单独设置holo.tpc_ds.store_sales Sink节点并发为8,提升 Hologres的写入性能。资源配置步骤详情请参见配置细粒度资源。经过调节后的作业资源配置计划如下图所示。资源配置计划
  7. 单击保存配置计划
  8. 单击上线
  9. 运维页面,查看调整效果。
    调整效果

常见问题

  • Q:更新了MySQL中的表结构,但是下游的表结构没有变化是怎么回事?

    A:表结构的变更同步并不识别具体的DDL,而是捕获前后两条数据之间的Schema变化。如果仅仅发生了DDL变更,但是上游无任何新增数据或者数据变更,则不会触发下游的数据变更。详细说明请参见表结构变更的同步策略

  • Q:Source出现finish split response timeout异常,是什么原因?

    A:该异常是因为task的CPU使用率过高导致来不及响应Coordinator的RPC请求。此时,您需要在资源配置页面增加Task Manager的CPU资源。

  • Q:在全量阶段发生表结构变更有什么影响?

    A:如果在全量读取阶段发生了源表的表结构变更,则Flink会忽略该表的表结构变更,导致表结构变更并不会同步到下游表。

  • Q:如果同步期间发生了不支持的表结构变更,导致作业同步失败,该怎么解决?

    如果发生了这种情况,需要重新同步该表的数据,即先停止作业,然后删掉下游表并重新启动同步作业,详情请参见CTAS使用限制CDAS使用限制

相关文档