本文为您介绍云原生数据仓库AnalyticDB MySQL版3.0结果表的DDL定义、WITH参数和类型映射。

什么是云原生数据仓库AnalyticDB MySQL版

云原生数据仓库AnalyticDB MySQL版是融合数据库、大数据技术于一体的云原生企业级数据仓库服务。AnalyticDB MySQL版支持高吞吐的数据实时增删改、低延时的实时分析和复杂ETL,兼容上下游生态工具,可用于构建企业级报表系统、数据仓库和数据服务引擎。

前提条件

使用限制

仅Flink计算引擎VVR 2.0.0及以上版本支持云原生数据仓库AnalyticDB MySQL版3.0 Connector。

DDL定义

CREATE TABLE adb_sink (
  id INT,
  num BIGINT
) WITH (
  'connector' = 'adb3.0',
  'tableName' = '<yourTablename>',
  'userName' = '<yourUsername>',
  'password' = '<yourPassword>',
  'url' = 'jdbc:mysql://<yourNetworkAddress>:<PortId>/<yourDatabaseName>'
);

WITH参数

参数 注释说明 是否必选 备注
connector 结果表类型。 固定值为adb3.0
tableName 表名。 无。
username 用户名。 无。
password 密码。 无。
url JDBC连接地址。 云原生数据仓库AnalyticDB MySQL版数据库地址。示例:url='jdbc:mysql://databaseName****-cn-shenzhen-a.ads.aliyuncs.com:10014/databaseName'
说明
  • 云原生数据仓库AnalyticDB MySQL版数据库连接信息,请参见URL地址查询
  • databaseName为云原生数据仓库AnalyticDB MySQL版数据库名称。
maxRetryTimes 写入数据失败后,重试写入的最大次数。 参数默认值取值情况如下:
  • 在Flink计算引擎VVR 3.x版本及以下版本,该参数默认值为3。
  • 在Flink计算引擎VVR 4.0.10及以上版本,该参数默认值为10。
batchSize 一次批量写入的条数。 参数默认值取值情况如下:
  • 在Flink计算引擎VVR 3.x版本及以下版本,该参数默认值为100。
  • 在Flink计算引擎VVR 4.0.10及以上版本,该参数默认值为1000。
bufferSize 内存中缓存的数据条数。batchSize或bufferSize任一到达阈值都会触发写入。 默认值为1000。
说明
  • 仅Flink计算引擎VVR 4.0.10及以上版本支持该参数。
  • 需指定主键后,该参数才生效。
flushIntervalMs 清空缓存的时间间隔。表示如果缓存中的数据在等待指定时间后,依然没有达到输出条件,系统会自动输出缓存中的所有数据。 单位为毫秒,参数默认值取值情况如下:
  • 在Flink计算引擎VVR 3.x版本及以下版本,该参数默认值为1000。
  • 在Flink计算引擎VVR 4.0.10及以上版本,该参数默认值为3000。
ignoreDelete 是否忽略Delete操作。 参数取值如下:
  • true:忽略Delete操作。
  • false(默认值):接受Delete操作。
说明 仅Flink计算引擎VVR 4.0.10及以上版本支持该参数。
replaceMode 是否采用replace into语法插入数据。 参数取值如下:
  • true(默认值):采用replace into语法插入数据。
  • false:采用insert into on duplicate key语法插入数据。
说明
  • 仅Flink计算引擎VVR 4.0.10及以上版本支持该参数。
  • 仅AnalyticDB MySQL 3.1.3.5及以上版本支持该参数。
excludeUpdateColumns 表示更新主键值相同的数据时,忽略指定字段的更新。 如果忽略指定的字段为多个时,则需要使用英文逗号(,)分割。例如excludeUpdateColumns=column1,column2
说明 仅在 replaceMode=false时,该参数才生效。在 replaceMode=true时,对应字段会被更新为null。
connectionMaxActive 线程池大小 默认值为40。
说明 仅Flink计算引擎VVR 4.0.10及以上版本支持该参数。

类型映射

云原生数据仓库AnalyticDB MySQL版3.0字段类型 Flink字段类型
BOOLEAN BOOLEAN
TINYINT INT
SMALLINT INT
INT INT
BIGINT BIGINT
DOUBLE DOUBLE
VARCHAR VARCHAR
DATETIME TIMESTAMP
DATE DATE
说明 因为MySQL的JDBC Driver在获取数据时,由于精度问题,Flink可能会采用不同的数据类型进行数据承接。

代码示例

CREATE TEMPORARY TABLE datagen_source (
  `name` VARCHAR,
  `age` INT
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE adb_sink (
  `name` VARCHAR,
  `age` INT
) WITH (
  'connector' = 'adb3.0',
  'password' = '<yourPassword>',
  'tableName' = '<yourTablename>',
  'url' = '<yourUrl>',
  'userName' = '<yourUsername>'
);

INSERT INTO adb_sink
SELECT  * FROM datagen_source;