Stream data from a Kafka topic into AnalyticDB for PostgreSQL using Apache Flink SQL connectors. This guide walks you through creating the Kafka source table, the AnalyticDB for PostgreSQL destination table, and the Flink sink table, then running the synchronization job.
Prerequisites
Before you begin, ensure that you have:
Added the Flink client IP address to the IP address whitelist of your AnalyticDB for PostgreSQL instance. See Configure an IP address whitelist.
Deployed the Kafka connector JAR package in the
$FLINK_HOME/libdirectory of the Flink client. This guide uses the following Maven dependency:<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka</artifactId> <version>1.17-SNAPSHOT</version> </dependency>For the full Kafka connector reference, see Apache Kafka SQL Connector.
Deployed the AnalyticDB for PostgreSQL connector JAR package in the
$FLINK_HOME/libdirectory. This guide uses connector version 1.13. Select a connector version that matches your Flink version. Download the JAR from AnalyticDB PostgreSQL Connector releases.
Usage notes
Reserved keywords: AnalyticDB for PostgreSQL treats
partitionandoffsetas reserved keywords. The example in this guide replaces them withshardandmeta_offsetin both the Flink source table and the destination table.Conflict handling: If the destination table has a primary key, configure
conflictmodeto control what happens when a conflict occurs. The default value isupsert, which uses INSERT ON CONFLICT or COPY ON CONFLICT.Write modes: The connector supports three modes: BATCH INSERT (
writemode=0), COPY API (writemode=1, default), and BATCH UPSERT (writemode=2).
If you use Alibaba Cloud Realtime Compute for Apache Flink, the setup steps in this guide also apply. See What is Alibaba Cloud Realtime Compute for Apache Flink?
Synchronize data from Kafka to AnalyticDB for PostgreSQL
The setup involves four steps:
Create the Kafka source table in Flink.
Create the destination table in AnalyticDB for PostgreSQL.
Create the AnalyticDB for PostgreSQL sink table in Flink.
Run the synchronization job.
Step 1: Create the Kafka source table in Flink
On the Flink client, define a source table that reads from the Kafka topic:
CREATE TABLE KafkaSourceTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`event_time` TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
`shard` BIGINT METADATA FROM 'partition' VIRTUAL,
`meta_offset` BIGINT METADATA FROM 'offset' VIRTUAL
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'value.format' = 'debezium-json'
);The event_time, shard, and meta_offset columns capture Kafka message metadata. shard maps from the Kafka partition metadata field, and meta_offset maps from the offset metadata field — both renamed to avoid conflicts with AnalyticDB for PostgreSQL reserved keywords. For all available metadata fields, see Available Metadata.
Kafka connector parameters
| Parameter | Required | Type | Default | Description |
|---|---|---|---|---|
connector | Yes | String | — | Connector type. Set to kafka. |
topic | Yes | String | — | Kafka topic name. |
properties.bootstrap.servers | Yes | String | — | Kafka bootstrap server address in host:port format. |
properties.group.id | Yes | String | — | Kafka consumer group ID. |
scan.startup.mode | No | Enum: earliest-offset, latest-offset, group-offsets, timestamp, specific-offsets | — | Consumer start position. See Start reading position. |
value.format | Yes | String | — | Serialization format for Kafka message values. See Formats. |
For the full parameter reference, see Connector options.
Step 2: Create the destination table in AnalyticDB for PostgreSQL
Connect to the AnalyticDB for PostgreSQL instance and create the destination table. Use a schema that matches the Flink source table, replacing reserved keywords with alternative column names:
CREATE TABLE ADBPGTargetTable (
user_id BIGINT PRIMARY KEY,
item_id BIGINT,
behavior VARCHAR,
event_time TIMESTAMP,
shard BIGINT, -- replaces 'partition' (reserved keyword)
meta_offset BIGINT -- replaces 'offset' (reserved keyword)
);Step 3: Create the AnalyticDB for PostgreSQL sink table in Flink
On the Flink client, define a sink table that maps to the AnalyticDB for PostgreSQL destination table:
CREATE TABLE ADBPGTargetTable (
`user_id` BIGINT PRIMARY KEY,
`item_id` BIGINT,
`behavior` STRING,
`event_time` TIMESTAMP(3),
`shard` BIGINT,
`meta_offset` BIGINT
) WITH (
'connector' = 'adbpg-nightly-1.13',
'url' = 'jdbc:postgresql://gp-bp15s3b9kn00j****-master.gpdb.rds.aliyuncs.com:5432/postgres',
'tablename' = 'ADBPGTargetTable',
'username' = 'user01',
'password' = 'Password01',
'targetschema' = 'public',
'writemode' = '1',
'batchsize' = '50000',
'conflictmode' = 'ignore',
'exceptionmode' = 'ignore',
'maxretrytimes' = '2',
'retrywaittime' = '200',
'connectionmaxactive' = '5',
'casesensitive' = '0',
'usecopy' = '0'
);AnalyticDB for PostgreSQL connector parameters
| Parameter | Required | Type | Default | Description |
|---|---|---|---|---|
connector | Yes | String | — | Connector name. Format: adbpg-nightly-{version}. For connector 1.13, use adbpg-nightly-1.13. |
url | Yes | String | — | JDBC connection URL. Format: jdbc:postgresql://<endpoint:port>/<database>. Example: jdbc:postgresql://gp-bp15s3b9kn00j****-master.gpdb.rds.aliyuncs.com:5432/postgres. |
tablename | Yes | String | — | Name of the destination table in AnalyticDB for PostgreSQL. |
username | Yes | String | — | Database account name. |
password | Yes | String | — | Password for the database account. |
targetschema | No | String | public | Schema of the destination table. |
writemode | No | Enum: 0 (BATCH INSERT), 1 (COPY API), 2 (BATCH UPSERT) | 1 | Write method. COPY API (1) provides higher throughput for bulk ingestion. |
batchsize | No | Integer | 50000 | Maximum number of rows per write batch. |
conflictmode | No | Enum: ignore, strict, update, upsert | upsert | Primary key conflict handling. ignore keeps the existing row. strict triggers a failover and reports an error. update updates the conflicting row. upsert performs an upsert using INSERT ON CONFLICT or COPY ON CONFLICT. For partitioned tables, the instance must run minor version 6.3.6.1 or later — see Update the minor engine version. |
exceptionmode | No | Enum: ignore, strict | ignore | Exception handling during writes. ignore skips error rows. strict triggers a failover and reports an error. |
maxretrytimes | No | Integer | 3 | Maximum retries after a statement execution failure. |
retrywaittime | No | Integer | 100 | Wait time between retries, in milliseconds. |
batchwritetimeoutms | No | Integer | 50000 | Timeout for a write batch, in milliseconds. After this period, the accumulated batch is flushed. |
connectionmaxactive | No | Integer | 5 | Maximum active connections in the connection pool per task manager. |
casesensitive | No | Enum: 0 (case-insensitive), 1 (case-sensitive) | 0 | Case sensitivity for column and table names. |
usecopy | No | Enum: 0, 1 | — | Whether to use COPY ON CONFLICT for upsert operations. |
verbose | No | Enum: 0 (disabled), 1 (enabled) | 0 | Connector log output. |
Step 4: Run the synchronization job
On the Flink client, run the following statement to start continuous data ingestion from Kafka into AnalyticDB for PostgreSQL:
INSERT INTO ADBPGTargetTable SELECT * FROM KafkaSourceTable;The Flink client console shows the job execution status.

What's next
To learn more about the Kafka connector options, see Apache Kafka SQL Connector.
To handle primary key conflicts with upsert semantics, set
conflictmodetoupsertand review Use INSERT ON CONFLICT to overwrite data.