All Products
Search
Document Center

Realtime Compute for Apache Flink:AnalyticDB for PostgreSQL connector

Last Updated:Mar 26, 2026

The AnalyticDB for PostgreSQL connector lets you use AnalyticDB for PostgreSQL as a source (beta), dimension, or sink table in Realtime Compute for Apache Flink SQL jobs. AnalyticDB for PostgreSQL is a massively parallel processing (MPP) data warehouse for large-scale online analytics.

Supported: Source (beta) · Dimension · Sink | Streaming · Batch | SQL API | Sink supports updates and deletes

Reading from a source table requires a custom connector configured via Flink CDC. For setup instructions, see Use Flink CDC to subscribe to full and incremental data in real time.

Prerequisites

Before you begin, ensure that you have:

Limitations

  • AnalyticDB for PostgreSQL V7.0 requires VVR 8.0.1 or later.

  • Self-managed PostgreSQL databases are not supported.

Syntax

Use connector='adbpg' for dimension and sink tables, and connector='adbpg-cdc' for source tables.

CREATE TEMPORARY TABLE adbpg_table (
  id      INT,
  len     INT,
  content VARCHAR,
  PRIMARY KEY (id)
) WITH (
  'connector'  = 'adbpg',
  'url'        = 'jdbc:postgresql://<host>:<port>/<database>',
  'tableName'  = '<table>',
  'userName'   = '<username>',
  'password'   = '<password>'
);

Connector options

General

These options apply to all table types.

Option Required Default Type Description
connector Yes STRING Set to adbpg-cdc for source tables, or adbpg for dimension and sink tables.
url Yes STRING The Java Database Connectivity (JDBC) URL in the format jdbc:postgresql://<host>:<port>/<database>.
tableName Yes STRING The name of the table in the database.
userName Yes STRING The username for the AnalyticDB for PostgreSQL database.
password Yes STRING The password for the AnalyticDB for PostgreSQL database.
maxRetryTimes No 3 INTEGER The maximum number of retry attempts when a write fails.
targetSchema No public STRING The name of the database schema.
caseSensitive No false STRING Specifies whether to treat identifiers as case-sensitive. Valid values: true, false.
connectionMaxActive No 5 INTEGER The maximum number of connections in the connection pool. The connector releases idle connections automatically. Setting this value too high may cause abnormal server connection counts.

Source-specific (beta)

These options apply only to source tables (connector='adbpg-cdc').

Option Required Default Type Description
schema-name Yes STRING The schema name. Supports regular expressions to subscribe to multiple schemas at once.
port Yes 5432 INTEGER The port of the AnalyticDB for PostgreSQL instance.
decoding.plugin.name Yes pgoutput STRING The PostgreSQL logical decoding plug-in. Set to pgoutput.
slot.name Yes STRING The name of the logical decoding slot. See Slot name guidance.
debezium.* Yes STRING Debezium client configuration properties. For example, set 'debezium.snapshot.mode' = 'never' to skip the initial snapshot. See Connector properties.
scan.incremental.snapshot.enabled No false BOOLEAN Specifies whether to enable incremental snapshot. Valid values: true, false.
scan.startup.mode No initial STRING The startup mode for data consumption. See Startup modes.
changelog-mode No ALL STRING How change events are encoded in the change stream. Valid values: ALL, UPSERT.
heartbeat.interval.ms No 30 seconds DURATION The interval for sending heartbeat packets, in milliseconds. See Heartbeat and WAL retention.
scan.incremental.snapshot.chunk.key-column No First primary key column STRING The column used as the chunk key during snapshot reads.

Startup modes

Value Behavior
initial (default) Performs a full historical data scan on first startup, then reads from the latest Write-Ahead Logging (WAL) position.
latest-offset Skips the historical data scan and reads only from the current end of the WAL.
snapshot Performs a full historical data scan and captures WAL data generated during that scan. Stops after the scan completes.

Changelog modes

Value Events captured
ALL (default) INSERT, DELETE, UPDATE_BEFORE, UPDATE_AFTER
UPSERT INSERT, DELETE, UPDATE_AFTER

Slot name guidance

Replication slots must be managed carefully to avoid conflicts:

  • Within a single Flink job: Use the same slot.name for all source tables in the job.

  • Across different Flink jobs: Assign a unique slot.name per job. Reusing a slot name across jobs causes the error PSQLException: ERROR: replication slot "debezium" is active for PID 974.

Heartbeat and WAL retention

When a source table has infrequent updates, the connector may not advance the replication slot offset. This prevents AnalyticDB for PostgreSQL from reclaiming WAL disk space, because the database retains WAL files until the slot offset advances past them. Sending periodic heartbeats keeps the slot offset moving forward, allowing old WAL files to be freed. Set heartbeat.interval.ms to a value appropriate for your update frequency when table changes are infrequent.

Sink-specific

These options apply only to sink tables (connector='adbpg').

Option Required Default Type Description
writeMode No insert STRING The write strategy for the first write attempt. See Write modes and conflict resolution.
conflictMode No strict STRING How to handle primary key or index conflicts. See Write modes and conflict resolution.
batchSize No 500 INTEGER The number of records written per batch.
flushIntervalMs No INTEGER The maximum time the connector waits before flushing cached records, in milliseconds. When the buffer does not reach batchSize within this period, all buffered records are written immediately.
retryWaitTime No 100 INTEGER The interval between retry attempts, in milliseconds.

Write modes and conflict resolution

writeMode and conflictMode work together to control how records are written and how conflicts are handled.

writeMode Description Notes
insert (default) Inserts records directly. Conflict handling is determined by conflictMode. Suitable for most use cases.
upsert Automatically updates existing records on conflict. Requires a primary key.
copy Inserts records using the PostgreSQL COPY command. Requires VVR 11.1 or later.
conflictMode Behavior on conflict Notes
strict (default) Reports an error.
ignore Silently discards the conflicting record.
update Updates the conflicting record. Suitable for tables without a primary key. Reduces write throughput.
upsert Updates the conflicting record. Requires a primary key. More efficient than update for keyed tables.

Dimension table-specific

These options apply only to dimension tables (connector='adbpg' used in a JOIN with FOR SYSTEM_TIME AS OF).

Option Required Default Type Description
cache No ALL STRING The cache policy for dimension table lookups. See Cache policies.
cacheSize No 100000 LONG The maximum number of rows held in cache. Takes effect only when cache=LRU.
cacheTTLMs No Long.MAX_VALUE LONG The cache entry lifetime in milliseconds. Behavior depends on the cache setting. See Cache policies.
maxJoinRows No 1024 INTEGER The maximum number of rows to join per input record.

Cache policies

Choosing the right cache policy balances query throughput against data freshness.

cache value Behavior When to use
ALL (default) Loads the entire dimension table into memory before the job starts. All lookups hit the cache. On cache expiry (cacheTTLMs), reloads the full table. If a join key is not found in the cache, it does not exist. Small-to-medium dimension tables where freshness can tolerate periodic reloads.
LRU Caches a subset of rows. On a cache miss, fetches from the database and updates the cache. Evicts the least recently used entries when the cache reaches cacheSize. cacheTTLMs controls per-entry expiry. Large dimension tables where caching everything is not feasible, or when low-latency partial caching is acceptable.
None No caching. Every lookup goes to the database. When data must always be read fresh from the database.

Data type mappings

AnalyticDB for PostgreSQL type Flink SQL type
BOOLEAN BOOLEAN
SMALLINT INT
INT INT
BIGINT BIGINT
FLOAT DOUBLE
VARCHAR VARCHAR
TEXT VARCHAR
TIMESTAMP TIMESTAMP
DATE DATE

Examples

Sink table

This example reads from a datagen source and writes to an AnalyticDB for PostgreSQL sink table.

CREATE TEMPORARY TABLE datagen_source (
  `name` VARCHAR,
  `age`  INT
)
COMMENT 'datagen source table'
WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE adbpg_sink (
  name VARCHAR,
  age  INT
) WITH (
  'connector' = 'adbpg',
  'url'       = 'jdbc:postgresql://<host>:<port>/<database>',
  'tableName' = '<table>',
  'userName'  = '<username>',
  'password'  = '<password>'
);

INSERT INTO adbpg_sink
SELECT * FROM datagen_source;

Dimension table

This example joins a datagen stream with an AnalyticDB for PostgreSQL dimension table using a temporal join.

CREATE TEMPORARY TABLE datagen_source (
  a         INT,
  b         BIGINT,
  c         STRING,
  `proctime` AS PROCTIME()
)
COMMENT 'datagen source table'
WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE adbpg_dim (
  a INT,
  b VARCHAR,
  c VARCHAR
) WITH (
  'connector' = 'adbpg',
  'url'       = 'jdbc:postgresql://<host>:<port>/<database>',
  'tableName' = '<table>',
  'userName'  = '<username>',
  'password'  = '<password>'
);

CREATE TEMPORARY TABLE blackhole_sink (
  a INT,
  b STRING
)
COMMENT 'blackhole sink table'
WITH (
  'connector' = 'blackhole'
);

INSERT INTO blackhole_sink
SELECT T.a, H.b
FROM datagen_source AS T
JOIN adbpg_dim FOR SYSTEM_TIME AS OF T.proctime AS H
  ON T.a = H.a;

Source table (beta)

For source table configuration and examples, see Use Flink CDC to subscribe to full and incremental data in real time.

Metrics

Sink table metrics are available in the Realtime Compute for Apache Flink console. Dimension tables do not expose metrics. For metric definitions, see Metrics.

Metric Description
numRecordsOut Total number of records written to the sink.
numRecordsOutPerSecond Records written per second.
numBytesOut Total bytes written to the sink.
numBytesOutPerSecond Bytes written per second.
currentSendTime Time taken for the most recent write operation.

What's next