The AnalyticDB for PostgreSQL connector lets you use AnalyticDB for PostgreSQL as a source (beta), dimension, or sink table in Realtime Compute for Apache Flink SQL jobs. AnalyticDB for PostgreSQL is a massively parallel processing (MPP) data warehouse for large-scale online analytics.
Supported: Source (beta) · Dimension · Sink | Streaming · Batch | SQL API | Sink supports updates and deletes
Reading from a source table requires a custom connector configured via Flink CDC. For setup instructions, see Use Flink CDC to subscribe to full and incremental data in real time.
Prerequisites
Before you begin, ensure that you have:
-
An AnalyticDB for PostgreSQL instance and table. See Create an instance and CREATE TABLE
-
An IP address whitelist configured for the instance. See Configure an IP address whitelist
Limitations
-
AnalyticDB for PostgreSQL V7.0 requires VVR 8.0.1 or later.
-
Self-managed PostgreSQL databases are not supported.
Syntax
Use connector='adbpg' for dimension and sink tables, and connector='adbpg-cdc' for source tables.
CREATE TEMPORARY TABLE adbpg_table (
id INT,
len INT,
content VARCHAR,
PRIMARY KEY (id)
) WITH (
'connector' = 'adbpg',
'url' = 'jdbc:postgresql://<host>:<port>/<database>',
'tableName' = '<table>',
'userName' = '<username>',
'password' = '<password>'
);
Connector options
General
These options apply to all table types.
| Option | Required | Default | Type | Description |
|---|---|---|---|---|
connector |
Yes | — | STRING | Set to adbpg-cdc for source tables, or adbpg for dimension and sink tables. |
url |
Yes | — | STRING | The Java Database Connectivity (JDBC) URL in the format jdbc:postgresql://<host>:<port>/<database>. |
tableName |
Yes | — | STRING | The name of the table in the database. |
userName |
Yes | — | STRING | The username for the AnalyticDB for PostgreSQL database. |
password |
Yes | — | STRING | The password for the AnalyticDB for PostgreSQL database. |
maxRetryTimes |
No | 3 |
INTEGER | The maximum number of retry attempts when a write fails. |
targetSchema |
No | public |
STRING | The name of the database schema. |
caseSensitive |
No | false |
STRING | Specifies whether to treat identifiers as case-sensitive. Valid values: true, false. |
connectionMaxActive |
No | 5 |
INTEGER | The maximum number of connections in the connection pool. The connector releases idle connections automatically. Setting this value too high may cause abnormal server connection counts. |
Source-specific (beta)
These options apply only to source tables (connector='adbpg-cdc').
| Option | Required | Default | Type | Description |
|---|---|---|---|---|
schema-name |
Yes | — | STRING | The schema name. Supports regular expressions to subscribe to multiple schemas at once. |
port |
Yes | 5432 |
INTEGER | The port of the AnalyticDB for PostgreSQL instance. |
decoding.plugin.name |
Yes | pgoutput |
STRING | The PostgreSQL logical decoding plug-in. Set to pgoutput. |
slot.name |
Yes | — | STRING | The name of the logical decoding slot. See Slot name guidance. |
debezium.* |
Yes | — | STRING | Debezium client configuration properties. For example, set 'debezium.snapshot.mode' = 'never' to skip the initial snapshot. See Connector properties. |
scan.incremental.snapshot.enabled |
No | false |
BOOLEAN | Specifies whether to enable incremental snapshot. Valid values: true, false. |
scan.startup.mode |
No | initial |
STRING | The startup mode for data consumption. See Startup modes. |
changelog-mode |
No | ALL |
STRING | How change events are encoded in the change stream. Valid values: ALL, UPSERT. |
heartbeat.interval.ms |
No | 30 seconds | DURATION | The interval for sending heartbeat packets, in milliseconds. See Heartbeat and WAL retention. |
scan.incremental.snapshot.chunk.key-column |
No | First primary key column | STRING | The column used as the chunk key during snapshot reads. |
Startup modes
| Value | Behavior |
|---|---|
initial (default) |
Performs a full historical data scan on first startup, then reads from the latest Write-Ahead Logging (WAL) position. |
latest-offset |
Skips the historical data scan and reads only from the current end of the WAL. |
snapshot |
Performs a full historical data scan and captures WAL data generated during that scan. Stops after the scan completes. |
Changelog modes
| Value | Events captured |
|---|---|
ALL (default) |
INSERT, DELETE, UPDATE_BEFORE, UPDATE_AFTER |
UPSERT |
INSERT, DELETE, UPDATE_AFTER |
Slot name guidance
Replication slots must be managed carefully to avoid conflicts:
-
Within a single Flink job: Use the same
slot.namefor all source tables in the job. -
Across different Flink jobs: Assign a unique
slot.nameper job. Reusing a slot name across jobs causes the errorPSQLException: ERROR: replication slot "debezium" is active for PID 974.
Heartbeat and WAL retention
When a source table has infrequent updates, the connector may not advance the replication slot offset. This prevents AnalyticDB for PostgreSQL from reclaiming WAL disk space, because the database retains WAL files until the slot offset advances past them. Sending periodic heartbeats keeps the slot offset moving forward, allowing old WAL files to be freed. Set heartbeat.interval.ms to a value appropriate for your update frequency when table changes are infrequent.
Sink-specific
These options apply only to sink tables (connector='adbpg').
| Option | Required | Default | Type | Description |
|---|---|---|---|---|
writeMode |
No | insert |
STRING | The write strategy for the first write attempt. See Write modes and conflict resolution. |
conflictMode |
No | strict |
STRING | How to handle primary key or index conflicts. See Write modes and conflict resolution. |
batchSize |
No | 500 |
INTEGER | The number of records written per batch. |
flushIntervalMs |
No | — | INTEGER | The maximum time the connector waits before flushing cached records, in milliseconds. When the buffer does not reach batchSize within this period, all buffered records are written immediately. |
retryWaitTime |
No | 100 |
INTEGER | The interval between retry attempts, in milliseconds. |
Write modes and conflict resolution
writeMode and conflictMode work together to control how records are written and how conflicts are handled.
writeMode |
Description | Notes |
|---|---|---|
insert (default) |
Inserts records directly. Conflict handling is determined by conflictMode. |
Suitable for most use cases. |
upsert |
Automatically updates existing records on conflict. | Requires a primary key. |
copy |
Inserts records using the PostgreSQL COPY command. |
Requires VVR 11.1 or later. |
conflictMode |
Behavior on conflict | Notes |
|---|---|---|
strict (default) |
Reports an error. | |
ignore |
Silently discards the conflicting record. | |
update |
Updates the conflicting record. | Suitable for tables without a primary key. Reduces write throughput. |
upsert |
Updates the conflicting record. | Requires a primary key. More efficient than update for keyed tables. |
Dimension table-specific
These options apply only to dimension tables (connector='adbpg' used in a JOIN with FOR SYSTEM_TIME AS OF).
| Option | Required | Default | Type | Description |
|---|---|---|---|---|
cache |
No | ALL |
STRING | The cache policy for dimension table lookups. See Cache policies. |
cacheSize |
No | 100000 |
LONG | The maximum number of rows held in cache. Takes effect only when cache=LRU. |
cacheTTLMs |
No | Long.MAX_VALUE |
LONG | The cache entry lifetime in milliseconds. Behavior depends on the cache setting. See Cache policies. |
maxJoinRows |
No | 1024 |
INTEGER | The maximum number of rows to join per input record. |
Cache policies
Choosing the right cache policy balances query throughput against data freshness.
cache value |
Behavior | When to use |
|---|---|---|
ALL (default) |
Loads the entire dimension table into memory before the job starts. All lookups hit the cache. On cache expiry (cacheTTLMs), reloads the full table. If a join key is not found in the cache, it does not exist. |
Small-to-medium dimension tables where freshness can tolerate periodic reloads. |
LRU |
Caches a subset of rows. On a cache miss, fetches from the database and updates the cache. Evicts the least recently used entries when the cache reaches cacheSize. cacheTTLMs controls per-entry expiry. |
Large dimension tables where caching everything is not feasible, or when low-latency partial caching is acceptable. |
None |
No caching. Every lookup goes to the database. | When data must always be read fresh from the database. |
Data type mappings
| AnalyticDB for PostgreSQL type | Flink SQL type |
|---|---|
BOOLEAN |
BOOLEAN |
SMALLINT |
INT |
INT |
INT |
BIGINT |
BIGINT |
FLOAT |
DOUBLE |
VARCHAR |
VARCHAR |
TEXT |
VARCHAR |
TIMESTAMP |
TIMESTAMP |
DATE |
DATE |
Examples
Sink table
This example reads from a datagen source and writes to an AnalyticDB for PostgreSQL sink table.
CREATE TEMPORARY TABLE datagen_source (
`name` VARCHAR,
`age` INT
)
COMMENT 'datagen source table'
WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE adbpg_sink (
name VARCHAR,
age INT
) WITH (
'connector' = 'adbpg',
'url' = 'jdbc:postgresql://<host>:<port>/<database>',
'tableName' = '<table>',
'userName' = '<username>',
'password' = '<password>'
);
INSERT INTO adbpg_sink
SELECT * FROM datagen_source;
Dimension table
This example joins a datagen stream with an AnalyticDB for PostgreSQL dimension table using a temporal join.
CREATE TEMPORARY TABLE datagen_source (
a INT,
b BIGINT,
c STRING,
`proctime` AS PROCTIME()
)
COMMENT 'datagen source table'
WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE adbpg_dim (
a INT,
b VARCHAR,
c VARCHAR
) WITH (
'connector' = 'adbpg',
'url' = 'jdbc:postgresql://<host>:<port>/<database>',
'tableName' = '<table>',
'userName' = '<username>',
'password' = '<password>'
);
CREATE TEMPORARY TABLE blackhole_sink (
a INT,
b STRING
)
COMMENT 'blackhole sink table'
WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink
SELECT T.a, H.b
FROM datagen_source AS T
JOIN adbpg_dim FOR SYSTEM_TIME AS OF T.proctime AS H
ON T.a = H.a;
Source table (beta)
For source table configuration and examples, see Use Flink CDC to subscribe to full and incremental data in real time.
Metrics
Sink table metrics are available in the Realtime Compute for Apache Flink console. Dimension tables do not expose metrics. For metric definitions, see Metrics.
| Metric | Description |
|---|---|
numRecordsOut |
Total number of records written to the sink. |
numRecordsOutPerSecond |
Records written per second. |
numBytesOut |
Total bytes written to the sink. |
numBytesOutPerSecond |
Bytes written per second. |
currentSendTime |
Time taken for the most recent write operation. |