全部產品
Search
文件中心

Realtime Compute for Apache Flink:管理PostgreSQL Catalog

更新時間:Dec 04, 2025

PostgreSQL Catalog 用於管理 PostgreSQL 中繼資料。配置後,無需在 Flink 中手動重建表結構,即可直接讀寫 PostgreSQL 資料。

使用限制

僅VVR 11.4及以上版本支援建立PostgreSQL Catalog。

建立PostgreSQL Catalog

資料查詢文本編輯地區,運行以下 SQL 陳述式建立 Catalog。

CREATE CATALOG `postgres` WITH (
    'type' = 'postgres',
    'default-database' = 'postgres',                      
    'hostname' = '<yourHostname>',
    'port' = '5432',                                       
    'username' = '<yourUserName>',                        
    'password' = '<yourPassWord>'                           
);

參數名稱

是否必選

預設值

描述

type

Catalog 類型。固定為postgres

hostname

PostgreSQL 資料庫串連地址。

port

5432

資料庫連接埠號碼。

username

訪問資料庫的使用者名稱。

password

訪問資料庫的密碼。

default-database

預設串連的資料庫名稱。

查看PostgreSQL Catalog

建立成功後,使用以下命令查看資料庫和表。

USE CATALOG `postgres`;
SHOW DATABASES;

USE `postgres`;
SHOW TABLES;

使用PostgreSQL Catalog

讀取資料

您可以直接讀取 PostgreSQL 表中的資料。若需配置 CDC 參數(如 Replication Slot),請使用 SQL Hint (OPTIONS) 覆蓋配置。

SELECT * 
FROM `postgres`.`postgres`.`public.target_table` 
/*+ OPTIONS(
    'slot.name' = 'testName', 
    'debezium.publication.autocreate.mode' = 'filtered'
) */;

寫入資料

INSERT INTO `postgres`.`postgres`.`public.target_table`
SELECT id, name 
FROM `source_table`;

維表查詢

INSERT INTO sink_table
SELECT 
  o.order_id,
  o.user_id,
  d.user_name,  
  o.amount
FROM pg_catalog.db.orders AS o
JOIN mysql_dim.db.users FOR SYSTEM_TIME AS OF o.proc_time AS d
ON o.user_id = d.user_id;

刪除PostgreSQL Catalog

若不再使用,運行以下命令刪除 Catalog。此操作僅刪除 Flink 中的中繼資料映射。

DROP CATALOG `postgres`;