本文为您介绍如何使用JDBC连接器。
背景信息
此连接器为开源Flink的JDBC连接器,JDBC连接器提供了对MySQL、PostgreSQL和Oracle等常见的数据库读写支持。JDBC连接器支持的信息如下。
类别 | 详情 |
---|---|
支持类型 | 源表、维表和结果表 |
运行模式 | 流模式和批模式 |
数据格式 | 暂不适用 |
特有监控指标 | 暂无 |
API种类 | SQL |
是否支持更新或删除结果表数据 | 是 |
前提条件
连接的数据库和表都已被创建。
使用限制
- 仅实时计算引擎VVR 6.0.1及以上版本支持JDBC连接器。
- JDBC源表为Bounded Source,表中数据读取完,对应的Task就会结束。如果需要捕获实时变更数据,则请使用CDC连接器,详情请参见MySQL的CDC源表和Postgres的CDC源表(公测中)。
- 使用JDBC结果表连接PostgreSQL数据库时,需要数据库版本为PostgreSQL 9.5及以上。因为DDL中定义主键的情况下,PostgreSQL采用ON CONFLICT语法进行插入或更新,此语法需要PostgreSQL 9.5及以上版本才支持。
- Flink全托管中只提供了开源JDBC连接器的实现,不包含具体的数据库的Driver。在使用JDBC连接器时,需要手动上传目标数据库Driver的JAR包。目前支持的Driver如下表所示。
Driver Group Id Artifact Id MySQL mysql mysql-connector-java Oracle com.oracle.database.jdbc ojdbc8 PostgreSQL org.postgresql postgresql 说明 如果您采用非列表中的JDBC Driver,则其正确性和可用性需要您自行充分测试并保证。
语法结构
CREATE TABLE jdbc_table (
`id` BIGINT,
`name` VARCHAR,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:xxx',
'table-name' = '<yourTable>',
'username' = '<yourUsername>',
'password' = '<yourPassword>'
);
WITH参数
- 通用
参数 说明 数据类型 是否必填 默认值 备注 connector 表类型。 String 是 无 固定值为jdbc。 url 数据库的URL。 String 是 无 无。 table-name JDBC表的名称。 String 是 无 无。 username JDBC用户名称。 String 否 无 如果指定了username和password中的任一参数,则两者必须都被指定。 password JDBC用户密码。 String 否 无 - 源表独有
参数 说明 数据类型 是否必填 默认值 备注 scan.partition.column 对输入进行分区的列名。 String 否 无 该列必须是数值类型、日期类型和时间戳类型等。关于分区扫描的详情请参见Partitioned Scan。 scan.partition.num 分区数。 Integer 否 无 无。 scan.partition.lower-bound 第一个分区的最小值。 Integer 否 无 无。 scan.partition.upper-bound 最后一个分区的最大值。 Integer 否 无 无。 scan.fetch-size 每次循环读取时,从数据库中获取的行数。 Integer 否 0 如果指定的值为0,则该配置项会被忽略。 scan.auto-commit 是否开启auto-commit。 Boolean 否 true 无。 - 结果表独有
参数 说明 数据类型 是否必填 默认值 备注 sink.buffer-flush.max-rows flush数据前,缓存记录的最大值。 Integer 否 100 您可以设置为0来禁用它,即不再缓存记录,直接flush数据。 sink.buffer-flush.interval flush数据的时间间隔。数据在Flink中缓存的时间超过该参数指定的时间后,异步线程将flush数据到数据库中。 Duration 否 1 s 您可以设置为0来禁用它,即不再缓存记录,直接flush数据。 说明 如果您需要完全异步地处理缓存的flush事件,则可以将sink.buffer-flush.max-rows设置为0,并配置适当的flush时间间隔。sink.max-retries 写入记录到数据库失败后的最大重试次数。 Integer 否 3 无。 - 维表独有
参数 说明 数据类型 是否必填 默认值 备注 lookup.cache.max-rows 指定缓存的最大行数。如果超过该值,则最老的行记录将会过期,会被新的记录替换掉。 Integer 否 无 默认情况下,维表Cache是未开启的。您可以设置lookup.cache.max-rows和lookup.cache.ttl参数来启用维表Cache。启用缓存时,采用的是LRU策略缓存。 lookup.cache.ttl 指定缓存中每行记录的最大存活时间。如果某行记录超过该时间,则该行记录将会过期。 Duration 否 无 lookup.cache.caching-missing-key 是否缓存空的查询结果。 Boolean 否 true 参数取值如下: - true(默认值):缓存空的查询结果。
- false:不缓存空的查询结果。
lookup.max-retries 查询数据库失败的最大重试次数。 Integer 否 3 无。
类型映射
MySQL类型 | Oracle类型 | PostgreSQL类型 | FlinkSQL类型 |
---|---|---|---|
TINYINT | 无 | 无 | TINYINT |
| 无 |
| SMALLINT |
| 无 |
| INT |
| 无 |
| BIGINT |
BIGINT UNSIGNED | 无 | 无 | DECIMAL(20, 0) |
BIGINT | 无 | BIGINT | BIGINT |
FLOAT | BINARY_FLOAT |
| FLOAT |
| BINARY_DOUBLE |
| DOUBLE |
|
|
| DECIMAL(p, s) |
| 无 | BOOLEANcan | BOOLEAN |
DATE | DATE | DATE | DATE |
TIME [(p)] | DATE | TIME [(p)] [WITHOUT TIMEZONE] | TIME [(p)] [WITHOUT TIMEZONE] |
DATETIME [(p)] | TIMESTAMP [(p)] [WITHOUT TIMEZONE] | TIMESTAMP [(p)] [WITHOUT TIMEZONE] | TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
|
|
| STRING |
|
| BYTEA | BYTES |
无 | 无 | ARRAY | ARRAY |
使用示例
- 源表
CREATE TEMPORARY TABLE jdbc_source ( `id` INT, `name` VARCHAR ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:xxx', 'table-name' = '<yourTable>', 'username' = '<yourUsername>', 'password' = '<yourPassword>' ); CREATE TEMPORARY TABLE blackhole_sink( `id` INT, `name` VARCHAR ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT * FROM jdbc_source ;
- 结果表
CREATE TEMPORARY TABLE datagen_source ( `name` VARCHAR, `age` INT ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE jdbc_sink ( `name` VARCHAR, `age` INT ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:xxxx', 'table-name' = '<yourTable>', 'username' = '<yourUsername>', 'password' = '<yourPassword>' ); INSERT INTO jdbc_sink SELECT * FROM datagen_source;
- 维表
CREATE TEMPORARY TABLE datagen_source( `id` INT, `data` BIGINT, `proctime` AS PROCTIME() ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE jdbc_dim ( `id` INT, `name` VARCHAR ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:xxx', 'table-name' = '<yourTable>', 'username' = '<yourUsername>', 'password' = '<yourPassword>' ); CREATE TEMPORARY TABLE blackhole_sink( `id` INT, `data` BIGINT, `name` VARCHAR ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT T.`id`,T.`data`, H.`name` FROM datagen_source AS T JOIN jdbc_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.id = H.id;