全部产品
Search
文档中心

Realtime Compute for Apache Flink:Manage PostgreSQL catalogs

更新时间:Dec 04, 2025

Katalog PostgreSQL mengelola metadata PostgreSQL. Setelah dikonfigurasi, Anda dapat langsung membaca dan menulis data di PostgreSQL tanpa perlu membangun ulang skema tabel di Flink.

Batasan

Katalog PostgreSQL hanya didukung di Ververica Runtime (VVR) versi 11.4 dan yang lebih baru.

Buat katalog PostgreSQL

Pada halaman Data Query, Anda dapat menjalankan pernyataan SQL berikut di editor teks untuk membuat katalog.

CREATE CATALOG `postgres` WITH (
    'type' = 'postgres',
    'default-database' = 'postgres',                      
    'hostname' = '<yourHostname>',
    'port' = '5432',                                       
    'username' = '<yourUserName>',                        
    'password' = '<yourPassWord>'                           
);

Nama parameter

Wajib

Nilai default

Deskripsi

type

Ya

Tidak ada

Jenis katalog. Tetapkan nilai ini ke postgres.

hostname

Ya

Tidak ada

Titik akhir database untuk PostgreSQL.

port

Tidak

5432

Nomor port database.

username

Ya

Tidak ada

Username untuk mengakses database.

password

Ya

Tidak ada

Password untuk mengakses database.

default-database

Ya

Tidak ada

Nama database default yang akan dihubungkan.

Melihat katalog PostgreSQL

Setelah membuat katalog, Anda dapat menjalankan perintah berikut untuk melihat database dan tabelnya.

USE CATALOG `postgres`;
SHOW DATABASES;

USE `postgres`;
SHOW TABLES;

Gunakan katalog PostgreSQL

Baca data

Anda dapat membaca data langsung dari tabel PostgreSQL. Untuk mengonfigurasi parameter Change Data Capture (CDC), seperti Replication Slot, gunakan petunjuk SQL (OPTIONS) untuk menimpa konfigurasi tersebut.

SELECT * 
FROM `postgres`.`postgres`.`public.target_table` 
/*+ OPTIONS(
    'slot.name' = 'testName', 
    'debezium.publication.autocreate.mode' = 'filtered'
) */;

Tulis data

INSERT INTO `postgres`.`postgres`.`public.target_table`
SELECT id, name 
FROM `source_table`;

Kueri tabel dimensi

INSERT INTO sink_table
SELECT 
  o.order_id,
  o.user_id,
  d.user_name,  
  o.amount
FROM pg_catalog.db.orders AS o
JOIN mysql_dim.db.users FOR SYSTEM_TIME AS OF o.proc_time AS d
ON o.user_id = d.user_id;

Hapus katalog PostgreSQL

Jika katalog tersebut tidak lagi diperlukan, Anda dapat menjalankan perintah berikut untuk menghapusnya. Operasi ini hanya menghapus pemetaan metadata di Flink.

DROP CATALOG `postgres`;