本文为您介绍MySQL结果表的DDL定义、WITH参数和类型映射。

什么是MySQL结果表

MySQL结果表支持所有兼容MySQL协议的数据库,包括RDS MySQL、PolarDB for MySQL或者自建MySQL,而云数据库RDS MySQL结果表专为云数据库RDS MySQL定制,支持更丰富的配置参数。

前提条件

使用限制

仅Flink计算引擎VVR 4.0.11及以上版本支持MySQL Connector。

DDL定义

CREATE TABLE mysql_sink (
  id INT,
  num BIGINT,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'mysql',
  'hostname' = '<yourHostname>',
  'port' = '3306',
  'username' = '<yourUsername>',
  'password' = '<yourPassword>',
  'database-name' = '<yourDatabaseName>',
  'table-name' = '<yourTableName>'
);
说明
  • RDS MySQL数据库支持自增主键,因此在DDL中不声明该自增字段。例如ID是自增字段,Flink DDL不声明该自增字段,则数据库在一行数据写入过程中会自动填补相关自增字段。
  • DDL声明的字段必须至少存在一个非主键的字段,否则产生报错。

WITH参数

参数 说明 数据类型 是否必填 备注
connector 结果表类型。 String 固定值为mysql
hostname MySQL数据库的IP地址或者hostname。 String 建议填写专有网络VPC地址。
说明 如果MySQL与Flink全托管不在同一VPC,则需填写公网地址。
username MySQL数据库服务的用户名。 String 无。
password MySQL数据库服务的密码。 String 无。
database-name MySQL数据库名称。 String 无。
table-name MySQL表名。 String 无。
port MySQL数据库服务的端口号。 Integer 默认值为3306。
sink.max-retries 写入数据失败后,重试写入的最大次数。 Integer 默认值为3。
sink.buffer-flush.max-rows 内存中缓存的数据条数。 Integer 默认值为100。
sink.buffer-flush.interval 清空缓存的时间间隔。表示如果缓存中的数据在等待指定时间后,依然没有达到输出条件,系统会自动输出缓存中的所有数据。 Duration 默认值为1秒(1s)。

类型映射

MySQL字段类型 Flink字段类型
TINYINT TINYINT
SMALLINT SMALLINT
TINYINT UNSIGNED
INT INT
MEDIUMINT
SMALLINT UNSIGNED
BIGINT BIGINT
INT UNSIGNED
BIGINT UNSIGNED DECIMAL(20, 0)
BIGINT BIGINT
FLOAT FLOAT
DOUBLE DOUBLE
DOUBLE PRECISION
NUMERIC(p, s) DECIMAL(p, s)
说明 其中p <= 38。
DECIMAL(p, s)
BOOLEAN BOOLEAN
TINYINT(1)
DATE DATE
TIME [(p)] TIME [(p)] [WITHOUT TIMEZONE]
DATETIME [(p)] TIMESTAMP [(p)] [WITHOUT TIMEZONE]
TIMESTAMP [(p)]
CHAR(n) CHAR(n)
VARCHAR(n) VARCHAR(n)
BIT(n) BINARY(⌈n/8⌉)
BINARY(n) BINARY(n)
VARBINARY(N) VARBINARY(N)
TINYTEXT STRING
TEXT
MEDIUMTEXT
LONGTEXT
TINYBLOB BYTES
说明 Flink仅支持小于等于2,147,483,647(2^31 - 1)的MySQL BLOB类型的记录。
BLOB
MEDIUMBLOB
LONGBLOB

代码示例

CREATE TEMPORARY TABLE datagen_source (
  `name` VARCHAR,
  `age` INT
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE mysql_sink (
  `name` VARCHAR,
  `age` INT
) WITH (
  'connector' = 'mysql',
  'hostname' = '<yourHostname>',
  'port' = '3306',
  'username' = '<yourUsername>',
  'password' = '<yourPassword>',
  'database-name' = '<yourDatabaseName>',
  'table-name' = '<yourTableName>'
);

INSERT INTO mysql_sink
SELECT  * FROM datagen_source;