This topic describes how to create an AnalyticDB for PostgreSQL result table. This topic also describes the parameters in the WITH clause and data type mappings used when you create an AnalyticDB for PostgreSQL result table.

Notice This topic applies to only Blink 3.6.0 and later.

Principles

Realtime Compute for Apache Flink writes data to an AnalyticDB for PostgreSQL result table in two steps:
  1. Converts each row of output data to a line of SQL statement.
  2. Writes and executes the SQL statement in the destination database.

DDL syntax

In Realtime Compute for Apache Flink, you can use AnalyticDB for PostgreSQL to store output data. The following sample code shows how to create an AnalyticDB for PostgreSQL result table.
create table rds_output(
  id INT,
  len INT,
  content VARCHAR,
  PRIMARY KEY(id)
) with (
  type='adbpg',
  url='jdbc:postgresql://<yourNetworkAddress>:<PortId>/<yourDatabaseName>',
  tableName='<yourDatabaseTableName>',
  userName='<yourDatabaseUserName>',
  password='<yourDatabasePassword>'
);

Parameters in the WITH clause

Parameter Description Required Remarks
type The type of the source table. Yes Set the value to adbpg.
url The Java Database Connectivity (JDBC) URL of the database. Yes The JDBC URL used to access the AnalyticDB for PostgreSQL database. The format is 'jdbc:postgresql://<yourNetworkAddress>:<PortId>/<yourDatabaseName>'.
  • yourNetworkAddress: the internal IP address.
  • PortId: the port that is used to log on to the database.
  • yourDatabaseName: the name of the database.
For example, you can set yourDatabaseName to url='jdbc:postgresql://gp-xxxxxx.gpdb.cn-chengdu.rds.aliyuncs.com:3432/postgres'.
tableName The name of the table. Yes N/A
username The account that is used to access the AnalyticDB for PostgreSQL database. Yes N/A
password The password that is used to access the AnalyticDB for PostgreSQL database. Yes N/A
maxRetryTimes The maximum number of retries for writing data to the table. No Default value: 3.
useCopy Specifies whether to use the copy API to write data. No Valid values:
  • 0: indicates that the INSERT statement is executed to write data to the AnalyticDB for PostgreSQL database. This is the default value.
  • 1: indicates that the copy API is used to write data to the AnalyticDB for PostgreSQL database.
batchSize The number of data records that can be written at a time. No Default value: 5000.
exceptionMode The policy that is used to handle exceptions during data writing. No Valid values:
  • ignore: The system ignores the data that is written when exceptions occur. This is the default value.
  • strict: If an exception occurs during data writing, an error message appears on the Failover tab.
conflictMode The policy that is used to handle primary key conflicts or unique index conflicts. No Valid values:
  • ignore: Primary key conflicts are ignored and the existing data is retained. This is the default value.
  • strict: If a primary key conflict occurs, an error message appears on the Failover tab.
  • update: If a primary key conflict occurs, data is updated.
targetSchema The name of the schema. No Default value: public.
connectionMaxActive The maximum number of connections allowed for a single task. No Configure this parameter based on the actual number of parallel tasks and the maximum number of connections allowed to the destination database.

Data type mapping

The following table lists the mappings between the field data types of AnalyticDB for PostgreSQL and Realtime Compute for Apache Flink.
Data type of AnalyticDB for PostgreSQL Data type of Realtime Compute for Apache Flink
BOOLEAN BOOLEAN
SMALLINT TINYINT
SMALLINT SMALLINT
INT INT
BIGINT BIGINT
DOUBLE PRECISION DOUBLE
TEXT VARCHAR
TIMESTAMP DATETIME
DATE DATE
REAL FLOAT
DOUBLE PRECISION DECIMAL
TIME TIME
TIMESTAMP TIMESTAMP