本文为您介绍如何使用Postgres CDC连接器。

背景信息

Postgres CDC可用于依次读取PostgreSQL数据库全量快照数据和变更数据,保证不多读一条也不少读一条数据。即使发生故障,也能采用Exactly Once方式处理。

Postgres CDC连接器支持的信息如下。
类别详情
支持类型源表
运行模式仅支持流模式
数据格式暂不适用
特有监控指标
  • currentFetchEventTimeLag:数据产生到拉取到Source Operator的间隔。

    该指标仅在binlog阶段有效,snapshot阶段该值恒为0。

  • currentEmitEventTimeLag:数据产生到离开Source Operator的间隔。

    该指标仅在binlog阶段有效,snapshot阶段该值恒为0。

  • sourceIdleTime:source至今有多久不产生新数据。
说明 指标的含义及如何查看监控指标,请参见查看监控指标
API种类SQL
是否支持更新或删除结果表数据不涉及

前提条件

  • 已创建Postgres数据库和表,创建RDS Postgres数据库和表的详情请参见创建账号和数据库
  • 已在阿里云RDS PostgreSQL、Amazon RDS PostgreSQL或者自建PostgreSQL上进行了相应的配置,详情请参见配置Postgres

使用限制

  • 仅Flink计算引擎VVR 2.0及以上版本支持Postgres的CDC连接器。
  • Postgres CDC暂不支持在全表扫描阶段执行Checkpoint。
    如果您的作业在全表扫描阶段触发Checkpoint,则可能由于Checkpoint超时导致作业Failover。因此,建议您在作业开发页面高级配置更多Flink配置中配置如下参数,避免在全量同步阶段由于Checkpoint超时导致Failover。
    execution.checkpointing.interval: 10min
    execution.checkpointing.tolerable-failed-checkpoints: 100
    restart-strategy: fixed-delay
    restart-strategy.fixed-delay.attempts: 2147483647
    相关的参数说明详情如下表所示。
    参数说明备注
    execution.checkpointing.intervalCheckpoint的时间间隔。单位是Duration类型,例如10min或30s。
    execution.checkpointing.tolerable-failed-checkpoints容忍Checkpoint失败的次数。该参数的取值与Checkpoint调度间隔时间的乘积就是允许的快照读取时间。
    说明 如果表特别大,建议将该参数值配置得大一些。
    restart-strategy重启策略。参数取值如下:
    • fixed-delay:固定延迟重启策略。
    • failure-rate:故障率重启策略。
    • exponential-delay:指数延迟重启策略。

    详情请参见Restart Strategies

    restart-strategy.fixed-delay.attempts固定延迟重启策略下,尝试重启的最大次数。无。

语法结构

CREATE TABLE postgrescdc_source (
  shipment_id INT,
  order_id INT,
  origin STRING,
  destination STRING,
  is_arrived BOOLEAN
) WITH (
  'connector' = 'postgres-cdc',
  'hostname' = '<yourHostname>',
  'port' = '5432',
  'username' = '<yourUserName>',
  'password' = '<yourPassWord>',
  'database-name' = '<yourDatabaseName>',
  'schema-name' = '<yourSchemaName>',
  'table-name' = '<yourTableName>'
);

WITH参数

参数说明数据类型是否必填默认值备注
connector表类型。STRING固定值为postgres-cdc
hostnamePostgres数据库的IP地址或者Hostname。STRING无。
usernamePostgres数据库服务的用户名。STRINGSTRING无。
passwordPostgres数据库服务的密码。STRING无。
database-name数据库名称。STRING数据库名称支持正则表达式以读取多个数据库的数据。
schema-namePostgres Schema名称。STRINGSchema名称支持正则表达式以读取多个Schema的数据。
table-namePostgres表名。STRING表名支持正则表达式去读取多个表的数据。
portPostgres数据库服务的端口号。INTEGER5432无。
decoding.plugin.namePostgres Logical Decoding插件名称。STRINGdecoderbufs根据Postgres服务上安装的插件确定。支持的插件列表如下:
  • decoderbufs(默认值)
  • wal2json
  • wal2json_rds
  • wal2json_streaming
  • wal2json_rds_streaming
  • pgoutput
说明 如果您使用的是阿里云RDS PostgreSQL,需要开启逻辑解码(wal2json)功能,详情请参见逻辑解码(wal2json)
debezium.*Debezium属性参数。STRING更细粒度控制Debezium客户端的行为。例如'debezium.snapshot.mode' = 'never',详情请参见配置属性
slot.name逻辑解码槽的名字。STRINGflink建议每个表都设置debezium.slot.name参数,以避免出现PSQLException: ERROR: replication slot "debezium" is active for PID 974报错。

类型映射

Postgres CDC和Flink字段类型对应关系如下。
Postgres CDC字段类型Flink字段类型
SMALLINTSMALLINT
INT2
SMALLSERIAL
SERIAL2
INTEGERINT
SERIAL
BIGINTBIGINT
BIGSERIAL
REALFLOAT
FLOAT4
FLOAT8DOUBLE
DOUBLE PRECISION
NUMERIC(p, s)DECIMAL(p, s)
DECIMAL(p, s)
BOOLEANBOOLEAN
DATEDATE
TIME [(p)] [WITHOUT TIMEZONE]TIME [(p)] [WITHOUT TIMEZONE]
TIMESTAMP [(p)] [WITHOUT TIMEZONE]TIMESTAMP [(p)] [WITHOUT TIMEZONE]
CHAR(n)STRING
CHARACTER(n)
VARCHAR(n)
CHARACTER VARYING(n)
TEXT
BYTEABYTES

使用示例

CREATE TABLE source (
  id INT NOT NULL,
  name STRING,
  description STRING,
  weight DECIMAL(10,3)
) WITH (
  'connector' = 'postgres-cdc',
  'hostname' = '<host name>',
  'port' = '<port>',
  'username' = '<user name>',
  'password' = '<password>',
  'database-name' = '<database name>',
  'schema-name' = '<schema name>',
  'table-name' = '<table name>'
);

SELECT * FROM source;