本文为您介绍如何使用AnalyticDB PostgreSQL连接器。

背景信息

云原生数据仓库 AnalyticDB PostgreSQL版是一种大规模并行处理(MPP)数据仓库服务,可提供海量数据在线分析服务。

AnalyticDB PostgreSQL版支持的信息如下。
类别详情
支持类型维表和结果表
运行模式流模式和批模式
数据格式暂不适用
特有监控指标
  • 结果表:
    • numRecordsOut
    • numRecordsOutPerSecond
    • numBytesOut
    • numBytesOutPerSecond
    • currentSendTime
  • 维表:无
说明 指标的含义及如何查看监控指标,请参见查看监控指标
API种类SQL
是否支持更新或删除结果表数据

前提条件

使用限制

  • 仅Flink实时计算引擎VVR 6.0.0及以上版本支持云原生数据仓库AnalyticDB PostgreSQL版连接器。
  • 暂不支持自建的Postgres SQL。

语法结构

CREATE TABLE adbpg_table (
 id INT,
 len INT,
 content VARCHAR,
 PRIMARY KEY(id)
) WITH (
 'connector'='adbpg',
 'url'='jdbc:postgresql://<Address>:<PortId>/<DatabaseName>',
 'tableName'='<yourDatabaseTableName>',
 'userName'='<yourDatabaseUserName>',
 'password'='<yourDatabasePassword>'
);

WITH参数

类型参数说明数据类型是否必填默认值备注
通用connector表类型。String固定值为adbpg。
urlJDBC连接地址。String格式为jdbc:postgresql://<Address>:<PortId>/<DatabaseName>
tableName表名。String无。
userName用户名。String无。
password密码。String无。
maxRetryTimes写入数据失败后,重试写入的最大次数。Integer3无。
targetSchemaSchema名称。Stringpublic无。
caseSensitive大小写是否敏感。Stringfalse参数取值如下:
  • true:大小写敏感。
  • false:大小写不敏感。
connectionMaxActive连接池的最大连接数。Integer5系统会自动释放与数据库服务的空闲连接。
重要 此参数设置过大可能会导致服务端连接数出现异常。
结果表独有retryWaitTime重试的时间间隔。Integer100单位毫秒。
batchSize一次批量写入的数据条数。Integer500无。
flushIntervalMs清空缓存的时间间隔。Integer如果缓存中的数据在等待指定时间后,依然没有达到输出条件,系统会自动输出缓存中的所有数据。单位毫秒。
writeMode第一次尝试写入时的写入方式。Stringinsert参数取值如下:
  • insert:直接插入,冲突时参考conflictMode
  • upsert:冲突时自动update,只能用于有主键的表。
conflictMode当Insert写入出现主键冲突或者唯一索引冲突时的处理策略。Stringstrict参数取值如下:
  • strict:冲突时报错。
  • ignore:冲突时忽略。
  • update:冲突时自动更新,可用于无主键表,执行效率较低。
  • upsert:冲突时自动更新,只能用于有主键表。
维表独有maxJoinRows单行数据Join的最多行数。Integer1024无。
cache缓存策略。StringALL支持以下三种缓存策略:
  • ALL:缓存维表里的所有数据。在Job运行前,系统会将维表中所有数据加载到Cache中,之后所有的维表查找数据都会通过Cache进行。如果在Cache中无法找到数据,则KEY不存在,并在Cache过期后重新加载一遍全量Cache。
  • LRU:缓存维表里的部分数据。源表的每条数据都会触发系统先在Cache中查找数据。如果没有找到,则去物理维表中查找。
  • None:无缓存。
cacheSize缓存大小,即缓存多少行数据。Long100000仅当选择LRU缓存策略时,cacheSize参数生效。
cacheTTLMs缓存失效的超时时间。LongLong.MAX_VALUEcacheTTLMs配置和cache配置有关:
  • 如果cache配置为LRU,则cacheTTLMs为缓存失效的超时时间。默认不过期。
  • 如果cache配置为ALL,则cacheTTLMs为设置缓存重新加载的间隔时间,默认不重新加载。
单位为毫秒。

类型映射

云原生数据仓库AnalyticDB Postgre版字段类型Flink字段类型
booleanboolean
smallintint
intint
bigintbigint
floatdouble
varcharvarchar
timestamptimestamp
datedate

使用示例

  • 结果表
    CREATE TEMPORARY TABLE datagen_source (
     `name` VARCHAR,
     `age` INT
    )
    COMMENT 'datagen source table'
    WITH (
     'connector' = 'datagen'
    );
    
    CREATE TABLE adbpg_sink (
     name VARCHAR,
     age INT
    ) WITH (
     'connector'='adbpg',
     'url'='jdbc:postgresql://<Address>:<PortId>/<DatabaseName>',
     'tableName'='<yourDatabaseTableName>',
     'userName'='<yourDatabaseUserName>',
     'password'='<yourDatabasePassword>'
    );
    
    INSERT INTO adbpg_sink
    SELECT * FROM datagen_source;
  • 维表
    CREATE TEMPORARY TABLE datagen_source(
     a INT,
     b BIGINT,
     c STRING,
     `proctime` AS PROCTIME()
    ) 
    COMMENT 'datagen source table'
    WITH (
     'connector' = 'datagen'
    };
    
    CREATE TABLE adbpg_dim (
     a INT, 
     b VARCHAR, 
     c VARCHAR
    ) WITH (
     'connector'='adbpg',
     'url'='jdbc:postgresql://<Address>:<PortId>/<DatabaseName>',
     'tableName'='<yourDatabaseTableName>',
     'userName'='<yourDatabaseUserName>',
     'password'='<yourDatabasePassword>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
     a INT,
     b STRING
    )
    COMMENT 'blackhole sink table'
    WITH (
     'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink SELECT T.a,H.b
    FROM datagen_source AS T JOIN adb_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.a = H.a;