PostgreSQL Catalog 用於管理 PostgreSQL 中繼資料。配置後,無需在 Flink 中手動重建表結構,即可直接讀寫 PostgreSQL 資料。
使用限制
僅VVR 11.4及以上版本支援建立PostgreSQL Catalog。
建立PostgreSQL Catalog
在資料查詢文本編輯地區,運行以下 SQL 陳述式建立 Catalog。
CREATE CATALOG `postgres` WITH (
'type' = 'postgres',
'default-database' = 'postgres',
'hostname' = '<yourHostname>',
'port' = '5432',
'username' = '<yourUserName>',
'password' = '<yourPassWord>'
);參數名稱 | 是否必選 | 預設值 | 描述 |
type | 是 | 無 | Catalog 類型。固定為 |
hostname | 是 | 無 | PostgreSQL 資料庫串連地址。 |
port | 否 | 5432 | 資料庫連接埠號碼。 |
username | 是 | 無 | 訪問資料庫的使用者名稱。 |
password | 是 | 無 | 訪問資料庫的密碼。 |
default-database | 是 | 無 | 預設串連的資料庫名稱。 |
查看PostgreSQL Catalog
建立成功後,使用以下命令查看資料庫和表。
USE CATALOG `postgres`;
SHOW DATABASES;
USE `postgres`;
SHOW TABLES;使用PostgreSQL Catalog
讀取資料
您可以直接讀取 PostgreSQL 表中的資料。若需配置 CDC 參數(如 Replication Slot),請使用 SQL Hint (OPTIONS) 覆蓋配置。
SELECT *
FROM `postgres`.`postgres`.`public.target_table`
/*+ OPTIONS(
'slot.name' = 'testName',
'debezium.publication.autocreate.mode' = 'filtered'
) */;寫入資料
INSERT INTO `postgres`.`postgres`.`public.target_table`
SELECT id, name
FROM `source_table`;維表查詢
INSERT INTO sink_table
SELECT
o.order_id,
o.user_id,
d.user_name,
o.amount
FROM pg_catalog.db.orders AS o
JOIN mysql_dim.db.users FOR SYSTEM_TIME AS OF o.proc_time AS d
ON o.user_id = d.user_id;刪除PostgreSQL Catalog
若不再使用,運行以下命令刪除 Catalog。此操作僅刪除 Flink 中的中繼資料映射。
DROP CATALOG `postgres`;