Postgres CDC可用於依次讀取PostgreSQL資料庫全量快照資料和變更資料,保證不多讀一條也不少讀一條資料。即使發生故障,也能採用Exactly Once方式處理。本文為您介紹如何使用Postgres CDC連接器。
背景資訊
Postgres CDC連接器支援的資訊如下。
類別 | 詳情 |
支援類型 | 源表 說明 您可以使用JDBC作為結果表和維表連接器。 |
運行模式 | 僅支援流模式 |
資料格式 | 暫不適用 |
特有監控指標 |
說明
|
API種類 | SQL和資料攝入YAML |
是否支援更新或刪除結果表資料 | 不涉及 |
特色功能
Postgres CDC連接器接入CDC增量快照框架(Realtime Compute引擎VVR 8.0.6及以上版本)。Postgres CDC讀取歷史全量資料後,自動切換到WAL變更日誌讀取,保證不多讀也不少讀資料。即使發生故障,也能保證Exactly Once語義處理資料。Postgres CDC源表提供了並發讀取全量資料,無鎖讀取和斷點續傳的能力。
作為源表,功能與優勢詳情如下:
流批一體,支援讀取全量和增量資料,無需維護兩套流程。
支援並發讀取全量資料,效能水平擴充。
全量讀取無縫切換增量讀取,自動縮容,節省計算資源。
全量階段讀取支援斷點續傳,更穩定。
無鎖讀取全量資料,不影響線上業務。
前提條件
Postgres CDC連接器通過PostgreSQL資料庫的邏輯複製讀取CDC變更流資料,支援阿里雲RDS PostgreSQL、Amazon RDS PostgreSQL和自建PostgreSQL。
阿里雲RDS PostgreSQL、Amazon RDS PostgreSQL或者自建PostgreSQL上相應的配置可能有差異,請您在使用之前詳細閱讀配置Postgres文檔進行相關配置。
完成配置後確保有下列的條件:
wal_level參數的值需設定為logical,即在預寫式日誌WAL(Write-ahead logging)中增加支援邏輯編碼所需的資訊。
訂閱表的REPLICA IDENTITY為FULL(發出的插入和更新操作事件包含表中所有列的舊值),以保障該表資料同步的一致性。
說明REPLICA IDENTITY是PostgreSQL特有的表級設定,它決定了邏輯解碼外掛程式在發生(INSERT)和更新(UPDATE)事件時,是否包含涉及的表列的舊值。REPLICA IDENTITY取值含義詳情請參見REPLICA IDENTITY。
需要確保max_wal_senders和max_replication_slots的參數值均大於當前資料庫複寫槽已使用數與Flink作業所需要的slot數量。
確保賬戶系統許可權為SUPERUSER或者同時擁有LOGIN和REPLICATION許可權,並且具有訂閱表的SELECT許可權用於全量資料查詢。
注意事項
僅Realtime Compute引擎8.0.6及以上版本支援Postgres CDC增量快照功能。
Flink PostgreSQL CDC 作業依賴 Replication Slot 來確保 WAL(Write-Ahead Log)不被過早清理,從而保障資料一致性。但若管理不當,可能引發磁碟空間浪費或資料讀取延遲等問題。請遵循以下建議:
請及時清理不再使用的 Slot
Flink 不會自動刪除 Replication Slot,即使作業已停止(尤其無狀態重啟情境),以防止因 WAL 被清除而導致資料丟失。
若確認某作業不再啟動,請手動刪除其關聯的 Replication Slot,釋放磁碟空間。
重要生命週期管理:將 Replication Slot 視為作業資源的一部分,隨作業啟停同步管理。
避免複用舊 Slot
新作業應使用新的 Slot Name,而非複用舊 Slot。複用可能導致作業啟動後需回溯大量歷史 WAL,延遲讀取最新資料。
PostgreSQL的邏輯複製要求一個 Slot 僅能被一個串連使用,不同作業必須使用不同的 Slot 名稱。
重要命名規範:自訂slot.name時,避免使用帶數字尾碼的名稱(如 my_slot_1),以防與臨時 Slot 衝突。
啟用增量快照下的Slot行為
前提條件:必須啟用checkpoint,且Source 表必須聲明主鍵。
Slot建立規則:
未開啟增量快照:僅支援單並發,使用 1 個全域 Slot。
開啟增量快照:
全量階段:每個 Source 並發子任務會建立一個臨時 Slot,命名格式為
${slot.name}_${task_id}。增量階段:自動回收所有臨時 Slot,僅保留 1 個全域 Slot。
最大Slot數量:Source 並發數 + 1(全量階段)
資源與效能
若 PostgreSQL 的 Slot 數量或磁碟空間受限,應適當降低全量階段的並發度(減少臨時 Slot 數量),但會犧牲全量讀取速度。
若下遊支援等冪寫入,可設定:
scan.incremental.snapshot.backfill.skip = true,跳過全量階段的 Binlog 回溯,加快啟動速度。此配置僅提供 At-Least-Once 語義。不適用於含彙總、維表 Join 等狀態計算的作業(可能丟失中間狀態所需的歷史變更)。
不開啟增量快照時,Postgres CDC連接器不支援在全表掃描階段執行Checkpoint。
不開啟增量快照時,如果您的作業在全表掃描階段觸發Checkpoint,則可能由於Checkpoint逾時導致作業Failover。因此,建議您在其他配置中配置如下參數,具體操作請參見如何配置自訂的作業運行參數?。避免在全量同步階段由於Checkpoint逾時導致Failover。
execution.checkpointing.interval: 10min execution.checkpointing.tolerable-failed-checkpoints: 100 restart-strategy: fixed-delay restart-strategy.fixed-delay.attempts: 2147483647相關的參數說明詳情如下表所示。
參數
說明
備忘
execution.checkpointing.interval
Checkpoint的時間間隔。
單位是Duration類型,例如10min或30s。
execution.checkpointing.tolerable-failed-checkpoints
容忍Checkpoint失敗的次數。
該參數的取值與Checkpoint調度間隔時間的乘積就是允許的快照讀取時間。
說明如果表特別大,建議將該參數值配置得大一些。
restart-strategy
重啟策略。
參數取值如下:
fixed-delay:固定延遲重啟策略。
failure-rate:故障率重啟策略。
exponential-delay:指數延遲重啟策略。
詳情請參見Restart Strategies。
restart-strategy.fixed-delay.attempts
固定延遲重啟策略下,嘗試重啟的最大次數。
無。
SQL
文法結構
CREATE TABLE postgrescdc_source (
shipment_id INT,
order_id INT,
origin STRING,
destination STRING,
is_arrived BOOLEAN
) WITH (
'connector' = 'postgres-cdc',
'hostname' = '<yourHostname>',
'port' = '5432',
'username' = '<yourUserName>',
'password' = '<yourPassWord>',
'database-name' = '<yourDatabaseName>',
'schema-name' = '<yourSchemaName>',
'table-name' = '<yourTableName>'
);WITH參數
參數 | 說明 | 資料類型 | 是否必填 | 預設值 | 備忘 |
connector | connector類型。 | STRING | 是 | 無 | 固定值為 |
hostname | Postgres資料庫的IP地址或者Hostname。 | STRING | 是 | 無 | 無。 |
username | Postgres資料庫服務的使用者名稱。 | STRING | 是 | 無 | 無。 |
password | Postgres資料庫服務的密碼。 | STRING | 是 | 無 | 無。 |
database-name | 資料庫名稱。 | STRING | 是 | 無 | 資料庫名稱。 |
schema-name | Postgres Schema名稱。 | STRING | 是 | 無 | Schema名稱支援Regex以讀取多個Schema的資料。 |
table-name | Postgres表名。 | STRING | 是 | 無 | 表名支援Regex以讀取多個表的資料。 |
port | Postgres資料庫服務的連接埠號碼。 | INTEGER | 否 | 5432 | 無。 |
decoding.plugin.name | Postgres Logical Decoding外掛程式名稱。 | STRING | 否 | decoderbufs | 根據Postgres服務上安裝的外掛程式確定。支援的外掛程式列表如下:
|
slot.name | 邏輯解碼槽的名字。 | STRING | 8.0.1版本之前為非必填,從8.0.1版本開始為必填 | 8.0.1版本之前預設值為flink,從8.0.1版本開始無預設值 | 建議每個表都設定 |
debezium.* | Debezium屬性參數。 | STRING | 否 | 無 | 更細粒度控制Debezium用戶端的行為。例如 |
scan.incremental.snapshot.enabled | 是否開啟增量快照。 | BOOLEAN | 否 | false | 參數取值如下:
|
scan.startup.mode | 消費資料時的啟動模式。 | STRING | 否 | initial | 參數取值如下:
|
changelog-mode | 用於編碼流更改的變更日誌(Changelog)模式。 | String | 否 | all | 支援的Changelog模式包括:
|
heartbeat.interval.ms | 發送心跳包的時間間隔。 | Duration | 否 | 30s | 單位為毫秒。 Postgres CDC連接器主動向資料庫發送心跳包來保證推進Slot的位移量。當表變更不頻繁時,設定該值可以及時回收WAL日誌。 |
scan.incremental.snapshot.chunk.key-column | 指定某一列作為快照階段切分分區的切分列。 | STRING | 否 | 無 | 預設從主鍵中選擇第一列。 |
scan.incremental.close-idle-reader.enabled | 是否在快照結束後關閉閒置Reader。 | Boolean | 否 | false | 該配置生效需要設定 |
scan.incremental.snapshot.backfill.skip | 是否跳過全量階段的日誌讀取。 | Boolean | 否 | false | 參數取值如下:
|
類型映射
PostgreSQL和Flink欄位類型對應關係如下。
PostgreSQL欄位類型 | Flink欄位類型 |
SMALLINT | SMALLINT |
INT2 | |
SMALLSERIAL | |
SERIAL2 | |
INTEGER | INT |
SERIAL | |
BIGINT | BIGINT |
BIGSERIAL | |
REAL | FLOAT |
FLOAT4 | |
FLOAT8 | DOUBLE |
DOUBLE PRECISION | |
NUMERIC(p, s) | DECIMAL(p, s) |
DECIMAL(p, s) | |
BOOLEAN | BOOLEAN |
DATE | DATE |
TIME [(p)] [WITHOUT TIMEZONE] | TIME [(p)] [WITHOUT TIMEZONE] |
TIMESTAMP [(p)] [WITHOUT TIMEZONE] | TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
CHAR(n) | STRING |
CHARACTER(n) | |
VARCHAR(n) | |
CHARACTER VARYING(n) | |
TEXT | |
BYTEA | BYTES |
使用樣本
CREATE TABLE source (
id INT NOT NULL,
name STRING,
description STRING,
weight DECIMAL(10,3)
) WITH (
'connector' = 'postgres-cdc',
'hostname' = '<host name>',
'port' = '<port>',
'username' = '<user name>',
'password' = '<password>',
'database-name' = '<database name>',
'schema-name' = '<schema name>',
'table-name' = '<table name>'
);
SELECT * FROM source;資料攝入
自Realtime Compute引擎11.4版本起,PostgreSQL連接器作為資料來源可以在資料攝入YAML作業中使用。
文法結構
source:
type: postgres
name: PostgreSQL Source
hostname: localhost
port: 5432
username: pg_username
password: pg_password
tables: db.scm.tbl
slot.name: test_slot
scan.startup.mode: initial
server-time-zone: UTC
connect.timeout: 120s
decoding.plugin.name: decoderbufs
sink:
type: ...配置項
參數 | 說明 | 是否必填 | 資料類型 | 預設值 | 備忘 |
type | 資料來源類型。 | 是 | STRING | 無 | 固定值為postgres。 |
name | 資料來源名稱。 | 否 | STRING | 無 | 無。 |
hostname | Postgres資料庫伺服器網域名稱或IP地址。 | 是 | STRING | (none) | 無。 |
port | Postgres資料庫伺服器暴露的連接埠。 | 否 | INTEGER | 5432 | 無。 |
username | Postgres使用者名稱。 | 是 | STRING | (none) | 無。 |
password | Postgres密碼。 | 是 | STRING | (none) | 無。 |
tables | 需要捕獲的Postgres資料庫表名。 支援Regex,可以監控多個滿足該Regex的表。 | 是 | STRING | (none) | 重要 目前僅支援捕獲同一資料庫下的表。 點號 (.) 被視為database、schema和table名的分隔字元。如果需要在Regex中使用點號 (.) 來匹配任何字元,則必須使用反斜線轉義點號。例如: |
slot.name | PostgreSQL複製槽名稱。 | 是 | STRING | (none) | 名稱必須符合 PostgreSQL 複製槽命名規則,可以包含小寫字母、數字和底線。 |
decoding.plugin.name | 伺服器上安裝的Postgres邏輯解碼外掛程式的名稱。 | 否 | STRING |
| 可選值包括 |
tables.exclude | 要排除的 Postgres 資料庫表名,此參數將在 tables 參數之後生效。 | 否 | STRING | (none) | 表名也支援Regex,可以排除多個滿足該Regex的表。用法與 tables 參數相同。 |
server-time-zone | 資料庫伺服器的會話時區,如“Asia/Shanghai”。 | 否 | STRING | (none) | 如果未設定,則將使用系統預設時區 ( |
scan.incremental.snapshot.chunk.size | 增量快照框架中每個chunk的大小(包含的行數)。 | 否 | INTEGER | 8096 | 當開啟增量快照讀取時,表會被切分成多個chunk讀取。在讀完chunk的資料之前,chunk的資料會先緩衝在記憶體中。 每個chunk包含的行數越少,則表中的chunk的總數量越大,儘管這會降低故障恢複的粒度,但可能導致記憶體OOM和整體的輸送量降低。因此,您需要進行權衡,並設定合理的chunk大小。 |
scan.snapshot.fetch.size | 當讀取表的全量資料時,每次最多拉取的記錄數。 | 否 | INTEGER | 1024 | 無。 |
scan.startup.mode | 消費資料時的啟動模式。 | 否 | STRING | initial | 參數取值如下:
|
scan.incremental.close-idle-reader.enabled | 是否在快照結束後關閉閒置 Reader。 | 否 | BOOLEAN | false | 該配置生效需要設定execution.checkpointing.checkpoints-after-tasks-finish.enabled為true。 |
scan.lsn-commit.checkpoints-num-delay | 在開始提交 LSN 位移量之前,延遲多少個檢查點。 | 否 | INTEGER | 3 | 檢查點 LSN 位移量將滾動提交,以避免無法從狀態恢複。 |
connect.timeout | 連接器嘗試串連到 Postgres 資料庫伺服器後,逾時前應等待的最長時間。 | 否 | DURATION | 30s | 此值不能小於 250 毫秒。 |
connect.max-retries | 連接器嘗試建立 Postgres 資料庫伺服器串連的最大重試次數。 | 否 | INTEGER | 3 | 無。 |
connection.pool.size | 串連池大小。 | 否 | INTEGER | 20 | 無。 |
jdbc.properties.* | 允許使用者傳遞自訂 JDBC URL 屬性。 | 否 | STRING | 20 | 使用者可以傳遞自訂屬性,例如 |
heartbeat.interval | 用於追蹤最新可用 WAL 日誌位移量的心跳事件發送間隔。 | 否 | DURATION | 30s | 無。 |
debezium.* | 將 Debezium 的屬性傳遞給 Debezium Embedded Engine,後者用於捕獲來自 PostgreSQL 伺服器的資料更改。 | 否 | STRING | (none) | 有關 Debezium PostgreSQL 連接器屬性的更多資訊,請參閱相關文檔。 |
chunk-meta.group.size | chunk元資訊的大小。 | 否 | STRING | 1000 | 如果元資訊大於該值,元資訊會分為多份傳遞。 |
metadata.list | 傳遞到下遊的可讀中繼資料列表,可在transform模組中使用。 | 否 | STRING | false | 使用逗號 (,) 分隔。目前可用的中繼資料有: |
scan.incremental.snapshot.unbounded-chunk-first.enabled | 快照讀取階段是否先分發無界的分區。 | 否 | STRING | false | 參數取值如下:
重要 實驗性功能。開啟後能夠降低TaskManager在快照階段同步最後一個分區時遇到記憶體溢出 (OOM) 的風險,建議在作業第一次啟動前添加。 |
相關文檔
Realtime ComputeFlink版支援的連接器列表,請參見支援的連接器。
將資料寫入PolarDB PostgreSQL版(Oracle文法相容1.0)結果表,請參見PolarDB PostgreSQL版(Oracle文法相容1.0)。
如果您需要讀寫RDS MySQL、PolarDB for MySQL或者自建MySQL資料庫,請使用MySQL連接器。