すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:PostgreSQL カタログの管理

最終更新日:Dec 04, 2025

PostgreSQL カタログは PostgreSQL のメタデータを管理します。カタログを構成すると、Flink でテーブルスキーマを再構築することなく、PostgreSQL のデータを直接読み書きできます。

制限事項

PostgreSQL カタログは、Ververica Runtime (VVR) 11.4 以降のバージョンでのみサポートされています。

PostgreSQL カタログの作成

データクエリページで、テキストエディターで次の SQL 文を実行してカタログを作成できます。

CREATE CATALOG `postgres` WITH (
    'type' = 'postgres',
    'default-database' = 'postgres',                      
    'hostname' = '<yourHostname>',
    'port' = '5432',                                       
    'username' = '<yourUserName>',                        
    'password' = '<yourPassWord>'                           
);

パラメーター名

必須

デフォルト値

説明

type

はい

なし

カタログのタイプ。これを postgres に設定します。

hostname

はい

なし

PostgreSQL のデータベースエンドポイント。

port

いいえ

5432

データベースのポート番号。

username

はい

なし

データベースにアクセスするためのユーザー名。

password

はい

なし

データベースにアクセスするためのパスワード。

default-database

はい

なし

接続先のデフォルトデータベースの名前。

PostgreSQL カタログの表示

カタログを作成した後、次のコマンドを実行して、そのデータベースとテーブルを表示できます。

USE CATALOG `postgres`;
SHOW DATABASES;

USE `postgres`;
SHOW TABLES;

PostgreSQL カタログ使用

データの読み取り

PostgreSQL テーブルから直接データを読み取ることができます。Replication Slot などの Change Data Capture (CDC) パラメーターを構成するには、SQL ヒント (OPTIONS) を使用して構成を上書きできます。

SELECT * 
FROM `postgres`.`postgres`.`public.target_table` 
/*+ OPTIONS(
    'slot.name' = 'testName', 
    'debezium.publication.autocreate.mode' = 'filtered'
) */;

データの書き込み

INSERT INTO `postgres`.`postgres`.`public.target_table`
SELECT id, name 
FROM `source_table`;

ディメンションテーブルクエリ

INSERT INTO sink_table
SELECT 
  o.order_id,
  o.user_id,
  d.user_name,  
  o.amount
FROM pg_catalog.db.orders AS o
JOIN mysql_dim.db.users FOR SYSTEM_TIME AS OF o.proc_time AS d
ON o.user_id = d.user_id;

PostgreSQL カタログの削除

カタログが不要になった場合は、次のコマンドを実行して削除できます。この操作では、Flink のメタデータマッピングのみが削除されます。

DROP CATALOG `postgres`;