Konektor Postgres CDC membaca snapshot lengkap dari database PostgreSQL, lalu menangkap perubahan data berikutnya. Proses ini memastikan bahwa setiap catatan diproses tepat satu kali (exactly once). Topik ini menjelaskan cara menggunakan konektor Postgres CDC.
Informasi latar belakang
Konektor Postgres CDC mendukung hal-hal berikut.
Kategori | Detail |
Tipe yang didukung | Tabel sumber Catatan Gunakan konektor JDBC untuk tabel sink dan tabel dimensi. |
Mode | Hanya mode stream |
Format data | Tidak berlaku |
Metrik unik | |
Tipe API | SQL dan Data Ingestion YAML |
Mendukung pembaruan atau penghapusan data tabel sink | Tidak berlaku |
Fitur
Konektor Postgres CDC terintegrasi dengan kerangka kerja snapshot inkremental untuk Change Data Capture (CDC), yang tersedia di Ververica Runtime (VVR) 8.0.6 dan versi selanjutnya. Setelah membaca data historis lengkap, konektor secara otomatis beralih ke pembacaan log perubahan dari write-ahead log (WAL). Proses ini menjamin semantik exactly-once. Tabel sumber Postgres CDC mendukung pembacaan data lengkap secara konkuren tanpa lock dan dapat dilanjutkan dari titik putus (breakpoint).
Sebagai tabel sumber, fitur dan keunggulannya adalah sebagai berikut:
Pemrosesan stream dan batch terpadu. Konektor mendukung pembacaan data lengkap maupun inkremental, sehingga menghilangkan kebutuhan untuk memelihara dua pekerjaan pemrosesan data terpisah.
Pembacaan data lengkap secara konkuren. Fitur ini memungkinkan skalabilitas horizontal untuk meningkatkan kinerja.
Peralihan mulus dari pembacaan lengkap ke inkremental. Konektor secara otomatis mengurangi skala untuk menghemat sumber daya komputasi.
Pembacaan yang dapat dilanjutkan. Konektor dapat melanjutkan dari titik putus selama fase pembacaan data lengkap, sehingga meningkatkan stabilitas Pekerjaan.
Pembacaan tanpa lock. Pembacaan data lengkap tidak memerlukan lock, sehingga mencegah dampak apa pun terhadap operasi bisnis online Anda.
Prasyarat
Konektor Postgres CDC membaca aliran change data capture (CDC) menggunakan fitur logical replication PostgreSQL. Konektor ini mendukung Alibaba Cloud RDS for PostgreSQL, Amazon RDS for PostgreSQL, dan self-managed PostgreSQL.
Konfigurasi yang diperlukan mungkin berbeda untuk Alibaba Cloud RDS for PostgreSQL, Amazon RDS for PostgreSQL, dan self-managed PostgreSQL. Untuk petunjuk konfigurasi terperinci, lihat dokumen Konfigurasi Postgres.
Setelah menyelesaikan konfigurasi, pastikan kondisi berikut terpenuhi:
Parameter wal_level harus diatur ke `logical`. Pengaturan ini menambahkan informasi yang diperlukan ke write-ahead log (WAL) untuk mendukung logical decoding.
REPLICA IDENTITY dari tabel yang berlangganan harus diatur ke `FULL`. Pengaturan ini memastikan bahwa event `INSERT` dan `UPDATE` mencakup nilai sebelumnya dari semua kolom dalam tabel, sehingga menjamin konsistensi data.
Catatan`REPLICA IDENTITY` adalah pengaturan tingkat tabel yang spesifik untuk PostgreSQL. Pengaturan ini menentukan apakah event `INSERT` dan `UPDATE` mencakup nilai sebelumnya dari kolom yang terpengaruh. Untuk informasi lebih lanjut tentang nilai-nilai `REPLICA IDENTITY`, lihat REPLICA IDENTITY.
Pastikan nilai parameter `max_wal_senders` dan `max_replication_slots` lebih besar daripada jumlah slot replikasi yang sedang digunakan ditambah jumlah slot yang dibutuhkan oleh pekerjaan Flink.
Pastikan akun memiliki hak istimewa `SUPERUSER`, atau memiliki izin `LOGIN` dan `REPLICATION`. Akun tersebut juga harus memiliki izin `SELECT` pada tabel yang berlangganan untuk mengkueri data lengkap.
Catatan penggunaan
Hanya Realtime Compute for Apache Flink V8.0.6 dan versi selanjutnya yang mendukung fitur snapshot inkremental Postgres CDC.
Slot replikasi
Pekerjaan Flink PostgreSQL CDC menggunakan slot replikasi untuk mencegah write-ahead log (WAL) dihapus sebelum waktunya dan untuk memastikan konsistensi data. Jika tidak dikelola dengan baik, slot replikasi dapat menyebabkan masalah seperti disk space terbuang atau keterlambatan pembacaan data. Kami merekomendasikan praktik terbaik berikut:
Bersihkan slot yang tidak digunakan segera
Flink tidak menghapus slot replikasi secara otomatis, bahkan setelah pekerjaan berhenti. Hal ini terutama berlaku untuk restart tanpa status (stateless). Perilaku ini mencegah kehilangan data yang dapat terjadi jika WAL dihapus.
Jika Anda memastikan bahwa pekerjaan tidak akan dijalankan ulang, Anda harus menghapus slot replikasi terkait secara manual untuk membebaskan disk space.
PentingManajemen siklus hidup: Perlakukan slot replikasi sebagai resource pekerjaan dan kelola secara sinkron dengan proses start dan stop pekerjaan.
Hindari penggunaan ulang slot lama
Pekerjaan baru harus menggunakan nama slot baru alih-alih menggunakan ulang yang lama. Penggunaan ulang slot dapat menyebabkan pekerjaan membaca banyak data WAL historis saat startup, sehingga menunda pemrosesan data terbaru.
Replikasi logis PostgreSQL mensyaratkan bahwa sebuah slot hanya dapat digunakan oleh satu koneksi. Pekerjaan yang berbeda harus menggunakan nama slot yang berbeda.
PentingKonvensi penamaan: Saat menyesuaikan `slot.name`, hindari penggunaan nama dengan akhiran numerik, seperti `my_slot_1`, untuk mencegah konflik dengan slot temporary.
Perilaku slot dengan snapshot inkremental diaktifkan
Prasyarat: Checkpoint harus diaktifkan, dan tabel sumber harus memiliki primary key yang ditentukan.
Aturan pembuatan slot:
Snapshot inkremental dinonaktifkan: Hanya mendukung konkurensi tunggal. Digunakan satu slot global.
Snapshot inkremental diaktifkan:
Fase lengkap: Setiap subtask sumber konkuren membuat slot sementara. Format penamaannya adalah
${slot.name}_${task_id}.Fase inkremental: Semua slot temporary secara otomatis diklaim kembali. Hanya satu slot global yang dipertahankan.
Jumlah maksimum slot: Konkurensi sumber + 1 (selama fase lengkap)
Sumber daya dan performa
Jika jumlah slot yang tersedia atau jumlah disk space di PostgreSQL terbatas, Anda dapat mengurangi konkurensi untuk fase lengkap agar menggunakan lebih sedikit slot sementara. Tindakan ini mengurangi kecepatan baca selama fase lengkap.
Jika sink downstream mendukung penulisan idempoten, Anda dapat mengatur
scan.incremental.snapshot.backfill.skip = true. Pengaturan ini melewatkan backfill WAL selama fase lengkap dan mempercepat startup pekerjaan.Konfigurasi ini hanya menyediakan semantik at-least-once. Konfigurasi ini tidak cocok untuk pekerjaan dengan komputasi stateful, seperti agregasi atau join tabel dimensi, karena perubahan historis yang diperlukan untuk state antara mungkin hilang.
Saat snapshot inkremental dinonaktifkan, checkpoint tidak didukung selama fase pemindaian tabel penuh.
Gunakan ulang langganan Postgres
Konektor Postgres CDC bergantung pada publication untuk menentukan perubahan tabel mana yang didorong ke sebuah slot. Jika beberapa pekerjaan berbagi publication yang sama, konfigurasi mereka akan ditimpa.
Penyebab
Nilai default dari publication.autocreate.mode adalah filtered, yang hanya mencakup tabel yang ditentukan dalam konfigurasi konektor. Mode ini mengubah tabel dalam publication saat pekerjaan dimulai, yang dapat memengaruhi operasi baca pekerjaan lain.
Solusi
Buat publication secara manual di PostgreSQL yang mencakup semua tabel yang akan dipantau. Atau, buat publication terpisah untuk setiap pekerjaan.
-- Buat publication bernama my_flink_pub yang mencakup semua tabel (atau tabel tertentu, buat satu publication per pekerjaan) CREATE PUBLICATION my_flink_pub FOR TABLE table_a, table_b; -- Atau lebih sederhana, sertakan semua tabel dalam database CREATE PUBLICATION my_flink_pub FOR ALL TABLES;CatatanBerlangganan ke semua tabel dalam database tidak disarankan. Jika database besar dan berisi banyak tabel, hal ini dapat menyebabkan network bandwidth terbuang dan konsumsi CPU tinggi pada kluster Flink.
Tambahkan konfigurasi Flink berikut:
debezium.publication.name = 'my_flink_pub'(Menentukan nama publication)debezium.publication.autocreate.mode = 'disabled'(Mencegah Flink mencoba membuat atau memodifikasi publication saat startup)
Pendekatan ini memberikan isolasi penuh dan memungkinkan Anda mengelola publication secara manual alih-alih mengandalkan Flink. Hal ini mencegah pekerjaan baru memengaruhi pekerjaan yang sudah ada dan memberikan kontrol akses yang lebih aman.
SQL
Sintaks
CREATE TABLE postgrescdc_source (
id INT NOT NULL,
name STRING,
description STRING,
weight DECIMAL(10,3)
) WITH (
'connector' = 'postgres-cdc',
'hostname' = '<host name>',
'port' = '<port>',
'username' = '<user name>',
'password' = '<password>',
'database-name' = '<database name>',
'schema-name' = '<schema name>',
'table-name' = '<table name>',
'decoding.plugin.name'= 'pgoutput',
'scan.incremental.snapshot.enabled' = 'true',
-- Melewatkan backfill dapat mempercepat pembacaan dan mengurangi penggunaan resource, tetapi dapat menyebabkan duplikasi data. Aktifkan ini jika sink downstream bersifat idempoten.
'scan.incremental.snapshot.backfill.skip' = 'false',
-- Di lingkungan produksi, atur ini ke 'filtered' atau 'disabled' dan kelola publication secara manual alih-alih melalui Flink.
'debezium-publication.autocreate.mode' = 'disabled'
-- Jika Anda memiliki beberapa sumber, konfigurasikan publication yang berbeda untuk setiap sumber.
--'debezium.publication.name' = 'my_flink_pub'
);Parameter WITH
Parameter | Deskripsi | Tipe data | Diperlukan | Default | Keterangan |
connector | Jenis konektor. | STRING | Ya | Tidak ada | Nilainya harus |
hostname | Alamat IP atau nama host dari database PostgreSQL. | STRING | Ya | Tidak ada | Tidak ada. |
username | Username untuk layanan database PostgreSQL. | STRING | Ya | Tidak ada | Tidak ada. |
password | Password untuk layanan database PostgreSQL. | STRING | Ya | Tidak ada | Tidak ada. |
database-name | Nama database. | STRING | Ya | Tidak ada | Nama database. |
schema-name | Nama skema PostgreSQL. | STRING | Ya | Tidak ada | Nama skema mendukung ekspresi reguler untuk membaca data dari beberapa skema. |
table-name | Nama tabel PostgreSQL. | STRING | Ya | Tidak ada | Nama tabel mendukung ekspresi reguler untuk membaca data dari beberapa tabel. |
port | Nomor port layanan database PostgreSQL. | INTEGER | Tidak | 5432 | Tidak ada. |
decoding.plugin.name | Nama plugin decoding logis PostgreSQL. | STRING | Tidak | decoderbufs | Hal ini ditentukan oleh plugin yang diinstal pada layanan PostgreSQL. Plugin yang didukung adalah:
|
slot.name | Nama slot decoding logis. | STRING | Wajib untuk VVR 8.0.1 dan versi selanjutnya. Opsional untuk versi sebelumnya. | Tidak ada untuk VVR 8.0.1 dan versi selanjutnya. Default-nya adalah `flink` untuk versi sebelumnya. | Atur |
debezium.* | Properti dan parameter Debezium | STRING | Tidak | Tidak ada | Memberikan kontrol lebih rinci atas perilaku klien Debezium. Misalnya, |
scan.incremental.snapshot.enabled | Menentukan apakah snapshot inkremental diaktifkan. | BOOLEAN | Tidak | false | Nilai valid:
Catatan
|
scan.startup.mode | Mode startup untuk konsumsi data. | STRING | Tidak | initial | Nilai valid:
|
changelog-mode | Mode changelog untuk encoding perubahan stream. | String | Tidak | all | Mode changelog yang didukung:
|
heartbeat.interval.ms | Interval pengiriman paket heartbeat. | Duration | Tidak | 30s | Unitnya adalah milidetik. Konektor Postgres CDC secara aktif mengirim heartbeat ke database untuk memajukan offset slot. Saat perubahan tabel jarang terjadi, pengaturan nilai ini memastikan pembersihan log WAL yang tepat waktu. |
scan.incremental.snapshot.chunk.key-column | Menentukan kolom yang digunakan sebagai kunci chunk untuk memisahkan shard selama fase snapshot. | STRING | Tidak | Tidak ada | Secara default, kolom pertama dari primary key dipilih. |
scan.incremental.close-idle-reader.enabled | Menentukan apakah reader idle ditutup setelah snapshot selesai. | Boolean | Tidak | false | Untuk mengaktifkan konfigurasi ini, atur |
scan.incremental.snapshot.backfill.skip | Menentukan apakah pembacaan log dilewati selama fase lengkap. | Boolean | Tidak | false | Nilai valid:
|
Pemetaan tipe
Tabel berikut menunjukkan pemetaan antara tipe bidang PostgreSQL dan Flink.
Tipe bidang PostgreSQL | Tipe bidang Flink |
SMALLINT | SMALLINT |
INT2 | |
SMALLSERIAL | |
SERIAL2 | |
INTEGER | INT |
SERIAL | |
BIGINT | BIGINT |
BIGSERIAL | |
REAL | FLOAT |
FLOAT4 | |
FLOAT8 | DOUBLE |
DOUBLE PRECISION | |
NUMERIC(p, s) | DECIMAL(p, s) |
DECIMAL(p, s) | |
BOOLEAN | BOOLEAN |
DATE | DATE |
TIME [(p)] [WITHOUT TIMEZONE] | TIME [(p)] [WITHOUT TIMEZONE] |
TIMESTAMP [(p)] [WITHOUT TIMEZONE] | TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
CHAR(n) | STRING |
CHARACTER(n) | |
VARCHAR(n) | |
CHARACTER VARYING(n) | |
TEXT | |
BYTEA | BYTES |
Contoh
CREATE TABLE source (
id INT NOT NULL,
name STRING,
description STRING,
weight DECIMAL(10,3)
) WITH (
'connector' = 'postgres-cdc',
'hostname' = '<host name>',
'port' = '<port>',
'username' = '<user name>',
'password' = '<password>',
'database-name' = '<database name>',
'schema-name' = '<schema name>',
'table-name' = '<table name>'
);
SELECT * FROM source;Data ingestion
Mulai dari Realtime Compute for Apache Flink V11.4, Anda dapat menggunakan konektor PostgreSQL sebagai sumber data dalam pekerjaan Data Ingestion YAML.
Sintaksis
source:
type: postgres
name: PostgreSQL Source
hostname: localhost
port: 5432
username: pg_username
password: pg_password
tables: db.scm.tbl
slot.name: test_slot
scan.startup.mode: initial
server-time-zone: UTC
connect.timeout: 120s
decoding.plugin.name: decoderbufs
sink:
type: ...Parameter
Parameter | Deskripsi | Wajib | Tipe data | Default | Keterangan |
type | Jenis sumber data. | Ya | STRING | Tidak ada | Nilainya harus `postgres`. |
name | Nama sumber data. | Tidak | STRING | Tidak ada | Tidak ada. |
hostname | Nama domain atau alamat IP server database PostgreSQL. | Ya | STRING | (none) | Tidak ada. |
port | Port yang diekspos oleh server database PostgreSQL. | Tidak | INTEGER | 5432 | Tidak ada. |
username | Username PostgreSQL. | Ya | STRING | (none) | Tidak ada. |
password | Password PostgreSQL. | Ya | STRING | (none) | Tidak ada. |
tables | Nama tabel database PostgreSQL yang akan ditangkap. Ekspresi reguler didukung untuk memantau beberapa tabel yang sesuai dengan ekspresi tersebut. | Ya | STRING | (none) | Penting Saat ini, hanya tabel dalam database yang sama yang dapat ditangkap. Titik (.) diperlakukan sebagai pemisah untuk nama database, skema, dan tabel. Untuk menggunakan titik (.) dalam ekspresi reguler untuk mencocokkan karakter apa pun, Anda harus meng-escape-nya dengan backslash. Contoh: |
slot.name | Nama slot replikasi PostgreSQL. | Ya | STRING | (none) | Nama harus mematuhi aturan penamaan slot replikasi PostgreSQL dan dapat berisi huruf kecil, angka, dan garis bawah. |
decoding.plugin.name | Nama plugin logical decoding PostgreSQL yang diinstal di server. | Tidak | STRING |
| Nilai opsional termasuk |
tables.exclude | Nama tabel database PostgreSQL yang akan dikecualikan. Parameter ini berlaku setelah parameter `tables`. | Tidak | STRING | (none) | Nama tabel juga mendukung ekspresi reguler untuk mengecualikan beberapa tabel yang sesuai dengan ekspresi tersebut. Penggunaannya sama seperti parameter `tables`. |
server-time-zone | Zona waktu sesi server database, seperti "Asia/Shanghai". | Tidak | STRING | (none) | Jika tidak diatur, zona waktu default sistem ( |
scan.incremental.snapshot.chunk.size | Ukuran (jumlah baris) setiap chunk dalam kerangka kerja snapshot inkremental. | Tidak | INTEGER | 8096 | Saat pembacaan snapshot inkremental diaktifkan, tabel dibagi menjadi beberapa chunk untuk dibaca. Data suatu chunk di-cache di memori sebelum sepenuhnya dibaca. Jumlah baris per chunk yang lebih kecil menghasilkan jumlah total chunk yang lebih besar untuk tabel tersebut. Meskipun hal ini mengurangi granularitas pemulihan kesalahan, hal ini dapat menyebabkan error kehabisan memori (OOM) dan throughput keseluruhan yang lebih rendah. Oleh karena itu, Anda perlu menyeimbangkan dan mengatur ukuran chunk yang wajar. |
scan.snapshot.fetch.size | Jumlah maksimum catatan yang diambil sekaligus saat membaca data lengkap tabel. | Tidak | INTEGER | 1024 | Tidak ada. |
scan.startup.mode | Mode startup untuk konsumsi data. | Tidak | STRING | initial | Nilai yang valid:
|
scan.incremental.close-idle-reader.enabled | Menentukan apakah reader idle ditutup setelah snapshot selesai. | Tidak | BOOLEAN | false | Untuk mengaktifkan konfigurasi ini, atur `execution.checkpointing.checkpoints-after-tasks-finish.enabled` ke `true`. |
scan.lsn-commit.checkpoints-num-delay | Jumlah checkpoint yang ditunda sebelum mulai melakukan commit offset LSN. | Tidak | INTEGER | 3 | Offset LSN checkpoint dilakukan commit secara bergilir untuk mencegah ketidakmampuan memulihkan dari state. |
connect.timeout | Waktu maksimum yang harus ditunggu konektor setelah mencoba terhubung ke server database PostgreSQL sebelum timeout. | Tidak | DURATION | 30s | Nilai ini tidak boleh kurang dari 250 milidetik. |
connect.max-retries | Jumlah maksimum upaya konektor untuk membuat koneksi ke server database PostgreSQL. | Tidak | INTEGER | 3 | Tidak ada. |
connection.pool.size | Ukuran kolam koneksi. | Tidak | INTEGER | 20 | Tidak ada. |
jdbc.properties.* | Memungkinkan pengguna meneruskan properti URL JDBC kustom. | Tidak | STRING | 20 | Pengguna dapat meneruskan properti kustom, seperti |
heartbeat.interval | Interval pengiriman event heartbeat untuk melacak offset log WAL terbaru yang tersedia. | Tidak | DURATION | 30s | Tidak ada. |
debezium.* | Meneruskan properti Debezium ke Debezium Embedded Engine, yang digunakan untuk menangkap perubahan data dari server PostgreSQL. | Tidak | STRING | (none) | Untuk informasi lebih lanjut tentang properti konektor Debezium PostgreSQL, lihat dokumentasi terkait. |
chunk-meta.group.size | Ukuran metadata chunk. | Tidak | STRING | 1000 | Jika metadata lebih besar dari nilai ini, metadata tersebut diteruskan dalam beberapa bagian. |
metadata.list | Daftar metadata yang dapat dibaca yang diteruskan ke downstream, yang dapat digunakan dalam modul transform. | Tidak | STRING | false | Gunakan koma (,) sebagai pemisah. Saat ini, metadata yang tersedia adalah: |
scan.incremental.snapshot.unbounded-chunk-first.enabled | Menentukan apakah chunk tak terbatas dikirim terlebih dahulu selama fase pembacaan snapshot. | Tidak | STRING | false | Nilai yang valid:
Penting Ini adalah fitur eksperimental. Mengaktifkannya dapat mengurangi risiko error kehabisan memori (OOM) saat TaskManager menyinkronkan chunk terakhir selama fase snapshot. Kami merekomendasikan menambahkan ini sebelum startup pertama pekerjaan. |
Referensi
Untuk daftar konektor yang didukung oleh Realtime Compute for Apache Flink, lihat Konektor yang didukung.
Untuk menulis data ke tabel sink PolarDB for PostgreSQL (Oracle Compatible), lihat PolarDB for PostgreSQL (Oracle Compatible).
Untuk membaca dari atau menulis ke RDS for MySQL, PolarDB for MySQL, atau database MySQL yang dikelola sendiri, gunakan konektor MySQL.