A Hologres catalog lets Flink read Hologres metadata directly from the Realtime Compute for Apache Flink console, so you do not need to register tables manually. This topic covers how to create, view, use, and delete a Hologres catalog.
Supported operations
| Operation | UI | Flink SQL |
|---|---|---|
| Create a catalog | Yes | Yes |
| View a catalog | Yes | — |
| Create a table in a catalog | Yes | Yes |
| Modify a table in a catalog | — | Yes |
| Read from and write to catalog tables | — | Yes |
| Use a catalog as a CTAS destination | — | Yes |
| Use a catalog as a CDAS destination | — | Yes |
| Delete a catalog | Yes | Yes |
Prerequisites
Before you begin, make sure you have:
A dedicated Hologres instance (shared cluster instances are not supported)
A database created in the instance. See Create a database
Limitations
Catalogs cannot be modified after creation. To change a catalog's configuration, delete it and create a new one.
Only dedicated Hologres instances are supported. Realtime Compute for Apache Flink can only read from and write to internal tables in Hologres, so shared cluster instances are not supported.
Create a Hologres catalog
Catalog configurations cannot be modified after creation. To change them, drop the existing catalog and create a new one.
UI
Go to the Catalogs page.
Log on to the Realtime Compute for Apache Flink console. In the Actions column of your workspace, click Console.
In the left navigation pane, click Catalogs.
Click Create Catalog, select Hologres, and click Next.
Configure the catalog parameters.
Parameter Required Description catalog name Yes Lowercase letters (a–z) and digits (0–9) only. Uppercase letters, hyphens (-), underscores (_), and other special characters are not supported. endpoint Yes For same-VPC connections: go to the Hologres console, open the target instance, and copy the Dedicated VPC endpoint from the Network Information section. For other network types, see Network connectivity. username Yes Either the AccessKey ID of an Alibaba Cloud account or RAM user, or a custom account in the format BASIC$<user_name>. The account must have permission to access the Hologres database. See Hologres permission models and User management. A catalog created with a custom account shows only the databases that account can access; one created with an AccessKey pair shows all databases in the instance.password Yes The AccessKey secret or the custom account password. To prevent credential leaks, store credentials as project variables instead of entering them directly. dbname Yes The name of an existing Hologres database. If the database does not exist, catalog creation fails. 
Click OK. The catalog appears under Catalogs in the left navigation pane.
Flink SQL
In the editor on the Data Query page, run the
CREATE CATALOGstatement. Syntax:Parameters:
Parameter Required Default Description catalognameYes — Catalog name. Lowercase letters (a–z) and digits (0–9) only. typeYes — Fixed value: hologres.endpointYes — The VPC endpoint of the Hologres instance. For same-VPC connections, go to the Hologres console and copy the VPC endpoint from Network Information. For other network types, see Network connectivity. usernameYes — The AccessKey ID of the Alibaba Cloud account. To avoid leaks, store it as a project variable. The account must have permission to access the Hologres database. See Hologres permission model. passwordYes — The AccessKey secret. dbnameYes — The name of an existing Hologres database. ignore-non-persisted-optionsNo trueWhether to ignore non-persisted options when creating a table with non-persisted options. true: the table is created and non-persisted options are ignored.false: table creation fails. Onlyendpoint,username,password, anddbnameare persisted.catalog.table.metadata-columnsNo None Metadata columns to add from the Hologres binary logging source table. Separate multiple columns with semicolons ( ;), for example,hg_binlog_event_type;hg_binlog_timestamp_us. Six types of meta-columns are supported. Tables with this parameter can only be used as source tables, not as sink or dimension tables. Supported in Ververica Runtime (VVR) 8.0.11 and later. See Hologres binary logging fields.Other Hologres connector parameters No — WITH parameters supported by the Hologres connector. When set at the catalog level, they apply to all tables by default. Requires ignore-non-persisted-options=true. See Connector options.CREATE CATALOG <catalogname> WITH ( 'type' = 'hologres', 'endpoint' = '<endpoint>', 'username' = '<AccessKey ID>', 'password' = '<AccessKey secret>', 'dbname' = '<dbname>' );Examples: Simple example:
CREATE CATALOG holocatalog WITH ( 'type' = 'hologres', 'endpoint' = 'hgpostcn-cn-******-cn-hangzhou-vpc-st.hologres.aliyuncs.com:80', 'username' = 'LTAI********************', 'password' = '${secret_values.ak_holo}', 'dbname' = 'holo_test' );Real-time consumption example (binary logging must be enabled on the Hologres table):
CREATE CATALOG holocatalog WITH ( 'type' = 'hologres', 'endpoint' = 'hgpostcn-cn-******-cn-hangzhou-vpc-st.hologres.aliyuncs.com:80', 'username' = 'LTAI********************', 'password' = '${secret_values.ak_holo}', 'dbname' = 'holo_test', 'binlog' = 'true', -- Enable binary logging consumption 'cdcmode' = 'true', 'connectionpoolname' = 'the_conn_pool', 'table_property.binlog.level' = 'replica', -- Enable binary logging on tables created through this catalog 'table_property.binlog.ttl' = '259200' );Click Run in the upper-right corner. Verify that the catalog appears under Catalogs in the left navigation pane.
View a Hologres catalog
Log on to the Realtime Compute for Apache Flink console. In the Actions column of your workspace, click Console, then click Catalogs in the left navigation pane.
On the Catalog List page, click View in the Actions column of the target catalog to browse its databases and tables. If the schema is
public, the schema prefix is omitted from the table name.
Use a Hologres catalog
Usage notes:
If the schema is
public, you can reference a table as${table_name}instead of${schema_name.table_name}.Tables in a Hologres catalog support updated data consumption. By default,
ignoredeleteisfalseandmutatetypeisinsertorupdate. See Merge data into a wide table.
Create a Hologres table
This example creates a table named holotable in the holodb database within the holocatalog catalog.
Before you create a table:
The
connectorparameter is required in the WITH clause, and its value must behologres. Other parameters such asendpointcan be omitted.You cannot directly add or modify WITH parameters in a Hologres table definition. Use SQL hints in INSERT statements instead.
UI
Log on to the Realtime Compute for Apache Flink console. In the Actions column of your workspace, click Console, then click Catalogs.
In the Actions column of the target catalog, click View. Then click View again for the target database.
Click Create Table.
On the Built-in Connector tab, select the Hologres connector and click Next.
Enter the
CREATE TABLEstatement.Syntax Example CREATE TABLE \${catalog_name}.\${db_name}.\${table_name}(...) WITH ('connector' = 'hologres');CREATE TABLE \holocatalog.\holo_test.\product(id INT, name STRING) WITH ('connector' = 'hologres');Click Confirm.
Flink SQL
In the editor on the Scripts page, enter a CREATE TABLE statement using one of the following approaches.
Option 1: Use `USE CATALOG` (recommended)
Switch to the catalog first, then create the table without a fully qualified name.
| Syntax | Example |
|---|---|
USE CATALOG ${catalog_name};<br>CREATE TABLE \${db_name}\.\${schema_name.table_name}\(...) WITH ('connector' = 'hologres'); | USE CATALOG holocatalog;<br>CREATE TABLE \holodb\.\holotable\ (id INT, name STRING) WITH ('connector' = 'hologres'); |
Option 2: Use a fully qualified name in the DDL
Reference the catalog directly in the CREATE TABLE statement.
| Syntax | Example |
|---|---|
CREATE TABLE \${catalog_name}\.\${db_name}\.\${schema_name.table_name}\(...) WITH ('connector' = 'hologres'); | CREATE TABLE \holocatalog\.\holodb\.\holotable\ (id INT, name STRING) WITH ('connector' = 'hologres'); |
Set physical table properties
Add table_property.* parameters to the WITH clause to control how Hologres stores the table. These properties are set when the table is created and some cannot be changed later.
CREATE TABLE `holocatalog`.`holodb`.`holotable` (
id INT,
name STRING
) WITH (
'connector' = 'hologres',
'table_property.orientation' = 'column',
'table_property.distribution_key' = 'a',
'table_property.clustering_key' = 'b:desc',
'table_property.bitmap_columns' = 'a,b',
'table_property.segment_key' = 'c',
'table_property.time_to_live_in_seconds' = '86400',
'table_property.binlog.level' = 'replica',
'table_property.binlog.ttl' = '86400'
);Supported properties (all use the table_property. prefix):
| Property | Description | Example | Modifiable |
|---|---|---|---|
table_property.orientation | Storage format | 'row,column' | No |
table_property.table_group | Table group | 'table_group_xxx' | — |
table_property.distribution_key | Distribution key | 'a,b' | — |
table_property.clustering_key | Clustering key | 'a,b:desc' | — |
table_property.event_time_column (formerly table_property.segment_key) | Segment key | 'c,d' | — |
table_property.bitmap_columns | Bitmap index | 'a:on,b:off' | Yes |
table_property.dictionary_encoding_columns | Dictionary encoding | 'a:on,b:off,c:auto' | Yes |
table_property.time_to_live_in_seconds | TTL (time to live) of table data | '864000' | Yes |
table_property.binlog.level | Enable binary logging | 'replica' | Yes |
table_property.binlog.ttl | TTL of binary logs | '86400' | Yes |
For more information, see Overview of table creation and Subscribe to Hologres binary logs.
Enable lenient mode (`enableTypeNormalization`)
Lenient mode normalizes Flink data types to broader Hologres-compatible types, which is useful in Create Table As Select (CTAS) scenarios where upstream type precision may vary.
| Value | Behavior |
|---|---|
false (default) | Creates the Hologres table using the exact type mapping. |
true | Normalizes types: TINYINT/SMALLINT/INT/BIGINT → BIGINT; CHAR/VARCHAR/STRING → STRING; FLOAT/DOUBLE → DOUBLE. Other types follow the standard type mapping. |
Enable lenient mode before the first CTAS job run. If you enable it after the first run, delete the descendant table and do a stateless restart for the change to take effect.
Modify a Hologres table
| Operation | Syntax and example |
|---|---|
| Modify table properties | ALTER TABLE \${catalog_name}\.\${db_name}\.\${table_name}\ SET ('table_property.binlog.level' = 'replica', 'table_property.binlog.ttl' = '64700');<br>Example: ALTER TABLE \holocatalog\.\holodb\.\holotable\ SET ('table_property.binlog.level' = 'replica', 'table_property.binlog.ttl' = '64700'); |
| Rename a table | ALTER TABLE \${catalog_name}\.\${db_name}\.\${table_name}\ RENAME TO \${catalog_name}\.\${db_name}\.\${new_table_name}\;<br>Example: ALTER TABLE \holocatalog\.\holodb\.\holotable\ RENAME TO \holocatalog\.\holodb\.\new_holotable\; |
| Add a column | ALTER TABLE \${catalog_name}\.\${db_name}\.\${table_name}\ ADD <column_name> <column_datatype> COMMENT '<column_comment>';<br>Example: ALTER TABLE \holocatalog\.\holodb\.\holotable\ ADD address STRING COMMENT 'Address information'; |
| Rename a column | ALTER TABLE \${catalog_name}\.\${db_name}\.\${table_name}\ RENAME <br>Example: ALTER TABLE \holocatalog\.\holodb\.\holotable\ RENAME address TO new_address; |
| Modify a column comment | ALTER TABLE \${catalog_name}\.\${db_name}\.\${table_name}\ MODIFY <column_name> <original_column_type> COMMENT '<new_column_comment>';<br>Example: ALTER TABLE \holocatalog\.\holodb\.\holotable\ MODIFY new_address STRING COMMENT 'New address information'; |
Read from and write to Hologres tables
Read data from Hologres
By default, Flink reads Hologres source tables in batch mode and does not consume newly written data in real time. To read in real time, use one of the following methods:
Configure binary logging at catalog creation (recommended for persistent settings): Enable binary logging when creating the catalog using
CREATE CATALOG. See the real-time consumption example above. Then query the table normally:Syntax Example INSERT INTO ${other_sink_table} SELECT ... FROM \${catalog_name}\.\${db_name}\.\${table_name}\;INSERT INTO sink_table SELECT id, name FROM \holocatalog\.\holodb\.\holotable\;Use a SQL hint (for per-query overrides): Add the
/*+ OPTIONS('binlog'='true') */hint to the query.INSERT INTO sinktable SELECT id, name FROM `holocatalog`.`holodb`.`holotable` /*+ OPTIONS ('binlog' = 'true') */;
Write data to Hologres
| Syntax | Example |
|---|---|
INSERT INTO \${catalog_name}\.\${db_name}\.\${table_name}\ SELECT ... FROM ${other_source_table}; | INSERT INTO \holocatalog\.\holodb\.\holotable\ SELECT id, name FROM source_table; |
Use as a CTAS destination
During CTAS synchronization, the Hologres catalog may automatically rewrite the destination schema to ensure data can be written to Hologres: a DECIMAL primary key is rewritten to BIGINT by default; TIME, TIMESTAMP, or TIMESTAMP_LTZ columns with precision greater than 6 are implicitly truncated to precision 6. If the DECIMAL rewrite does not meet your requirements, use CTAS syntax to convert the type to STRING and re-establish the primary key.
CTAS (Create Table As Select) creates a Hologres table and populates it from a source table in one statement. Set physical table properties in the WITH clause—they are applied when the destination table is created.
CREATE TABLE IF NOT EXISTS `${catalog_name}`.`${db_name}`.`${table_name}`
WITH (
'connector' = 'hologres'
) AS TABLE ${other_source_table};Example:
CREATE TABLE IF NOT EXISTS `holocatalog`.`holodb`.`holotable`
WITH (
'connector' = 'hologres'
) AS TABLE source_table;Use as a CDAS destination
CDAS WITH parameters apply globally—you cannot set different properties for each destination table. To set per-table properties, create the destination tables manually before starting the CDAS job. See Create a Hologres table for supported physical table properties.
CDAS (Create Database As Database) synchronizes an entire source database to Hologres.
CREATE DATABASE IF NOT EXISTS `${catalog_name}`.`${db_name}`
WITH (
'sink.parallelism' = '5' -- Set the parallelism for each sink table
) AS DATABASE ${other_source_database};Example:
CREATE DATABASE IF NOT EXISTS `holocatalog`.`holodb`
WITH (
'sink.parallelism' = '5'
) AS DATABASE source_database;WITH parameters in CDAS apply to all downstream sink tables. For details on supported parameters, see Hologres sink table.
Additional parameters:
| Parameter | Required | Default | Description |
|---|---|---|---|
schemaname | No | public | The schema in the destination Hologres database to sync data into. |
Delete a Hologres catalog
Deleting a catalog does not affect running jobs, but it does affect jobs that are not yet published and jobs that need to be paused and resumed. Proceed with caution.
UI
Log on to the Realtime Compute for Apache Flink console. In the Actions column of your workspace, click Console, then click Catalogs in the left navigation pane.
On the Catalog List page, find the catalog and click Delete in the Actions column.
In the confirmation dialog, click Delete.
Verify that the catalog no longer appears in the Catalogs section on the left.
Flink SQL
In the editor on the Data Query page, run the following command:
DROP CATALOG ${catalog_name}Replace
${catalog_name}with the name of the catalog as shown in the Realtime Compute for Apache Flink console.Right-click the command and select Run.
Confirm that the catalog no longer appears in the Metadata section on the left.
FAQ
Real-time consumption errors: See FAQ about catalog errors.
Network connectivity issues: See Network connectivity.
CTAS error about CatalogTableProvider: See What do I do if the error message "CREATE TABLE ... AS TABLE ... statement requires target catalog ... implements org.apache.flink.table.catalog.CatalogTableProvider interface." appears?
What's next
Explore connector options for Hologres: Connector options
Build end-to-end pipelines with Hologres catalogs: