本文为您介绍云原生数据仓库AnalyticDB MySQL版3.0维表的DDL定义、WITH参数、CACHE参数和类型映射。

什么是云原生数据仓库AnalyticDB MySQL版

云原生数据仓库AnalyticDB MySQL版是融合数据库、大数据技术于一体的云原生企业级数据仓库服务。AnalyticDB MySQL版支持高吞吐的数据实时增删改、低延时的实时分析和复杂ETL,兼容上下游生态工具,可用于构建企业级报表系统、数据仓库和数据服务引擎。

前提条件

使用限制

仅Flink计算引擎VVR 2.0.0及以上版本支持云原生数据仓库AnalyticDB MySQL版3.0 Connector。

DDL定义

CREATE TABLE adb30_dim (
  id1 INT,
  id2 VARCHAR
) WITH (
  'connector' = 'adb3.0',
  'password' = '<yourPassword>',
  'tableName' = '<yourTablename>',
  'url' = '<yourUrl>',
  'userName' = '<yourUsername>',
  'cache' = 'ALL',
  'cacheSize' = '500'
);

WITH参数

参数 说明 是否必选 备注
connector 维表类型。 固定值为adb3.0
password 密码。 无。
tableName 表名。 无。
url URL地址。 云原生数据仓库AnalyticDB MySQL版3.0的专有网络VPC地址。
username 用户名。 无。
maxRetryTimes 写入数据失败后,重试写入的次数。 该参数默认值和VVR版本有关,详情如下:
  • 当实时计算引擎VVR版本为3.x时,则该参数默认值为3。
  • 当实时计算引擎VVR版本为10.x时,则该参数默认值为10。
CACHE 缓存策略、缓存大小和缓存超时时间。 详情请参见CACHE参数

CACHE参数

参数 说明 是否必填 备注
cache 缓存策略。 云原生数据仓库AnalyticDB MySQL版3.0维表支持以下三种缓存策略:
  • None:无缓存。
  • LRU:缓存维表里的部分数据。源表的每条数据都会触发系统先在Cache中查找数据,如果没有找到,则去物理维表中查找。
  • ALL(默认值):缓存维表里的所有数据。在Job运行前,系统会将维表中所有数据加载到Cache中,之后所有的维表查找数据都会通过Cache进行。如果在Cache中无法找到数据,则KEY不存在,并在Cache过期后重新加载一遍全量Cache。

    适用于远程表数据量小且MISS KEY(源表数据和维表JOIN时,ON条件无法关联)特别多的场景。

说明
  • 如果使用CACHE ALL时,请注意节点内存大小,防止出现OOM。
  • 因为系统会异步加载维表数据,所以在使用CACHE ALL时,需要增加维表JOIN节点的内存,增加的内存大小为远程表数据量的两倍。
cacheSize 缓存大小,即缓存多少行数据。 cacheSize配置和cache为LRU有关。当cache配置为LRU时,必须配置cacheSize参数。默认值为100000行。
说明
  • 如果cache配置为None,则不用配置cacheSize参数,默认为空。
  • 如果cache配置为ALL,则不用配置cacheSize参数。因为cache配置为ALL,表示将物理表的数据都加载到缓存中,因此无需配置缓存大小。
cacheTTLMs 缓存超时时间,单位为毫秒。 cacheTTLMs配置和cache配置为LRUALL有关:
  • 如果cache配置为LRU,则cacheTTLMs为缓存失效的超时时间。默认值为Long.MAX_VALUE,即代表缓存不过期。
  • 如果cache配置为ALL,则cacheTTLMs为物理表数据被重新加载的间隔时间。默认值为Long.MAX_VALUE,即代表不重新加载物理表数据。
说明 如果cache配置为None,则cacheTTLMs不用配置。因为cache配置为None,表示没有缓存,因此不用配置缓存超时时间。
maxJoinRows 主表中每一条数据查询维表时,匹配后最多返回的结果数。 默认值为1024。如果您可以预估一条数据对应的维表数据最多为n条,则可以设置maxJoinRows='n',以确保实时计算匹配处理效率。
说明 进行Join时,主表输入一条数据,对应维表匹配后返回的结果总数受该参数限制。

类型映射

云原生数据仓库AnalyticDB MySQL版3.0字段类型 Flink字段类型
BOOLEAN BOOLEAN
TINYINT INT
SMALLINT INT
INT INT
BIGINT BIGINT
DOUBLE DOUBLE
VARCHAR VARCHAR
DATETIME TIMESTAMP
DATE DATE

代码示例

CREATE TEMPORARY TABLE datagen_source(
  a INT,
  b VARCHAR,
  c STRING,
  `proctime` AS PROCTIME()
) with (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE adb_dim (
  a INT, 
  b VARCHAR, 
  c VARCHAR
) with (
  'connector' = 'adb3.0',
  'password' = '<yourPassword>',
  'tableName' = '<yourTablename>',
  'url' = '<yourUrl>',
  'userName' = '<yourUsername>'
);

CREATE TEMPORARY TABLE blackhole_sink(
  a INT,
  b VARCHAR
) with (
  'connector' = 'blackhole'
);

insert into blackhole_sink select T.a,H.b
FROM datagen_source AS T JOIN adb_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.a = H.a;