CDAS支持整库级别的表结构和数据的实时同步,此外还支持表结构变更的同步。本文为您介绍CREATE DATABASE AS(CDAS)的背景信息、前提条件、使用限制、基本语法和示例。

背景信息

CDAS是CTAS语法的一个语法糖,以实现整库同步、多表同步的功能。阿里云Flink引擎会将CDAS语句为每个需要同步的表翻译成一个对应的CTAS语句。因此CDAS还拥有CTAS的数据同步,以及表结构变更同步的能力,常用于全自动化的数据集成场景,并且阿里云Flink还能对Source进行优化,复用一个Source节点读取多业务表的数据,这对于MySQL CDC数据源场景尤为适用。因为这不仅可以减少数据库的连接数,还能避免重复拉取Binlog数据,以降低数据库的读取压力。关于CDAS相关信息如下所示:
  • 功能特性
    功能 详情
    整库同步 支持实时同步整库(或者多张表)的全量和增量数据到每张对应的目标表中。
    表结构变更同步 在实时同步整库数据的同时,还支持将每张源表的表结构变更(加列等)实时同步到对应的目标表中。
    分库合并同步 支持使用正则表达式定义库名,匹配数据源的多个分库下的源表,合并后同步到下游每张对应表名的目标表中。
    多CDAS&CTAS语句 支持使用STATEMENT SET语法将多个CDAS和CTAS语句作为一个作业一起提交,并支持对Source节点的合并复用,降低对数据源的压力。
  • 启动流程
    当执行CDAS语句时,阿里云Flink将会按照以下流程执行:
    1. 检查目标存储中是否存在目标库和目标表。

      如果不存在目标库,则通过目标端Catalog去目标存储中创建相应的目标库;如果存在,则跳过建库,并检查目标库是否存在该目标表。如果不存在,则在目标库中创建相应的目标表,该目标表具有和源库中表相同的表名和Schema。如果存在,则跳过建表。

    2. 提交和启动相应的数据同步作业。

      将源库中的数据以及Schema变更同步到目标库下的表中。

    例如从MySQL到Hologres的CDAS数据同步流程如下图所示。CDAS示意图
  • 表结构变更同步策略

    因为CDAS是CTAS语法的一个语法糖,所以表结构变更能力与CTAS一致,详细的表结构变更同步策略请参见CREATE TABLE AS(CTAS)语句

前提条件

执行CDAS语法前,需要在您的工作空间中已注册目标端的Catalog。详情请参见管理Hive Metastore管理Hologres Catalog管理MySQL Catalog

使用限制

  • 仅Flink计算引擎vvr-4.0.11-flink-1.13及以上版本支持CDAS语法。
  • 仅Flink计算引擎vvr-4.0.13-flink-1.13及以上版本支持分库合并同步。
  • 目标端的Catalog仅支持Hologres Catalog。
  • CDAS不支持作业调试,详情请参见作业调试

基本语法

CREATE DATABASE IF NOT EXISTS <target_database>
[COMMENT database_comment]
[WITH (key1=val1, key2=val2, ...)]
AS DATABASE <source_database>
INCLUDING { ALL TABLES | TABLE 'table_name' }
[EXCLUDING TABLE 'table_name']
[/*+ OPTIONS(key1=val1, key2=val2, ... ) */]

<target_database>:
  [catalog_name.]db_name

<source_database>:
  [catalog_name.]db_name
CDAS语法复用了CREATE DATABASE语法的基本结构,其中的参数解释如下表所示。
参数 说明
target_database 数据同步的目标数据库名,可以指定具体的Catalog名称。
COMMENT 目标库的描述,默认使用source_database的描述。
WITH 目标库的参数,详情请参见对应的Catalog文档。例如管理Hologres Catalog
说明 key和value都需要为字符串类型,例如'sink.parallelism' = '4'
source_database 数据同步的源库名称,可以指定具体的Catalog名称。
INCLUDING ALL TABLES 同步源库中的所有表。
INCLUDING TABLE 同步源库中指定的表。支持使用竖线(|)分隔指定多个表,也可以使用正则表达式指定符合某一规则的表。例如INCLUDING TABLE 'web.*'表示要同步源库中所有web开头的表。
EXCLUDING TABLE 用于指定不需要同步的表,支持使用竖线(|)分隔指定多个表,也可以使用正则表达式指定符合某一规则的表,例如INCLUDING ALL TABLES EXCLUDING TABLE 'web.*'表示同步源库中所有不是web开头的表。
OPTIONS 源表的参数,详情请参见每个源表Connector文档的WITH参数。Connector文档请参见数据源表
说明 key和value都需要为字符串类型,例如'server-id' = '65500'
说明 因为IF NOT EXISTS关键字为必填,所以如果目标库或目标表在目标存储中并不存在,则会先创建该目标库和目标表,否则跳过创建步骤。创建的目标表Schema会使用源表的Schema,包括主键以及物理字段的字段名和字段类型,不包括计算列、meta字段、Watermark。其中源表到目标表的字段类型会经过类型映射,详见各个Connector页面的类型映射表。

示例

  • 示例一:整库同步

    通常,CDAS都会配合数据源的Catalog和目标的Catalog一起使用,例如MySQL Catalog和Hologres Catalog结合CDAS语法,完成 MySQL到Hologres的全量和增量数据同步。使用MySQL Catalog可以自动解析源表的Schema及相应的参数,而不用手动编写DDL。

    假设我们已在工作空间中注册了名为holo的Hologres Catalog和名为mysql的MySQL Catalog,MySQL中有一个tpcds的库。您可以使用以下语句将tpcds库下的24张表全部同步到Hologres中,包括未来的数据变更和表结构变更,无需提前在Hologres中创建表。
    USE CATALOG holo;
    
    CREATE IF NOT EXISTS DATABASE holo_tpcds  -- 在hologres中创建holo_tpcds库。
    WITH ('sink.parallelism' = '4') -- 可选,指定目标库的参数,每个holo sink默认使用4并发。
    AS DATABASE mysql.tpcds INCLUDING ALL TABLES  -- 同步mysql中tpcds库下所有表。
    /*+ OPTIONS('server-id'='8001-8004') */ ; -- 可选,指定mysql-cdc源表的额外参数。
    说明 Hologres支持在创建目标Database时指定WITH参数,这些参数仅对当前作业生效,用于控制写入目标表时的行为,不会持久化到Hologres中。目前支持的参数类型及其具体作用详情请参见Hologres结果表
  • 示例二:分库合并同步

    对于分库合并同步的场景,需要利用正则表达式的库名来匹配所要同步的多个分库。使用CDAS可以将上游多个分库下相同表名的数据合并同步到Hologres目标库对应表名的同一张表中,库名和表名会作为额外的两个字段写入到每张目标表中。为保证主键唯一性,库名、表名和原主键一起作为对应Hologres表的新联合主键。

    假设MySQL实例中有order_db01~order_db99多个分库,每个分库下都有order、order_detail等多张表。您可以使用以下语句将99个分库下的order、order_detail等表全部同步到Hologres中,包括未来的数据变更和表结构变更,无需提前在Hologres中创建表。order1
    USE CATALOG holo;
    
    CREATE DATABASE IF NOT EXISTS holo_order  -- 在Hologres中创建holo_order库,包括mysql中order分库的所有表。
    WITH ('sink.parallelism' = '4')         -- 可选,指定目标库的参数,每个Hologres Sink默认并发为4。
    AS DATABASE mysql.`order_db[0-9]+` INCLUDING ALL TABLES   -- 同步mysql中order_db分库下所有表。
    /*+ OPTIONS('server-id'='8001-8004') */ ;  -- 可选,指定mysql-cdc源表的额外参数。
  • 示例三:多CDAS&CTAS语句

    阿里云Flink也支持使用STATEMENT SET语法将多个CDAS和CTAS语句作为一个作业一起提交,并且阿里云Flink还能对Source进行优化,复用一个Source节点读取多业务表的数据,这对于MySQL CDC数据源场景尤为适用,因为这可以减少server-id的使用,从而减少对数据库的连接数和读取压力。

    假设MySQL实例中有tpcds、tpch、user_db01~user_db99(分库分表)多个库。您可以通过组合多条CDAS和CTAS语句,将MySQL实例下的所有库和表都同步到Hologres,只需一个Flink作业便能完成所有表的同步,只需一个Source便能读取所有表的数据,代码示例如下。
    USE CATALOG holo;
    
    BEGIN STATEMENT SET;
    
    -- 同步user分库分表。
    CREATE TABLE IF NOT EXISTS user
    AS TABLE mysql.`user_db[0-9]+`.`user[0-9]+`
    /*+ OPTIONS('server-id'='8001-8004') */;
    
    -- 同步TPCDS库。
    CREATE DATABASE IF NOT EXISTS holo_tpcds
    AS DATABASE mysql.tpcds INCLUDING ALL TABLES
    /*+ OPTIONS('server-id'='8001-8004') */ ;
    -- 同步TPCH库。
    CREATE DATABASE IF NOT EXISTS holo_tpch
    AS DATABASE mysql.tpch INCLUDING ALL TABLES
    /*+ OPTIONS('server-id'='8001-8004') */ ;
    
    END;