PostgreSQL カタログは PostgreSQL のメタデータを管理します。カタログを構成すると、Flink でテーブルスキーマを再構築することなく、PostgreSQL のデータを直接読み書きできます。
制限事項
PostgreSQL カタログは、Ververica Runtime (VVR) 11.4 以降のバージョンでのみサポートされています。
PostgreSQL カタログの作成
データクエリページで、テキストエディターで次の SQL 文を実行してカタログを作成できます。
CREATE CATALOG `postgres` WITH (
'type' = 'postgres',
'default-database' = 'postgres',
'hostname' = '<yourHostname>',
'port' = '5432',
'username' = '<yourUserName>',
'password' = '<yourPassWord>'
);パラメーター名 | 必須 | デフォルト値 | 説明 |
type | はい | なし | カタログのタイプ。これを |
hostname | はい | なし | PostgreSQL のデータベースエンドポイント。 |
port | いいえ | 5432 | データベースのポート番号。 |
username | はい | なし | データベースにアクセスするためのユーザー名。 |
password | はい | なし | データベースにアクセスするためのパスワード。 |
default-database | はい | なし | 接続先のデフォルトデータベースの名前。 |
PostgreSQL カタログの表示
カタログを作成した後、次のコマンドを実行して、そのデータベースとテーブルを表示できます。
USE CATALOG `postgres`;
SHOW DATABASES;
USE `postgres`;
SHOW TABLES;PostgreSQL カタログの使用
データの読み取り
PostgreSQL テーブルから直接データを読み取ることができます。Replication Slot などの Change Data Capture (CDC) パラメーターを構成するには、SQL ヒント (OPTIONS) を使用して構成を上書きできます。
SELECT *
FROM `postgres`.`postgres`.`public.target_table`
/*+ OPTIONS(
'slot.name' = 'testName',
'debezium.publication.autocreate.mode' = 'filtered'
) */;データの書き込み
INSERT INTO `postgres`.`postgres`.`public.target_table`
SELECT id, name
FROM `source_table`;ディメンションテーブルクエリ
INSERT INTO sink_table
SELECT
o.order_id,
o.user_id,
d.user_name,
o.amount
FROM pg_catalog.db.orders AS o
JOIN mysql_dim.db.users FOR SYSTEM_TIME AS OF o.proc_time AS d
ON o.user_id = d.user_id;PostgreSQL カタログの削除
カタログが不要になった場合は、次のコマンドを実行して削除できます。この操作では、Flink のメタデータマッピングのみが削除されます。
DROP CATALOG `postgres`;