All Products
Search
Document Center

AnalyticDB:Use Apache Flink to synchronize data from Kafka to AnalyticDB for PostgreSQL

Last Updated:Mar 28, 2026

Stream data from a Kafka topic into AnalyticDB for PostgreSQL using Apache Flink SQL connectors. This guide walks you through creating the Kafka source table, the AnalyticDB for PostgreSQL destination table, and the Flink sink table, then running the synchronization job.

Prerequisites

Before you begin, ensure that you have:

  • Added the Flink client IP address to the IP address whitelist of your AnalyticDB for PostgreSQL instance. See Configure an IP address whitelist.

  • Deployed the Kafka connector JAR package in the $FLINK_HOME/lib directory of the Flink client. This guide uses the following Maven dependency:

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka</artifactId>
      <version>1.17-SNAPSHOT</version>
    </dependency>

    For the full Kafka connector reference, see Apache Kafka SQL Connector.

  • Deployed the AnalyticDB for PostgreSQL connector JAR package in the $FLINK_HOME/lib directory. This guide uses connector version 1.13. Select a connector version that matches your Flink version. Download the JAR from AnalyticDB PostgreSQL Connector releases.

Usage notes

  • Reserved keywords: AnalyticDB for PostgreSQL treats partition and offset as reserved keywords. The example in this guide replaces them with shard and meta_offset in both the Flink source table and the destination table.

  • Conflict handling: If the destination table has a primary key, configure conflictmode to control what happens when a conflict occurs. The default value is upsert, which uses INSERT ON CONFLICT or COPY ON CONFLICT.

  • Write modes: The connector supports three modes: BATCH INSERT (writemode=0), COPY API (writemode=1, default), and BATCH UPSERT (writemode=2).

If you use Alibaba Cloud Realtime Compute for Apache Flink, the setup steps in this guide also apply. See What is Alibaba Cloud Realtime Compute for Apache Flink?

Synchronize data from Kafka to AnalyticDB for PostgreSQL

The setup involves four steps:

  1. Create the Kafka source table in Flink.

  2. Create the destination table in AnalyticDB for PostgreSQL.

  3. Create the AnalyticDB for PostgreSQL sink table in Flink.

  4. Run the synchronization job.

Step 1: Create the Kafka source table in Flink

On the Flink client, define a source table that reads from the Kafka topic:

CREATE TABLE KafkaSourceTable (
  `user_id`     BIGINT,
  `item_id`     BIGINT,
  `behavior`    STRING,
  `event_time`  TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
  `shard`       BIGINT METADATA FROM 'partition' VIRTUAL,
  `meta_offset` BIGINT METADATA FROM 'offset' VIRTUAL
) WITH (
  'connector'                    = 'kafka',
  'topic'                        = 'user_behavior',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id'          = 'testGroup',
  'scan.startup.mode'            = 'earliest-offset',
  'value.format'                 = 'debezium-json'
);

The event_time, shard, and meta_offset columns capture Kafka message metadata. shard maps from the Kafka partition metadata field, and meta_offset maps from the offset metadata field — both renamed to avoid conflicts with AnalyticDB for PostgreSQL reserved keywords. For all available metadata fields, see Available Metadata.

Kafka connector parameters

ParameterRequiredTypeDefaultDescription
connectorYesStringConnector type. Set to kafka.
topicYesStringKafka topic name.
properties.bootstrap.serversYesStringKafka bootstrap server address in host:port format.
properties.group.idYesStringKafka consumer group ID.
scan.startup.modeNoEnum: earliest-offset, latest-offset, group-offsets, timestamp, specific-offsetsConsumer start position. See Start reading position.
value.formatYesStringSerialization format for Kafka message values. See Formats.

For the full parameter reference, see Connector options.

Step 2: Create the destination table in AnalyticDB for PostgreSQL

Connect to the AnalyticDB for PostgreSQL instance and create the destination table. Use a schema that matches the Flink source table, replacing reserved keywords with alternative column names:

CREATE TABLE ADBPGTargetTable (
  user_id     BIGINT PRIMARY KEY,
  item_id     BIGINT,
  behavior    VARCHAR,
  event_time  TIMESTAMP,
  shard       BIGINT,       -- replaces 'partition' (reserved keyword)
  meta_offset BIGINT        -- replaces 'offset' (reserved keyword)
);

Step 3: Create the AnalyticDB for PostgreSQL sink table in Flink

On the Flink client, define a sink table that maps to the AnalyticDB for PostgreSQL destination table:

CREATE TABLE ADBPGTargetTable (
  `user_id`     BIGINT PRIMARY KEY,
  `item_id`     BIGINT,
  `behavior`    STRING,
  `event_time`  TIMESTAMP(3),
  `shard`       BIGINT,
  `meta_offset` BIGINT
) WITH (
  'connector'           = 'adbpg-nightly-1.13',
  'url'                 = 'jdbc:postgresql://gp-bp15s3b9kn00j****-master.gpdb.rds.aliyuncs.com:5432/postgres',
  'tablename'           = 'ADBPGTargetTable',
  'username'            = 'user01',
  'password'            = 'Password01',
  'targetschema'        = 'public',
  'writemode'           = '1',
  'batchsize'           = '50000',
  'conflictmode'        = 'ignore',
  'exceptionmode'       = 'ignore',
  'maxretrytimes'       = '2',
  'retrywaittime'       = '200',
  'connectionmaxactive' = '5',
  'casesensitive'       = '0',
  'usecopy'             = '0'
);

AnalyticDB for PostgreSQL connector parameters

ParameterRequiredTypeDefaultDescription
connectorYesStringConnector name. Format: adbpg-nightly-{version}. For connector 1.13, use adbpg-nightly-1.13.
urlYesStringJDBC connection URL. Format: jdbc:postgresql://<endpoint:port>/<database>. Example: jdbc:postgresql://gp-bp15s3b9kn00j****-master.gpdb.rds.aliyuncs.com:5432/postgres.
tablenameYesStringName of the destination table in AnalyticDB for PostgreSQL.
usernameYesStringDatabase account name.
passwordYesStringPassword for the database account.
targetschemaNoStringpublicSchema of the destination table.
writemodeNoEnum: 0 (BATCH INSERT), 1 (COPY API), 2 (BATCH UPSERT)1Write method. COPY API (1) provides higher throughput for bulk ingestion.
batchsizeNoInteger50000Maximum number of rows per write batch.
conflictmodeNoEnum: ignore, strict, update, upsertupsertPrimary key conflict handling. ignore keeps the existing row. strict triggers a failover and reports an error. update updates the conflicting row. upsert performs an upsert using INSERT ON CONFLICT or COPY ON CONFLICT. For partitioned tables, the instance must run minor version 6.3.6.1 or later — see Update the minor engine version.
exceptionmodeNoEnum: ignore, strictignoreException handling during writes. ignore skips error rows. strict triggers a failover and reports an error.
maxretrytimesNoInteger3Maximum retries after a statement execution failure.
retrywaittimeNoInteger100Wait time between retries, in milliseconds.
batchwritetimeoutmsNoInteger50000Timeout for a write batch, in milliseconds. After this period, the accumulated batch is flushed.
connectionmaxactiveNoInteger5Maximum active connections in the connection pool per task manager.
casesensitiveNoEnum: 0 (case-insensitive), 1 (case-sensitive)0Case sensitivity for column and table names.
usecopyNoEnum: 0, 1Whether to use COPY ON CONFLICT for upsert operations.
verboseNoEnum: 0 (disabled), 1 (enabled)0Connector log output.

Step 4: Run the synchronization job

On the Flink client, run the following statement to start continuous data ingestion from Kafka into AnalyticDB for PostgreSQL:

INSERT INTO ADBPGTargetTable SELECT * FROM KafkaSourceTable;

The Flink client console shows the job execution status.

Use Flink to synchronize data from Kafka to AnalyticDB for PostgreSQL

What's next