本文为您介绍MySQL结果表的DDL定义、WITH参数和类型映射。
什么是MySQL结果表
MySQL结果表支持所有兼容MySQL协议的数据库,包括RDS MySQL、PolarDB for MySQL或者自建MySQL,而云数据库RDS MySQL结果表专为云数据库RDS MySQL定制,支持更丰富的配置参数。
前提条件
- 已创建MySQL数据库和表,详情请参见RDS MySQL创建数据库和账号、PolarDB MySQL创建数据库和账号或自建MySQL创建数据库和账号。
- 已设置IP白名单,详情请参见RDS MySQL白名单设置、PolarDB MySQL白名单设置或自建MySQL白名单设置。
使用限制
仅Flink计算引擎VVR 4.0.11及以上版本支持MySQL Connector。
DDL定义
CREATE TABLE mysql_sink (
id INT,
num BIGINT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql',
'hostname' = '<yourHostname>',
'port' = '3306',
'username' = '<yourUsername>',
'password' = '<yourPassword>',
'database-name' = '<yourDatabaseName>',
'table-name' = '<yourTableName>'
);
说明
- RDS MySQL数据库支持自增主键,因此在DDL中不声明该自增字段。例如ID是自增字段,Flink DDL不声明该自增字段,则数据库在一行数据写入过程中会自动填补相关自增字段。
- DDL声明的字段必须至少存在一个非主键的字段,否则产生报错。
WITH参数
参数 | 说明 | 数据类型 | 是否必填 | 备注 |
---|---|---|---|---|
connector | 结果表类型。 | String | 是 | 固定值为mysql 。
|
hostname | MySQL数据库的IP地址或者hostname。 | String | 是 | 建议填写专有网络VPC地址。
说明 如果MySQL与Flink全托管不在同一VPC,则需填写公网地址。
|
username | MySQL数据库服务的用户名。 | String | 是 | 无。 |
password | MySQL数据库服务的密码。 | String | 是 | 无。 |
database-name | MySQL数据库名称。 | String | 是 | 无。 |
table-name | MySQL表名。 | String | 是 | 无。 |
port | MySQL数据库服务的端口号。 | Integer | 否 | 默认值为3306。 |
sink.max-retries | 写入数据失败后,重试写入的最大次数。 | Integer | 否 | 默认值为3。 |
sink.buffer-flush.max-rows | 内存中缓存的数据条数。 | Integer | 否 | 默认值为100。 |
sink.buffer-flush.interval | 清空缓存的时间间隔。表示如果缓存中的数据在等待指定时间后,依然没有达到输出条件,系统会自动输出缓存中的所有数据。 | Duration | 否 | 默认值为1秒(1s)。 |
类型映射
MySQL字段类型 | Flink字段类型 |
---|---|
TINYINT | TINYINT |
SMALLINT | SMALLINT |
TINYINT UNSIGNED | |
INT | INT |
MEDIUMINT | |
SMALLINT UNSIGNED | |
BIGINT | BIGINT |
INT UNSIGNED | |
BIGINT UNSIGNED | DECIMAL(20, 0) |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DOUBLE PRECISION | |
NUMERIC(p, s) | DECIMAL(p, s)
说明 其中p <= 38。
|
DECIMAL(p, s) | |
BOOLEAN | BOOLEAN |
TINYINT(1) | |
DATE | DATE |
TIME [(p)] | TIME [(p)] [WITHOUT TIMEZONE] |
DATETIME [(p)] | TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
TIMESTAMP [(p)] | |
CHAR(n) | CHAR(n) |
VARCHAR(n) | VARCHAR(n) |
BIT(n) | BINARY(⌈n/8⌉) |
BINARY(n) | BINARY(n) |
VARBINARY(N) | VARBINARY(N) |
TINYTEXT | STRING |
TEXT | |
MEDIUMTEXT | |
LONGTEXT | |
TINYBLOB | BYTES
说明 Flink仅支持小于等于2,147,483,647(2^31 - 1)的MySQL BLOB类型的记录。
|
BLOB | |
MEDIUMBLOB | |
LONGBLOB |
代码示例
CREATE TEMPORARY TABLE datagen_source (
`name` VARCHAR,
`age` INT
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE mysql_sink (
`name` VARCHAR,
`age` INT
) WITH (
'connector' = 'mysql',
'hostname' = '<yourHostname>',
'port' = '3306',
'username' = '<yourUsername>',
'password' = '<yourPassword>',
'database-name' = '<yourDatabaseName>',
'table-name' = '<yourTableName>'
);
INSERT INTO mysql_sink
SELECT * FROM datagen_source;