This topic describes how to use the AnalyticDB for PostgreSQL connector.
Background information
AnalyticDB for PostgreSQL is a data warehouse for massively parallel processing (MPP). It provides online analysis services for a large amount of data.
The following table describes the capabilities supported by the AnalyticDB for PostgreSQL connector.
Item | Description |
Table type | Source (beta), dimension, and sink table Note Currently, to read from an AnalyticDB for PostgreSQL source, you need to configure a custom connector. For more information, see Use Flink CDC to subscribe to full and incremental data in real time. |
Running mode | Streaming and batch mode. |
Data format | N/A |
Metric |
Note For more information about the metrics, see Metrics. |
API type | SQL |
Data update or deletion in a sink table | Supported |
Prerequisites
An AnalyticDB for PostgreSQL instance and an AnalyticDB for PostgreSQL table are created. For more information, see Create an instance and CREATE TABLE.
An IP address whitelist is configured for the AnalyticDB for PostgreSQL instance. For more information, see Configure an IP address whitelist.
Limitations
Only VVR 8.0.1 or later supports AnalyticDB for PostgreSQL V7.0.
Self-managed PostgreSQL databases are not supported.
Syntax
CREATE TEMPORARY TABLE adbpg_table (
id INT,
len INT,
content VARCHAR,
PRIMARY KEY(id)
) WITH (
'connector'='adbpg',
'url'='jdbc:postgresql://<yourAddress>:<yourPortId>/<yourDatabaseName>',
'tableName'='<yourDatabaseTableName>',
'userName'='<yourDatabaseUserName>',
'password'='<yourDatabasePassword>'
);Connector options
General
Option | Description | Data type | Required | Default value | Remarks |
connector | The connector to use. | STRING | Yes | No default value |
|
url | The Java Database Connectivity (JDBC) URL of the database. | STRING | Yes | No default value | The URL is in the |
tableName | The name of the table in the database. | STRING | Yes | No default value | N/A. |
userName | The username that is used to access the AnalyticDB for PostgreSQL database. | STRING | Yes | No default value | N/A. |
password | The password that is used to access the AnalyticDB for PostgreSQL database. | STRING | Yes | No default value | N/A. |
maxRetryTimes | The maximum number of retries that are allowed to write data to the table if a data writing attempt fails. | INTEGER | No | 3 | N/A. |
targetSchema | The name of the schema. | STRING | No | public | N/A. |
caseSensitive | Specifies whether to enable case sensitivity. | STRING | No | false | Valid values:
|
connectionMaxActive | The maximum number of connections in the connection pool. | INTEGER | No | 5 | The system automatically releases idle connections to the database service. Important If this option is set to an excessively large value, the number of server connections may be abnormal. |
Source-specific (beta)
Option | Description | Data type | Required | Remarks |
schema-name | The schema name. | STRING | Yes | This option supports regular expressions. You can subscribe to multiple schemas at a time. |
port | The port of the AnalyticDB for PostgreSQL instance. | INTEGER | Yes | Set it to |
decoding.plugin.name | The name of the PostgreSQL Logical Decoding plug-in. | STRING | Yes | Set it to |
slot.name | The name of the logical decoding slot. | STRING | Yes |
|
debezium.* | Controls the behavior of the Debezium client. | STRING | Yes | For example, set |
scan.incremental.snapshot.enabled | Specifies whether to enable incremental snapshot. | BOOLEAN | No | Valid values:
|
scan.startup.mode | The startup mode for data consumption. | STRING | No | Valid values:
|
changelog-mode | Specifies how change events are encoded within the change stream. | STRING | No | Valid values:
|
heartbeat.interval.ms | The interval of sending heartbeat packets, in milliseconds. | DURATION | No | Default value: 30 seconds. The AnalyticDB for PostgreSQLCDC connector actively sends heartbeat packets to the database to ensure the slot offset continuously advances. If table data is not frequently changed, set this option to a proper value to regularly clear WAL logs and avoid disk wastage. |
scan.incremental.snapshot.chunk.key-column | Specifies a chunk key column during snapshot reading. | STRING | No | Defaults to the first column of the primary key. |
Sink-specific
Option | Description | Data type | Required | Default value | Remarks |
retryWaitTime | The interval between retries, in milliseconds. | INTEGER | No | 100 | |
batchSize | The number of data records that can be written to the table at a time. | INTEGER | No | 500 | N/A. |
flushIntervalMs | The interval at which the cache is cleared. | INTEGER | No | N/A. | If the number of cached data records does not reach the upper limit within the specified period of time, all cached data is written to the sink table. Unit: milliseconds. |
writeMode | The write mode in which the system attempts to write data to the table for the first time. | STRING | No | insert | Valid values:
|
conflictMode | The policy based on which a primary key conflict or index conflict is handled when data is inserted into a table. | STRING | No | strict | Valid values:
|
Dimension table-specific
Option | Description | Data type | Required | Default value | Remarks |
maxJoinRows | The maximum number of rows to join in a row of data. | INTEGER | No | 1024 | N/A. |
cache | The cache policy. | STRING | No | ALL | Valid values:
|
cacheSize | The maximum number of rows of data that can be cached. | LONG | No | 100000 | The cacheSize option takes effect only when you set the cache option to LRU. |
cacheTTLMs | The cache timeout period. | LONG | No | Long.MAX_VALUE | The configuration of the cacheTTLMs option varies based on the cache option.
Unit: milliseconds. |
Data type mappings
Data type of AnalyticDB for PostgreSQL | Data type of Realtime Compute for Apache Flink |
BOOLEAN | BOOLEAN |
SMALLINT | INT |
INT | INT |
BIGINT | BIGINT |
FLOAT | DOUBLE |
VARCHAR | VARCHAR |
TEXT | VARCHAR |
TIMESTAMP | TIMESTAMP |
DATE | DATE |
Sample code
Source table (beta)
See Use Flink CDC to subscribe to full and incremental data in real time.
Sink table:
CREATE TEMPORARY TABLE datagen_source ( `name` VARCHAR, `age` INT ) COMMENT 'datagen source table' WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE adbpg_sink ( name VARCHAR, age INT ) WITH ( 'connector'='adbpg', 'url'='jdbc:postgresql://<yourAddress>:<yourPortId>/<yourDatabaseName>', 'tableName'='<yourDatabaseTableName>', 'userName'='<yourDatabaseUserName>', 'password'='<yourDatabasePassword>' ); INSERT INTO adbpg_sink SELECT * FROM datagen_source;Dimension table:
CREATE TEMPORARY TABLE datagen_source( a INT, b BIGINT, c STRING, `proctime` AS PROCTIME() ) COMMENT 'datagen source table' WITH ( 'connector' = 'datagen' }; CREATE TEMPORARY TABLE adbpg_dim ( a INT, b VARCHAR, c VARCHAR ) WITH ( 'connector'='adbpg', 'url'='jdbc:postgresql://<yourAddress>:<yourPortId>/<yourDatabaseName>', 'tableName'='<yourDatabaseTableName>', 'userName'='<yourDatabaseUserName>', 'password'='<yourDatabasePassword>' ); CREATE TEMPORARY TABLE blackhole_sink( a INT, b STRING ) COMMENT 'blackhole sink table' WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT T.a,H.b FROM datagen_source AS T JOIN adb_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.a = H.a;