全部产品
Search
文档中心

实时计算Flink版:Postgres CDC(公测中)

更新时间:Apr 11, 2024

Postgres CDC可用于依次读取PostgreSQL数据库全量快照数据和变更数据,保证不多读一条也不少读一条数据。即使发生故障,也能采用Exactly Once方式处理。本文为您介绍如何使用Postgres CDC连接器。

背景信息

Postgres CDC连接器支持的信息如下。

类别

详情

支持类型

源表

说明

您可以使用JDBC作为结果表和维表连接器。

运行模式

仅支持流模式

数据格式

暂不适用

特有监控指标

  • currentFetchEventTimeLag:数据产生到拉取到Source Operator的间隔。

  • currentEmitEventTimeLag:数据产生到离开Source Operator的间隔。

  • sourceIdleTime:source至今有多久不产生新数据。

说明
  • currentFetchEventTimeLag与currentEmitEventTimeLag指标仅在增量阶段有效,全量阶段该值恒为0。

  • 指标含义详情,请参见监控指标说明

API种类

SQL

是否支持更新或删除结果表数据

不涉及

特色功能

Postgres CDC连接器接入CDC增量快照框架(实时计算引擎VVR 8.0.6及以上版本)。Postgres CDC读取历史全量数据后,自动切换到WAL变更日志读取,保证不多读也不少读数据。即使发生故障,也能保证Exactly Once语义处理数据。Postgres CDC源表提供了并发读取全量数据,无锁读取和断点续传的能力。

作为源表,功能与优势详情如下:

  • 流批一体,支持读取全量和增量数据,无需维护两套流程。

  • 支持并发读取全量数据,性能水平扩展。

  • 全量读取无缝切换增量读取,自动缩容,节省计算资源。

  • 全量阶段读取支持断点续传,更稳定。

  • 无锁读取全量数据,不影响线上业务。

前提条件

Postgres CDC连接器通过PostgreSQL数据库的逻辑复制读取CDC变更流数据,需要满足以下条件:

  • wal_level参数的值需设置为logical,即在预写式日志WAL(Write-ahead logging)中增加支持逻辑编码所需的信息。

  • 执行ALTER TABLE schema.table REPLICA IDENTITY FULL;命令设置订阅表的REPLICA IDENTITY为FULL(发出的插入和更新操作事件包含表中所有列的旧值),以保障该表数据同步的一致性。

    说明

    REPLICA IDENTITY是PostgreSQL特有的表级设置,它决定了逻辑解码插件在发生(INSERT)和更新(UPDATE)事件时,是否包含涉及的表列的旧值。REPLICA IDENTITY取值含义详情请参见REPLICA IDENTITY

  • 需要确保max_wal_senders和max_replication_slots的参数值均大于当前数据库复制槽已使用数与Flink作业所需要的slot数量。

  • 确保账户系统权限为SUPERUSER或者同时拥有LOGIN和REPLICATION权限,并且具有订阅表的SELECT权限用于全量数据查询。

说明

阿里云RDS PostgreSQL、Amazon RDS PostgreSQL或者自建PostgreSQL上相应的配置可能有差异,详情请参见配置Postgres

注意事项

  • 仅实时计算引擎8.0.6及以上版本支持Postgres CDC增量快照功能。

  • 请及时管理Replication Slot,以免出现磁盘空间浪费的问题。

    为了防止在Flink作业重启过程中由于Checkpoint对应的WAL(Write-Ahead Log)段被清除而引发数据丢失,Flink作业不会自动移除Replication Slot。因此,如果确认特定的Flink作业不会再次启动,应当手动删除相关的Replication Slot,以释放其占用的资源。另外,如果PostgreSQL的Replication Slot的确认位点长时间不向前推进,PostgreSQL不会清理该槽位点之后的WAL条目,这可能会导致未使用的WAL积累而占用过多的磁盘空间。

  • 开启增量快照时,Postgres CDC连接器必须开启Checkpoint,并且Source表必须声明主键。Source多并发读取全量数据时会创建多个临时的Replication Slot。

    不开启增量快照读取的PostgreSQL CDC Source仅支持单一并发,因此只需要一个全局Slot。当开启增量快照时,PostgreSQL CDC Source在全量阶段所需的最大Slot数量为Source数量 * 并发数 + 1。进入增量阶段后,系统自动回收在全量阶段创建的Slot,仅保留一个全局Slot。如果Slot数量有限,需要控制全量阶段的并发数量,这样做的缺点是会降低读取速度。如果下游算子或存储支持幂等性,可以启用scan.incremental.snapshot.backfill.skip = true以跳过全量阶段的日志读取,这样做的缺点是仅能提供至少一次(At-Least Once)的语义保证。如果SQL要做聚合、关联等操作,不建议跳过全量阶段日志的读取。

  • 不开启增量快照时,Postgres CDC连接器不支持在全表扫描阶段执行Checkpoint。

    不开启增量快照时,如果您的作业在全表扫描阶段触发Checkpoint,则可能由于Checkpoint超时导致作业Failover。因此,建议您在其他配置中配置如下参数,具体操作请参见如何配置作业运行参数?。避免在全量同步阶段由于Checkpoint超时导致Failover。

    execution.checkpointing.interval: 10min
    execution.checkpointing.tolerable-failed-checkpoints: 100
    restart-strategy: fixed-delay
    restart-strategy.fixed-delay.attempts: 2147483647

    相关的参数说明详情如下表所示。

    参数

    说明

    备注

    execution.checkpointing.interval

    Checkpoint的时间间隔。

    单位是Duration类型,例如10min或30s。

    execution.checkpointing.tolerable-failed-checkpoints

    容忍Checkpoint失败的次数。

    该参数的取值与Checkpoint调度间隔时间的乘积就是允许的快照读取时间。

    说明

    如果表特别大,建议将该参数值配置得大一些。

    restart-strategy

    重启策略。

    参数取值如下:

    • fixed-delay:固定延迟重启策略。

    • failure-rate:故障率重启策略。

    • exponential-delay:指数延迟重启策略。

    详情请参见Restart Strategies

    restart-strategy.fixed-delay.attempts

    固定延迟重启策略下,尝试重启的最大次数。

    无。

语法结构

CREATE TABLE postgrescdc_source (
  shipment_id INT,
  order_id INT,
  origin STRING,
  destination STRING,
  is_arrived BOOLEAN
) WITH (
  'connector' = 'postgres-cdc',
  'hostname' = '<yourHostname>',
  'port' = '5432',
  'username' = '<yourUserName>',
  'password' = '<yourPassWord>',
  'database-name' = '<yourDatabaseName>',
  'schema-name' = '<yourSchemaName>',
  'table-name' = '<yourTableName>'
);

WITH参数

参数

说明

数据类型

是否必填

默认值

备注

connector

表类型。

STRING

固定值为postgres-cdc

hostname

Postgres数据库的IP地址或者Hostname。

STRING

无。

username

Postgres数据库服务的用户名。

STRING

无。

password

Postgres数据库服务的密码。

STRING

无。

database-name

数据库名称。

STRING

数据库名称支持正则表达式以读取多个数据库的数据。

schema-name

Postgres Schema名称。

STRING

Schema名称支持正则表达式以读取多个Schema的数据。

table-name

Postgres表名。

STRING

表名支持正则表达式去读取多个表的数据。

port

Postgres数据库服务的端口号。

INTEGER

5432

无。

decoding.plugin.name

Postgres Logical Decoding插件名称。

STRING

decoderbufs

根据Postgres服务上安装的插件确定。支持的插件列表如下:

  • decoderbufs(默认值)

  • pgoutput

  • wal2json

  • wal2json_rds

  • wal2json_streaming

  • wal2json_rds_streaming

slot.name

逻辑解码槽的名字。

STRING

8.0.1版本之前为非必填,从8.0.1版本开始为必填

8.0.1版本之前默认值为flink,从8.0.1版本开始无默认值

建议每个表都设置slot.name参数,以避免出现PSQLException: ERROR: replication slot "debezium" is active for PID 974报错。

debezium.*

Debezium属性参数。

STRING

更细粒度控制Debezium客户端的行为。例如'debezium.snapshot.mode' = 'never',详情请参见配置属性

scan.incremental.snapshot.enabled

是否开启增量快照。

BOOLEAN

false

参数取值如下:

  • false(默认值):不开启增量快照。

  • true:开启增量快照。

说明
  • 此功能为实验性功能。仅实时计算引擎8.0.6及以上版本支持该参数。

  • 增量快照的功能优势,前提条件和使用限制详情请参见特色功能前提条件注意事项

scan.startup.mode

消费数据时的启动模式。

STRING

initial

参数取值如下:

  • initial(默认):在第一次启动时,会先扫描历史全量数据,然后读取最新的WAL日志数据。

  • latest-offset:在第一次启动时,不会扫描历史全量数据,直接从WAL日志的末尾(最新的WAL日志处)开始读取,即只读取该连接器启动以后的最新变更。

  • snapshot:先扫描历史全量数据,再读取全量阶段新产生的WAL日志,最终作业会停止。

changelog-mode

用于编码流更改的变更日志(Changelog)模式。

String

all

支持的Changelog模式包括:

  • ALL:支持所有类型,包括INSERT、DELETE、UPDATE_BEFORE和UPDATE_AFTER。

  • UPSERT:仅支持Upsert类型,包括INSERT、DELETE和UPDATE_AFTER。

heartbeat.interval.ms

发送心跳包的时间间隔。

Duration

30s

单位为毫秒。

Postgres CDC连接器主动向数据库发送心跳包来保证推进Slot的偏移量。当表变更不频繁时,设置该值可以及时回收WAL日志。

scan.incremental.snapshot.chunk.key-column

指定某一列作为快照阶段切分分片的切分列。

STRING

默认从主键中选择第一列。

scan.incremental.close-idle-reader.enabled

是否在快照结束后关闭空闲的Reader。

Boolean

false

该配置生效需要设置execution.checkpointing.checkpoints-after-tasks-finish.enabled为true。

scan.incremental.snapshot.backfill.skip

是否跳过全量阶段的日志读取。

Boolean

false

参数取值如下:

  • true:跳过。

    增量阶段从低水位线开始读取日志。

    如果下游算子或存储支持幂等性,建议跳过全量阶段日志的读取,这样做的优点是能够减少WAL Slot数量,缺点是仅能提供至少一次(At-Least Once)的语义保证

  • false:不跳过。

    全量阶段读取分片时,会读取低水位线和高水位线之间的日志来保证一致性。

    如果SQL要做聚合、关联等操作,不建议跳过全量阶段日志的读取。

类型映射

Postgres CDC和Flink字段类型对应关系如下。

Postgres CDC字段类型

Flink字段类型

SMALLINT

SMALLINT

INT2

SMALLSERIAL

SERIAL2

INTEGER

INT

SERIAL

BIGINT

BIGINT

BIGSERIAL

REAL

FLOAT

FLOAT4

FLOAT8

DOUBLE

DOUBLE PRECISION

NUMERIC(p, s)

DECIMAL(p, s)

DECIMAL(p, s)

BOOLEAN

BOOLEAN

DATE

DATE

TIME [(p)] [WITHOUT TIMEZONE]

TIME [(p)] [WITHOUT TIMEZONE]

TIMESTAMP [(p)] [WITHOUT TIMEZONE]

TIMESTAMP [(p)] [WITHOUT TIMEZONE]

CHAR(n)

STRING

CHARACTER(n)

VARCHAR(n)

CHARACTER VARYING(n)

TEXT

BYTEA

BYTES

使用示例

CREATE TABLE source (
  id INT NOT NULL,
  name STRING,
  description STRING,
  weight DECIMAL(10,3)
) WITH (
  'connector' = 'postgres-cdc',
  'hostname' = '<host name>',
  'port' = '<port>',
  'username' = '<user name>',
  'password' = '<password>',
  'database-name' = '<database name>',
  'schema-name' = '<schema name>',
  'table-name' = '<table name>'
);

SELECT * FROM source;

相关文档