すべてのプロダクト
Search
ドキュメントセンター

PolarDB:CDC を設定してデータ変更を同期する

最終更新日:Nov 10, 2025

変更データキャプチャ (CDC) を使用すると、データベース内のリアルタイムのデータ変更 (INSERT/UPDATE/DELETE) をキャプチャできます。これらの変更をイベントストリームとして、データウェアハウス、Flink などの分析プラットフォーム、または他のデータベースインスタンスなどの下流システムに同期できます。

PolarDB for PostgreSQL (分散版) クラスターでは、CDC の設定はデータの発生元によって異なります:

  • 分散テーブル: データ変更は個々のデータノード (DN) で発生します。変更の完全なセットを取得するには、下流システムはすべての DN をサブスクライブする必要があります。

  • レプリケートされたテーブル: データ変更はすべてのノードにコピーされます。データの重複を防ぐために、プライマリ計算ノード (CN) は単一のイベントストリームをパブリッシュします。下流システムは、プライマリ CN のみをサブスクライブする必要があります。

準備

開始する前に、必要なクラスターパラメーターが正しく設定されていることを確認してください。これらのパラメーターは、論理レプリケーションを有効にするために必要です。

説明

デフォルトでは、これらのパラメーターは PolarDB for PostgreSQL (分散版) ですでに設定されています。構成が異なる場合は、サポートにチケットを送信してください。

  • プライマリ CN で、次の SQL 文を実行して、すべてのノードで polar_cluster.enable_change_data_captureon になっているかどうかを確認します。

    SELECT success, result FROM run_command_on_all_nodes($$ SHOW polar_cluster.enable_change_data_capture $$);

    期待される結果は以下のとおりです。すべてのノードの result 列は on である必要があります。

     success | result 
    ---------+--------
     t       | on
     t       | on
     t       | on
     t       | on
  • プライマリ CN で、次の SQL 文を実行して、すべてのノードで wal_levellogical になっているかどうかを確認します。

    SELECT success, result FROM run_command_on_all_nodes($$ SHOW wal_level $$);

    期待される結果は以下のとおりです。すべてのノードの result 列は logical である必要があります。

    success | result  
    ---------+---------
     t       | logical
     t       | logical
     t       | logical
     t       | logical

ステップ 1: パブリッシャーでパブリケーションとレプリケーションスロットを作成する

パブリッシャー (お使いの PolarDB for PostgreSQL (分散版) クラスター) の設定には、パブリケーションの作成とレプリケーションスロットの作成の 2 つのステップが含まれます。

パブリケーションの作成

パブリケーションは、変更がキャプチャされるテーブルのセットです。この操作はプライマリ CN で一度実行するだけで、システムが自動的にすべてのノードに同期します。

CREATE PUBLICATION <publication_name> FOR TABLE <table_name1>, <table_name2>;
説明
  • FOR ALL TABLES オプションは使用しないでください。パブリッシュするテーブルを明示的に指定する必要があります。

  • <publication_name> はパブリケーションの名前です。<table_name1>/<table_name2> は、パブリッシュする分散テーブルまたはレプリケートされたテーブルを指定します。分散テーブルの場合、物理シャードの名前ではなく、論理テーブル名のみをパブリッシュする必要があります。

レプリケーションスロットの作成

レプリケーションスロットは、下流のサブスクライバーのために先行書き込みログ (WAL) ログを保持し、消費される前にログが削除されるのを防ぎます。データ変更はプライマリ CN とすべての DN から発生するため、これらの各ノードにレプリケーションスロットを作成する必要があります。

プライマリ CN で次の SQL 文を実行すると、関連するすべてのノード (プライマリ CN とすべての DN) に同じ名前のレプリケーションスロットを一括で作成できます:

WITH nodes AS (
    SELECT
        nodename,
        nodeport,
        $$ SELECT pg_create_logical_replication_slot('<publication_slot_name>', 'pgoutput', false) $$ AS cmd
    FROM pg_dist_node
    WHERE nodeid = 1 OR shouldhaveshards = true
)
SELECT result.*
FROM (
    SELECT
        array_agg(nodename) as nodenames,
        array_agg(nodeport) as nodeports,
        array_agg(cmd) as cmds
    FROM nodes
) params,
LATERAL master_run_on_worker(nodenames, nodeports, cmds, true) AS result;
説明

<publication_slot_name> はレプリケーションスロットの名前です。

期待される結果は以下のとおりです。t 列の値が success の場合、レプリケーションスロットが正常に作成されたことを示します。作成に失敗した場合、result 列に理由が表示されます。

   node_name    | node_port | success |               result               
----------------+-----------+---------+------------------------------------
 10.xxx.xxx.xxx |      3007 | t       | (<publication_slot_name>,0/C024D7D0)
 10.xxx.xxx.xxx |      3006 | t       | (<publication_slot_name>,0/C33B6668)
 10.xxx.xxx.xxx |      3003 | t       | (<publication_slot_name>,0/C33949B0)
(3 rows)

ステップ 2: サブスクライバーでサブスクリプションを作成する

Debezium、Flink、または別の PostgreSQL インスタンスなどのサブスクライバーは、レプリケーションスロットを持つ各ノード (プライマリ CN とすべての DN) に対して個別のサブスクリプションを確立する必要があります。これは、データ変更の完全なストリームを受信するために必要です。

説明

各サブスクリプションの構成はほぼ同じです。各ノードのホストとポートを変更するだけで済みます。

Debezium の例

サブスクライバーとして Debezium を使用する場合、各サブスクリプションで database.hostnamedatabase.port の設定項目を変更するだけで済みます。これらを対応するノードのプライマリエンドポイントとポートに設定します。残りの構成は同じです。次の例は、1 つの DN をサブスクライブするための構成を示しています:

{
    "name": "xxx",
    "config": {
        "connector.class" : "io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname" : "<DN1_primary_endpoint>",
        "database.port" : "<DN1_port>",
        "database.user" : "<your_username>",
        "database.password" : "<your_password>",
        "database.dbname" : "postgres",
        "slot.name": "<replication_slot_name>",
        "publication.name": "<publication_name>",
        ...
    }
}

PostgreSQL のサンプルパターン

サブスクライバーとして PostgreSQL を使用する場合、各サブスクリプションの接続文字列で hostport パラメーターを変更するだけで済みます。これらを対応するノードのプライマリエンドポイントとポートに設定します。残りの構成は同じです。次の例は、1 つの DN をサブスクライブする方法を示しています:

CREATE SUBSCRIPTION test_subscription
    CONNECTION 'dbname=postgres host=<DN1_primary_endpoint> port=<DN1_port> user=<your_username> password=<your_password>'
    PUBLICATION <publication_name>
    WITH (create_slot=false, slot_name='<replication_slot_name>');

ステップ 3: CDC リンクを検証する

すべてのサブスクライバーを設定した後、プライマリ CN で次の SQL 文を実行して、すべてのパブリッシュ元ノードのレプリケーションスロットがアクティブであるかどうかを確認できます。

WITH nodes AS (
    SELECT
        nodename,
        nodeport,
        $$ SELECT active FROM pg_replication_slots WHERE slot_name = '<publication_slot_name>' $$ AS cmd
    FROM pg_dist_node
    WHERE nodeid = 1 OR shouldhaveshards = true
)
SELECT result.*
FROM (
    SELECT
        array_agg(nodename) as nodenames,
        array_agg(nodeport) as nodeports,
        array_agg(cmd) as cmds
    FROM nodes
) params,
LATERAL master_run_on_worker(nodenames, nodeports, cmds, true) AS result;

期待される結果は以下のとおりです。t 列の値が result の場合、ノードとそのサブスクライバー間のレプリケーションリンクがアクティブであることを示します。これで、ソーステーブルにデータを挿入または変更し、サブスクライバーが変更を受信するかどうかを確認できます。

  node_name     | node_port | success | result 
----------------+-----------+---------+--------
 10.xxx.xxx.xxx |      3007 | t       | t
 10.xxx.xxx.xxx |      3006 | t       | t
 10.xxx.xxx.xxx |      3003 | t       | t
(3 rows)

メンテナンスとクリーンアップ

下流システムがデータを必要としなくなった場合は、サブスクリプションを停止する必要があります。その後、パブリッシャーのレプリケーションスロットを速やかに削除して、WAL ログ領域を解放します。

説明

レプリケーションスロットを削除すると、関連する WAL ログが永久に削除されます。未消費のデータ変更は失われ、回復できません。サブスクリプションを一時的に一時停止するだけの場合は、レプリケーションスロットを削除しないでください。

プライマリ CN で次の SQL 文を実行すると、関連するすべてのノードからレプリケーションスロットを一括で削除できます:

WITH nodes AS (
    SELECT
        nodename,
        nodeport,
        $$ SELECT pg_drop_replication_slot('<publication_slot_name>') $$ AS cmd
    FROM pg_dist_node
    WHERE nodeid = 1 OR shouldhaveshards = true
)
SELECT result.*
FROM (
    SELECT
        array_agg(nodename) as nodenames,
        array_agg(nodeport) as nodeports,
        array_agg(cmd) as cmds
    FROM nodes
) params,
LATERAL master_run_on_worker(nodenames, nodeports, cmds, true) AS result;