変更データキャプチャ (CDC) を使用すると、データベース内のリアルタイムのデータ変更 (INSERT/UPDATE/DELETE) をキャプチャできます。これらの変更をイベントストリームとして、データウェアハウス、Flink などの分析プラットフォーム、または他のデータベースインスタンスなどの下流システムに同期できます。
PolarDB for PostgreSQL (分散版) クラスターでは、CDC の設定はデータの発生元によって異なります:
分散テーブル: データ変更は個々のデータノード (DN) で発生します。変更の完全なセットを取得するには、下流システムはすべての DN をサブスクライブする必要があります。
レプリケートされたテーブル: データ変更はすべてのノードにコピーされます。データの重複を防ぐために、プライマリ計算ノード (CN) は単一のイベントストリームをパブリッシュします。下流システムは、プライマリ CN のみをサブスクライブする必要があります。
準備
開始する前に、必要なクラスターパラメーターが正しく設定されていることを確認してください。これらのパラメーターは、論理レプリケーションを有効にするために必要です。
デフォルトでは、これらのパラメーターは PolarDB for PostgreSQL (分散版) ですでに設定されています。構成が異なる場合は、サポートにチケットを送信してください。
プライマリ CN で、次の SQL 文を実行して、すべてのノードで
polar_cluster.enable_change_data_captureがonになっているかどうかを確認します。SELECT success, result FROM run_command_on_all_nodes($$ SHOW polar_cluster.enable_change_data_capture $$);期待される結果は以下のとおりです。すべてのノードの
result列はonである必要があります。success | result ---------+-------- t | on t | on t | on t | onプライマリ CN で、次の SQL 文を実行して、すべてのノードで
wal_levelがlogicalになっているかどうかを確認します。SELECT success, result FROM run_command_on_all_nodes($$ SHOW wal_level $$);期待される結果は以下のとおりです。すべてのノードの
result列はlogicalである必要があります。success | result ---------+--------- t | logical t | logical t | logical t | logical
ステップ 1: パブリッシャーでパブリケーションとレプリケーションスロットを作成する
パブリッシャー (お使いの PolarDB for PostgreSQL (分散版) クラスター) の設定には、パブリケーションの作成とレプリケーションスロットの作成の 2 つのステップが含まれます。
パブリケーションの作成
パブリケーションは、変更がキャプチャされるテーブルのセットです。この操作はプライマリ CN で一度実行するだけで、システムが自動的にすべてのノードに同期します。
CREATE PUBLICATION <publication_name> FOR TABLE <table_name1>, <table_name2>;FOR ALL TABLESオプションは使用しないでください。パブリッシュするテーブルを明示的に指定する必要があります。<publication_name>はパブリケーションの名前です。<table_name1>/<table_name2>は、パブリッシュする分散テーブルまたはレプリケートされたテーブルを指定します。分散テーブルの場合、物理シャードの名前ではなく、論理テーブル名のみをパブリッシュする必要があります。
レプリケーションスロットの作成
レプリケーションスロットは、下流のサブスクライバーのために先行書き込みログ (WAL) ログを保持し、消費される前にログが削除されるのを防ぎます。データ変更はプライマリ CN とすべての DN から発生するため、これらの各ノードにレプリケーションスロットを作成する必要があります。
プライマリ CN で次の SQL 文を実行すると、関連するすべてのノード (プライマリ CN とすべての DN) に同じ名前のレプリケーションスロットを一括で作成できます:
WITH nodes AS (
SELECT
nodename,
nodeport,
$$ SELECT pg_create_logical_replication_slot('<publication_slot_name>', 'pgoutput', false) $$ AS cmd
FROM pg_dist_node
WHERE nodeid = 1 OR shouldhaveshards = true
)
SELECT result.*
FROM (
SELECT
array_agg(nodename) as nodenames,
array_agg(nodeport) as nodeports,
array_agg(cmd) as cmds
FROM nodes
) params,
LATERAL master_run_on_worker(nodenames, nodeports, cmds, true) AS result;<publication_slot_name> はレプリケーションスロットの名前です。
期待される結果は以下のとおりです。t 列の値が success の場合、レプリケーションスロットが正常に作成されたことを示します。作成に失敗した場合、result 列に理由が表示されます。
node_name | node_port | success | result
----------------+-----------+---------+------------------------------------
10.xxx.xxx.xxx | 3007 | t | (<publication_slot_name>,0/C024D7D0)
10.xxx.xxx.xxx | 3006 | t | (<publication_slot_name>,0/C33B6668)
10.xxx.xxx.xxx | 3003 | t | (<publication_slot_name>,0/C33949B0)
(3 rows)ステップ 2: サブスクライバーでサブスクリプションを作成する
Debezium、Flink、または別の PostgreSQL インスタンスなどのサブスクライバーは、レプリケーションスロットを持つ各ノード (プライマリ CN とすべての DN) に対して個別のサブスクリプションを確立する必要があります。これは、データ変更の完全なストリームを受信するために必要です。
各サブスクリプションの構成はほぼ同じです。各ノードのホストとポートを変更するだけで済みます。
Debezium の例
サブスクライバーとして Debezium を使用する場合、各サブスクリプションで database.hostname と database.port の設定項目を変更するだけで済みます。これらを対応するノードのプライマリエンドポイントとポートに設定します。残りの構成は同じです。次の例は、1 つの DN をサブスクライブするための構成を示しています:
{
"name": "xxx",
"config": {
"connector.class" : "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname" : "<DN1_primary_endpoint>",
"database.port" : "<DN1_port>",
"database.user" : "<your_username>",
"database.password" : "<your_password>",
"database.dbname" : "postgres",
"slot.name": "<replication_slot_name>",
"publication.name": "<publication_name>",
...
}
}PostgreSQL のサンプルパターン
サブスクライバーとして PostgreSQL を使用する場合、各サブスクリプションの接続文字列で host と port パラメーターを変更するだけで済みます。これらを対応するノードのプライマリエンドポイントとポートに設定します。残りの構成は同じです。次の例は、1 つの DN をサブスクライブする方法を示しています:
CREATE SUBSCRIPTION test_subscription
CONNECTION 'dbname=postgres host=<DN1_primary_endpoint> port=<DN1_port> user=<your_username> password=<your_password>'
PUBLICATION <publication_name>
WITH (create_slot=false, slot_name='<replication_slot_name>');ステップ 3: CDC リンクを検証する
すべてのサブスクライバーを設定した後、プライマリ CN で次の SQL 文を実行して、すべてのパブリッシュ元ノードのレプリケーションスロットがアクティブであるかどうかを確認できます。
WITH nodes AS (
SELECT
nodename,
nodeport,
$$ SELECT active FROM pg_replication_slots WHERE slot_name = '<publication_slot_name>' $$ AS cmd
FROM pg_dist_node
WHERE nodeid = 1 OR shouldhaveshards = true
)
SELECT result.*
FROM (
SELECT
array_agg(nodename) as nodenames,
array_agg(nodeport) as nodeports,
array_agg(cmd) as cmds
FROM nodes
) params,
LATERAL master_run_on_worker(nodenames, nodeports, cmds, true) AS result;
期待される結果は以下のとおりです。t 列の値が result の場合、ノードとそのサブスクライバー間のレプリケーションリンクがアクティブであることを示します。これで、ソーステーブルにデータを挿入または変更し、サブスクライバーが変更を受信するかどうかを確認できます。
node_name | node_port | success | result
----------------+-----------+---------+--------
10.xxx.xxx.xxx | 3007 | t | t
10.xxx.xxx.xxx | 3006 | t | t
10.xxx.xxx.xxx | 3003 | t | t
(3 rows)メンテナンスとクリーンアップ
下流システムがデータを必要としなくなった場合は、サブスクリプションを停止する必要があります。その後、パブリッシャーのレプリケーションスロットを速やかに削除して、WAL ログ領域を解放します。
レプリケーションスロットを削除すると、関連する WAL ログが永久に削除されます。未消費のデータ変更は失われ、回復できません。サブスクリプションを一時的に一時停止するだけの場合は、レプリケーションスロットを削除しないでください。
プライマリ CN で次の SQL 文を実行すると、関連するすべてのノードからレプリケーションスロットを一括で削除できます:
WITH nodes AS (
SELECT
nodename,
nodeport,
$$ SELECT pg_drop_replication_slot('<publication_slot_name>') $$ AS cmd
FROM pg_dist_node
WHERE nodeid = 1 OR shouldhaveshards = true
)
SELECT result.*
FROM (
SELECT
array_agg(nodename) as nodenames,
array_agg(nodeport) as nodeports,
array_agg(cmd) as cmds
FROM nodes
) params,
LATERAL master_run_on_worker(nodenames, nodeports, cmds, true) AS result;