本文为您介绍MySQL维表的DDL定义、WITH参数和类型映射。

什么是MySQL维表

MySQL维表支持所有兼容MySQL协议的数据库,包括RDS MySQL、PolarDB for MySQL或者自建MySQL,而云数据库RDS MySQL维表专为云数据库RDS MySQL定制,支持更丰富的配置参数。

前提条件

使用限制

仅Flink计算引擎VVR 4.0.11及以上版本支持MySQL Connector。

DDL定义

CREATE TABLE mysql_dim (
   id INT,
   name STRING,
   PRIMARY KEY (id) NOT ENFORCED
) WITH (
   'connector' = 'mysql',
   'hostname' = '<yourHostname>',
   'port' = '3306',
   'username' = '<yourUsername>',
   'password' = '<yourPassword>',
   'database-name' = '<yourDatabaseName>',
   'table-name' = '<yourTableName>'
);

WITH参数

参数 说明 数据类型 是否必填 备注
connector 维表类型。 String 固定值为mysql
hostname MySQL数据库的IP地址或者hostname。 String 建议填写专有网络VPC地址。
说明 如果MySQL与Flink全托管不在同一VPC,则需填写公网地址。
username MySQL数据库服务的用户名。 String 无。
password MySQL数据库服务的密码。 String 无。
database-name MySQL数据库名称。 String 无。
table-name MySQL表名。 String 无。
port MySQL数据库服务的端口号。 Integer 默认值为3306。
lookup.cache.strategy 缓存策略。 String 支持以下三种缓存策略:
  • None(默认值):无缓存。
  • LRU:缓存维表里的部分数据。源表的每条数据都会触发系统先在Cache中查找数据。如果没有找到,则去物理维表中查找。
  • ALL:缓存维表里的所有数据。在Job运行前,系统会将维表中所有数据加载到Cache中,之后所有的维表查找数据都会通过Cache进行。如果在Cache中无法找到数据,则KEY不存在,并在Cache过期后重新加载一遍全量Cache。

    适用于远程表数据量小且MISS KEY(源表数据和维表JOIN时,ON条件无法关联)特别多的场景。

说明
  • 使用LRU缓存策略时,必须配置lookup.cache.max-rows参数。
  • 如果使用CACHE ALL时,请注意节点内存大小,防止出现OOM。
  • 因为系统会异步加载维表数据,所以在使用CACHE ALL时,需要增加维表JOIN节点的内存,增加的内存大小为远程表数据量的两倍。
lookup.cache.max-rows 最大缓存条数。 Integer
  • 当选择LRU缓存策略后,必须设置缓存大小。
  • 当选择ALL缓存策略后,可以不设置缓存大小。
lookup.cache.ttl 缓存超时时间。 Duration lookup.cache.ttl的配置和lookup.cache.strategy有关,详情如下:
  • 如果lookup.cache.strategy配置为None,则lookup.cache.ttl可以不配置,表示缓存不超时。
  • 如果lookup.cache.strategy配置为LRU,则lookup.cache.ttl为缓存超时时间。默认不过期。
  • 如果lookup.cache.strategy配置为ALL,则lookup.cache.ttl为缓存加载时间。默认不重新加载。

类型映射

MySQL字段类型 Flink字段类型
TINYINT TINYINT
SMALLINT SMALLINT
TINYINT UNSIGNED
INT INT
MEDIUMINT
SMALLINT UNSIGNED
BIGINT BIGINT
INT UNSIGNED
BIGINT UNSIGNED DECIMAL(20, 0)
BIGINT BIGINT
FLOAT FLOAT
DOUBLE DOUBLE
DOUBLE PRECISION
NUMERIC(p, s) DECIMAL(p, s)
说明 其中p <= 38。
DECIMAL(p, s)
BOOLEAN BOOLEAN
TINYINT(1)
DATE DATE
TIME [(p)] TIME [(p)] [WITHOUT TIMEZONE]
DATETIME [(p)] TIMESTAMP [(p)] [WITHOUT TIMEZONE]
TIMESTAMP [(p)]
CHAR(n) CHAR(n)
VARCHAR(n) VARCHAR(n)
BIT(n) BINARY(⌈n/8⌉)
BINARY(n) BINARY(n)
VARBINARY(N) VARBINARY(N)
TINYTEXT STRING
TEXT
MEDIUMTEXT
LONGTEXT
TINYBLOB BYTES
说明 Flink仅支持小于等于2,147,483,647(2^31 - 1)的MySQL BLOB类型的记录。
BLOB
MEDIUMBLOB
LONGBLOB

代码示例

CREATE TEMPORARY TABLE datagen_source(
  a INT,
  b BIGINT,
  c STRING,
  `proctime` AS PROCTIME()
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE mysql_dim (
  a INT,
  b VARCHAR,
  c VARCHAR
) WITH (
  'connector' = 'mysql',
  'hostname' = '<yourHostname>',
  'port' = '3306',
  'username' = '<yourUsername>',
  'password' = '<yourPassword>',
  'database-name' = '<yourDatabaseName>',
  'table-name' = '<yourTableName>'
);

CREATE TEMPORARY TABLE blackhole_sink(
  a INT,
  b STRING
) WITH (
  'connector' = 'blackhole'
);

INSERT INTO blackhole_sink
SELECT T.a, H.b
FROM datagen_source AS T JOIN mysql_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.a = H.a;