本文为您介绍如何使用云原生数据仓库AnalyticDB MySQL版3.0连接器。
背景信息
云原生数据仓库AnalyticDB MySQL版3.0是融合数据库、大数据技术于一体的云原生企业级数据仓库服务。AnalyticDB MySQL版支持高吞吐的数据实时增删改、低延时的实时分析和复杂ETL,兼容上下游生态工具,可用于构建企业级报表系统、数据仓库和数据服务引擎。
ADB MySQL 3.0连接器支持的信息如下。
类别 | 详情 |
---|---|
支持类型 | 维表和结果表 |
运行模式 | 流模式和批模式 |
数据格式 | 暂不适用 |
特有监控指标 | 暂无 |
API种类 | SQL |
是否支持更新或删除结果表数据 | 是 |
前提条件
- 已创建AnalyticDB MySQL集群并创建表,详情请参见创建集群和CREATE TABLE。
- 已设置白名单,详情请参见设置白名单。
使用限制
- 仅支持作为维表和结果表,不支持作为源表
- 仅Flink计算引擎VVR 3.x及以上版本支持云原生数据仓库AnalyticDB MySQL版3.0连接器
语法结构
CREATE TEMPORARY TABLE adb_table (
`id` INT,
`num` BIGINT,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'adb3.0',
'url' = '<yourUrl>',
'userName' = '<yourUsername>',
'password' = '<yourPassword>',
'tableName' = '<yourTablename>'
);
WITH参数
- 通用
参数 说明 数据类型 是否必填 默认值 备注 connector 结果表类型。 String 是 无 固定值为adb3.0。 url JDBC连接地址。 String 是 无 云原生数据仓库AnalyticDB MySQL版数据库地址。示例: url='jdbc:mysql://databaseName****-cn-shenzhen-a.ads.aliyuncs.com:10014/databaseName'
。说明- 云原生数据仓库AnalyticDB MySQL版数据库连接信息,请参见URL地址查询。
- databaseName为云原生数据仓库AnalyticDB MySQL版数据库名称。
userName 用户名。 String 是 无 无。 password 密码。 String 是 无 无。 tableName 表名。 String 是 无 无。 maxRetryTimes 写入数据失败后,重试写入的最大次数。 Integer 否 参数默认值取值情况如下。 - 在Flink计算引擎VVR 3.x版本及以下版本,该参数默认值为3。
- 在Flink计算引擎VVR 4.0.10及以上版本,该参数默认值为10。
无。 - 结果表独有
参数 说明 数据类型 是否必填 默认值 备注 batchSize 一次批量写入的条数。 Integer 否 参数默认值取值情况如下: - 在Flink计算引擎VVR 3.x版本及以下版本,该参数默认值为100。
- 在Flink计算引擎VVR 4.0.10及以上版本,该参数默认值为1000。
需指定主键后,该参数才生效。 bufferSize 内存中缓存的数据条数。batchSize或bufferSize任一到达阈值都会触发写入。 Integer 否 1000 需指定主键后,该参数才生效。 说明 仅Flink计算引擎VVR 4.0.10及以上版本支持该参数。flushIntervalMs 清空缓存的时间间隔。表示如果缓存中的数据在等待指定时间后,依然没有达到输出条件,系统会自动输出缓存中的所有数据。 Integer 否 参数默认值取值情况如下: - 在Flink计算引擎VVR 3.x版本及以下版本,该参数默认值为1000。
- 在Flink计算引擎VVR 4.0.10及以上版本,该参数默认值为3000。
单位为毫秒。 ignoreDelete 是否忽略Delete操作。 Boolean 否 false 参数取值如下: - true:忽略Delete操作。
- false:接受Delete操作。
说明 仅Flink计算引擎VVR 4.0.10及以上版本支持该参数。replaceMode 是否采用replace into语法插入数据。 Boolean 否 true 该参数取值如下: - true:采用
replace into
语法插入数据。 - false:采用
insert into on duplicate key
语法插入数据。
说明- 仅Flink计算引擎VVR 4.0.10及以上版本支持该参数。
- 仅AnalyticDB MySQL 3.1.3.5及以上版本支持该参数。
excludeUpdateColumns 表示更新主键值相同的数据时,忽略指定字段的更新。 String 否 空字符串 如果忽略指定的字段为多个时,则需要使用英文逗号(,)分割。例如 excludeUpdateColumns=column1,column2
。说明- 仅在replaceMode=false时,该参数才生效。在replaceMode=true时,对应字段会被更新为null。
- 要忽略的多个字段需要写在一行中,不能换行。
connectionMaxActive 线程池大小。 Integer 否 40 仅Flink计算引擎VVR 4.0.10及以上版本支持该参数。 - 维表独有
参数 说明 数据类型 是否必填 默认值 备注 cache 缓存策略。 String 否 ALL 云原生数据仓库AnalyticDB MySQL版3.0维表支持以下三种缓存策略: - None:无缓存。
- LRU:缓存维表里的部分数据。源表的每条数据都会触发系统先在Cache中查找数据,如果没有找到,则去物理维表中查找。
- ALL(默认值):缓存维表里的所有数据。在Job运行前,系统会将维表中所有数据加载到Cache中,之后所有的维表查找数据都会通过Cache进行。如果在Cache中无法找到数据,则KEY不存在,并在Cache过期后重新加载一遍全量Cache。
说明- 如果使用CACHE ALL时,请注意节点内存大小,防止出现OOM。
- 因为系统会异步加载维表数据,所以在使用CACHE ALL时,需要增加维表JOIN节点的内存,增加的内存大小为远程表数据量的两倍。
cacheSize 缓存大小,即缓存多少行数据。 Integer 否 100000 cacheSize配置和cache为LRU有关。当cache配置为LRU时,必须配置cacheSize参数。 cacheTTLMs 缓存超时时间,单位为毫秒。 Integer 否 Long.MAX_VALUE cacheTTLMs配置和cache配置为LRU或ALL有关: - 如果cache配置为LRU,则cacheTTLMs为缓存失效的超时时间。默认值为
Long.MAX_VALUE
,即代表缓存不过期。 - 如果cache配置为ALL,则cacheTTLMs为物理表数据被重新加载的间隔时间。默认值为
Long.MAX_VALUE
,即代表不重新加载物理表数据。
说明 如果cache配置为None,则cacheTTLMs不用配置。因为cache配置为None,表示没有缓存,因此不用配置缓存超时时间。maxJoinRows 主表中每一条数据查询维表时,匹配后最多返回的结果数。 Integer 否 1024 如果您可以预估一条数据对应的维表数据最多为n条,则可以设置maxJoinRows='n',以确保实时计算匹配处理效率。 说明 进行Join时,主表输入一条数据,对应维表匹配后返回的结果总数受该参数限制。
类型映射
云原生数据仓库AnalyticDB MySQL版3.0字段类型 | Flink字段类型 |
---|---|
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL(p, s) 或NUMERIC(p, s) | DECIMAL(p, s) |
VARCHAR | STRING |
BINARY | BYTES |
DATE | DATE |
TIME | TIME |
DATETIME | TIMESTAMP |
TIMESTAMP | TIMESTAMP |
POINT | STRING |
使用示例
- 结果表
CREATE TEMPORARY TABLE datagen_source ( `name` VARCHAR, `age` INT ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE adb_sink ( `name` VARCHAR, `age` INT ) WITH ( 'connector' = 'adb3.0', 'url' = '<yourUrl>', 'userName' = '<yourUsername>', 'password' = '<yourPassword>', 'tableName' = '<yourTablename>' ); INSERT INTO adb_sink SELECT * FROM datagen_source;
- 维表
CREATE TEMPORARY TABLE datagen_source( `a` INT, `b` VARCHAR, `c` STRING, `proctime` AS PROCTIME() ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE adb_dim ( `a` INT, `b` VARCHAR, `c` VARCHAR ) WITH ( 'connector' = 'adb3.0', 'url' = '<yourUrl>', 'userName' = '<yourUsername>', 'password' = '<yourPassword>', 'tableName' = '<yourTablename>' ); CREATE TEMPORARY TABLE blackhole_sink( `a` INT, `b` VARCHAR ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT T.a,H.b FROM datagen_source AS T JOIN adb_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.a = H.a;