全部产品
Search
文档中心

Realtime Compute for Apache Flink:Konektor AnalyticDB untuk PostgreSQL

更新时间:Nov 06, 2025

Topik ini menjelaskan cara menggunakan Konektor AnalyticDB for PostgreSQL.

Informasi latar belakang

AnalyticDB for PostgreSQL adalah gudang data untuk pemrosesan paralel masif (MPP) yang menyediakan layanan analitik online untuk sejumlah besar data.

Tabel berikut menggambarkan kemampuan yang didukung oleh Konektor AnalyticDB for PostgreSQL.

Item

Deskripsi

Jenis tabel

Tabel sumber (beta), tabel dimensi, dan tabel sink

Catatan

Saat ini, untuk membaca dari sumber AnalyticDB untuk PostgreSQL, Anda perlu mengonfigurasi konektor kustom. Untuk informasi lebih lanjut, lihat Gunakan Flink CDC untuk berlangganan data penuh dan inkremental secara real-time.

Mode operasi

Mode streaming dan batch.

Format data

Tidak tersedia

Metrik

  • Metrik untuk tabel sink

    • numRecordsOut

    • numRecordsOutPerSecond

    • numBytesOut

    • numBytesOutPerSecond

    • currentSendTime

  • Metrik untuk tabel dimensi: Tidak tersedia.

Catatan

Untuk informasi lebih lanjut tentang metrik, lihat Metrics.

Jenis API

SQL

Pembaruan atau penghapusan data dalam tabel sink

Didukung

Prasyarat

Keterbatasan

  • Hanya VVR 8.0.1 atau versi lebih baru yang mendukung AnalyticDB for PostgreSQL V7.0.

  • Database PostgreSQL yang dikelola sendiri tidak didukung.

Sintaksis

CREATE TEMPORARY TABLE adbpg_table (
 id INT,
 len INT,
 content VARCHAR, 
 PRIMARY KEY(id)
) WITH (
 'connector'='adbpg',
 'url'='jdbc:postgresql://<yourAddress>:<yourPortId>/<yourDatabaseName>',
 'tableName'='<yourDatabaseTableName>',
 'userName'='<yourDatabaseUserName>',
 'password'='<yourDatabasePassword>'
);

Opsi konektor

Umum

Opsi

Deskripsi

Tipe data

Diperlukan

Nilai default

Catatan

connector

Konektor yang digunakan.

STRING

Ya

Tidak ada nilai default

  • Tabel sumber: Atur ke adbpg-cdc.

  • Tabel dimensi dan sink: Atur ke adbpg.

url

URL Java Database Connectivity (JDBC) dari database.

STRING

Ya

Tidak ada nilai default

URL dalam format jdbc:postgresql://<Address>:<PortId>/<DatabaseName>.

tableName

Nama tabel di dalam database.

STRING

Ya

Tidak ada nilai default

Tidak tersedia.

userName

Nama pengguna yang digunakan untuk mengakses database AnalyticDB for PostgreSQL.

STRING

Ya

Tidak ada nilai default

Tidak tersedia.

password

Kata sandi yang digunakan untuk mengakses database AnalyticDB for PostgreSQL.

STRING

Ya

Tidak ada nilai default

Tidak tersedia.

maxRetryTimes

Jumlah maksimum percobaan ulang yang diizinkan untuk menulis data ke tabel jika upaya penulisan data gagal.

INTEGER

Tidak

3

Tidak tersedia.

targetSchema

Nama skema.

STRING

Tidak

public

Tidak tersedia.

caseSensitive

Menentukan apakah sensitivitas huruf besar/kecil diaktifkan.

STRING

Tidak

false

Nilai valid:

  • true: Sensitivitas huruf besar/kecil diaktifkan.

  • false: Sensitivitas huruf besar/kecil dinonaktifkan. Ini adalah nilai default.

connectionMaxActive

Jumlah maksimum koneksi dalam kumpulan koneksi.

INTEGER

Tidak

5

Sistem secara otomatis melepaskan koneksi idle ke layanan database.

Penting

Jika opsi ini disetel ke nilai yang terlalu besar, jumlah koneksi server mungkin abnormal.

Spesifik sumber (beta)

Opsi

Deskripsi

Tipe data

Diperlukan

Catatan

schema-name

Nama skema.

STRING

Ya

Opsi ini mendukung ekspresi reguler. Anda dapat berlangganan beberapa skema sekaligus.

port

Port instans AnalyticDB untuk PostgreSQL.

INTEGER

Ya

Atur ke 5432.

decoding.plugin.name

Nama Plugin Logical Decoding PostgreSQL.

STRING

Ya

Atur ke pgoutput.

slot.name

Nama slot decoding logis.

STRING

Ya

  • Dalam pekerjaan Flink yang sama: Gunakan nilai slot.name yang konsisten untuk semua tabel sumber.

  • Di seluruh pekerjaan Flink yang berbeda: Tetapkan nilai slot.name unik untuk setiap pekerjaan untuk mencegah kesalahan seperti PSQLException: ERROR: replication slot "debezium" is active for PID 974.

debezium.*

Mengontrol perilaku klien Debezium.

STRING

Ya

Sebagai contoh, atur 'debezium.snapshot.mode' = 'never' untuk menonaktifkan snapshot. Untuk informasi lebih lanjut, lihat Properti konektor.

scan.incremental.snapshot.enabled

Menentukan apakah akan mengaktifkan snapshot inkremental.

BOOLEAN

Tidak

Nilai valid:

  • false (default)

  • true

scan.startup.mode

Mode startup untuk konsumsi data.

STRING

Tidak

Nilai valid:

  • initial (default): Ketika konektor mulai pertama kali, ia melakukan pemindaian data historis penuh dan kemudian mulai membaca data Write-Ahead Logging (WAL) terbaru.

  • latest-offset: Konektor mulai membaca dari akhir WAL (posisi log terbaru) tanpa memindai data historis.

  • snapshot: Konektor melakukan pemindaian data historis penuh dan membaca data WAL baru yang dihasilkan selama pemindaian ini. Pekerjaan berhenti setelah pemindaian data penuh selesai.

changelog-mode

Menentukan bagaimana peristiwa perubahan dikodekan dalam aliran perubahan.

STRING

Tidak

Nilai valid:

  • ALL (default): Menangkap semua peristiwa perubahan historis, termasuk INSERT, DELETE, UPDATE_BEFORE, dan UPDATE_AFTER.

  • UPSERT: Menangkap peristiwa UPSERT, termasuk INSERT, DELETE, dan UPDATE_AFTER.

heartbeat.interval.ms

Interval pengiriman paket heartbeat, dalam milidetik.

DURATION

Tidak

Nilai default: 30 detik.

Konektor CDC AnalyticDB untuk PostgreSQL secara aktif mengirimkan paket heartbeat ke database untuk memastikan offset slot terus maju. Jika data tabel tidak sering berubah, atur opsi ini ke nilai yang sesuai untuk membersihkan log WAL secara berkala dan menghindari pemborosan disk.

scan.incremental.snapshot.chunk.key-column

Menentukan kolom kunci chunk selama pembacaan snapshot.

STRING

Tidak

Default ke kolom pertama dari primary key.

Spesifik untuk Sink

Opsi

Deskripsi

Tipe data

Diperlukan

Nilai default

Catatan

retryWaitTime

Interval antara percobaan ulang, dalam milidetik.

INTEGER

Tidak

100

batchSize

Jumlah catatan data yang dapat ditulis ke tabel sekaligus.

INTEGER

Tidak

500

Tidak tersedia.

flushIntervalMs

Interval pengosongan cache.

INTEGER

Tidak

Tidak tersedia.

Jika jumlah data yang di-cache tidak mencapai batas atas dalam periode waktu tertentu, semua data yang di-cache ditulis ke tabel sink. Unit: milidetik.

writeMode

Mode penulisan di mana sistem mencoba menulis data ke tabel untuk pertama kalinya.

STRING

Tidak

insert

Nilai valid:

  • insert: Data langsung dimasukkan ke tabel sink. Jika terjadi konflik, kebijakan penanganan ditentukan oleh opsi conflictMode. Ini adalah nilai default.

  • upsert: Data dalam tabel secara otomatis diperbarui saat terjadi konflik. Nilai ini cocok hanya untuk tabel yang memiliki primary key.

  • copy: Data dimasukkan melalui perintah COPY.

    Catatan

    Hanya VVR 11.1 atau yang lebih baru yang mendukung copy.

conflictMode

Kebijakan berdasarkan mana konflik primary key atau indeks ditangani saat data dimasukkan ke tabel.

STRING

Tidak

strict

Nilai valid:

  • strict: Jika terjadi konflik, sistem melaporkan kesalahan. Ini adalah nilai default.

  • ignore: Jika terjadi konflik, sistem mengabaikan konflik.

  • update: Jika terjadi konflik, sistem secara otomatis memperbarui data. Nilai ini cocok untuk tabel yang tidak memiliki primary key. Kebijakan ini mengurangi efisiensi pemrosesan data.

  • upsert: Jika terjadi konflik, sistem secara otomatis memperbarui data dalam tabel. Nilai ini cocok hanya untuk tabel yang memiliki primary key.

Spesifik tabel dimensi

Opsi

Deskripsi

Tipe data

Diperlukan

Nilai default

Catatan

maxJoinRows

Jumlah maksimum baris untuk digabungkan dalam satu baris data.

INTEGER

Tidak

1024

Tidak tersedia.

cache

Kebijakan cache.

STRING

Tidak

ALL

Nilai valid:

  • ALL: Semua data dalam tabel dimensi di-cache. Ini adalah nilai default. Sebelum deployment berjalan, sistem memuat semua data dalam tabel dimensi ke cache. Dengan cara ini, cache dicari untuk semua kueri berikutnya dalam tabel dimensi. Jika sistem tidak menemukan catatan data dalam cache, kunci gabungan tidak ada. Sistem memuat ulang semua data dalam cache setelah entri cache kedaluwarsa.

  • LRU: Sebagian data dalam tabel dimensi di-cache. Sistem mencari data dalam cache setiap kali catatan data dibaca dari tabel sumber. Jika data tidak ditemukan, sistem mencari data dalam tabel dimensi fisik.

  • None: Tidak ada data yang di-cache.

cacheSize

Jumlah maksimum baris data yang dapat di-cache.

LONG

Tidak

100000

Opsi cacheSize berlaku hanya ketika Anda mengatur opsi cache ke LRU.

cacheTTLMs

Periode timeout cache.

LONG

Tidak

Long.MAX_VALUE

Konfigurasi opsi cacheTTLMs bervariasi berdasarkan opsi cache.

  • Jika opsi cache disetel ke LRU, opsi cacheTTLMs menentukan periode timeout cache. Secara default, entri cache tidak kedaluwarsa.

  • Jika Anda mengatur opsi cache ke ALL, opsi cacheTTLMs menentukan interval di mana sistem menyegarkan cache. Secara default, cache tidak disegarkan.

Unit: milidetik.

Pemetaan tipe data

Tipe data AnalyticDB for PostgreSQL

Tipe data Realtime Compute for Apache Flink

BOOLEAN

BOOLEAN

SMALLINT

INT

INT

INT

BIGINT

BIGINT

FLOAT

DOUBLE

VARCHAR

VARCHAR

TEXT

VARCHAR

TIMESTAMP

TIMESTAMP

DATE

DATE

Contoh kode

  • Tabel Sumber (beta)

    Lihat Gunakan Flink CDC untuk berlangganan data penuh dan inkremental secara real-time.

  • Tabel Sink:

    CREATE TEMPORARY TABLE datagen_source (
     `name` VARCHAR,
     `age` INT
    )
    COMMENT 'datagen source table'
    WITH (
     'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE adbpg_sink (
     name VARCHAR,
     age INT
    ) WITH (
     'connector'='adbpg',
     'url'='jdbc:postgresql://<yourAddress>:<yourPortId>/<yourDatabaseName>',
     'tableName'='<yourDatabaseTableName>',
     'userName'='<yourDatabaseUserName>',
     'password'='<yourDatabasePassword>'
    );
    
    INSERT INTO adbpg_sink
    SELECT * FROM datagen_source;
  • Tabel dimensi:

    CREATE TEMPORARY TABLE datagen_source(
     a INT,
     b BIGINT,
     c STRING,
     `proctime` AS PROCTIME()
    ) 
    COMMENT 'datagen source table'
    WITH (
     'connector' = 'datagen'
    };
    
    CREATE TEMPORARY TABLE adbpg_dim (
     a INT, 
     b VARCHAR, 
     c VARCHAR
    ) WITH (
     'connector'='adbpg',
     'url'='jdbc:postgresql://<yourAddress>:<yourPortId>/<yourDatabaseName>',
     'tableName'='<yourDatabaseTableName>',
     'userName'='<yourDatabaseUserName>',
     'password'='<yourDatabasePassword>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
     a INT,
     b STRING
    )
    COMMENT 'blackhole sink table'
    WITH (
     'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink SELECT T.a,H.b
    FROM datagen_source AS T JOIN adb_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.a = H.a;

Referensi