本文为您介绍如何使用云原生数据仓库AnalyticDB MySQL版3.0连接器。

背景信息

云原生数据仓库AnalyticDB MySQL版3.0是融合数据库、大数据技术于一体的云原生企业级数据仓库服务。AnalyticDB MySQL版支持高吞吐的数据实时增删改、低延时的实时分析和复杂ETL,兼容上下游生态工具,可用于构建企业级报表系统、数据仓库和数据服务引擎。

ADB MySQL 3.0连接器支持的信息如下。
类别详情
支持类型维表和结果表
运行模式流模式和批模式
数据格式暂不适用
特有监控指标暂无
API种类SQL
是否支持更新或删除结果表数据

前提条件

使用限制

  • 仅支持作为维表和结果表,不支持作为源表
  • 仅Flink计算引擎VVR 3.x及以上版本支持云原生数据仓库AnalyticDB MySQL版3.0连接器

语法结构

CREATE TEMPORARY TABLE adb_table (
  `id` INT,
  `num` BIGINT,
  PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
  'connector' = 'adb3.0',
  'url' = '<yourUrl>',
  'userName' = '<yourUsername>',
  'password' = '<yourPassword>',
  'tableName' = '<yourTablename>'
);

WITH参数

  • 通用
    参数说明数据类型是否必填默认值备注
    connector结果表类型。String固定值为adb3.0。
    urlJDBC连接地址。String云原生数据仓库AnalyticDB MySQL版数据库地址。示例:url='jdbc:mysql://databaseName****-cn-shenzhen-a.ads.aliyuncs.com:10014/databaseName'
    说明
    • 云原生数据仓库AnalyticDB MySQL版数据库连接信息,请参见URL地址查询
    • databaseName为云原生数据仓库AnalyticDB MySQL版数据库名称。
    userName用户名。String无。
    password密码。String无。
    tableName表名。String无。
    maxRetryTimes写入数据失败后,重试写入的最大次数。Integer参数默认值取值情况如下。
    • 在Flink计算引擎VVR 3.x版本及以下版本,该参数默认值为3。
    • 在Flink计算引擎VVR 4.0.10及以上版本,该参数默认值为10。
    无。
  • 结果表独有
    参数说明数据类型是否必填默认值备注
    batchSize一次批量写入的条数。Integer参数默认值取值情况如下:
    • 在Flink计算引擎VVR 3.x版本及以下版本,该参数默认值为100。
    • 在Flink计算引擎VVR 4.0.10及以上版本,该参数默认值为1000。
    需指定主键后,该参数才生效。
    bufferSize内存中缓存的数据条数。batchSizebufferSize任一到达阈值都会触发写入。Integer1000需指定主键后,该参数才生效。
    说明 仅Flink计算引擎VVR 4.0.10及以上版本支持该参数。
    flushIntervalMs清空缓存的时间间隔。表示如果缓存中的数据在等待指定时间后,依然没有达到输出条件,系统会自动输出缓存中的所有数据。Integer参数默认值取值情况如下:
    • 在Flink计算引擎VVR 3.x版本及以下版本,该参数默认值为1000。
    • 在Flink计算引擎VVR 4.0.10及以上版本,该参数默认值为3000。
    单位为毫秒。
    ignoreDelete是否忽略Delete操作。Booleanfalse参数取值如下:
    • true:忽略Delete操作。
    • false:接受Delete操作。
    说明 仅Flink计算引擎VVR 4.0.10及以上版本支持该参数。
    replaceMode是否采用replace into语法插入数据。Booleantrue该参数取值如下:
    • true:采用replace into语法插入数据。
    • false:采用insert into on duplicate key语法插入数据。
    说明
    • 仅Flink计算引擎VVR 4.0.10及以上版本支持该参数。
    • 仅AnalyticDB MySQL 3.1.3.5及以上版本支持该参数。
    excludeUpdateColumns表示更新主键值相同的数据时,忽略指定字段的更新。String空字符串如果忽略指定的字段为多个时,则需要使用英文逗号(,)分割。例如excludeUpdateColumns=column1,column2
    说明
    • 仅在replaceMode=false时,该参数才生效。在replaceMode=true时,对应字段会被更新为null。
    • 要忽略的多个字段需要写在一行中,不能换行。
    connectionMaxActive线程池大小。Integer40仅Flink计算引擎VVR 4.0.10及以上版本支持该参数。
  • 维表独有
    参数说明数据类型是否必填默认值备注
    cache缓存策略。StringALL云原生数据仓库AnalyticDB MySQL版3.0维表支持以下三种缓存策略:
    • None:无缓存。
    • LRU:缓存维表里的部分数据。源表的每条数据都会触发系统先在Cache中查找数据,如果没有找到,则去物理维表中查找。
    • ALL(默认值):缓存维表里的所有数据。在Job运行前,系统会将维表中所有数据加载到Cache中,之后所有的维表查找数据都会通过Cache进行。如果在Cache中无法找到数据,则KEY不存在,并在Cache过期后重新加载一遍全量Cache。
    适用于远程表数据量小且MISS KEY在源表数据和维表JOIN时,ON条件无法关联特别多的场景。
    说明
    • 如果使用CACHE ALL时,请注意节点内存大小,防止出现OOM。
    • 因为系统会异步加载维表数据,所以在使用CACHE ALL时,需要增加维表JOIN节点的内存,增加的内存大小为远程表数据量的两倍。
    cacheSize缓存大小,即缓存多少行数据。Integer100000cacheSize配置和cache为LRU有关。当cache配置为LRU时,必须配置cacheSize参数。
    cacheTTLMs缓存超时时间,单位为毫秒。IntegerLong.MAX_VALUEcacheTTLMs配置和cache配置为LRU或ALL有关:
    • 如果cache配置为LRU,则cacheTTLMs为缓存失效的超时时间。默认值为Long.MAX_VALUE,即代表缓存不过期。
    • 如果cache配置为ALL,则cacheTTLMs为物理表数据被重新加载的间隔时间。默认值为Long.MAX_VALUE,即代表不重新加载物理表数据。
    说明 如果cache配置为None,则cacheTTLMs不用配置。因为cache配置为None,表示没有缓存,因此不用配置缓存超时时间。
    maxJoinRows主表中每一条数据查询维表时,匹配后最多返回的结果数。Integer1024如果您可以预估一条数据对应的维表数据最多为n条,则可以设置maxJoinRows='n',以确保实时计算匹配处理效率。
    说明 进行Join时,主表输入一条数据,对应维表匹配后返回的结果总数受该参数限制。

类型映射

云原生数据仓库AnalyticDB MySQL版3.0字段类型Flink字段类型
BOOLEANBOOLEAN
TINYINTTINYINT
SMALLINTSMALLINT
INTINT
BIGINTBIGINT
FLOATFLOAT
DOUBLEDOUBLE
DECIMAL(p, s) 或NUMERIC(p, s)DECIMAL(p, s)
VARCHARSTRING
BINARYBYTES
DATEDATE
TIMETIME
DATETIMETIMESTAMP
TIMESTAMPTIMESTAMP
POINTSTRING

使用示例

  • 结果表
    CREATE TEMPORARY TABLE datagen_source (
      `name` VARCHAR,
      `age` INT
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE adb_sink (
      `name` VARCHAR,
      `age` INT
    ) WITH (
      'connector' = 'adb3.0',
      'url' = '<yourUrl>',
      'userName' = '<yourUsername>',
      'password' = '<yourPassword>',
      'tableName' = '<yourTablename>'
    );
    
    INSERT INTO adb_sink
    SELECT * FROM datagen_source;
  • 维表
    CREATE TEMPORARY TABLE datagen_source(
      `a` INT,
      `b` VARCHAR,
      `c` STRING,
      `proctime` AS PROCTIME()
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE adb_dim (
      `a` INT,
      `b` VARCHAR,
      `c` VARCHAR
    ) WITH (
      'connector' = 'adb3.0',
      'url' = '<yourUrl>',
      'userName' = '<yourUsername>',
      'password' = '<yourPassword>',
      'tableName' = '<yourTablename>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
      `a` INT,
      `b` VARCHAR
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink SELECT T.a,H.b
    FROM datagen_source AS T JOIN adb_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.a = H.a;