All Products
Search
Document Center

AnalyticDB:Write data to AnalyticDB for PostgreSQL using Realtime Compute for Apache Flink

Last Updated:Mar 30, 2026

This topic shows you how to configure a Flink job that streams data into an AnalyticDB for PostgreSQL instance using the AnalyticDB for PostgreSQL connector.

Limitations

  • Realtime Compute for Apache Flink cannot read data from AnalyticDB for PostgreSQL in serverless mode.

  • Only Ververica Runtime (VVR) 6.0.0 or later supports the AnalyticDB for PostgreSQL connector.

  • Only VVR 8.0.1 or later supports AnalyticDB for PostgreSQL V7.0.

Note If you use a custom connector, follow the instructions in Manage custom connectors.

Prerequisites

Before you begin, ensure that you have:

  • A fully managed Flink workspace. See Activate fully managed Flink.

  • An AnalyticDB for PostgreSQL instance. See Create an instance.

  • The AnalyticDB for PostgreSQL instance and the fully managed Flink workspace in the same virtual private cloud (VPC).

Configure the AnalyticDB for PostgreSQL instance

  1. Log on to the AnalyticDB for PostgreSQL console.

  2. Add the CIDR block of the fully managed Flink workspace to the IP address whitelist of the instance. See Configure an IP address whitelist.

  3. Click Log On to Database. See Client connection for connection options.

  4. Create the destination table:

    CREATE TABLE test_adbpg_table(
      b1 int,
      b2 int,
      b3 text,
      PRIMARY KEY(b1)
    );

Set up the Flink connector

The AnalyticDB for PostgreSQL connector is distributed as a JAR file hosted on GitHub. Download the file and upload it to your Flink workspace as a custom connector.

  1. Log on to the Realtime Compute for Apache Flink console. On the Fully Managed Flink tab, find your workspace and click Console in the Actions column.

  2. In the left-side navigation pane, click Connectors.

  3. Click Create Custom Connector and upload the JAR file.

    Note Get the JAR file from GitHub releases. The JAR version must match the Flink engine version of your workspace.
  4. Click Next. The system parses the JAR content. If parsing fails, check that the connector code conforms to Apache Flink community standards.

  5. Click Finish. The connector appears in the connector list.

Create a Flink job

  1. Log on to the Realtime Compute for Apache Flink console. On the Fully Managed Flink tab, find your workspace and click Console in the Actions column.

  2. In the left-side navigation pane, click SQL Editor. In the upper-left corner, click New.

  3. In the New Draft dialog box, on the SQL Scripts tab, click Blank Stream Draft and click Next.

  4. Configure the draft and click Create.

    Parameter Description Example
    Name A unique name for the draft within the current project adbpg-test
    Location The folder where the code file is saved Draft
    Engine Version The Flink engine version. See Engine version for supported versions vvr-6.0.7-flink-1.15

Write data to AnalyticDB for PostgreSQL

Define the source and sink tables

Copy the following SQL into the code editor. It defines a datagen_source table that generates random data and a test_adbpg_table sink that maps to your AnalyticDB for PostgreSQL instance.

-- Source: generates random data using the built-in datagen connector
CREATE TABLE datagen_source (
  f_sequence INT,
  f_random INT,
  f_random_str STRING
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '5',
  'fields.f_sequence.kind' = 'sequence',
  'fields.f_sequence.start' = '1',
  'fields.f_sequence.end' = '1000',
  'fields.f_random.min' = '1',
  'fields.f_random.max' = '1000',
  'fields.f_random_str.length' = '10'
);

-- Sink: writes data to AnalyticDB for PostgreSQL via the adbpg connector
CREATE TABLE test_adbpg_table (
  `B1` bigint,
  `B2` bigint,
  `B3` VARCHAR,
  `B4` VARCHAR,
  PRIMARY KEY(B1) NOT ENFORCED
) WITH (
  'connector' = 'adbpg-nightly-1.13',
  'url' = 'jdbc:postgresql://<internal-endpoint>:5432/<database-name>',
  'tablename' = 'test_adbpg_table',
  'username' = '<username>',
  'password' = '<password>',
  'maxretrytimes' = '2',
  'batchsize' = '50000',
  'connectionmaxactive' = '5',
  'conflictmode' = 'ignore',
  'usecopy' = '0',
  'targetschema' = 'public',
  'exceptionmode' = 'ignore',
  'casesensitive' = '0',
  'writemode' = '1',
  'retrywaittime' = '200'
);

Keep the datagen_source parameters as-is. For test_adbpg_table, replace the placeholder values and adjust the optional parameters based on your requirements.

Connector parameters

Required parameters

Parameter Description Example
connector Connector name. Format: adbpg-nightly-{version} adbpg-nightly-1.13
url Java Database Connectivity (JDBC) URL for the instance. Format: jdbc:postgresql://<internal-endpoint>:<port>/<database> jdbc:postgresql://gp-xxxxxx.gpdb.cn-chengdu.rds.aliyuncs.com:5432/postgres
tablename Name of the destination table test_adbpg_table
username Database account name
password Database account password

Optional parameters

Parameter Default Description
conflictmode upsert Policy for handling primary key or unique index conflicts. See Conflict handling modes.
writemode 1 Write method: 0 = BATCH INSERT, 1 = COPY API (fastest), 2 = BATCH UPSERT
batchsize 50000 Maximum rows per write batch
batchwritetimeoutms 50000 Batch write timeout in milliseconds. The batch is flushed after this period even if not full.
maxretrytimes 3 Maximum retries on write failure
retrywaittime 100 Interval between retries in milliseconds
exceptionmode ignore Exception handling policy: ignore = skip data written during an exception; strict = trigger a failover and report an error
targetschema public Target schema in the AnalyticDB for PostgreSQL database
connectionmaxactive 5 Maximum active connections per TaskManager in the connection pool
casesensitive 0 Column and table name case sensitivity: 0 = case-insensitive, 1 = case-sensitive
verbose 0 Connector log output: 0 = disabled, 1 = enabled
Note For the full list of supported parameters and data type mappings, see AnalyticDB for PostgreSQL connector.

Conflict handling modes

The conflictmode parameter controls what happens when a record conflicts with an existing primary key or unique index. AnalyticDB for PostgreSQL uses a combination of INSERT ON CONFLICT and COPY ON CONFLICT statements to perform UPSERT operations.

Value Behavior
upsert (default) Runs INSERT ON CONFLICT + COPY ON CONFLICT to overwrite existing rows. For partitioned tables, the instance minor version must be V6.3.6.1 or later. See Update the minor engine version.
update Updates the conflicting row
ignore Retains the existing row and discards the incoming record
strict Triggers a failover and reports an error

Start the deployment

  1. In the upper-right corner of the SQL Editor, click Deploy and then click OK.

    Note Session clusters are suitable for development and testing only. For production deployments, do not publish to a session cluster. See Debug a deployment.
  2. On the Deployments page, find your deployment and click Start in the Actions column.

  3. In the Start Job dialog box, click Start.

Verify the result

  1. Connect to the AnalyticDB for PostgreSQL database. See Client connection.

  2. Query the table to confirm data is being written:

    SELECT * FROM test_adbpg_table;

    Data is written to the AnalyticDB for PostgreSQL database. The following figure shows the returned result.

    adbpg2.png

What's next

References