All Products
Search
Document Center

PolarDB:Konfigurasikan CDC untuk menyinkronkan perubahan data

Last Updated:Nov 12, 2025

Change Data Capture (CDC) memungkinkan Anda menangkap modifikasi data secara real-time (INSERT/UPDATE/DELETE) dalam sebuah database. Perubahan tersebut kemudian dapat disinkronkan sebagai aliran acara ke sistem downstream, seperti gudang data, platform analitik seperti Flink, atau instans database lainnya.

Dalam kluster PolarDB for PostgreSQL (edisi terdistribusi), konfigurasi CDC bergantung pada asal data tersebut:

  • Tabel terdistribusi: Perubahan data terjadi pada masing-masing node data (DN). Untuk mendapatkan seluruh perubahan secara lengkap, sistem downstream harus berlangganan ke setiap DN.

  • Tabel replikasi: Perubahan data direplikasi ke semua node. Untuk mencegah duplikasi data, node komputasi utama (CN) menerbitkan satu aliran acara tunggal. Sistem downstream hanya perlu berlangganan ke CN utama.

Persiapan

Sebelum memulai, pastikan parameter kluster yang diperlukan telah dikonfigurasi dengan benar. Parameter-parameter ini diperlukan untuk mengaktifkan replikasi logis.

Catatan

Secara default, parameter ini sudah dikonfigurasi di PolarDB for PostgreSQL (edisi terdistribusi). Jika konfigurasi Anda berbeda, ajukan tiket untuk mendapatkan dukungan.

  • Pada CN utama, jalankan pernyataan SQL berikut untuk memeriksa apakah polar_cluster.enable_change_data_capture bernilai on untuk semua node.

    SELECT success, result FROM run_command_on_all_nodes($$ SHOW polar_cluster.enable_change_data_capture $$);

    Hasil yang diharapkan ditunjukkan di bawah ini. Kolom result untuk semua node harus bernilai on.

     success | result 
    ---------+--------
     t       | on
     t       | on
     t       | on
     t       | on
  • Pada CN utama, jalankan pernyataan SQL berikut untuk memeriksa apakah wal_level bernilai logical untuk semua node.

    SELECT success, result FROM run_command_on_all_nodes($$ SHOW wal_level $$);

    Hasil yang diharapkan ditunjukkan di bawah ini. Kolom result untuk semua node harus bernilai logical.

    success | result  
    ---------+---------
     t       | logical
     t       | logical
     t       | logical
     t       | logical

Langkah 1: Buat publikasi dan slot replikasi pada penerbit

Mengonfigurasi penerbit—yaitu kluster PolarDB for PostgreSQL (edisi terdistribusi)—melibatkan dua langkah: membuat publikasi dan membuat slot replikasi.

Buat publikasi

Publikasi adalah kumpulan tabel yang perubahannya akan ditangkap. Operasi ini cukup dijalankan sekali pada CN utama; sistem akan secara otomatis menyinkronkannya ke semua node.

CREATE PUBLICATION <publication_name> FOR TABLE <table_name1>, <table_name2>;
Catatan
  • Jangan gunakan opsi FOR ALL TABLES. Anda harus menentukan tabel yang akan dipublikasikan secara eksplisit.

  • <publication_name> adalah nama publikasi. <table_name1>/<table_name2> menentukan tabel terdistribusi atau tabel replikasi yang akan dipublikasikan. Untuk tabel terdistribusi, cukup publikasikan nama tabel logisnya, bukan nama shard fisiknya.

Buat slot replikasi

Slot replikasi menyimpan log Write-Ahead Logging (WAL) untuk subscriber downstream, sehingga mencegah log tersebut dihapus sebelum dikonsumsi. Karena perubahan data berasal dari CN utama dan semua DN, Anda harus membuat slot replikasi pada masing-masing node tersebut.

Anda dapat menjalankan pernyataan SQL berikut pada CN utama untuk membuat slot replikasi dengan nama yang sama pada semua node terkait (CN utama dan semua DN) secara batch:

WITH nodes AS (
    SELECT
        nodename,
        nodeport,
        $$ SELECT pg_create_logical_replication_slot('<publication_slot_name>', 'pgoutput', false) $$ AS cmd
    FROM pg_dist_node
    WHERE nodeid = 1 OR shouldhaveshards = true
)
SELECT result.*
FROM (
    SELECT
        array_agg(nodename) as nodenames,
        array_agg(nodeport) as nodeports,
        array_agg(cmd) as cmds
    FROM nodes
) params,
LATERAL master_run_on_worker(nodenames, nodeports, cmds, true) AS result;
Catatan

<publication_slot_name> adalah nama slot replikasi.

Hasil yang diharapkan ditunjukkan di bawah ini. Nilai t pada kolom success menunjukkan bahwa slot replikasi berhasil dibuat. Jika pembuatan gagal, kolom result akan menampilkan alasannya.

   node_name    | node_port | success |               result               
----------------+-----------+---------+------------------------------------
 10.xxx.xxx.xxx |      3007 | t       | (<publication_slot_name>,0/C024D7D0)
 10.xxx.xxx.xxx |      3006 | t       | (<publication_slot_name>,0/C33B6668)
 10.xxx.xxx.xxx |      3003 | t       | (<publication_slot_name>,0/C33949B0)
(3 rows)

Langkah 2: Buat langganan pada subscriber

Subscriber, seperti Debezium, Flink, atau instans PostgreSQL lainnya, harus membuat langganan terpisah untuk setiap node yang memiliki slot replikasi (CN utama dan semua DN) agar menerima aliran perubahan data secara lengkap.

Catatan

Konfigurasi untuk setiap langganan hampir identik. Anda hanya perlu mengubah host dan port untuk masing-masing node.

Contoh Debezium

Jika Anda menggunakan Debezium sebagai subscriber, cukup ubah item konfigurasi database.hostname dan database.port untuk setiap langganan. Atur nilai-nilai tersebut ke Titik akhir utama dan port dari node yang sesuai. Konfigurasi lainnya tetap sama. Contoh berikut menunjukkan konfigurasi untuk berlangganan ke satu DN:

{
    "name": "xxx",
    "config": {
        "connector.class" : "io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname" : "<DN1_primary_endpoint>",
        "database.port" : "<DN1_port>",
        "database.user" : "<your_username>",
        "database.password" : "<your_password>",
        "database.dbname" : "postgres",
        "slot.name": "<replication_slot_name>",
        "publication.name": "<publication_name>",
        ...
    }
}

Pola contoh untuk PostgreSQL

Jika Anda menggunakan PostgreSQL sebagai subscriber, cukup ubah parameter host dan port dalam string koneksi untuk setiap langganan. Atur nilai-nilai tersebut ke Titik akhir utama dan port dari node yang sesuai. Konfigurasi lainnya tetap sama. Contoh berikut menunjukkan cara berlangganan ke satu DN:

CREATE SUBSCRIPTION test_subscription
    CONNECTION 'dbname=postgres host=<DN1_primary_endpoint> port=<DN1_port> user=<your_username> password=<your_password>'
    PUBLICATION <publication_name>
    WITH (create_slot=false, slot_name='<replication_slot_name>');

Langkah 3: Verifikasi tautan CDC

Setelah semua subscriber dikonfigurasi, jalankan pernyataan SQL berikut pada CN utama untuk memeriksa apakah slot replikasi pada semua node penerbit aktif.

WITH nodes AS (
    SELECT
        nodename,
        nodeport,
        $$ SELECT active FROM pg_replication_slots WHERE slot_name = '<publication_slot_name>' $$ AS cmd
    FROM pg_dist_node
    WHERE nodeid = 1 OR shouldhaveshards = true
)
SELECT result.*
FROM (
    SELECT
        array_agg(nodename) as nodenames,
        array_agg(nodeport) as nodeports,
        array_agg(cmd) as cmds
    FROM nodes
) params,
LATERAL master_run_on_worker(nodenames, nodeports, cmds, true) AS result;

Hasil yang diharapkan ditunjukkan di bawah ini. Nilai t pada kolom result menunjukkan bahwa tautan replikasi antara node dan subscriber-nya aktif. Anda kini dapat menyisipkan atau memodifikasi data dalam tabel sumber dan memverifikasi apakah subscriber menerima perubahan tersebut.

  node_name     | node_port | success | result 
----------------+-----------+---------+--------
 10.xxx.xxx.xxx |      3007 | t       | t
 10.xxx.xxx.xxx |      3006 | t       | t
 10.xxx.xxx.xxx |      3003 | t       | t
(3 rows)

Pemeliharaan dan pembersihan

Jika sistem downstream tidak lagi memerlukan data tersebut, hentikan langganan terlebih dahulu. Kemudian, segera hapus slot replikasi pada penerbit untuk membebaskan ruang log WAL.

Catatan

Menghapus slot replikasi akan menghapus permanen log WAL yang terkait dengannya. Semua perubahan data yang belum dikonsumsi akan hilang dan tidak dapat dipulihkan. Jika Anda hanya ingin menjeda langganan sementara, jangan hapus slot replikasi tersebut.

Anda dapat menjalankan pernyataan SQL berikut pada CN utama untuk menghapus slot replikasi dari semua node terkait secara batch:

WITH nodes AS (
    SELECT
        nodename,
        nodeport,
        $$ SELECT pg_drop_replication_slot('<publication_slot_name>') $$ AS cmd
    FROM pg_dist_node
    WHERE nodeid = 1 OR shouldhaveshards = true
)
SELECT result.*
FROM (
    SELECT
        array_agg(nodename) as nodenames,
        array_agg(nodeport) as nodeports,
        array_agg(cmd) as cmds
    FROM nodes
) params,
LATERAL master_run_on_worker(nodenames, nodeports, cmds, true) AS result;