All Products
Search
Document Center

Realtime Compute for Apache Flink:AnalyticDB for PostgreSQL connector

Last Updated:Apr 15, 2024

This topic describes how to use the AnalyticDB for PostgreSQL connector.

Background information

AnalyticDB for PostgreSQL is a data warehouse for massively parallel processing (MPP). It provides online analysis services for a large amount of data.

The following table describes the capabilities supported by the AnalyticDB for PostgreSQL connector.

Item

Description

Table type

Dimension table and result table

Running mode

Streaming mode and batch mode.

Data format

N/A

Metric

  • Metrics for result tables

    • numRecordsOut

    • numRecordsOutPerSecond

    • numBytesOut

    • numBytesOutPerSecond

    • currentSendTime

  • Metrics for dimension tables: none

Note

For more information about the metrics, see Metrics.

API type

SQL

Data update or deletion in a result table

Supported

Prerequisites

Limits

  • Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 6.0.0 or later supports the AnalyticDB for PostgreSQL connector.

  • Only Realtime Compute for Apache Flink that uses VVR 8.0.1 or later supports AnalyticDB for PostgreSQL V7.0.

  • Self-managed PostgreSQL databases are not supported.

Syntax

CREATE TABLE adbpg_table (
 id INT,
 len INT,
 content VARCHAR, 
 PRIMARY KEY(id)
) WITH (
 'connector'='adbpg',
 'url'='jdbc:postgresql://<Address>:<PortId>/<DatabaseName>',
 'tableName'='<yourDatabaseTableName>',
 'userName'='<yourDatabaseUserName>',
 'password'='<yourDatabasePassword>'
);

Parameters in the WITH clause

Category

Parameter

Description

Data type

Required

Default value

Remarks

Common parameters

connector

The type of the table.

STRING

Yes

No default value

Set the value to adbpg.

url

The Java Database Connectivity (JDBC) URL of the database.

STRING

Yes

No default value

The URL is in the jdbc:postgresql://<Address>:<PortId>/<DatabaseName> format.

tableName

The name of the table in the database.

STRING

Yes

No default value

N/A.

userName

The username that is used to access the AnalyticDB for PostgreSQL database.

STRING

Yes

No default value

N/A.

password

The password that is used to access the AnalyticDB for PostgreSQL database.

STRING

Yes

No default value

N/A.

maxRetryTimes

The maximum number of retries that are allowed to write data to the table if a data writing attempt fails.

INTEGER

No

3

N/A.

targetSchema

The name of the schema.

STRING

No

public

N/A.

caseSensitive

Specifies whether to enable case sensitivity.

STRING

No

false

Valid values:

  • true: Case sensitivity is enabled.

  • false: Case sensitivity is disabled. This is the default value.

connectionMaxActive

The maximum number of connections in the connection pool.

INTEGER

No

5

The system automatically releases idle connections to the database service.

Important

If this parameter is set to an excessively large value, the number of server connections may be abnormal.

Parameters only for result tables

retryWaitTime

The interval between retries.

INTEGER

No

100

Unit: milliseconds.

batchSize

The number of data records that can be written to the table at a time.

INTEGER

No

500

N/A.

flushIntervalMs

The interval at which the cache is cleared.

INTEGER

No

N/A.

If the number of cached data records does not reach the upper limit within the specified period of time, all cached data is written to the result table. Unit: milliseconds.

writeMode

The write mode in which the system attempts to write data to the table for the first time.

STRING

No

insert

Valid values:

  • insert: Data is directly inserted into the result table. If a conflict occurs, the handling policy is determined by the conflictMode parameter. This is the default value.

  • upsert: Data in the table is automatically updated when a conflict occurs. This value is suitable only for tables that have a primary key.

conflictMode

The policy based on which a primary key conflict or index conflict is handled when data is inserted into a table.

STRING

No

strict

Valid values:

  • strict: If a conflict occurs, the system reports an error. This is the default value.

  • ignore: If a conflict occurs, the system ignores the conflict.

  • update: If a conflict occurs, the system automatically updates data. This value is suitable for tables that do not have a primary key. This policy reduces the data processing efficiency.

  • upsert: If a conflict occurs, the system automatically updates data in the table. This value is suitable only for tables that have a primary key.

Parameters only for dimension tables

maxJoinRows

The maximum number of rows to join in a row of data.

INTEGER

No

1024

N/A.

cache

The cache policy.

STRING

No

ALL

Valid values:

  • ALL: All data in the dimension table is cached. This is the default value. Before a deployment runs, the system loads all data in the dimension table to the cache. This way, the cache is searched for all subsequent queries in the dimension table. If the system does not find the data record in the cache, the join key does not exist. The system reloads all data in the cache after cache entries expire.

  • LRU: Partial data in the dimension table is cached. The system searches for data in the cache each time a data record is read from the source table. If the data is not found, the system searches for the data in the physical dimension table.

  • None: No data is cached.

cacheSize

The maximum number of rows of data that can be cached.

LONG

No

100000

The cacheSize parameter takes effect only when you set the cache parameter to LRU.

cacheTTLMs

The cache timeout period.

LONG

No

Long.MAX_VALUE

The configuration of the cacheTTLMs parameter varies based on the cache parameter.

  • If the cache parameter is set to LRU, the cacheTTLMs parameter specifies the cache timeout period. By default, cache entries do not expire.

  • If you set the cache parameter to ALL, the cacheTTLMs parameter specifies the interval at which the system refreshes the cache. By default, the cache is not refreshed.

Unit: milliseconds.

Data type mappings

Data type of AnalyticDB for PostgreSQL

Data type of Realtime Compute for Apache Flink

BOOLEAN

BOOLEAN

SMALLINT

INT

INT

INT

BIGINT

BIGINT

FLOAT

DOUBLE

VARCHAR

VARCHAR

TEXT

VARCHAR

TIMESTAMP

TIMESTAMP

DATE

DATE

Sample code

  • Sample code for a result table

    CREATE TEMPORARY TABLE datagen_source (
     `name` VARCHAR,
     `age` INT
    )
    COMMENT 'datagen source table'
    WITH (
     'connector' = 'datagen'
    );
    
    CREATE TABLE adbpg_sink (
     name VARCHAR,
     age INT
    ) WITH (
     'connector'='adbpg',
     'url'='jdbc:postgresql://<Address>:<PortId>/<DatabaseName>',
     'tableName'='<yourDatabaseTableName>',
     'userName'='<yourDatabaseUserName>',
     'password'='<yourDatabasePassword>'
    );
    
    INSERT INTO adbpg_sink
    SELECT * FROM datagen_source;
  • Sample code for a dimension table

    CREATE TEMPORARY TABLE datagen_source(
     a INT,
     b BIGINT,
     c STRING,
     `proctime` AS PROCTIME()
    ) 
    COMMENT 'datagen source table'
    WITH (
     'connector' = 'datagen'
    };
    
    CREATE TABLE adbpg_dim (
     a INT, 
     b VARCHAR, 
     c VARCHAR
    ) WITH (
     'connector'='adbpg',
     'url'='jdbc:postgresql://<Address>:<PortId>/<DatabaseName>',
     'tableName'='<yourDatabaseTableName>',
     'userName'='<yourDatabaseUserName>',
     'password'='<yourDatabasePassword>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
     a INT,
     b STRING
    )
    COMMENT 'blackhole sink table'
    WITH (
     'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink SELECT T.a,H.b
    FROM datagen_source AS T JOIN adb_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.a = H.a;

References