本文为您介绍如何创建分析型数据库PostgreSQL版结果表,以及创建过程中涉及到的WITH参数和类型映射。
注意 本文仅适用于Blink 3.6.0及以上版本。
实现原理
实时计算Flink版写入分析型数据库PostgreSQL结果表可以分为以下两个阶段:
- 将实时计算Flink版每行的结果数据拼接为一行SQL。
- 在目标数据库执行拼接后的SQL。
DDL定义
实时计算Flink版支持使用分析型数据库PostgreSQL版作为结果输出。示例代码如下。
create table rds_output(
id INT,
len INT,
content VARCHAR,
PRIMARY KEY(id)
) with (
type='adbpg',
url='jdbc:postgresql://<yourNetworkAddress>:<PortId>/<yourDatabaseName>',
tableName='<yourDatabaseTableName>',
userName='<yourDatabaseUserName>',
password='<yourDatabasePassword>'
);
WITH参数
参数 | 说明 | 是否必填 | 备注 |
---|---|---|---|
type | 源表类型。 | 是 | 固定值为adbpg 。
|
url | JDBC连接地址。 | 是 | 分析型数据库PostgreSQL版数据库的JDBC连接地址。格式为'jdbc:postgresql://<yourNetworkAddress>:<PortId>/<yourDatabaseName>',其中:
|
tableName | 表名。 | 是 | 无。 |
username | 账号。 | 是 | 无。 |
password | 密码。 | 是 | 无。 |
maxRetryTimes | 写入重试次数。 | 否 | 默认值为3。 |
useCopy | 是否采用Copy API写入数据。 | 否 | 参数取值如下:
|
batchSize | 一次批量写入的条数。 | 否 | 默认值为5000。 |
exceptionMode | 数据写入过程中出现异常时的处理策略。 | 否 | 支持以下两种处理策略:
|
conflictMode | 当主键冲突或唯一索引出现冲突时的处理策略。 | 否 | 支持以下三种处理策略:
|
targetSchema | Schema名称。 | 否 | 默认值为public。 |
connectionMaxActive | 单个task允许的最大连接数。 | 否 | 请根据实际的并发task个数,以及目标端数据库允许的最大连接数进行设置。 |
类型映射
分析型数据库PostgreSQL和实时计算Flink版字段类型对应关系如下。
分析型数据库PostgreSQL版字段类型 | 实时计算Flink版字段类型 |
---|---|
BOOLEAN | BOOLEAN |
SMALLINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
DOUBLE PRECISION | DOUBLE |
TEXT | VARCHAR |
TIMESTAMP | DATETIME |
DATE | DATE |
REAL | FLOAT |
DOUBLE PRECISION | DECIMAL |
TIME | TIME |
TIMESTAMP | TIMESTAMP |