All Products
Search
Document Center

Realtime Compute for Apache Flink:PolarDB for PostgreSQL (Compatible with Oracle) 1.0 (deprecating)

Last Updated:Mar 26, 2026
Important

The PolarDB for PostgreSQL (Compatible with Oracle) 1.0 connector is being deprecated and will be removed in a future release. It will no longer receive updates or maintenance, and its configuration options will be removed from the console. For details, see End-of-support (EOS) for the PolarDB for PostgreSQL (Compatible with Oracle) 1.0 connector. Migrate your workloads promptly to avoid service interruptions.

Supported type: Sink | Running modes: Streaming and batch | API type: SQL | Update and delete support: Supported

Use this connector to write Flink data into a PolarDB for PostgreSQL (Compatible with Oracle) 1.0 cluster. PolarDB for PostgreSQL (Compatible with Oracle) is a cloud-native database built on a storage and compute separation architecture, with high compatibility with Oracle.

Prerequisites

Before you begin, ensure that you have:

Limitations

  • This connector supports only PolarDB 1.0. For PolarDB 2.0, use the JDBC connector instead.

  • This connector requires Ververica Runtime (VVR) 8.0.5 or later.

Syntax

CREATE TABLE polardbo_table (
  id INT,
  len INT,
  content VARCHAR,
  PRIMARY KEY(id)
) WITH (
  'connector'='polardbo',
  'url'='jdbc:postgresql://<Address>:<PortId>/<DatabaseName>',
  'tableName'='<yourDatabaseTableName>',
  'userName'='<yourDatabaseUserName>',
  'password'='<yourDatabasePassword>'
);

Connector options

Option Type Required Default Description
connector String Yes N/A Must be polardbo.
url String Yes N/A The JDBC URL of the database. Format: jdbc:postgresql://<Address>:<PortId>/<DatabaseName>.
tableName String Yes N/A The name of the database table.
userName String Yes N/A The username for connecting to the database.
password String Yes N/A The password for connecting to the database. Store credentials using project variables rather than hardcoding them in plaintext.
maxRetryTimes Integer No 3 The maximum number of retries for a failed write operation.
targetSchema String No public The name of the schema.
caseSensitive String No false Specifies whether table and field names are case-sensitive. Valid values: true, false.
connectionMaxActive Integer No 5 The maximum number of active connections in the connection pool. The system automatically releases idle connections. Setting this value too high may overload the database server.
retryWaitTime Integer No 100 The interval between retries, in milliseconds.
batchSize Integer No 500 The number of records written in a single batch.
flushIntervalMs Integer No N/A The flush interval, in milliseconds. If the number of buffered records does not reach batchSize within this interval, the connector flushes all buffered data.
writeMode String No insert The write mode. See Write modes for details.
conflictMode String No strict The conflict handling policy for insert operations. See Write modes for details.

Write modes

The connector supports two write modes, controlled by writeMode and conflictMode.

`writeMode=insert` (default)

The connector performs a direct INSERT. When a primary key or unique index conflict occurs, the behavior is controlled by conflictMode:

conflictMode Behavior
strict (default) Throws an error on conflict.
ignore Ignores the conflicting record.
update Updates the existing record on conflict. Works on tables without a primary key, but has lower write performance.

`writeMode=upsert`

The connector performs an INSERT or UPDATE based on whether a conflict exists. This mode requires the sink table to have a primary key defined in the DDL.

Metrics

The following metrics are available for sink tables. For descriptions of each metric, see Monitoring metrics.

  • numRecordsOut

  • numRecordsOutPerSecond

  • numBytesOut

  • numBytesOutPerSecond

  • currentSendTime

Data type mappings

The following table maps PolarDB for PostgreSQL (Compatible with Oracle) 1.0 data types to Flink SQL data types for sink tables.

PolarDB type Flink type
BOOLEAN BOOLEAN
INT INT
NUMBER BIGINT
NUMBER DOUBLE
VARCHAR VARCHAR
TIMESTAMP TIMESTAMP
VARCHAR DATE

Example

The following example generates data using a datagen source and writes it to a PolarDB sink table.

CREATE TEMPORARY TABLE datagen_source (
  `name` VARCHAR,
  `age` INT
)
COMMENT 'datagen source table'
WITH (
  'connector' = 'datagen'
);

CREATE TABLE polardbo_sink (
  name VARCHAR,
  age INT
) WITH (
  'connector'='polardbo',
  'url'='jdbc:postgresql://<Address>:<PortId>/<DatabaseName>',
  'tableName'='<yourDatabaseTableName>',
  'userName'='<yourDatabaseUserName>',
  'password'='<yourDatabasePassword>'
);

INSERT INTO polardbo_sink
SELECT * FROM datagen_source;