全部产品
Search
文档中心

Realtime Compute for Apache Flink:Postgres CDC (Public Preview)

更新时间:Jan 22, 2026

Konektor Postgres CDC membaca snapshot lengkap dari database PostgreSQL, lalu menangkap perubahan data berikutnya. Proses ini memastikan bahwa setiap catatan diproses tepat satu kali (exactly once). Topik ini menjelaskan cara menggunakan konektor Postgres CDC.

Informasi latar belakang

Konektor Postgres CDC mendukung hal-hal berikut.

Kategori

Detail

Tipe yang didukung

Tabel sumber

Catatan

Gunakan konektor JDBC untuk tabel sink dan tabel dimensi.

Mode

Hanya mode stream

Format data

Tidak berlaku

Metrik unik

Metrik pemantauan unik

  • currentFetchEventTimeLag: Interval dari saat data dihasilkan hingga ditarik oleh operator Source.

  • currentEmitEventTimeLag: Interval dari saat data dihasilkan hingga meninggalkan operator Source.

  • sourceIdleTime: Durasi sumber tidak menghasilkan data baru.

Catatan
  • Metrik currentFetchEventTimeLag dan currentEmitEventTimeLag hanya berlaku pada fase inkremental. Pada fase lengkap, nilainya selalu 0.

  • Untuk informasi lebih lanjut tentang metrik tersebut, lihat Deskripsi metrik.

Tipe API

SQL dan Data Ingestion YAML

Mendukung pembaruan atau penghapusan data tabel sink

Tidak berlaku

Fitur

Konektor Postgres CDC terintegrasi dengan kerangka kerja snapshot inkremental untuk Change Data Capture (CDC), yang tersedia di Ververica Runtime (VVR) 8.0.6 dan versi selanjutnya. Setelah membaca data historis lengkap, konektor secara otomatis beralih ke pembacaan log perubahan dari write-ahead log (WAL). Proses ini menjamin semantik exactly-once. Tabel sumber Postgres CDC mendukung pembacaan data lengkap secara konkuren tanpa lock dan dapat dilanjutkan dari titik putus (breakpoint).

Sebagai tabel sumber, fitur dan keunggulannya adalah sebagai berikut:

  • Pemrosesan stream dan batch terpadu. Konektor mendukung pembacaan data lengkap maupun inkremental, sehingga menghilangkan kebutuhan untuk memelihara dua pekerjaan pemrosesan data terpisah.

  • Pembacaan data lengkap secara konkuren. Fitur ini memungkinkan skalabilitas horizontal untuk meningkatkan kinerja.

  • Peralihan mulus dari pembacaan lengkap ke inkremental. Konektor secara otomatis mengurangi skala untuk menghemat sumber daya komputasi.

  • Pembacaan yang dapat dilanjutkan. Konektor dapat melanjutkan dari titik putus selama fase pembacaan data lengkap, sehingga meningkatkan stabilitas Pekerjaan.

  • Pembacaan tanpa lock. Pembacaan data lengkap tidak memerlukan lock, sehingga mencegah dampak apa pun terhadap operasi bisnis online Anda.

Prasyarat

Konektor Postgres CDC membaca aliran change data capture (CDC) menggunakan fitur logical replication PostgreSQL. Konektor ini mendukung Alibaba Cloud RDS for PostgreSQL, Amazon RDS for PostgreSQL, dan self-managed PostgreSQL.

Penting

Konfigurasi yang diperlukan mungkin berbeda untuk Alibaba Cloud RDS for PostgreSQL, Amazon RDS for PostgreSQL, dan self-managed PostgreSQL. Untuk petunjuk konfigurasi terperinci, lihat dokumen Konfigurasi Postgres.

Setelah menyelesaikan konfigurasi, pastikan kondisi berikut terpenuhi:

  • Parameter wal_level harus diatur ke `logical`. Pengaturan ini menambahkan informasi yang diperlukan ke write-ahead log (WAL) untuk mendukung logical decoding.

  • REPLICA IDENTITY dari tabel yang berlangganan harus diatur ke `FULL`. Pengaturan ini memastikan bahwa event `INSERT` dan `UPDATE` mencakup nilai sebelumnya dari semua kolom dalam tabel, sehingga menjamin konsistensi data.

    Catatan

    `REPLICA IDENTITY` adalah pengaturan tingkat tabel yang spesifik untuk PostgreSQL. Pengaturan ini menentukan apakah event `INSERT` dan `UPDATE` mencakup nilai sebelumnya dari kolom yang terpengaruh. Untuk informasi lebih lanjut tentang nilai-nilai `REPLICA IDENTITY`, lihat REPLICA IDENTITY.

  • Pastikan nilai parameter `max_wal_senders` dan `max_replication_slots` lebih besar daripada jumlah slot replikasi yang sedang digunakan ditambah jumlah slot yang dibutuhkan oleh pekerjaan Flink.

  • Pastikan akun memiliki hak istimewa `SUPERUSER`, atau memiliki izin `LOGIN` dan `REPLICATION`. Akun tersebut juga harus memiliki izin `SELECT` pada tabel yang berlangganan untuk mengkueri data lengkap.

Catatan penggunaan

Hanya Realtime Compute for Apache Flink V8.0.6 dan versi selanjutnya yang mendukung fitur snapshot inkremental Postgres CDC.

Slot replikasi

Pekerjaan Flink PostgreSQL CDC menggunakan slot replikasi untuk mencegah write-ahead log (WAL) dihapus sebelum waktunya dan untuk memastikan konsistensi data. Jika tidak dikelola dengan baik, slot replikasi dapat menyebabkan masalah seperti disk space terbuang atau keterlambatan pembacaan data. Kami merekomendasikan praktik terbaik berikut:

  • Bersihkan slot yang tidak digunakan segera

    • Flink tidak menghapus slot replikasi secara otomatis, bahkan setelah pekerjaan berhenti. Hal ini terutama berlaku untuk restart tanpa status (stateless). Perilaku ini mencegah kehilangan data yang dapat terjadi jika WAL dihapus.

    • Jika Anda memastikan bahwa pekerjaan tidak akan dijalankan ulang, Anda harus menghapus slot replikasi terkait secara manual untuk membebaskan disk space.

      Penting

      Manajemen siklus hidup: Perlakukan slot replikasi sebagai resource pekerjaan dan kelola secara sinkron dengan proses start dan stop pekerjaan.

  • Hindari penggunaan ulang slot lama

    • Pekerjaan baru harus menggunakan nama slot baru alih-alih menggunakan ulang yang lama. Penggunaan ulang slot dapat menyebabkan pekerjaan membaca banyak data WAL historis saat startup, sehingga menunda pemrosesan data terbaru.

    • Replikasi logis PostgreSQL mensyaratkan bahwa sebuah slot hanya dapat digunakan oleh satu koneksi. Pekerjaan yang berbeda harus menggunakan nama slot yang berbeda.

      Penting

      Konvensi penamaan: Saat menyesuaikan `slot.name`, hindari penggunaan nama dengan akhiran numerik, seperti `my_slot_1`, untuk mencegah konflik dengan slot temporary.

  • Perilaku slot dengan snapshot inkremental diaktifkan

    • Prasyarat: Checkpoint harus diaktifkan, dan tabel sumber harus memiliki primary key yang ditentukan.

    • Aturan pembuatan slot:

      • Snapshot inkremental dinonaktifkan: Hanya mendukung konkurensi tunggal. Digunakan satu slot global.

      • Snapshot inkremental diaktifkan:

        • Fase lengkap: Setiap subtask sumber konkuren membuat slot sementara. Format penamaannya adalah ${slot.name}_${task_id}.

        • Fase inkremental: Semua slot temporary secara otomatis diklaim kembali. Hanya satu slot global yang dipertahankan.

    • Jumlah maksimum slot: Konkurensi sumber + 1 (selama fase lengkap)

  • Sumber daya dan performa

    • Jika jumlah slot yang tersedia atau jumlah disk space di PostgreSQL terbatas, Anda dapat mengurangi konkurensi untuk fase lengkap agar menggunakan lebih sedikit slot sementara. Tindakan ini mengurangi kecepatan baca selama fase lengkap.

    • Jika sink downstream mendukung penulisan idempoten, Anda dapat mengatur scan.incremental.snapshot.backfill.skip = true. Pengaturan ini melewatkan backfill WAL selama fase lengkap dan mempercepat startup pekerjaan.

      Konfigurasi ini hanya menyediakan semantik at-least-once. Konfigurasi ini tidak cocok untuk pekerjaan dengan komputasi stateful, seperti agregasi atau join tabel dimensi, karena perubahan historis yang diperlukan untuk state antara mungkin hilang.

  • Saat snapshot inkremental dinonaktifkan, checkpoint tidak didukung selama fase pemindaian tabel penuh.

    Konfigurasi untuk menghindari timeout selama fase sinkronisasi penuh

    Saat snapshot inkremental dinonaktifkan, jika pekerjaan Anda memicu checkpoint selama fase pemindaian tabel penuh, pekerjaan tersebut mungkin mengalami failover karena timeout checkpoint. Untuk mencegah hal ini, Anda dapat mengonfigurasi parameter berikut di Other Configurations. Untuk informasi lebih lanjut, lihat Bagaimana cara mengonfigurasi parameter runtime kustom untuk pekerjaan?. Konfigurasi ini mencegah failover yang disebabkan oleh timeout checkpoint selama fase sinkronisasi penuh.

    execution.checkpointing.interval: 10min
    execution.checkpointing.tolerable-failed-checkpoints: 100
    restart-strategy: fixed-delay
    restart-strategy.fixed-delay.attempts: 2147483647

    Tabel berikut menjelaskan parameter-parameter tersebut.

    Parameter

    Deskripsi

    Keterangan

    execution.checkpointing.interval

    Interval antar checkpoint.

    Unitnya adalah nilai Duration, seperti 10min atau 30s.

    execution.checkpointing.tolerable-failed-checkpoints

    Jumlah kegagalan checkpoint yang dapat ditoleransi sebelum pekerjaan gagal.

    Hasil perkalian parameter ini dengan interval penjadwalan checkpoint merupakan waktu baca snapshot yang diizinkan.

    Catatan

    Jika tabel sangat besar, atur parameter ini ke nilai yang lebih besar.

    restart-strategy

    Strategi restart pekerjaan.

    Nilai valid:

    • fixed-delay: Strategi restart dengan delay tetap.

    • failure-rate: Strategi restart berdasarkan tingkat kegagalan.

    • exponential-delay: Strategi restart dengan delay eksponensial.

    Untuk informasi lebih lanjut, lihat Strategi Restart.

    restart-strategy.fixed-delay.attempts

    Jumlah maksimum upaya restart untuk strategi restart fixed-delay.

    Tidak ada.

Gunakan ulang langganan Postgres

Konektor Postgres CDC bergantung pada publication untuk menentukan perubahan tabel mana yang didorong ke sebuah slot. Jika beberapa pekerjaan berbagi publication yang sama, konfigurasi mereka akan ditimpa.

Penyebab

Nilai default dari publication.autocreate.mode adalah filtered, yang hanya mencakup tabel yang ditentukan dalam konfigurasi konektor. Mode ini mengubah tabel dalam publication saat pekerjaan dimulai, yang dapat memengaruhi operasi baca pekerjaan lain.

Solusi

  1. Buat publication secara manual di PostgreSQL yang mencakup semua tabel yang akan dipantau. Atau, buat publication terpisah untuk setiap pekerjaan.

    -- Buat publication bernama my_flink_pub yang mencakup semua tabel (atau tabel tertentu, buat satu publication per pekerjaan)
    CREATE PUBLICATION my_flink_pub FOR TABLE table_a, table_b;
    -- Atau lebih sederhana, sertakan semua tabel dalam database
    CREATE PUBLICATION my_flink_pub FOR ALL TABLES;
    Catatan

    Berlangganan ke semua tabel dalam database tidak disarankan. Jika database besar dan berisi banyak tabel, hal ini dapat menyebabkan network bandwidth terbuang dan konsumsi CPU tinggi pada kluster Flink.

  2. Tambahkan konfigurasi Flink berikut:

    • debezium.publication.name = 'my_flink_pub' (Menentukan nama publication)

    • debezium.publication.autocreate.mode = 'disabled' (Mencegah Flink mencoba membuat atau memodifikasi publication saat startup)

Pendekatan ini memberikan isolasi penuh dan memungkinkan Anda mengelola publication secara manual alih-alih mengandalkan Flink. Hal ini mencegah pekerjaan baru memengaruhi pekerjaan yang sudah ada dan memberikan kontrol akses yang lebih aman.

SQL

Sintaks

CREATE TABLE postgrescdc_source (
  id INT NOT NULL,
  name STRING,
  description STRING,
  weight DECIMAL(10,3)
) WITH (
  'connector' = 'postgres-cdc',
  'hostname' = '<host name>',
  'port' = '<port>',
  'username' = '<user name>',
  'password' = '<password>',
  'database-name' = '<database name>',
  'schema-name' = '<schema name>',
  'table-name' = '<table name>',
  'decoding.plugin.name'= 'pgoutput',
  'scan.incremental.snapshot.enabled' = 'true',
  -- Melewatkan backfill dapat mempercepat pembacaan dan mengurangi penggunaan resource, tetapi dapat menyebabkan duplikasi data. Aktifkan ini jika sink downstream bersifat idempoten.
  'scan.incremental.snapshot.backfill.skip' = 'false',
  -- Di lingkungan produksi, atur ini ke 'filtered' atau 'disabled' dan kelola publication secara manual alih-alih melalui Flink.
  'debezium-publication.autocreate.mode' = 'disabled'
  -- Jika Anda memiliki beberapa sumber, konfigurasikan publication yang berbeda untuk setiap sumber.
  --'debezium.publication.name' = 'my_flink_pub'
);

Parameter WITH

Parameter

Deskripsi

Tipe data

Diperlukan

Default

Keterangan

connector

Jenis konektor.

STRING

Ya

Tidak ada

Nilainya harus postgres-cdc.

hostname

Alamat IP atau nama host dari database PostgreSQL.

STRING

Ya

Tidak ada

Tidak ada.

username

Username untuk layanan database PostgreSQL.

STRING

Ya

Tidak ada

Tidak ada.

password

Password untuk layanan database PostgreSQL.

STRING

Ya

Tidak ada

Tidak ada.

database-name

Nama database.

STRING

Ya

Tidak ada

Nama database.

schema-name

Nama skema PostgreSQL.

STRING

Ya

Tidak ada

Nama skema mendukung ekspresi reguler untuk membaca data dari beberapa skema.

table-name

Nama tabel PostgreSQL.

STRING

Ya

Tidak ada

Nama tabel mendukung ekspresi reguler untuk membaca data dari beberapa tabel.

port

Nomor port layanan database PostgreSQL.

INTEGER

Tidak

5432

Tidak ada.

decoding.plugin.name

Nama plugin decoding logis PostgreSQL.

STRING

Tidak

decoderbufs

Hal ini ditentukan oleh plugin yang diinstal pada layanan PostgreSQL. Plugin yang didukung adalah:

  • decoderbufs (default): Didukung pada PostgreSQL 9.6 dan versi selanjutnya. Plugin ini harus diinstal.

  • pgoutput (disarankan): Plugin bawaan resmi untuk PostgreSQL 10 dan versi yang lebih baru.

slot.name

Nama slot decoding logis.

STRING

Wajib untuk VVR 8.0.1 dan versi selanjutnya. Opsional untuk versi sebelumnya.

Tidak ada untuk VVR 8.0.1 dan versi selanjutnya. Default-nya adalah `flink` untuk versi sebelumnya.

Atur slot.name yang unik untuk setiap tabel guna menghindari error PSQLException: ERROR: replication slot "debezium" is active for PID 974. Untuk informasi lebih lanjut, lihat Slot replikasi.

debezium.*

Properti dan parameter Debezium

STRING

Tidak

Tidak ada

Memberikan kontrol lebih rinci atas perilaku klien Debezium. Misalnya, 'debezium.snapshot.mode' = 'never'. Untuk informasi lebih lanjut, lihat Properti konfigurasi.

scan.incremental.snapshot.enabled

Menentukan apakah snapshot inkremental diaktifkan.

BOOLEAN

Tidak

false

Nilai valid:

  • false (default): Menonaktifkan snapshot inkremental.

  • true: Mengaktifkan snapshot inkremental.

Catatan
  • Ini adalah fitur eksperimental. Parameter ini hanya didukung di mesin komputasi waktu nyata V8.0.6 dan versi selanjutnya.

  • Untuk informasi lebih lanjut tentang manfaat, prasyarat, dan batasan snapshot inkremental, lihat Fitur, Prasyarat, dan Catatan penggunaan.

scan.startup.mode

Mode startup untuk konsumsi data.

STRING

Tidak

initial

Nilai valid:

  • initial (default): Memindai data historis lengkap saat startup pertama, lalu membaca data WAL terbaru.

  • latest-offset: Tidak memindai data historis lengkap saat startup pertama. Mulai membaca dari akhir WAL, artinya hanya membaca perubahan terbaru setelah konektor dimulai.

  • snapshot: Memindai data historis lengkap, membaca data WAL baru yang dihasilkan selama fase lengkap, lalu pekerjaan berhenti.

changelog-mode

Mode changelog untuk encoding perubahan stream.

String

Tidak

all

Mode changelog yang didukung:

  • ALL: Mendukung semua tipe, termasuk INSERT, DELETE, UPDATE_BEFORE, dan UPDATE_AFTER.

  • UPSERT: Hanya mendukung tipe upsert, yang mencakup INSERT, DELETE, dan UPDATE_AFTER.

heartbeat.interval.ms

Interval pengiriman paket heartbeat.

Duration

Tidak

30s

Unitnya adalah milidetik.

Konektor Postgres CDC secara aktif mengirim heartbeat ke database untuk memajukan offset slot. Saat perubahan tabel jarang terjadi, pengaturan nilai ini memastikan pembersihan log WAL yang tepat waktu.

scan.incremental.snapshot.chunk.key-column

Menentukan kolom yang digunakan sebagai kunci chunk untuk memisahkan shard selama fase snapshot.

STRING

Tidak

Tidak ada

Secara default, kolom pertama dari primary key dipilih.

scan.incremental.close-idle-reader.enabled

Menentukan apakah reader idle ditutup setelah snapshot selesai.

Boolean

Tidak

false

Untuk mengaktifkan konfigurasi ini, atur execution.checkpointing.checkpoints-after-tasks-finish.enabled ke true.

scan.incremental.snapshot.backfill.skip

Menentukan apakah pembacaan log dilewati selama fase lengkap.

Boolean

Tidak

false

Nilai valid:

  • true: Operasi dilewati.

    Fase inkremental mulai membaca log dari watermark rendah.

    Jika operator atau penyimpanan downstream mendukung idempotensi, melewatkan pembacaan log untuk fase lengkap direkomendasikan. Hal ini mengurangi jumlah slot WAL tetapi hanya menyediakan semantik at-least-once.

  • false: Jangan lewati.

    Selama fase lengkap, log antara watermark rendah dan tinggi dibaca untuk memastikan konsistensi.

    Jika SQL Anda melibatkan operasi seperti agregasi atau join, jangan lewati pembacaan log untuk fase lengkap.

Pemetaan tipe

Tabel berikut menunjukkan pemetaan antara tipe bidang PostgreSQL dan Flink.

Tipe bidang PostgreSQL

Tipe bidang Flink

SMALLINT

SMALLINT

INT2

SMALLSERIAL

SERIAL2

INTEGER

INT

SERIAL

BIGINT

BIGINT

BIGSERIAL

REAL

FLOAT

FLOAT4

FLOAT8

DOUBLE

DOUBLE PRECISION

NUMERIC(p, s)

DECIMAL(p, s)

DECIMAL(p, s)

BOOLEAN

BOOLEAN

DATE

DATE

TIME [(p)] [WITHOUT TIMEZONE]

TIME [(p)] [WITHOUT TIMEZONE]

TIMESTAMP [(p)] [WITHOUT TIMEZONE]

TIMESTAMP [(p)] [WITHOUT TIMEZONE]

CHAR(n)

STRING

CHARACTER(n)

VARCHAR(n)

CHARACTER VARYING(n)

TEXT

BYTEA

BYTES

Contoh

CREATE TABLE source (
  id INT NOT NULL,
  name STRING,
  description STRING,
  weight DECIMAL(10,3)
) WITH (
  'connector' = 'postgres-cdc',
  'hostname' = '<host name>',
  'port' = '<port>',
  'username' = '<user name>',
  'password' = '<password>',
  'database-name' = '<database name>',
  'schema-name' = '<schema name>',
  'table-name' = '<table name>'
);

SELECT * FROM source;

Data ingestion

Mulai dari Realtime Compute for Apache Flink V11.4, Anda dapat menggunakan konektor PostgreSQL sebagai sumber data dalam pekerjaan Data Ingestion YAML.

Sintaksis

source:
  type: postgres
  name: PostgreSQL Source
  hostname: localhost
  port: 5432
  username: pg_username
  password: pg_password
  tables: db.scm.tbl
  slot.name: test_slot
  scan.startup.mode: initial
  server-time-zone: UTC
  connect.timeout: 120s
  decoding.plugin.name: decoderbufs

sink:
  type: ...

Parameter

Parameter

Deskripsi

Wajib

Tipe data

Default

Keterangan

type

Jenis sumber data.

Ya

STRING

Tidak ada

Nilainya harus `postgres`.

name

Nama sumber data.

Tidak

STRING

Tidak ada

Tidak ada.

hostname

Nama domain atau alamat IP server database PostgreSQL.

Ya

STRING

(none)

Tidak ada.

port

Port yang diekspos oleh server database PostgreSQL.

Tidak

INTEGER

5432

Tidak ada.

username

Username PostgreSQL.

Ya

STRING

(none)

Tidak ada.

password

Password PostgreSQL.

Ya

STRING

(none)

Tidak ada.

tables

Nama tabel database PostgreSQL yang akan ditangkap.

Ekspresi reguler didukung untuk memantau beberapa tabel yang sesuai dengan ekspresi tersebut.

Ya

STRING

(none)

Penting

Saat ini, hanya tabel dalam database yang sama yang dapat ditangkap.

Titik (.) diperlakukan sebagai pemisah untuk nama database, skema, dan tabel. Untuk menggunakan titik (.) dalam ekspresi reguler untuk mencocokkan karakter apa pun, Anda harus meng-escape-nya dengan backslash. Contoh: bdb.schema_\.*.order_\.*.

slot.name

Nama slot replikasi PostgreSQL.

Ya

STRING

(none)

Nama harus mematuhi aturan penamaan slot replikasi PostgreSQL dan dapat berisi huruf kecil, angka, dan garis bawah.

decoding.plugin.name

Nama plugin logical decoding PostgreSQL yang diinstal di server.

Tidak

STRING

pgoutput

Nilai opsional termasuk decoderbufs dan pgoutput.

tables.exclude

Nama tabel database PostgreSQL yang akan dikecualikan. Parameter ini berlaku setelah parameter `tables`.

Tidak

STRING

(none)

Nama tabel juga mendukung ekspresi reguler untuk mengecualikan beberapa tabel yang sesuai dengan ekspresi tersebut. Penggunaannya sama seperti parameter `tables`.

server-time-zone

Zona waktu sesi server database, seperti "Asia/Shanghai".

Tidak

STRING

(none)

Jika tidak diatur, zona waktu default sistem (ZoneId.systemDefault()) digunakan.

scan.incremental.snapshot.chunk.size

Ukuran (jumlah baris) setiap chunk dalam kerangka kerja snapshot inkremental.

Tidak

INTEGER

8096

Saat pembacaan snapshot inkremental diaktifkan, tabel dibagi menjadi beberapa chunk untuk dibaca. Data suatu chunk di-cache di memori sebelum sepenuhnya dibaca.

Jumlah baris per chunk yang lebih kecil menghasilkan jumlah total chunk yang lebih besar untuk tabel tersebut. Meskipun hal ini mengurangi granularitas pemulihan kesalahan, hal ini dapat menyebabkan error kehabisan memori (OOM) dan throughput keseluruhan yang lebih rendah. Oleh karena itu, Anda perlu menyeimbangkan dan mengatur ukuran chunk yang wajar.

scan.snapshot.fetch.size

Jumlah maksimum catatan yang diambil sekaligus saat membaca data lengkap tabel.

Tidak

INTEGER

1024

Tidak ada.

scan.startup.mode

Mode startup untuk konsumsi data.

Tidak

STRING

initial

Nilai yang valid:

  • initial (default): Memindai data historis lengkap saat startup pertama, lalu membaca data binary logging terbaru.

  • latest-offset: Tidak memindai data historis lengkap saat startup pertama. Mulai membaca dari akhir log biner, artinya hanya membaca perubahan terbaru setelah konektor dimulai.

  • committed-offset: Tidak memindai data historis lengkap. Mengonsumsi data inkremental dari offset tertentu.

  • snapshot: Hanya mengonsumsi data historis lengkap dan tidak mengonsumsi data inkremental.

scan.incremental.close-idle-reader.enabled

Menentukan apakah reader idle ditutup setelah snapshot selesai.

Tidak

BOOLEAN

false

Untuk mengaktifkan konfigurasi ini, atur `execution.checkpointing.checkpoints-after-tasks-finish.enabled` ke `true`.

scan.lsn-commit.checkpoints-num-delay

Jumlah checkpoint yang ditunda sebelum mulai melakukan commit offset LSN.

Tidak

INTEGER

3

Offset LSN checkpoint dilakukan commit secara bergilir untuk mencegah ketidakmampuan memulihkan dari state.

connect.timeout

Waktu maksimum yang harus ditunggu konektor setelah mencoba terhubung ke server database PostgreSQL sebelum timeout.

Tidak

DURATION

30s

Nilai ini tidak boleh kurang dari 250 milidetik.

connect.max-retries

Jumlah maksimum upaya konektor untuk membuat koneksi ke server database PostgreSQL.

Tidak

INTEGER

3

Tidak ada.

connection.pool.size

Ukuran kolam koneksi.

Tidak

INTEGER

20

Tidak ada.

jdbc.properties.*

Memungkinkan pengguna meneruskan properti URL JDBC kustom.

Tidak

STRING

20

Pengguna dapat meneruskan properti kustom, seperti 'jdbc.properties.useSSL' = 'false'.

heartbeat.interval

Interval pengiriman event heartbeat untuk melacak offset log WAL terbaru yang tersedia.

Tidak

DURATION

30s

Tidak ada.

debezium.*

Meneruskan properti Debezium ke Debezium Embedded Engine, yang digunakan untuk menangkap perubahan data dari server PostgreSQL.

Tidak

STRING

(none)

Untuk informasi lebih lanjut tentang properti konektor Debezium PostgreSQL, lihat dokumentasi terkait.

chunk-meta.group.size

Ukuran metadata chunk.

Tidak

STRING

1000

Jika metadata lebih besar dari nilai ini, metadata tersebut diteruskan dalam beberapa bagian.

metadata.list

Daftar metadata yang dapat dibaca yang diteruskan ke downstream, yang dapat digunakan dalam modul transform.

Tidak

STRING

false

Gunakan koma (,) sebagai pemisah. Saat ini, metadata yang tersedia adalah: op_ts.

scan.incremental.snapshot.unbounded-chunk-first.enabled

Menentukan apakah chunk tak terbatas dikirim terlebih dahulu selama fase pembacaan snapshot.

Tidak

STRING

false

Nilai yang valid:

  • true: Chunk tak terbatas dikirim terlebih dahulu selama fase pembacaan snapshot.

  • false (default): Chunk tak terbatas tidak dikirim terlebih dahulu selama fase pembacaan snapshot.

Penting

Ini adalah fitur eksperimental. Mengaktifkannya dapat mengurangi risiko error kehabisan memori (OOM) saat TaskManager menyinkronkan chunk terakhir selama fase snapshot. Kami merekomendasikan menambahkan ini sebelum startup pertama pekerjaan.

Referensi

  • Untuk daftar konektor yang didukung oleh Realtime Compute for Apache Flink, lihat Konektor yang didukung.

  • Untuk menulis data ke tabel sink PolarDB for PostgreSQL (Oracle Compatible), lihat PolarDB for PostgreSQL (Oracle Compatible).

  • Untuk membaca dari atau menulis ke RDS for MySQL, PolarDB for MySQL, atau database MySQL yang dikelola sendiri, gunakan konektor MySQL.