A PostgreSQL catalog manages PostgreSQL metadata, enabling direct data read/write access in Flink without needing to rebuild table schemas. This topic describes how to create, use, and delete a PostgreSQL catalog.
Version requirements
PostgreSQL catalogs require Ververica Runtime (VVR) 11.4+.
Create a catalog
On the Scripts page, run the following statement in the SQL editor to create a catalog:
CREATE CATALOG `postgres` WITH (
'type' = 'postgres',
'default-database' = 'postgres',
'hostname' = '<yourHostname>',
'port' = '5432',
'username' = '<yourUserName>',
'password' = '<yourPassWord>'
);Configuration option | Required | Default value | Description |
type | Yes | (none) | The type of catalog. Set this to |
hostname | Yes | (none) | The PostgreSQL database hostname. |
port | No |
| The database port number. |
username | Yes | (none) | The username to access the database. |
password | Yes | (none) | The password to access the database. |
default-database | Yes | (none) | The name of the default database to connect to. |
View a catalog
After you create a catalog, run the following commands to view its databases and tables.
USE CATALOG `postgres`;
SHOW DATABASES;
USE `postgres`;
SHOW TABLES;Use a catalog
Read data from PostreSQL
With a PostreSQL catalog, you can read data directly from PostgreSQL tables using Flink. To configure Change Data Capture (CDC) options, such as replication slot, use SQL hints (OPTIONS) to overwrite the configuration.
SELECT *
FROM `postgres`.`postgres`.`public.target_table`
/*+ OPTIONS(
'slot.name' = 'testName',
'debezium.publication.autocreate.mode' = 'filtered'
) */;Write data to PostreSQL
INSERT INTO `postgres`.`postgres`.`public.target_table`
SELECT id, name
FROM `source_table`;Lookup join
INSERT INTO sink_table
SELECT
o.order_id,
o.user_id,
d.user_name,
o.amount
FROM pg_catalog.db.orders AS o
JOIN mysql_dim.db.users FOR SYSTEM_TIME AS OF o.proc_time AS d
ON o.user_id = d.user_id;Delete a catalog
If you no longer need the PostreSQL catalog, run the following command to delete it. This operation deletes only the metadata mapping in Flink.
DROP CATALOG `postgres`;