Katalog PostgreSQL mengelola metadata PostgreSQL. Setelah dikonfigurasi, Anda dapat langsung membaca dan menulis data di PostgreSQL tanpa perlu membangun ulang skema tabel di Flink.
Batasan
Katalog PostgreSQL hanya didukung di Ververica Runtime (VVR) versi 11.4 dan yang lebih baru.
Buat katalog PostgreSQL
Pada halaman Data Query, Anda dapat menjalankan pernyataan SQL berikut di editor teks untuk membuat katalog.
CREATE CATALOG `postgres` WITH (
'type' = 'postgres',
'default-database' = 'postgres',
'hostname' = '<yourHostname>',
'port' = '5432',
'username' = '<yourUserName>',
'password' = '<yourPassWord>'
);Nama parameter | Wajib | Nilai default | Deskripsi |
type | Ya | Tidak ada | Jenis katalog. Tetapkan nilai ini ke |
hostname | Ya | Tidak ada | Titik akhir database untuk PostgreSQL. |
port | Tidak | 5432 | Nomor port database. |
username | Ya | Tidak ada | Username untuk mengakses database. |
password | Ya | Tidak ada | Password untuk mengakses database. |
default-database | Ya | Tidak ada | Nama database default yang akan dihubungkan. |
Melihat katalog PostgreSQL
Setelah membuat katalog, Anda dapat menjalankan perintah berikut untuk melihat database dan tabelnya.
USE CATALOG `postgres`;
SHOW DATABASES;
USE `postgres`;
SHOW TABLES;Gunakan katalog PostgreSQL
Baca data
Anda dapat membaca data langsung dari tabel PostgreSQL. Untuk mengonfigurasi parameter Change Data Capture (CDC), seperti Replication Slot, gunakan petunjuk SQL (OPTIONS) untuk menimpa konfigurasi tersebut.
SELECT *
FROM `postgres`.`postgres`.`public.target_table`
/*+ OPTIONS(
'slot.name' = 'testName',
'debezium.publication.autocreate.mode' = 'filtered'
) */;Tulis data
INSERT INTO `postgres`.`postgres`.`public.target_table`
SELECT id, name
FROM `source_table`;Kueri tabel dimensi
INSERT INTO sink_table
SELECT
o.order_id,
o.user_id,
d.user_name,
o.amount
FROM pg_catalog.db.orders AS o
JOIN mysql_dim.db.users FOR SYSTEM_TIME AS OF o.proc_time AS d
ON o.user_id = d.user_id;Hapus katalog PostgreSQL
Jika katalog tersebut tidak lagi diperlukan, Anda dapat menjalankan perintah berikut untuk menghapusnya. Operasi ini hanya menghapus pemetaan metadata di Flink.
DROP CATALOG `postgres`;