全部产品
Search
文档中心

实时计算Flink版:PolarDB PostgreSQL版(Oracle语法兼容1.0)

更新时间:Apr 07, 2024

本文为您介绍如何使用PolarDB PostgreSQL版(Oracle语法兼容1.0)连接器。

背景信息

PolarDB PostgreSQL版(兼容Oracle)是阿里巴巴自研的新一代云原生数据库,在存储计算分离架构下,利用了软硬件结合的优势,为您提供具备极致弹性、高性能、海量存储、安全可靠的数据库服务,高度兼容Oracle。

PolarDB PostgreSQL版(Oracle语法兼容1.0)连接器支持的信息如下。

类别

详情

支持类型

结果表

运行模式

流模式和批模式

数据格式

暂不适用

特有监控指标

  • 结果表:

    • numRecordsOut

    • numRecordsOutPerSecond

    • numBytesOut

    • numBytesOutPerSecond

    • currentSendTime

说明

指标含义详情,请参见监控指标说明

API种类

SQL

是否支持更新或删除结果表数据

前提条件

使用限制

仅Flink实时计算引擎VVR 8.0.5及以上版本支持PolarDB PostgreSQL版(Oracle语法兼容1.0)连接器。

语法结构

CREATE TABLE polardbo_table (
 id INT,
 len INT,
 content VARCHAR,
 PRIMARY KEY(id)
) WITH (
 'connector'='polardbo',
 'url'='jdbc:postgresql://<Address>:<PortId>/<DatabaseName>',
 'tableName'='<yourDatabaseTableName>',
 'userName'='<yourDatabaseUserName>',
 'password'='<yourDatabasePassword>'
);

WITH参数

参数

说明

数据类型

是否必填

默认值

备注

connector

表类型。

String

固定值为polardbo。

url

JDBC连接地址。

String

格式为jdbc:postgresql://<Address>:<PortId>/<DatabaseName>

tableName

表名。

String

无。

userName

用户名。

String

无。

password

密码。

String

为了避免您的密码信息泄露,建议您通过密钥管理的方式填写密码取值,详情请参见密钥管理

maxRetryTimes

写入数据失败后,重试写入的最大次数。

Integer

3

无。

targetSchema

Schema名称。

String

public

无。

caseSensitive

大小写是否敏感。

String

false

参数取值如下:

  • true:大小写敏感。

  • false(默认值):大小写不敏感。

connectionMaxActive

连接池的最大连接数。

Integer

5

系统会自动释放与数据库服务的空闲连接。

重要

此参数设置过大可能会导致服务端连接数出现异常。

retryWaitTime

重试的时间间隔。

Integer

100

单位毫秒。

batchSize

一次批量写入的数据条数。

Integer

500

无。

flushIntervalMs

清空缓存的时间间隔。

Integer

单位毫秒。如果缓存中的数据在等待指定时间后,依然没有达到输出条件,系统会自动输出缓存中的所有数据。

writeMode

第一次尝试写入时的写入方式。

String

insert

参数取值如下:

  • insert(默认值):直接插入,冲突时参考conflictMode。

  • upsert:冲突时自动update,只能用于有主键的表。

conflictMode

当Insert写入出现主键冲突或者唯一索引冲突时的处理策略。

String

strict

参数取值如下:

  • strict(默认值):冲突时报错。

  • ignore:冲突时忽略。

  • update:冲突时自动更新,可用于无主键表,执行效率较低。

类型映射

下面是将connector用于结果表的场景下,Flink字段到PolarDB PostgreSQL(Oracle兼容1.0)字段的映射关系。

PolarDB PostgreSQL版(Oracle语法兼容1.0)字段类型

Flink字段类型

boolean

boolean

int

int

number

bigint

number

double

varchar

varchar

timestamp

timestamp

varchar

date

使用示例

  • 结果表

    CREATE TEMPORARY TABLE datagen_source (
     `name` VARCHAR,
     `age` INT
    )
    COMMENT 'datagen source table'
    WITH (
     'connector' = 'datagen'
    );
    
    CREATE TABLE polardbo_sink (
     name VARCHAR,
     age INT
    ) WITH (
     'connector'='polardbo',
     'url'='jdbc:postgresql://<Address>:<PortId>/<DatabaseName>',
     'tableName'='<yourDatabaseTableName>',
     'userName'='<yourDatabaseUserName>',
     'password'='<yourDatabasePassword>'
    );
    
    INSERT INTO polardbo_sink
    SELECT * FROM datagen_source;