Konektor Change Data Capture (CDC) PostgreSQL membaca snapshot lengkap dan data perubahan dari database PostgreSQL. Konektor ini memastikan setiap catatan data dibaca tepat satu kali dan mempertahankan semantik tepat-sekali selama pemulihan kesalahan. Topik ini menjelaskan cara menggunakan konektor PostgreSQL CDC.
Informasi latar belakang
Konektor Postgres CDC memiliki kemampuan berikut.
Item | Deskripsi |
Tipe yang didukung | Tabel sumber Catatan Anda dapat menggunakan konektor JDBC untuk membuat tabel sink atau tabel dimensi. |
Mode operasi | Hanya mode streaming |
Format data | Tidak berlaku |
Metrik pemantauan spesifik |
Catatan
|
Tipe API | SQL dan Data Ingestion YAML |
Pembaruan atau penghapusan data di tabel sink | Tidak berlaku |
Fitur
Konektor PostgreSQL CDC menggunakan kerangka kerja snapshot inkremental yang tersedia di Realtime Compute for Apache Flink dengan Ververica Runtime (VVR) 8.0.6 atau versi yang lebih baru. Konektor ini pertama-tama membaca data historis lengkap, lalu secara otomatis beralih ke pembacaan log perubahan write-ahead logging (WAL). Proses ini memastikan tidak ada data yang terlewat atau diduplikasi. Bahkan jika terjadi kegagalan, data diproses dengan semantik tepat-sekali. Tabel sumber PostgreSQL CDC mendukung pembacaan konkuren data lengkap, pembacaan tanpa lock, serta transfer data yang dapat dilanjutkan kembali.
Sebagai tabel sumber, konektor ini memiliki fitur dan keunggulan berikut:
Menyatukan pemrosesan stream dan batch dengan mendukung pembacaan data lengkap dan inkremental, sehingga menghilangkan kebutuhan untuk memelihara dua proses terpisah.
Mendukung pembacaan konkuren data lengkap untuk penskalaan performa horizontal.
Beralih secara mulus dari pembacaan data lengkap ke pembacaan data inkremental dan secara otomatis mengecilkan skala untuk menghemat sumber daya komputasi.
Mendukung transfer data yang dapat dilanjutkan kembali selama fase pembacaan data lengkap guna meningkatkan stabilitas.
Membaca data lengkap tanpa lock untuk menghindari gangguan pada operasi bisnis online.
Prasyarat
Konektor PostgreSQL CDC membaca aliran data CDC menggunakan fitur replikasi logis dari database PostgreSQL. Konektor ini mendukung Alibaba Cloud RDS for PostgreSQL, Amazon RDS for PostgreSQL, dan PostgreSQL yang dikelola sendiri.
Konfigurasi untuk Alibaba Cloud RDS for PostgreSQL, Amazon RDS for PostgreSQL, dan PostgreSQL yang dikelola sendiri berbeda. Sebelum memulai, selesaikan konfigurasi yang diperlukan seperti yang dijelaskan dalam dokumen Konfigurasi PostgreSQL.
Setelah konfigurasi selesai, pastikan kondisi berikut terpenuhi:
Nilai parameter wal_level diatur ke logical. Ini menambahkan informasi yang diperlukan untuk encoding logis ke write-ahead logging (WAL).
REPLICA IDENTITY dari tabel yang berlangganan diatur ke FULL. Ini memastikan bahwa event INSERT dan UPDATE menyertakan nilai sebelumnya dari semua kolom dalam tabel, sehingga menjamin konsistensi sinkronisasi data.
CatatanREPLICA IDENTITY adalah pengaturan tingkat tabel yang spesifik untuk PostgreSQL. Pengaturan ini menentukan apakah plugin decoding logis menyertakan nilai sebelumnya dari kolom tabel yang terlibat selama event INSERT dan UPDATE. Untuk informasi lebih lanjut tentang nilai REPLICA IDENTITY, lihat REPLICA IDENTITY.
Nilai parameter max_wal_senders dan max_replication_slots lebih besar daripada jumlah slot replikasi yang sedang digunakan oleh database ditambah jumlah slot yang dibutuhkan oleh pekerjaan Flink.
Akun memiliki izin sistem SUPERUSER atau memiliki izin LOGIN dan REPLICATION. Akun tersebut juga harus memiliki izin SELECT pada tabel yang berlangganan untuk mengkueri data lengkap.
Perhatian
Fitur snapshot inkremental PostgreSQL CDC hanya didukung di Realtime Compute for Apache Flink V8.0.6 dan versi yang lebih baru.
Pekerjaan Flink PostgreSQL CDC bergantung pada Replication Slot untuk memastikan bahwa write-ahead log (WAL) tidak dipurge secara prematur, sehingga menjamin konsistensi data. Namun, manajemen yang tidak tepat dapat menyebabkan masalah seperti disk space terbuang atau latensi pembacaan data. Ikuti rekomendasi berikut:
Segera purge slot yang tidak lagi digunakan
Flink tidak menghapus secara otomatis slot replikasi, bahkan setelah pekerjaan dihentikan, terutama dalam skenario restart tanpa status. Perilaku ini mencegah kehilangan data yang dapat terjadi jika WAL dipurge.
Jika Anda memastikan bahwa pekerjaan tidak akan dijalankan ulang, Anda harus menghapus slot replikasi terkait secara manual untuk membebaskan disk space.
PentingManajemen siklus hidup: Perlakukan slot replikasi sebagai bagian dari sumber daya pekerjaan. Kelola secara sinkron dengan operasi start dan stop pekerjaan.
Hindari penggunaan ulang slot lama
Pekerjaan baru harus menggunakan nama slot baru alih-alih menggunakan ulang slot lama. Penggunaan ulang slot dapat menyebabkan pekerjaan membaca volume besar data WAL historis saat startup, sehingga menunda pembacaan data terbaru.
Replikasi logis PostgreSQL mensyaratkan bahwa slot hanya dapat digunakan oleh satu koneksi. Oleh karena itu, pekerjaan yang berbeda harus menggunakan nama slot yang berbeda.
PentingKonvensi penamaan: Saat menyesuaikan `slot.name`, hindari penggunaan nama dengan akhiran numerik, seperti `my_slot_1`, untuk mencegah konflik dengan slot temporary.
Perilaku slot ketika snapshot inkremental diaktifkan
Prasyarat: Anda harus mengaktifkan checkpointing, dan tabel sumber harus memiliki primary key.
Aturan pembuatan slot:
Snapshot inkremental dinonaktifkan: Hanya mendukung konkurensi tunggal, yang menggunakan satu slot global.
Snapshot inkremental diaktifkan:
Fase lengkap: Setiap subtask sumber konkuren membuat slot temporary dengan nama dalam format
${slot.name}_${task_id}.Fase inkremental: Semua slot temporary secara otomatis diklaim kembali. Hanya satu slot global yang dipertahankan.
Jumlah maksimum slot: Konkurensi sumber + 1 (selama fase snapshot lengkap)
Sumber daya dan performa
Jika jumlah slot atau kapasitas disk space di PostgreSQL terbatas, Anda dapat mengurangi konkurensi pada fase snapshot lengkap untuk menurunkan jumlah slot temporary. Hal ini akan memperlambat kecepatan pembacaan data lengkap.
Jika sistem downstream mendukung penulisan idempoten, Anda dapat mengatur
scan.incremental.snapshot.backfill.skip = trueuntuk melewati backfill WAL selama fase lengkap dan mempercepat kecepatan startup.Konfigurasi ini hanya menyediakan semantik at-least-once. Konfigurasi ini tidak cocok untuk pekerjaan dengan komputasi stateful seperti agregasi atau join tabel dimensi, karena perubahan historis yang diperlukan untuk state antara mungkin hilang.
Ketika fitur snapshot inkremental tidak diaktifkan, konektor PostgreSQL CDC tidak mendukung pelaksanaan checkpoint selama fase pemindaian tabel penuh.
Jika snapshot inkremental tidak diaktifkan, pekerjaan mungkin mengalami failover akibat timeout checkpoint jika checkpoint dipicu selama fase pemindaian tabel penuh. Oleh karena itu, Anda dapat mengonfigurasi parameter berikut di bagian Other Configurations untuk mencegah failover akibat timeout checkpoint selama fase sinkronisasi lengkap. Untuk informasi lebih lanjut, lihat Bagaimana cara mengonfigurasi parameter runtime kustom untuk pekerjaan?.
execution.checkpointing.interval: 10min execution.checkpointing.tolerable-failed-checkpoints: 100 restart-strategy: fixed-delay restart-strategy.fixed-delay.attempts: 2147483647Tabel berikut menjelaskan parameter-parameter tersebut.
Parameter
Deskripsi
Catatan
execution.checkpointing.interval
Interval di mana pengecekan titik kontrol dipicu.
Tipe data: Duration. Contoh: 10 min atau 30 s.
execution.checkpointing.tolerable-failed-checkpoints
Jumlah kegagalan checkpoint yang dapat ditoleransi.
Hasil perkalian nilai parameter ini dengan interval penjadwalan checkpoint menentukan waktu pembacaan snapshot yang diizinkan.
CatatanJika tabel sangat besar, atur parameter ini ke nilai yang lebih besar.
restart-strategy
Kebijakan restart.
Nilai valid:
fixed-delay: Kebijakan restart dengan delay tetap.
failure-rate: Kebijakan restart berdasarkan laju kegagalan.
exponential-delay: Kebijakan restart dengan delay eksponensial.
Untuk informasi lebih lanjut, lihat Strategi Restart.
restart-strategy.fixed-delay.attempts
Jumlah maksimum upaya restart untuk kebijakan restart fixed-delay.
Tidak ada.
SQL
Sintaksis
CREATE TABLE postgrescdc_source (
shipment_id INT,
order_id INT,
origin STRING,
destination STRING,
is_arrived BOOLEAN
) WITH (
'connector' = 'postgres-cdc',
'hostname' = '<yourHostname>',
'port' = '5432',
'username' = '<yourUserName>',
'password' = '<yourPassWord>',
'database-name' = '<yourDatabaseName>',
'schema-name' = '<yourSchemaName>',
'table-name' = '<yourTableName>'
);Parameter dalam klausa WITH
Parameter | Deskripsi | Tipe data | Diperlukan | Nilai default | Catatan |
connector | Tipe konektor. | STRING | Ya | Tidak ada | Nilainya harus |
hostname | Alamat IP atau nama host dari database PostgreSQL. | STRING | Ya | Tidak ada | Tidak ada. |
username | Username untuk layanan database PostgreSQL. | STRING | Ya | Tidak ada | Tidak ada. |
password | Password untuk layanan database PostgreSQL. | STRING | Ya | Tidak ada | Tidak ada. |
database-name | Nama database. | STRING | Ya | Tidak ada | Nama database. |
schema-name | Nama skema PostgreSQL. | STRING | Ya | Tidak ada | Nama skema mendukung ekspresi reguler untuk membaca data dari beberapa skema. |
table-name | Nama tabel PostgreSQL. | STRING | Ya | Tidak ada | Nama tabel mendukung ekspresi reguler untuk membaca data dari beberapa tabel. |
port | Nomor port layanan database PostgreSQL. | INTEGER | Tidak | 5432 | Tidak ada. |
decoding.plugin.name | Nama plugin decoding logis PostgreSQL. | STRING | Tidak | decoderbufs | Ini ditentukan oleh plugin yang diinstal pada layanan PostgreSQL. Plugin yang didukung adalah sebagai berikut:
|
slot.name | Nama slot decoding logis. | STRING | Opsional untuk versi sebelum 8.0.1. Wajib untuk versi 8.0.1 dan yang lebih baru. | Nilai default adalah flink untuk versi sebelum 8.0.1. Tidak ada nilai default untuk versi 8.0.1 dan yang lebih baru. | Atur parameter |
debezium.* | Parameter properti Debezium. | STRING | Tidak | Tidak ada | Memberikan kontrol lebih granular atas perilaku klien Debezium. Misalnya, |
scan.incremental.snapshot.enabled | Menentukan apakah snapshot inkremental diaktifkan. | BOOLEAN | Tidak | false | Nilai valid:
|
scan.startup.mode | Mode startup untuk konsumsi data. | STRING | Tidak | initial | Nilai valid:
|
changelog-mode | Mode changelog untuk encoding perubahan stream. | String | Tidak | all | Mode changelog yang didukung meliputi:
|
heartbeat.interval.ms | Interval pengiriman paket heartbeat. | Duration | Tidak | 30s | Unitnya adalah milidetik. Konektor PostgreSQL CDC secara aktif mengirim paket heartbeat ke database untuk memajukan offset slot. Ketika perubahan tabel jarang terjadi, pengaturan nilai ini dapat segera memurge log WAL. |
scan.incremental.snapshot.chunk.key-column | Menentukan kolom sebagai kolom pemisahan untuk sharding pada fase snapshot. | STRING | Tidak | Tidak ada | Secara default, kolom pertama dari primary key dipilih. |
scan.incremental.close-idle-reader.enabled | Menentukan apakah reader idle ditutup setelah snapshot berakhir. | Boolean | Tidak | false | Parameter ini hanya berlaku jika Anda mengatur parameter |
scan.incremental.snapshot.backfill.skip | Menentukan apakah pembacaan log dilewati pada fase snapshot lengkap. | Boolean | Tidak | false | Nilai valid:
|
Pemetaan tipe data
Tabel berikut menjelaskan pemetaan antara tipe data PostgreSQL dan Flink.
Tipe data PostgreSQL | Tipe data Flink |
SMALLINT | SMALLINT |
INT2 | |
SMALLSERIAL | |
SERIAL2 | |
INTEGER | INT |
SERIAL | |
BIGINT | BIGINT |
BIGSERIAL | |
REAL | FLOAT |
FLOAT4 | |
FLOAT8 | DOUBLE |
DOUBLE PRECISION | |
NUMERIC(p, s) | DECIMAL(p, s) |
DECIMAL(p, s) | |
BOOLEAN | BOOLEAN |
DATE | DATE |
TIME [(p)] [WITHOUT TIMEZONE] | TIME [(p)] [WITHOUT TIMEZONE] |
TIMESTAMP [(p)] [WITHOUT TIMEZONE] | TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
CHAR(n) | STRING |
CHARACTER(n) | |
VARCHAR(n) | |
CHARACTER VARYING(n) | |
TEXT | |
BYTEA | BYTES |
Kode contoh
CREATE TABLE source (
id INT NOT NULL,
name STRING,
description STRING,
weight DECIMAL(10,3)
) WITH (
'connector' = 'postgres-cdc',
'hostname' = '<host name>',
'port' = '<port>',
'username' = '<user name>',
'password' = '<password>',
'database-name' = '<database name>',
'schema-name' = '<schema name>',
'table-name' = '<table name>'
);
SELECT * FROM source;Data ingestion
Di Realtime Compute for Apache Flink V11.4 dan versi yang lebih baru, Anda dapat menggunakan konektor PostgreSQL sebagai sumber data dalam pekerjaan Data Ingestion YAML.
Sintaksis
source:
type: postgres
name: PostgreSQL Source
hostname: localhost
port: 5432
username: pg_username
password: pg_password
tables: db.scm.tbl
slot.name: test_slot
scan.startup.mode: initial
server-time-zone: UTC
connect.timeout: 120s
decoding.plugin.name: decoderbufs
sink:
type: ...Parameter
Parameter | Deskripsi | Wajib | Tipe data | Nilai default | Catatan |
type | Tipe sumber data. | Ya | STRING | Tidak ada | Nilainya harus postgres. |
name | Nama sumber data. | Tidak | STRING | Tidak ada | Tidak ada. |
hostname | Nama domain atau alamat IP server database PostgreSQL. | Ya | STRING | (none) | Tidak ada. |
port | Port yang diekspos oleh server database PostgreSQL. | Tidak | INTEGER | 5432 | Tidak ada. |
username | Username untuk database PostgreSQL. | Ya | STRING | (none) | Tidak ada. |
password | Password untuk database PostgreSQL. | Ya | STRING | (none) | Tidak ada. |
tables | Nama tabel database PostgreSQL yang akan ditangkap. Anda dapat menggunakan ekspresi reguler untuk memantau beberapa tabel yang sesuai dengan ekspresi tersebut. | Ya | STRING | (none) | Penting Saat ini, Anda hanya dapat menangkap tabel dalam database yang sama. Titik (.) digunakan sebagai pemisah untuk nama database, skema, dan tabel. Untuk menggunakan titik (.) sebagai pencocokan karakter apa pun dalam ekspresi reguler, Anda harus meng-escape-nya dengan backslash (\). Contoh: |
slot.name | Nama slot replikasi PostgreSQL. | Ya | STRING | (none) | Nama harus mengikuti konvensi penamaan slot replikasi PostgreSQL dan hanya boleh berisi huruf kecil, angka, dan garis bawah. |
decoding.plugin.name | Nama plug-in decoding logis yang diinstal pada server PostgreSQL. | Tidak | STRING |
| Nilai yang valid meliputi |
tables.exclude | Nama tabel database PostgreSQL yang akan dikecualikan. Parameter ini berlaku setelah parameter tables. | Tidak | STRING | (none) | Anda juga dapat menggunakan ekspresi reguler untuk mengecualikan beberapa tabel yang sesuai dengan ekspresi tersebut. Penggunaannya sama seperti parameter tables. |
server-time-zone | Zona waktu sesi server database, seperti "Asia/Shanghai". | Tidak | STRING | (none) | Jika parameter ini tidak diatur, zona waktu default sistem ( |
scan.incremental.snapshot.chunk.size | Ukuran setiap chunk dalam kerangka kerja snapshot inkremental, ditentukan sebagai jumlah baris. | Tidak | INTEGER | 8096 | Saat Anda mengaktifkan pembacaan snapshot inkremental, tabel dibagi menjadi chunk. Data dari setiap chunk di-cache di memori sebelum sepenuhnya dibaca. Ukuran chunk yang lebih kecil menghasilkan lebih banyak chunk. Ini meningkatkan granularitas pemulihan kesalahan tetapi dapat menyebabkan error kehabisan memori (OOM) dan menurunkan throughput keseluruhan. Atur ukuran chunk yang wajar untuk menyeimbangkan faktor-faktor ini. |
scan.snapshot.fetch.size | Jumlah maksimum catatan yang diambil sekaligus saat membaca data lengkap tabel. | Tidak | INTEGER | 1024 | Tidak ada. |
scan.startup.mode | Mode startup untuk konsumsi data. | Tidak | STRING | initial | Nilai yang valid:
|
scan.incremental.close-idle-reader.enabled | Menentukan apakah reader idle ditutup setelah fase snapshot berakhir. | Tidak | BOOLEAN | false | Konfigurasi ini hanya berlaku jika Anda mengatur execution.checkpointing.checkpoints-after-tasks-finish.enabled ke true. |
scan.lsn-commit.checkpoints-num-delay | Jumlah checkpoint yang ditunda sebelum konektor mulai melakukan commit offset LSN. | Tidak | INTEGER | 3 | Offset LSN checkpoint dilakukan commit secara bergulir untuk mencegah kegagalan pemulihan dari state. |
connect.timeout | Waktu maksimum yang ditunggu konektor untuk terhubung ke server database PostgreSQL sebelum terjadi timeout. | Tidak | DURATION | 30s | Nilainya tidak boleh kurang dari 250 ms. |
connect.max-retries | Jumlah maksimum upaya retry untuk konektor agar dapat terhubung ke server database PostgreSQL. | Tidak | INTEGER | 3 | Tidak ada. |
connection.pool.size | Ukuran kolam koneksi. | Tidak | INTEGER | 20 | Tidak ada. |
jdbc.properties.* | Memungkinkan Anda meneruskan properti URL JDBC kustom. | Tidak | STRING | 20 | Anda dapat meneruskan properti kustom, seperti |
heartbeat.interval | Interval pengiriman event heartbeat untuk melacak offset WAL terbaru yang tersedia. | Tidak | DURATION | 30s | Tidak ada. |
debezium.* | Meneruskan properti Debezium ke Debezium Embedded Engine, yang digunakan untuk menangkap perubahan data dari server PostgreSQL. | Tidak | STRING | (none) | Untuk informasi lebih lanjut tentang properti konektor Debezium PostgreSQL, lihat dokumentasi terkait. |
chunk-meta.group.size | Ukuran metadata chunk. | Tidak | STRING | 1000 | Jika metadata lebih besar dari nilai ini, metadata diteruskan dalam beberapa bagian. |
metadata.list | Daftar metadata yang dapat dibaca yang diteruskan ke downstream dan dapat digunakan di modul transform. | Tidak | STRING | false | Gunakan koma (,) untuk memisahkan nilai. Metadata yang tersedia saat ini adalah: |
scan.incremental.snapshot.unbounded-chunk-first.enabled | Menentukan apakah chunk tak terbatas didistribusikan terlebih dahulu selama fase pembacaan snapshot. | Tidak | STRING | false | Nilai yang valid:
Penting Ini adalah fitur eksperimental. Jika Anda mengaktifkan fitur ini, Anda dapat mengurangi risiko error kehabisan memori (OOM) saat TaskManager menyinkronkan chunk terakhir dalam fase snapshot. Kami menyarankan Anda menambahkan konfigurasi ini sebelum pekerjaan dijalankan untuk pertama kalinya. |
Referensi
Untuk daftar konektor yang didukung oleh Realtime Compute for Apache Flink, lihat Konektor yang didukung.
Untuk menulis data ke tabel sink PolarDB for PostgreSQL (Oracle Compatible) 1.0, lihat PolarDB for PostgreSQL (Oracle Compatible) 1.0.
Untuk membaca dari atau menulis ke RDS for MySQL, PolarDB for MySQL, atau database MySQL yang dikelola sendiri, gunakan konektor MySQL.