All Products
Search
Document Center

AnalyticDB:Use Realtime Compute for Apache Flink to read and write full data in real time

Last Updated:Mar 28, 2026

This topic shows how to connect Realtime Compute for Apache Flink to AnalyticDB for PostgreSQL as both a dimension table source and a result table sink.

Limits

  • Realtime Compute for Apache Flink cannot read data from AnalyticDB for PostgreSQL in Serverless mode.

  • Only Ververica Runtime (VVR) 6.0.0 or later supports the AnalyticDB for PostgreSQL connector.

  • Only VVR 8.0.1 or later supports AnalyticDB for PostgreSQL V7.0.

Note

If you use a custom connector, follow the instructions in Manage custom connectors.

Prerequisites

Before you begin, ensure that you have:

  • A fully managed Flink workspace. For more information, see Activate Realtime Compute for Apache Flink.

  • An AnalyticDB for PostgreSQL instance. For more information, see Create an instance.

  • The AnalyticDB for PostgreSQL instance and the fully managed Flink workspace in the same virtual private cloud (VPC).

Overview

This tutorial walks through four steps:

  1. Configure an AnalyticDB for PostgreSQL instance — create the dimension table and result table.

  2. Create a Flink draft — open the SQL Editor and set up a blank stream draft.

  3. Write and deploy the draft — paste the SQL code, update the connection parameters, and start the deployment.

  4. Query the results — verify that Flink wrote data to the result table.

Step 1: Configure an AnalyticDB for PostgreSQL instance

  1. Log on to the AnalyticDB for PostgreSQL console.

  2. Add the CIDR block of the fully managed Flink workspace to an IP address whitelist of the AnalyticDB for PostgreSQL instance. For more information, see Configure an IP address whitelist.

  3. Click Log On to Database. For more information, see Client connection.

  4. Create a dimension table named adbpg_dim_table and insert 50 rows of test data:

    -- Create a dimension table.
    CREATE TABLE adbpg_dim_table(
      id int,
      username text,
      PRIMARY KEY(id)
    );
    
    -- Insert 50 rows. The id field ranges from 1 to 50,
    -- and the username field is the string "username" followed by the row number.
    INSERT INTO adbpg_dim_table(id, username)
    SELECT i, 'username'||i::text
    FROM generate_series(1, 50) AS t(i);
  5. Create a result table named adbpg_sink_table to store the data that Flink writes:

    CREATE TABLE adbpg_sink_table(
      id int,
      username text,
      score int
    );

Step 2: Create a Flink draft

  1. Log on to the Realtime Compute for Apache Flink console. On the Fully Managed Flink tab, find the workspace and click Console in the Actions column.

  2. In the left-side navigation pane, click SQL Editor. In the upper-left corner, click New.

  3. In the New Draft dialog box, on the SQL Scripts tab, click Blank Stream Draft, then click Next.

  4. Configure the following parameters and click Create.

    ParameterDescriptionExample
    NameThe draft name. Must be unique in the current project.adbpg-test
    LocationThe folder where the draft is saved. Click the icon next to an existing folder to create a subfolder.Draft
    Engine VersionThe Flink engine version. For version details, see Engine version.vvr-6.0.7-flink-1.15

Step 3: Write and deploy the draft

  1. Copy the following SQL into the code editor. The code creates three tables — a Datagen source, an AnalyticDB for PostgreSQL dimension table, and an AnalyticDB for PostgreSQL result table — then joins the source with the dimension table and writes the result to AnalyticDB for PostgreSQL.

    -- Source table: generates 50 rows with sequential IDs (1-50) and random scores (70-100).
    CREATE TEMPORARY TABLE datagen_source (
      id INT,
      score INT
    ) WITH (
      'connector' = 'datagen',
      'fields.id.kind' = 'sequence',
      'fields.id.start' = '1',
      'fields.id.end' = '50',
      'fields.score.kind' = 'random',
      'fields.score.min' = '70',
      'fields.score.max' = '100'
    );
    
    -- Dimension table: reads from adbpg_dim_table using a lookup join.
    CREATE TEMPORARY TABLE dim_adbpg(
      id int,
      username varchar,
      PRIMARY KEY(id) NOT ENFORCED
    ) WITH (
      'connector' = 'adbpg',
      'url' = 'jdbc:postgresql://<internal-endpoint>:<port>/<database>',
      'tablename' = 'adbpg_dim_table',
      'username' = '<db-username>',
      'password' = '<db-password>',
      'maxJoinRows' = '100',
      'maxRetryTimes' = '1',
      'cache' = 'lru',
      'cacheSize' = '1000'
    );
    
    -- Result table: writes joined data to adbpg_sink_table.
    CREATE TEMPORARY TABLE sink_adbpg (
      id int,
      username varchar,
      score int
    ) WITH (
      'connector' = 'adbpg',
      'url' = 'jdbc:postgresql://<internal-endpoint>:<port>/<database>',
      'tablename' = 'adbpg_sink_table',
      'username' = '<db-username>',
      'password' = '<db-password>',
      'maxRetryTimes' = '2',
      'batchsize' = '5000',
      'conflictMode' = 'ignore',
      'writeMode' = 'insert',
      'retryWaitTime' = '200'
    );
    
    -- Join the source with the dimension table and insert results into the result table.
    INSERT INTO sink_adbpg
    SELECT ts.id, ts.username, ds.score
    FROM datagen_source AS ds
    JOIN dim_adbpg FOR SYSTEM_TIME AS OF PROCTIME() AS ts
    ON ds.id = ts.id;
  2. Replace the placeholders in the dim_adbpg and sink_adbpg WITH clauses with your actual values.

    Note

    For the full list of connector parameters and data type mappings, see AnalyticDB for PostgreSQL connector.

    ParameterRequiredDescription
    urlYesThe JDBC URL for the AnalyticDB for PostgreSQL instance. Format: jdbc:postgresql://<internal-endpoint>:<port>/<database>. Example: jdbc:postgresql://gp-xxxxxx.gpdb.cn-chengdu.rds.aliyuncs.com:3432/postgres
    tablenameYesThe table name in the AnalyticDB for PostgreSQL database.
    usernameYesThe database account name.
    passwordYesThe database account password.
  3. In the upper-right corner of the SQL Editor page, click Validate to check for syntax errors.

  4. Click Deploy.

  5. On the Deployments page, find the deployment and click Start in the Actions column.

Step 4: Query the result table

  1. Log on to the AnalyticDB for PostgreSQL console.

  2. Click Log On to Database. For more information, see Client connection.

  3. Run the following query to verify that Flink wrote data to the result table:

    SELECT * FROM adbpg_sink_table;

    The result should contain 50 rows, each with an id (1-50), a username, and a score between 70 and 100.

image.png

What's next

References