This topic describes how to use the AnalyticDB for PostgreSQL connector.
Background information
AnalyticDB for PostgreSQL is a data warehouse for massively parallel processing (MPP). It provides online analysis services for a large amount of data.
The following table describes the capabilities supported by the AnalyticDB for PostgreSQL connector.
Item | Description |
Table type | Dimension table and sink table |
Running mode | Streaming mode and batch mode. |
Data format | N/A |
Metric |
Note For more information about the metrics, see Metrics. |
API type | SQL |
Data update or deletion in a sink table | Supported |
Prerequisites
An AnalyticDB for PostgreSQL instance and an AnalyticDB for PostgreSQL table are created. For more information, see Create an instance and CREATE TABLE.
An IP address whitelist is configured for the AnalyticDB for PostgreSQL instance. For more information, see Configure an IP address whitelist.
Limits
Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 6.0.0 or later supports the AnalyticDB for PostgreSQL connector.
Only Realtime Compute for Apache Flink that uses VVR 8.0.1 or later supports AnalyticDB for PostgreSQL V7.0.
Self-managed PostgreSQL databases are not supported.
Syntax
CREATE TEMPORARY TABLE adbpg_table (
id INT,
len INT,
content VARCHAR,
PRIMARY KEY(id)
) WITH (
'connector'='adbpg',
'url'='jdbc:postgresql://<Address>:<PortId>/<DatabaseName>',
'tableName'='<yourDatabaseTableName>',
'userName'='<yourDatabaseUserName>',
'password'='<yourDatabasePassword>'
);
Connector options in the WITH clause
Category | Option | Description | Data type | Required | Default value | Remarks |
Common options | connector | The type of the table. | STRING | Yes | No default value | Set the value to adbpg. |
url | The Java Database Connectivity (JDBC) URL of the database. | STRING | Yes | No default value | The URL is in the | |
tableName | The name of the table in the database. | STRING | Yes | No default value | N/A. | |
userName | The username that is used to access the AnalyticDB for PostgreSQL database. | STRING | Yes | No default value | N/A. | |
password | The password that is used to access the AnalyticDB for PostgreSQL database. | STRING | Yes | No default value | N/A. | |
maxRetryTimes | The maximum number of retries that are allowed to write data to the table if a data writing attempt fails. | INTEGER | No | 3 | N/A. | |
targetSchema | The name of the schema. | STRING | No | public | N/A. | |
caseSensitive | Specifies whether to enable case sensitivity. | STRING | No | false | Valid values:
| |
connectionMaxActive | The maximum number of connections in the connection pool. | INTEGER | No | 5 | The system automatically releases idle connections to the database service. Important If this option is set to an excessively large value, the number of server connections may be abnormal. | |
Sink-exclusive options | retryWaitTime | The interval between retries. | INTEGER | No | 100 | Unit: milliseconds. |
batchSize | The number of data records that can be written to the table at a time. | INTEGER | No | 500 | N/A. | |
flushIntervalMs | The interval at which the cache is cleared. | INTEGER | No | N/A. | If the number of cached data records does not reach the upper limit within the specified period of time, all cached data is written to the sink table. Unit: milliseconds. | |
writeMode | The write mode in which the system attempts to write data to the table for the first time. | STRING | No | insert | Valid values:
| |
conflictMode | The policy based on which a primary key conflict or index conflict is handled when data is inserted into a table. | STRING | No | strict | Valid values:
| |
Dimension-exclusive options | maxJoinRows | The maximum number of rows to join in a row of data. | INTEGER | No | 1024 | N/A. |
cache | The cache policy. | STRING | No | ALL | Valid values:
| |
cacheSize | The maximum number of rows of data that can be cached. | LONG | No | 100000 | The cacheSize option takes effect only when you set the cache option to LRU. | |
cacheTTLMs | The cache timeout period. | LONG | No | Long.MAX_VALUE | The configuration of the cacheTTLMs option varies based on the cache option.
Unit: milliseconds. |
Data type mappings
Data type of AnalyticDB for PostgreSQL | Data type of Realtime Compute for Apache Flink |
BOOLEAN | BOOLEAN |
SMALLINT | INT |
INT | INT |
BIGINT | BIGINT |
FLOAT | DOUBLE |
VARCHAR | VARCHAR |
TEXT | VARCHAR |
TIMESTAMP | TIMESTAMP |
DATE | DATE |
Sample code
Sink table:
CREATE TEMPORARY TABLE datagen_source ( `name` VARCHAR, `age` INT ) COMMENT 'datagen source table' WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE adbpg_sink ( name VARCHAR, age INT ) WITH ( 'connector'='adbpg', 'url'='jdbc:postgresql://<yourAddress>:<yourPortId>/<yourDatabaseName>', 'tableName'='<yourDatabaseTableName>', 'userName'='<yourDatabaseUserName>', 'password'='<yourDatabasePassword>' ); INSERT INTO adbpg_sink SELECT * FROM datagen_source;
Dimension table:
CREATE TEMPORARY TABLE datagen_source( a INT, b BIGINT, c STRING, `proctime` AS PROCTIME() ) COMMENT 'datagen source table' WITH ( 'connector' = 'datagen' }; CREATE TEMPORARY TABLE adbpg_dim ( a INT, b VARCHAR, c VARCHAR ) WITH ( 'connector'='adbpg', 'url'='jdbc:postgresql://<yourAddress>:<yourPortId>/<yourDatabaseName>', 'tableName'='<yourDatabaseTableName>', 'userName'='<yourDatabaseUserName>', 'password'='<yourDatabasePassword>' ); CREATE TEMPORARY TABLE blackhole_sink( a INT, b STRING ) COMMENT 'blackhole sink table' WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT T.a,H.b FROM datagen_source AS T JOIN adb_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.a = H.a;