變更資料擷取(Change Data Capture, CDC)功能允許您即時捕獲資料庫中的資料修改(INSERT/UPDATE/DELETE),並將其以事件流的形式同步到下遊系統,如資料倉儲、分析平台(如Flink)或其他資料庫執行個體。
在PolarDB PostgreSQL分布式版叢集中,搭建CDC的關鍵在於理解資料的來源:
準備工作:檢查與配置
在開始之前,請確保資料庫叢集的相關參數已正確設定。這些參數是開啟邏輯複製功能的前提。
預設情況下,PolarDB PostgreSQL分布式版已配置好以下參數。如果檢查結果不符,請提交工單聯絡我們處理。
在主CN節點執行以下 SQL,檢查所有節點的
polar_cluster.enable_change_data_capture是否為on。SELECT success, result FROM run_command_on_all_nodes($$ SHOW polar_cluster.enable_change_data_capture $$);預期結果如下:所有節點的
result列都應為on。success | result ---------+-------- t | on t | on t | on t | on在主CN節點上執行以下SQL,檢查所有節點的
wal_level是否為logical。SELECT success, result FROM run_command_on_all_nodes($$ SHOW wal_level $$);預期結果如下:所有節點的
result列都應為logical。success | result ---------+--------- t | logical t | logical t | logical t | logical
步驟一:在發布端建立發布和複製槽
發布端(即您的PolarDB PostgreSQL分布式版叢集)的配置分為兩步:建立發布(Publication)和建立複製槽(Replication Slot)。
建立發布
發布是定義哪些表的變更需要被捕獲的集合。此操作只需在主CN上執行一次,系統會自動將其同步到所有節點。
CREATE PUBLICATION <publication_name> FOR TABLE <table_name1>, <table_name2>;請勿使用
FOR ALL TABLES選項建立發布,應明確指定需要發布的表。<publication_name>為發布名,<table_name1>/<table_name2>為要發布的分布表/複製表。發布端僅需發布分布表的邏輯表名,無需發布物理分區的表名。
建立複製槽
複製槽用於為下遊訂閱者保留WAL日誌,防止其在被消費前被清理。由於資料變更來源於主CN和所有DN,您需要在這些節點上都建立複製槽。
在主CN節點上執行以下SQL,即可在所有相關節點(主CN和所有DN)上大量建立同名的複製槽:
WITH nodes AS (
SELECT
nodename,
nodeport,
$$ SELECT pg_create_logical_replication_slot('<publication_slot_name>', 'pgoutput', false) $$ AS cmd
FROM pg_dist_node
WHERE nodeid = 1 OR shouldhaveshards = true
)
SELECT result.*
FROM (
SELECT
array_agg(nodename) as nodenames,
array_agg(nodeport) as nodeports,
array_agg(cmd) as cmds
FROM nodes
) params,
LATERAL master_run_on_worker(nodenames, nodeports, cmds, true) AS result;<publication_slot_name>為複製槽名稱。
預期的結果如下,其中success列為t表示複製槽建立成功。否則複製槽建立失敗,通過result列可以查看具體原因:
node_name | node_port | success | result
----------------+-----------+---------+------------------------------------
10.xxx.xxx.xxx | 3007 | t | (<publication_slot_name>,0/C024D7D0)
10.xxx.xxx.xxx | 3006 | t | (<publication_slot_name>,0/C33B6668)
10.xxx.xxx.xxx | 3003 | t | (<publication_slot_name>,0/C33949B0)
(3 rows)步驟二:在訂閱端建立訂閱
訂閱端(如Debezium、Flink或另一個PostgreSQL執行個體)需要為每一個建立了複製槽的節點(主CN和所有DN)分別建立一個訂閱串連,以接收完整的變更資料。
每個訂閱串連的配置基本相同,只需修改主機地址和連接埠號碼。
Debezium樣本
如果使用Debezium作為訂閱端,那麼每個訂閱端僅database.hostname和database.port兩個配置項需要修改為相應節點的主地址和連接埠號碼,其餘配置完全相同。以訂閱其中一個DN節點的配置為例:
{
"name": "xxx",
"config": {
"connector.class" : "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname" : "<DN1主地址>",
"database.port" : "<DN1連接埠>",
"database.user" : "<您的使用者名稱>",
"database.password" : "<您的密碼>",
"database.dbname" : "postgres",
"slot.name": "<複製槽名稱>",
"publication.name": "<發布名稱>",
...
}
}PostgreSQL樣本
如果使用PostgreSQL作為訂閱端,那麼每個訂閱端僅host和port兩個配置項需要修改為相應節點的主地址和連接埠號碼,其餘配置完全相同。以訂閱其中一個DN節點的配置為例:
CREATE SUBSCRIPTION test_subscription
CONNECTION 'dbname=postgres host=<DN1主地址> port=<DN1連接埠> user=<您的使用者名稱> password=<您的密碼>'
PUBLICATION <發布名稱>
WITH (create_slot=false, slot_name='<複製槽名稱>');步驟三:驗證CDC鏈路
在所有訂閱端配置完成後,您可以在主CN上執行以下SQL,檢查所有發布節點的複製槽是否都已啟用。
WITH nodes AS (
SELECT
nodename,
nodeport,
$$ SELECT active FROM pg_replication_slots WHERE slot_name = '<publication_slot_name>' $$ AS cmd
FROM pg_dist_node
WHERE nodeid = 1 OR shouldhaveshards = true
)
SELECT result.*
FROM (
SELECT
array_agg(nodename) as nodenames,
array_agg(nodeport) as nodeports,
array_agg(cmd) as cmds
FROM nodes
) params,
LATERAL master_run_on_worker(nodenames, nodeports, cmds, true) AS result;
預期得到的結果如下。如果result列值為t則表示相應節點與訂閱端的複製關係已經建立。此時,您可以在源端表中插入或修改資料,並觀察訂閱端是否能接收到變更。
node_name | node_port | success | result
----------------+-----------+---------+--------
10.xxx.xxx.xxx | 3007 | t | t
10.xxx.xxx.xxx | 3006 | t | t
10.xxx.xxx.xxx | 3003 | t | t
(3 rows)維護與清理
如果下遊不再需要訂閱資料,則應該在下遊停止訂閱後,及時清理髮布端的複製槽,以釋放WAL日誌空間。
清理複製槽將導致關聯的WAL日誌被永久刪除,未被消費的資料變更將丟失且無法恢複。如果只是臨時暫停,請勿執行此操作清理複製槽。
在主CN上執行以下SQL,即可在所有相關節點上大量刪除複製槽:
WITH nodes AS (
SELECT
nodename,
nodeport,
$$ SELECT pg_drop_replication_slot('<publication_slot_name>') $$ AS cmd
FROM pg_dist_node
WHERE nodeid = 1 OR shouldhaveshards = true
)
SELECT result.*
FROM (
SELECT
array_agg(nodename) as nodenames,
array_agg(nodeport) as nodeports,
array_agg(cmd) as cmds
FROM nodes
) params,
LATERAL master_run_on_worker(nodenames, nodeports, cmds, true) AS result;