全部產品
Search
文件中心

Realtime Compute for Apache Flink:Postgres CDC(公測中)

更新時間:Dec 04, 2025

Postgres CDC可用於依次讀取PostgreSQL資料庫全量快照資料和變更資料,保證不多讀一條也不少讀一條資料。即使發生故障,也能採用Exactly Once方式處理。本文為您介紹如何使用Postgres CDC連接器。

背景資訊

Postgres CDC連接器支援的資訊如下。

類別

詳情

支援類型

源表

說明

您可以使用JDBC作為結果表和維表連接器。

運行模式

僅支援流模式

資料格式

暫不適用

特有監控指標

  • currentFetchEventTimeLag:資料產生到拉取到Source Operator的間隔。

  • currentEmitEventTimeLag:資料產生到離開Source Operator的間隔。

  • sourceIdleTime:source至今有多久不產生新資料。

說明
  • currentFetchEventTimeLag與currentEmitEventTimeLag指標僅在增量階段有效,全量階段該值恒為0。

  • 指標含義詳情,請參見監控指標說明

API種類

SQL和資料攝入YAML

是否支援更新或刪除結果表資料

不涉及

特色功能

Postgres CDC連接器接入CDC增量快照框架(Realtime Compute引擎VVR 8.0.6及以上版本)。Postgres CDC讀取歷史全量資料後,自動切換到WAL變更日誌讀取,保證不多讀也不少讀資料。即使發生故障,也能保證Exactly Once語義處理資料。Postgres CDC源表提供了並發讀取全量資料,無鎖讀取和斷點續傳的能力。

作為源表,功能與優勢詳情如下:

  • 流批一體,支援讀取全量和增量資料,無需維護兩套流程。

  • 支援並發讀取全量資料,效能水平擴充。

  • 全量讀取無縫切換增量讀取,自動縮容,節省計算資源。

  • 全量階段讀取支援斷點續傳,更穩定。

  • 無鎖讀取全量資料,不影響線上業務。

前提條件

Postgres CDC連接器通過PostgreSQL資料庫的邏輯複製讀取CDC變更流資料,支援阿里雲RDS PostgreSQLAmazon RDS PostgreSQL自建PostgreSQL

重要

阿里雲RDS PostgreSQLAmazon RDS PostgreSQL或者自建PostgreSQL上相應的配置可能有差異,請您在使用之前詳細閱讀配置Postgres文檔進行相關配置

完成配置後確保有下列的條件:

  • wal_level參數的值需設定為logical,即在預寫式日誌WAL(Write-ahead logging)中增加支援邏輯編碼所需的資訊。

  • 訂閱表的REPLICA IDENTITY為FULL(發出的插入和更新操作事件包含表中所有列的舊值),以保障該表資料同步的一致性。

    說明

    REPLICA IDENTITY是PostgreSQL特有的表級設定,它決定了邏輯解碼外掛程式在發生(INSERT)和更新(UPDATE)事件時,是否包含涉及的表列的舊值。REPLICA IDENTITY取值含義詳情請參見REPLICA IDENTITY

  • 需要確保max_wal_senders和max_replication_slots的參數值均大於當前資料庫複寫槽已使用數與Flink作業所需要的slot數量。

  • 確保賬戶系統許可權為SUPERUSER或者同時擁有LOGIN和REPLICATION許可權,並且具有訂閱表的SELECT許可權用於全量資料查詢。

注意事項

  • 僅Realtime Compute引擎8.0.6及以上版本支援Postgres CDC增量快照功能。

Flink PostgreSQL CDC 作業依賴 Replication Slot 來確保 WAL(Write-Ahead Log)不被過早清理,從而保障資料一致性。但若管理不當,可能引發磁碟空間浪費資料讀取延遲等問題。請遵循以下建議:

  • 請及時清理不再使用的 Slot

    • Flink 不會自動刪除 Replication Slot,即使作業已停止(尤其無狀態重啟情境),以防止因 WAL 被清除而導致資料丟失。

    • 若確認某作業不再啟動,請手動刪除其關聯的 Replication Slot,釋放磁碟空間。

      重要

      生命週期管理:將 Replication Slot 視為作業資源的一部分,隨作業啟停同步管理。

  • 避免複用舊 Slot

    • 新作業應使用新的 Slot Name,而非複用舊 Slot。複用可能導致作業啟動後需回溯大量歷史 WAL,延遲讀取最新資料。

    • PostgreSQL的邏輯複製要求一個 Slot 僅能被一個串連使用,不同作業必須使用不同的 Slot 名稱。

      重要

      命名規範:自訂slot.name時,避免使用帶數字尾碼的名稱(如 my_slot_1),以防與臨時 Slot 衝突。

  • 啟用增量快照下的Slot行為

    • 前提條件:必須啟用checkpoint,且Source 表必須聲明主鍵。

    • Slot建立規則:

      • 未開啟增量快照:僅支援單並發,使用 1 個全域 Slot

      • 開啟增量快照

        • 全量階段每個 Source 並發子任務會建立一個臨時 Slot,命名格式為${slot.name}_${task_id}

        • 增量階段:自動回收所有臨時 Slot,僅保留 1 個全域 Slot。

    • 最大Slot數量:Source 並發數 + 1(全量階段)

  • 資源與效能

    • 若 PostgreSQL 的 Slot 數量或磁碟空間受限,應適當降低全量階段的並發度(減少臨時 Slot 數量),但會犧牲全量讀取速度。

    • 若下遊支援等冪寫入,可設定:scan.incremental.snapshot.backfill.skip = true,跳過全量階段的 Binlog 回溯,加快啟動速度。

      此配置僅提供 At-Least-Once 語義。不適用於含彙總、維表 Join 等狀態計算的作業(可能丟失中間狀態所需的歷史變更)。

  • 不開啟增量快照時,Postgres CDC連接器不支援在全表掃描階段執行Checkpoint。

    不開啟增量快照時,如果您的作業在全表掃描階段觸發Checkpoint,則可能由於Checkpoint逾時導致作業Failover。因此,建議您在其他配置中配置如下參數,具體操作請參見如何配置自訂的作業運行參數?。避免在全量同步階段由於Checkpoint逾時導致Failover。

    execution.checkpointing.interval: 10min
    execution.checkpointing.tolerable-failed-checkpoints: 100
    restart-strategy: fixed-delay
    restart-strategy.fixed-delay.attempts: 2147483647

    相關的參數說明詳情如下表所示。

    參數

    說明

    備忘

    execution.checkpointing.interval

    Checkpoint的時間間隔。

    單位是Duration類型,例如10min或30s。

    execution.checkpointing.tolerable-failed-checkpoints

    容忍Checkpoint失敗的次數。

    該參數的取值與Checkpoint調度間隔時間的乘積就是允許的快照讀取時間。

    說明

    如果表特別大,建議將該參數值配置得大一些。

    restart-strategy

    重啟策略。

    參數取值如下:

    • fixed-delay:固定延遲重啟策略。

    • failure-rate:故障率重啟策略。

    • exponential-delay:指數延遲重啟策略。

    詳情請參見Restart Strategies

    restart-strategy.fixed-delay.attempts

    固定延遲重啟策略下,嘗試重啟的最大次數。

    無。

SQL

文法結構

CREATE TABLE postgrescdc_source (
  shipment_id INT,
  order_id INT,
  origin STRING,
  destination STRING,
  is_arrived BOOLEAN
) WITH (
  'connector' = 'postgres-cdc',
  'hostname' = '<yourHostname>',
  'port' = '5432',
  'username' = '<yourUserName>',
  'password' = '<yourPassWord>',
  'database-name' = '<yourDatabaseName>',
  'schema-name' = '<yourSchemaName>',
  'table-name' = '<yourTableName>'
);

WITH參數

參數

說明

資料類型

是否必填

預設值

備忘

connector

connector類型。

STRING

固定值為postgres-cdc

hostname

Postgres資料庫的IP地址或者Hostname。

STRING

無。

username

Postgres資料庫服務的使用者名稱。

STRING

無。

password

Postgres資料庫服務的密碼。

STRING

無。

database-name

資料庫名稱。

STRING

資料庫名稱。

schema-name

Postgres Schema名稱。

STRING

Schema名稱支援Regex以讀取多個Schema的資料。

table-name

Postgres表名。

STRING

表名支援Regex以讀取多個表的資料。

port

Postgres資料庫服務的連接埠號碼。

INTEGER

5432

無。

decoding.plugin.name

Postgres Logical Decoding外掛程式名稱。

STRING

decoderbufs

根據Postgres服務上安裝的外掛程式確定。支援的外掛程式列表如下:

  • decoderbufs(預設值):在Postgres 9.6及以上版本支援,需要安裝該外掛程式。

  • pgoutput(推薦): Postgres 10及以上版本的官方內建外掛程式。

slot.name

邏輯解碼槽的名字。

STRING

8.0.1版本之前為非必填,從8.0.1版本開始為必填

8.0.1版本之前預設值為flink,從8.0.1版本開始無預設值

建議每個表都設定slot.name參數,以避免出現PSQLException: ERROR: replication slot "debezium" is active for PID 974報錯。

debezium.*

Debezium屬性參數。

STRING

更細粒度控制Debezium用戶端的行為。例如'debezium.snapshot.mode' = 'never',詳情請參見配置屬性

scan.incremental.snapshot.enabled

是否開啟增量快照。

BOOLEAN

false

參數取值如下:

  • false(預設值):不開啟增量快照。

  • true:開啟增量快照。

說明
  • 此功能為實驗性功能。僅Realtime Compute引擎8.0.6及以上版本支援該參數。

  • 增量快照的功能優勢,前提條件和使用限制詳情請參見特色功能前提條件注意事項

scan.startup.mode

消費資料時的啟動模式。

STRING

initial

參數取值如下:

  • initial(預設):在第一次啟動時,會先掃描歷史全量資料,然後讀取最新的WAL日誌資料。

  • latest-offset:在第一次啟動時,不會掃描歷史全量資料,直接從WAL日誌的末尾(最新的WAL日誌處)開始讀取,即唯讀取該連接器啟動以後的最新變更。

  • snapshot:先掃描歷史全量資料,再讀取全量階段新產生的WAL日誌,最終作業會停止。

changelog-mode

用於編碼流更改的變更日誌(Changelog)模式。

String

all

支援的Changelog模式包括:

  • ALL:支援所有類型,包括INSERT、DELETE、UPDATE_BEFORE和UPDATE_AFTER。

  • UPSERT:僅支援Upsert類型,包括INSERT、DELETE和UPDATE_AFTER。

heartbeat.interval.ms

發送心跳包的時間間隔。

Duration

30s

單位為毫秒。

Postgres CDC連接器主動向資料庫發送心跳包來保證推進Slot的位移量。當表變更不頻繁時,設定該值可以及時回收WAL日誌。

scan.incremental.snapshot.chunk.key-column

指定某一列作為快照階段切分分區的切分列。

STRING

預設從主鍵中選擇第一列。

scan.incremental.close-idle-reader.enabled

是否在快照結束後關閉閒置Reader。

Boolean

false

該配置生效需要設定execution.checkpointing.checkpoints-after-tasks-finish.enabled為true。

scan.incremental.snapshot.backfill.skip

是否跳過全量階段的日誌讀取。

Boolean

false

參數取值如下:

  • true:跳過。

    增量階段從低水位線開始讀取日誌。

    如果下遊運算元或儲存支援等冪性,建議跳過全量階段日誌的讀取,這樣做的優點是能夠減少WAL Slot數量,缺點是僅能提供至少一次(At-Least Once)的語義保證

  • false:不跳過。

    全量階段讀取分區時,會讀取低水位線和高水位線之間的日誌來保證一致性。

    如果SQL要做彙總、關聯等操作,不建議跳過全量階段日誌的讀取。

類型映射

PostgreSQL和Flink欄位類型對應關係如下。

PostgreSQL欄位類型

Flink欄位類型

SMALLINT

SMALLINT

INT2

SMALLSERIAL

SERIAL2

INTEGER

INT

SERIAL

BIGINT

BIGINT

BIGSERIAL

REAL

FLOAT

FLOAT4

FLOAT8

DOUBLE

DOUBLE PRECISION

NUMERIC(p, s)

DECIMAL(p, s)

DECIMAL(p, s)

BOOLEAN

BOOLEAN

DATE

DATE

TIME [(p)] [WITHOUT TIMEZONE]

TIME [(p)] [WITHOUT TIMEZONE]

TIMESTAMP [(p)] [WITHOUT TIMEZONE]

TIMESTAMP [(p)] [WITHOUT TIMEZONE]

CHAR(n)

STRING

CHARACTER(n)

VARCHAR(n)

CHARACTER VARYING(n)

TEXT

BYTEA

BYTES

使用樣本

CREATE TABLE source (
  id INT NOT NULL,
  name STRING,
  description STRING,
  weight DECIMAL(10,3)
) WITH (
  'connector' = 'postgres-cdc',
  'hostname' = '<host name>',
  'port' = '<port>',
  'username' = '<user name>',
  'password' = '<password>',
  'database-name' = '<database name>',
  'schema-name' = '<schema name>',
  'table-name' = '<table name>'
);

SELECT * FROM source;

資料攝入

自Realtime Compute引擎11.4版本起,PostgreSQL連接器作為資料來源可以在資料攝入YAML作業中使用。

文法結構

source:
  type: postgres
  name: PostgreSQL Source
  hostname: localhost
  port: 5432
  username: pg_username
  password: pg_password
  tables: db.scm.tbl
  slot.name: test_slot
  scan.startup.mode: initial
  server-time-zone: UTC
  connect.timeout: 120s
  decoding.plugin.name: decoderbufs

sink:
  type: ...

配置項

參數

說明

是否必填

資料類型

預設值

備忘

type

資料來源類型。

STRING

固定值為postgres。

name

資料來源名稱。

STRING

無。

hostname

Postgres資料庫伺服器網域名稱或IP地址。

STRING

(none)

無。

port

Postgres資料庫伺服器暴露的連接埠。

INTEGER

5432

無。

username

Postgres使用者名稱。

STRING

(none)

無。

password

Postgres密碼。

STRING

(none)

無。

tables

需要捕獲的Postgres資料庫表名。

支援Regex,可以監控多個滿足該Regex的表。

STRING

(none)

重要

目前僅支援捕獲同一資料庫下的表。

點號 (.) 被視為database、schema和table名的分隔字元。如果需要在Regex中使用點號 (.) 來匹配任何字元,則必須使用反斜線轉義點號。例如: bdb.schema_\.*.order_\.*

slot.name

PostgreSQL複製槽名稱。

STRING

(none)

名稱必須符合 PostgreSQL 複製槽命名規則,可以包含小寫字母、數字和底線。

decoding.plugin.name

伺服器上安裝的Postgres邏輯解碼外掛程式的名稱。

STRING

pgoutput

可選值包括decoderbufspgoutput

tables.exclude

要排除的 Postgres 資料庫表名,此參數將在 tables 參數之後生效。

STRING

(none)

表名也支援Regex,可以排除多個滿足該Regex的表。用法與 tables 參數相同。

server-time-zone

資料庫伺服器的會話時區,如“Asia/Shanghai”。

STRING

(none)

如果未設定,則將使用系統預設時區 (ZoneId.systemDefault()) 來確定伺服器時區。

scan.incremental.snapshot.chunk.size

增量快照框架中每個chunk的大小(包含的行數)。

INTEGER

8096

當開啟增量快照讀取時,表會被切分成多個chunk讀取。在讀完chunk的資料之前,chunk的資料會先緩衝在記憶體中。

每個chunk包含的行數越少,則表中的chunk的總數量越大,儘管這會降低故障恢複的粒度,但可能導致記憶體OOM和整體的輸送量降低。因此,您需要進行權衡,並設定合理的chunk大小。

scan.snapshot.fetch.size

當讀取表的全量資料時,每次最多拉取的記錄數。

INTEGER

1024

無。

scan.startup.mode

消費資料時的啟動模式。

STRING

initial

參數取值如下:

  • initial(預設):在第一次啟動時,會先掃描歷史全量資料,然後讀取最新的Binlog資料。

  • latest-offset:在第一次啟動時,不會掃描歷史全量資料,直接從Binlog的末尾(最新的Binlog處)開始讀取,即唯讀取該連接器啟動以後的最新變更。

  • committed-offset:不掃描歷史全量資料,從指錨點開始消費增量資料。

  • snapshot:只消費歷史全量資料,不消費增量資料。

scan.incremental.close-idle-reader.enabled

是否在快照結束後關閉閒置 Reader。

BOOLEAN

false

該配置生效需要設定execution.checkpointing.checkpoints-after-tasks-finish.enabled為true。

scan.lsn-commit.checkpoints-num-delay

在開始提交 LSN 位移量之前,延遲多少個檢查點。

INTEGER

3

檢查點 LSN 位移量將滾動提交,以避免無法從狀態恢複。

connect.timeout

連接器嘗試串連到 Postgres 資料庫伺服器後,逾時前應等待的最長時間。

DURATION

30s

此值不能小於 250 毫秒。

connect.max-retries

連接器嘗試建立 Postgres 資料庫伺服器串連的最大重試次數。

INTEGER

3

無。

connection.pool.size

串連池大小。

INTEGER

20

無。

jdbc.properties.*

允許使用者傳遞自訂 JDBC URL 屬性。

STRING

20

使用者可以傳遞自訂屬性,例如 'jdbc.properties.useSSL' = 'false'

heartbeat.interval

用於追蹤最新可用 WAL 日誌位移量的心跳事件發送間隔。

DURATION

30s

無。

debezium.*

將 Debezium 的屬性傳遞給 Debezium Embedded Engine,後者用於捕獲來自 PostgreSQL 伺服器的資料更改。

STRING

(none)

有關 Debezium PostgreSQL 連接器屬性的更多資訊,請參閱相關文檔

chunk-meta.group.size

chunk元資訊的大小。

STRING

1000

如果元資訊大於該值,元資訊會分為多份傳遞。

metadata.list

傳遞到下遊的可讀中繼資料列表,可在transform模組中使用。

STRING

false

使用逗號 (,) 分隔。目前可用的中繼資料有:op_ts

scan.incremental.snapshot.unbounded-chunk-first.enabled

快照讀取階段是否先分發無界的分區。

STRING

false

參數取值如下:

  • true:快照讀取階段優先分發無界的分區。

  • false(預設):快照讀取階段不優先分發無界的分區。

重要

實驗性功能。開啟後能夠降低TaskManager在快照階段同步最後一個分區時遇到記憶體溢出 (OOM) 的風險,建議在作業第一次啟動前添加。

相關文檔