The PolarDB for PostgreSQL (Compatible with Oracle) 1.0 connector is being deprecated and will be removed in a future release. It will no longer receive updates or maintenance, and its configuration options will be removed from the console. For details, see End-of-support (EOS) for the PolarDB for PostgreSQL (Compatible with Oracle) 1.0 connector. Migrate your workloads promptly to avoid service interruptions.
Supported type: Sink | Running modes: Streaming and batch | API type: SQL | Update and delete support: Supported
Use this connector to write Flink data into a PolarDB for PostgreSQL (Compatible with Oracle) 1.0 cluster. PolarDB for PostgreSQL (Compatible with Oracle) is a cloud-native database built on a storage and compute separation architecture, with high compatibility with Oracle.
Prerequisites
Before you begin, ensure that you have:
-
A PolarDB for PostgreSQL (Compatible with Oracle) 1.0 cluster and a table. See Create a PolarDB for PostgreSQL (Compatible with Oracle) cluster and Create a table
-
A whitelist configured for the cluster. See Configure a cluster whitelist
Limitations
-
This connector supports only PolarDB 1.0. For PolarDB 2.0, use the JDBC connector instead.
-
This connector requires Ververica Runtime (VVR) 8.0.5 or later.
Syntax
CREATE TABLE polardbo_table (
id INT,
len INT,
content VARCHAR,
PRIMARY KEY(id)
) WITH (
'connector'='polardbo',
'url'='jdbc:postgresql://<Address>:<PortId>/<DatabaseName>',
'tableName'='<yourDatabaseTableName>',
'userName'='<yourDatabaseUserName>',
'password'='<yourDatabasePassword>'
);
Connector options
| Option | Type | Required | Default | Description |
|---|---|---|---|---|
connector |
String | Yes | N/A | Must be polardbo. |
url |
String | Yes | N/A | The JDBC URL of the database. Format: jdbc:postgresql://<Address>:<PortId>/<DatabaseName>. |
tableName |
String | Yes | N/A | The name of the database table. |
userName |
String | Yes | N/A | The username for connecting to the database. |
password |
String | Yes | N/A | The password for connecting to the database. Store credentials using project variables rather than hardcoding them in plaintext. |
maxRetryTimes |
Integer | No | 3 | The maximum number of retries for a failed write operation. |
targetSchema |
String | No | public | The name of the schema. |
caseSensitive |
String | No | false | Specifies whether table and field names are case-sensitive. Valid values: true, false. |
connectionMaxActive |
Integer | No | 5 | The maximum number of active connections in the connection pool. The system automatically releases idle connections. Setting this value too high may overload the database server. |
retryWaitTime |
Integer | No | 100 | The interval between retries, in milliseconds. |
batchSize |
Integer | No | 500 | The number of records written in a single batch. |
flushIntervalMs |
Integer | No | N/A | The flush interval, in milliseconds. If the number of buffered records does not reach batchSize within this interval, the connector flushes all buffered data. |
writeMode |
String | No | insert | The write mode. See Write modes for details. |
conflictMode |
String | No | strict | The conflict handling policy for insert operations. See Write modes for details. |
Write modes
The connector supports two write modes, controlled by writeMode and conflictMode.
`writeMode=insert` (default)
The connector performs a direct INSERT. When a primary key or unique index conflict occurs, the behavior is controlled by conflictMode:
conflictMode |
Behavior |
|---|---|
strict (default) |
Throws an error on conflict. |
ignore |
Ignores the conflicting record. |
update |
Updates the existing record on conflict. Works on tables without a primary key, but has lower write performance. |
`writeMode=upsert`
The connector performs an INSERT or UPDATE based on whether a conflict exists. This mode requires the sink table to have a primary key defined in the DDL.
Metrics
The following metrics are available for sink tables. For descriptions of each metric, see Monitoring metrics.
-
numRecordsOut -
numRecordsOutPerSecond -
numBytesOut -
numBytesOutPerSecond -
currentSendTime
Data type mappings
The following table maps PolarDB for PostgreSQL (Compatible with Oracle) 1.0 data types to Flink SQL data types for sink tables.
| PolarDB type | Flink type |
|---|---|
| BOOLEAN | BOOLEAN |
| INT | INT |
| NUMBER | BIGINT |
| NUMBER | DOUBLE |
| VARCHAR | VARCHAR |
| TIMESTAMP | TIMESTAMP |
| VARCHAR | DATE |
Example
The following example generates data using a datagen source and writes it to a PolarDB sink table.
CREATE TEMPORARY TABLE datagen_source (
`name` VARCHAR,
`age` INT
)
COMMENT 'datagen source table'
WITH (
'connector' = 'datagen'
);
CREATE TABLE polardbo_sink (
name VARCHAR,
age INT
) WITH (
'connector'='polardbo',
'url'='jdbc:postgresql://<Address>:<PortId>/<DatabaseName>',
'tableName'='<yourDatabaseTableName>',
'userName'='<yourDatabaseUserName>',
'password'='<yourDatabasePassword>'
);
INSERT INTO polardbo_sink
SELECT * FROM datagen_source;