All Products
Search
Document Center

Realtime Compute for Apache Flink:Manage PostgreSQL catalogs

Last Updated:Dec 09, 2025

A PostgreSQL catalog manages PostgreSQL metadata, enabling direct data read/write access in Flink without needing to rebuild table schemas. This topic describes how to create, use, and delete a PostgreSQL catalog.

Version requirements

PostgreSQL catalogs require Ververica Runtime (VVR) 11.4+.

Create a catalog

On the Scripts page, run the following statement in the SQL editor to create a catalog:

CREATE CATALOG `postgres` WITH (
    'type' = 'postgres',
    'default-database' = 'postgres',                      
    'hostname' = '<yourHostname>',
    'port' = '5432',                                       
    'username' = '<yourUserName>',                        
    'password' = '<yourPassWord>'                           
);

Configuration option

Required

Default value

Description

type

Yes

(none)

The type of catalog. Set this to postgres.

hostname

Yes

(none)

The PostgreSQL database hostname.

port

No

5432

The database port number.

username

Yes

(none)

The username to access the database.

password

Yes

(none)

The password to access the database.

default-database

Yes

(none)

The name of the default database to connect to.

View a catalog

After you create a catalog, run the following commands to view its databases and tables.

USE CATALOG `postgres`;
SHOW DATABASES;

USE `postgres`;
SHOW TABLES;

Use a catalog

Read data from PostreSQL

With a PostreSQL catalog, you can read data directly from PostgreSQL tables using Flink. To configure Change Data Capture (CDC) options, such as replication slot, use SQL hints (OPTIONS) to overwrite the configuration.

SELECT * 
FROM `postgres`.`postgres`.`public.target_table` 
/*+ OPTIONS(
    'slot.name' = 'testName', 
    'debezium.publication.autocreate.mode' = 'filtered'
) */;

Write data to PostreSQL

INSERT INTO `postgres`.`postgres`.`public.target_table`
SELECT id, name 
FROM `source_table`;

Lookup join

INSERT INTO sink_table
SELECT 
  o.order_id,
  o.user_id,
  d.user_name,  
  o.amount
FROM pg_catalog.db.orders AS o
JOIN mysql_dim.db.users FOR SYSTEM_TIME AS OF o.proc_time AS d
ON o.user_id = d.user_id;

Delete a catalog

If you no longer need the PostreSQL catalog, run the following command to delete it. This operation deletes only the metadata mapping in Flink.

DROP CATALOG `postgres`;