本文为您介绍云数据库RDS MySQL结果表的DDL定义、WITH参数和类型映射。

什么是云数据库RDS MySQL

RDS MySQL基于阿里巴巴的MySQL源码分支,经过双十一高并发、大数据量的考验,拥有优良的性能。RDS MySQL支持实例管理、账号管理、数据库管理、备份恢复、白名单、透明数据加密以及数据迁移等基本功能。RDS MySQL详情请参见概述

前提条件

使用限制

仅Flink计算引擎VVR 2.0.0及以上版本支持云数据库RDS MySQL Connector。

DDL定义

CREATE TABLE rds_sink (
   id INT,
   num BIGINT,
   PRIMARY KEY (id) NOT ENFORCED
) WITH (
   'connector' = 'rds',
   'tableName' = '<yourTablename>',
   'userName' = '<yourUsername>',
   'password' = '<yourPassword>',
   'url' = '<yourUrl>'
);
说明
  • Flink写入RDS或DRDS数据库结果表原理:针对Flink每行结果数据,拼接成一行SQL语句,输入至目标端数据库,然后执行。如果使用批量写,需要在URL后面加上参数?rewriteBatchedStatements=true,以提高系统性能。
  • RDS MySQL数据库支持自增主键,因此在DDL中不声明该自增字段。例如ID是自增字段,Flink DDL不声明该自增字段,则数据库在一行数据写入过程中会自动填补相关自增字段。
  • 如果DRDS有分区表,拆分键必须在DDL里PRIMARY KEY()中声明,否则拆分的表无法写入。
  • DDL声明的字段必须至少存在一个非主键的字段,否则产生报错。

WITH参数

参数 说明 数据类型 是否必填 备注
connector 结果表类型。 String 固定值为rds
tableName 表名。 String 无。
userName 用户名。 String 无。
password 密码。 String 无。
url URL地址。 String 云数据库RDS版专有网络VPC地址,即内网地址,详情请​参见查看或修改内外网地址和端口。URL的格式为:jdbc:mysql://<内网地址>/<databaseName>,其中databaseName为对应的数据库名称。
说明 如果使用批量写,需要在URL后面加上参数?rewriteBatchedStatements=true,以提高系统性能。
maxRetryTimes 写入数据失败后,重试写入的最大次数。 Integer 参数默认值取值情况如下:
  • 在Flink计算引擎VVR 4.0.7及以上版本,该参数默认值为10。
  • 在Flink计算引擎VVR 4.0.0~4.0.6版本,该参数默认值为3。
  • 在Flink计算引擎VVR 3.x版本及以下版本,该参数默认值为3。
说明 Flink计算引擎版本推荐您使用4.0.10及以上或4.1.1及以上版本。
batchSize 一次批量写入的条数。 Integer 参数默认值取值情况如下:
  • 在Flink计算引擎VVR 4.0.7及以上版本,该参数默认值为4096。
  • 在Flink计算引擎VVR 4.0.0~4.0.6版本,该参数默认值为5000。
  • 在Flink计算引擎VVR 3.x版本及以下版本,该参数默认值为100。
说明 Flink计算引擎版本推荐您使用4.0.10及以上或4.1.1及以上版本。
bufferSize 内存中缓存的数据条数。batchSize或 bufferSize任一到达阈值都会触发写入。 Integer 默认值为10000。
说明
  • 仅Flink计算引擎VVR 4.0.7及以上版本支持该参数。
  • 需指定主键后,该参数才生效。
flushIntervalMs 清空缓存的时间间隔。表示如果缓存中的数据在等待指定时间后,依然没有达到输出条件,系统统会自动输出缓存中的所有数据。 Integer 单位为毫秒,参数默认值取值情况如下:
  • 在Flink计算引擎VVR 4.0.7及以上版本,该参数默认值为2000。
  • 在Flink计算引擎VVR 4.0.0~4.0.6版本,该参数默认值为0。
  • 在Flink计算引擎VVR 3.x版本及以下版本,该参数默认值为1000。
说明 Flink计算引擎版本推荐您使用4.0.10及以上或4.1.1及以上版本。
ignoreDelete 是否忽略Delete操作。 Boolean 参数取值如下:
  • true:忽略Delete操作。
  • false(默认值):接受Delete操作。
说明 仅Flink计算引擎VVR 4.0.7及以上版本支持该参数。
connectionMaxActive 数据库连接池大小。 Integer 默认值为40。
说明
  • 仅Flink计算引擎VVR 4.0.7及以上版本支持该参数。
  • 如果出现获取连接超时的问题,说明连接池不够用,可适当增大连接池的大小。

类型映射

RDS字段类型 Flink字段类型
BOOLEAN BOOLEAN
TINYINT TINYINT
SMALLINT SMALLINT
TINYINT UNSIGNED SMALLINT
INT INT
SMALLINT UNSIGNED INT
BIGINT BIGINT
INT UNSIGNED BIGINT
BIGINT UNSIGNED DECIMAL(20,0)
FLOAT FLOAT
DECIMAL DECIMAL
DOUBLE DOUBLE
DATE DATE
TIME TIME
TIMESTAMP TIMESTAMP
VARCHAR VARCHAR
VARBINARY VARBINARY

代码示例

CREATE TEMPORARY TABLE datagen_source (
  `name` VARCHAR,
  `age` INT
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE rds_sink (
  `name` VARCHAR,
  `age` INT
) WITH (
  'connector' = 'rds',
  'password' = '<yourPassword>',
  'tableName' = '<yourTablename>',
  'url' = '<yourUrl>',
  'userName' = '<yourUsername>'
);

INSERT INTO rds_sink
SELECT  * FROM datagen_source;

常见问题