This topic provides the DDL syntax that is used to create an AnalyticDB for PostgreSQL result table, describes the parameters in the WITH clause, and provides data type mappings.

What is AnalyticDB for PostgreSQL?

AnalyticDB for PostgreSQL is developed by Alibaba Cloud based on the open source Greenplum. AnalyticDB for PostgreSQL is compatible with ANSI SQL 2003 and PostgreSQL and Oracle database ecosystems. AnalyticDB for PostgreSQL supports row and column store modes. AnalyticDB for PostgreSQL processes petabytes of data offline at a high performance level and supports highly concurrent online queries. This way, AnalyticDB for PostgreSQL provides a competitive data warehousing solution in various industries. AnalyticDB for PostgreSQL is a Massively Parallel Processing (MPP) data warehousing service that is designed to analyze large volumes of data online.

Prerequisites

Limits

Only Flink that uses Ververica Runtime (VVR) 5.0.0 or later supports AnalyticDB for PostgreSQL connectors.

DDL syntax

CREATE TABLE adbpg_sink (
  id INT,
  len INT,
  content VARCHAR, 
  PRIMARY KEY(id)
) WITH (
  'connector'='adbpg',
  'url'='jdbc:postgresql://<Address>:<PortId>/<DatabaseName>',
  'tableName'='<yourDatabaseTableName>',
  'userName'='<yourDatabaseUserName>',
  'password'='<yourDatabasePassword>'
);

Parameters in the WITH clause

Parameter Description Required Remarks
connector The type of the result table. Yes Set the value to adbpg.
url The Java Database Connectivity (JDBC) URL of the database. Yes The URL is in the jdbc:postgresql://<Internal endpoint >:<Port number>/<Name of the connected database> format. For example, you can set this parameter to jdbc:postgresql://gp-xxxxxx.gpdb.cn-chengdu.rds.aliyuncs.com:5432/postgres.
tableName The name of the table. Yes N/A.
userName The username that is used to access the AnalyticDB for PostgreSQL database. Yes N/A.
password The password that is used to access the AnalyticDB for PostgreSQL database. Yes N/A.
maxRetryTimes The maximum number of retries that are allowed to write data to the table if a data writing attempt fails. No Default value: 3.
retryWaitTime The interval between retries. No Default value: 100. Unit: milliseconds.
batchSize The number of data records that can be written to the table at the same time. No Default value: 500.
flushIntervalMs The interval at which the cache is cleared. No If the number of cached data records does not reach the upper limit in a specified period of time, all cached data is written to the result table.

Default value: 5000. Unit: milliseconds.

writeMode The write mode in which the system attempts to write data to the table for the first time. No Valid values:
  • insert: The system directly inserts data into the table. If a conflict occurs, the system handles the conflict based on the setting of the conflictMode parameter. This is the default value.
  • upsert: If a conflict occurs, the system automatically updates data in the table. This value is suitable only for tables that have a primary key.
conflictMode The policy based on which a primary key conflict or index conflict is handled when data is inserted into a table. No Valid values:
  • strict: If a conflict occurs, the system reports an error. This is the default value.
  • ignore: If a conflict occurs, the system ignores the conflict.
  • update: If a conflict occurs, the system automatically updates data. This value is suitable for tables that do not have a primary key. This policy reduces the data processing efficiency.
  • upsert: If a conflict occurs, the system automatically updates data in the table. This value is suitable only for tables that have a primary key.
targetSchema The name of the schema. No Default value: public.
caseSensitive Specifies whether the field is case-sensitive. No Valid values:
  • true: The field is case-sensitive.
  • false: The field is not case sensitive. This is the default value.
connectionMaxActive The maximum number of connections in the connection pool. No Default value: 5.
Note
  • The system automatically releases idle connections from the database.
  • If you set this parameter to a large value, the number of server connections may be abnormal.

Data type mappings

Data type of AnalyticDB for PostgreSQL Data type of Flink
BOOLEAN BOOLEAN
SMALLINT INT
INT INT
BIGINT BIGINT
FLOAT DOUBLE
VARCHAR VARCHAR
TIMESTAMP TIMESTAMP
DATE DATE

Sample code

CREATE TEMPORARY TABLE datagen_source (
  `name` VARCHAR,
  `age` INT
)
COMMENT 'datagen source table'
WITH (
   'connector' = 'datagen'
);

CREATE TABLE adbpg_sink (
  name VARCHAR,
  age INT
) WITH (
  'connector'='adbpg',
  'url'='jdbc:postgresql://<Address>:<PortId>/<DatabaseName>',
  'tableName'='<yourDatabaseTableName>',
  'userName'='<yourDatabaseUserName>',
  'password'='<yourDatabasePassword>'
);

INSERT INTO adbpg_sink
SELECT * FROM datagen_source;