This topic describes how to create an AnalyticDB for PostgreSQL result table. This topic also describes the parameters in the WITH clause and data type mappings used when you create an AnalyticDB for PostgreSQL result table.
Notice This topic applies to only Blink 3.6.0 and later.
Principles
Realtime Compute for Apache Flink writes data to an AnalyticDB for PostgreSQL result
table in two steps:
- Converts each row of output data to a line of SQL statement.
- Writes and executes the SQL statement in the destination database.
DDL syntax
In Realtime Compute for Apache Flink, you can use AnalyticDB for PostgreSQL to store
output data. The following sample code shows how to create an AnalyticDB for PostgreSQL
result table.
create table rds_output(
id INT,
len INT,
content VARCHAR,
PRIMARY KEY(id)
) with (
type='adbpg',
url='jdbc:postgresql://<yourNetworkAddress>:<PortId>/<yourDatabaseName>',
tableName='<yourDatabaseTableName>',
userName='<yourDatabaseUserName>',
password='<yourDatabasePassword>'
);
Parameters in the WITH clause
Parameter | Description | Required | Remarks |
---|---|---|---|
type | The type of the source table. | Yes | Set the value to adbpg .
|
url | The Java Database Connectivity (JDBC) URL of the database. | Yes | The JDBC URL used to access the AnalyticDB for PostgreSQL database. The format is
'jdbc:postgresql://<yourNetworkAddress>:<PortId>/<yourDatabaseName>'.
|
tableName | The name of the table. | Yes | N/A |
username | The account that is used to access the AnalyticDB for PostgreSQL database. | Yes | N/A |
password | The password that is used to access the AnalyticDB for PostgreSQL database. | Yes | N/A |
maxRetryTimes | The maximum number of retries for writing data to the table. | No | Default value: 3. |
useCopy | Specifies whether to use the copy API to write data. | No | Valid values:
|
batchSize | The number of data records that can be written at a time. | No | Default value: 5000. |
exceptionMode | The policy that is used to handle exceptions during data writing. | No | Valid values:
|
conflictMode | The policy that is used to handle primary key conflicts or unique index conflicts. | No | Valid values:
|
targetSchema | The name of the schema. | No | Default value: public. |
connectionMaxActive | The maximum number of connections allowed for a single task. | No | Configure this parameter based on the actual number of parallel tasks and the maximum number of connections allowed to the destination database. |
Data type mapping
The following table lists the mappings between the field data types of AnalyticDB
for PostgreSQL and Realtime Compute for Apache Flink.
Data type of AnalyticDB for PostgreSQL | Data type of Realtime Compute for Apache Flink |
---|---|
BOOLEAN | BOOLEAN |
SMALLINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
DOUBLE PRECISION | DOUBLE |
TEXT | VARCHAR |
TIMESTAMP | DATETIME |
DATE | DATE |
REAL | FLOAT |
DOUBLE PRECISION | DECIMAL |
TIME | TIME |
TIMESTAMP | TIMESTAMP |