This topic provides the DDL syntax that is used to create an AnalyticDB for PostgreSQL dimension table, describes the parameters in the WITH clause, and provides data type mappings.

What is AnalyticDB for PostgreSQL?

AnalyticDB for PostgreSQL is developed by Alibaba Cloud based on the open source Greenplum. AnalyticDB for PostgreSQL is compatible with ANSI SQL 2003 and PostgreSQL and Oracle database ecosystems. AnalyticDB for PostgreSQL supports row and column store modes. AnalyticDB for PostgreSQL processes petabytes of data offline at a high performance level and supports highly concurrent online queries. This way, AnalyticDB for PostgreSQL can provide a competitive data warehousing solution in various industries. AnalyticDB for PostgreSQL is a Massively Parallel Processing (MPP) data warehousing service that is designed to analyze large volumes of data online.

Prerequisites

Limits

Only Flink that uses Ververica Runtime (VVR) 5.0.0 or later supports AnalyticDB for PostgreSQL connectors.

DDL syntax

CREATE TABLE adbpg_dim (
  id INT,
  len INT,
  content VARCHAR, 
  PRIMARY KEY(id)
) WITH (
  'connector'='adbpg',
  'url'='jdbc:postgresql://<Address>:<PortId>/<DatabaseName>',
  'tableName'='<yourDatabaseTableName>',
  'userName'='<yourDatabaseUserName>',
  'password'='<yourDatabasePassword>'
);

Parameters in the WITH clause

Parameter Description Required Remarks
connector The type of the dimension table. Yes Set the value to adbpg.
url The Java Database Connectivity (JDBC) URL of the database. Yes The URL is in the jdbc:postgresql://<Address>:<PortId>/<DatabaseName> format.
tableName The name of the table. Yes N/A.
username The username that is used to access the AnalyticDB for PostgreSQL database. Yes N/A.
password The password that is used to access the AnalyticDB for PostgreSQL database. Yes N/A.
maxRetryTimes The maximum number of retries that are allowed to write data to the table if a data writing attempt fails. No Default value: 3.
targetSchema The name of the schema. No Default value: public.
caseSensitive Specifies whether the field is case-sensitive. No Valid values:
  • true: The field is case-sensitive.
  • false: The field is not case sensitive. This is the default value.
connectionMaxActive The maximum number of connections in the connection pool. No Default value: 5.
Note
  • The system automatically releases idle connections from the database.
  • If you set this parameter to a large value, the number of server connections may be abnormal.
maxJoinRows The maximum number of rows to join in a row of data. No Default value: 1024.
cache The cache policy. No Valid values:
  • ALL: indicates that all data in the dimension table is cached. This is the default value. Before the system runs a job, the system loads all data in the dimension table to the cache. This way, the cache can be searched for all subsequent queries in the dimension table. If the system does not find the data record in the cache, the join key does not exist. The system reloads all data in the cache after cache entries expire.
  • LRU: indicates that only the specified data in the dimension table is cached. The system searches for data in the cache each time a data record is read from the source table. If the data is not found, the system searches for the data in the physical dimension table.
  • None: indicates that data is not cached. This is the default value.
cacheSize The maximum number of data records that can be cached. No The cacheSize parameter takes effect only when you set the cache parameter to LRU. Default value: 100000.
cacheTTLMs The cache timeout period. No The setting of cacheTTLMs is related to cache.
  • If you set the cache parameter to LRU, the cacheTTLMs parameter specifies the cache timeout period. By default, cache entries do not expire.
  • If you set the cache parameter to ALL, the cacheTTLMs parameter specifies the interval at which the system refreshes the cache. By default, the cache is not refreshed.

Unit: milliseconds.

Data type mappings

Data type of AnalyticDB for PostgreSQL Data type of Flink
BOOLEAN BOOLEAN
SMALLINT INT
INT INT
BIGINT BIGINT
FLOAT DOUBLE
VARCHAR VARCHAR
TIMESTAMP TIMESTAMP
DATE DATE

Sample code

CREATE TEMPORARY TABLE datagen_source(
  a INT,
  b BIGINT,
  c STRING,
  `proctime` AS PROCTIME()
)
COMMENT 'datagen source table'
WITH (
   'connector' = 'datagen'
};

CREATE TABLE adbpg_dim (
  a INT,
  b VARCHAR,
  c VARCHAR
) WITH (
  'connector'='adbpg',
  'url'='jdbc:postgresql://<Address>:<PortId>/<DatabaseName>',
  'tableName'='<yourDatabaseTableName>',
  'userName'='<yourDatabaseUserName>',
  'password'='<yourDatabasePassword>'
);

CREATE TEMPORARY TABLE blackhole_sink(
  a INT,
  b STRING
)
  COMMENT 'blackhole sink table'
  WITH (
   'connector' = 'blackhole'
);

INSERT INTO blackhole_sink SELECT T.a,H.b
FROM datagen_source AS T JOIN adb_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.a = H.a;