This topic describes the Hologres connector.
Overview
Hologres is Alibaba Cloud's unified real-time data warehousing service, engineered for high-performance data operations. It enables real-time writing, updating, processing, and analysis of massive datasets. Supporting standard SQL syntax and most PostgreSQL functions, Hologres excels at petabyte-scale OLAP and ad hoc analysis, delivering high-concurrency, low-latency data services. Its capabilities include fine-grained workload isolation and enterprise-grade security. Deeply integrated with MaxCompute, Realtime Compute for Apache Flink, and DataWorks, Hologres provides comprehensive end-to-end data warehousing solutions for enterprises. The following table briefly describes the Hologres connector of Realtime Compute for Apache Flink:
Category | Details |
Supported types | Source, dimension, and sink tables |
Execution modes | Stream and batch modes |
Data format | Not supported |
Monitoring metrics | |
API types | DataStream and SQL |
Supports data updates or deletions in sink tables | Yes |
Features
Feature | Details |
Read Hologres data with or without binary logging (binlog). This feature is compatible with both change data capture (CDC) and non-CDC modes. | |
Perform full, incremental, or unified full and incremental consumption. | |
Ignore new data, replace entire rows, or update only specific fields. | |
Update only modified columns instead of the entire row. | |
Consume binlogs from physical partitioned tables. A single job can monitor all partitions, including newly added ones. You can also consume binlogs from logical partitioned tables. | |
You can write data to the parent table of a partitioned table and automatically create the corresponding child partitions. | |
Real-time synchronization for a single table or an entire database | You can perform real-time synchronization of data at the single-table or entire-database level. This feature provides the following capabilities:
For more information, see CREATE TABLE AS (CTAS) statement and Quick Start for real-time database synchronization. |
Limitations and suggestions
Limitations
Foreign tables not supported: The Hologres connector does not support access to Hologres foreign tables, such as MaxCompute foreign tables.
Time type limit: Real-time consumption of TIMESTAMP data is not supported. Use the TIMESTAMPTZ type when you create tables.
Source table scan: In Ververica Runtime (VVR) 8 and earlier, data is read from Hologres in batch mode by default. This means incremental data is not consumed.
Watermark limit: In VVR 8 and earlier, CDC mode does not support defining watermarks. To perform windowed aggregations, use a non-windowed aggregation solution.
Suggestions
Table storage format:
Dimension tables for point lookups: Recommended to use row-oriented storage. Requires setting both a primary key and a clustering key.
Dimension tables for one-to-many queries: Recommended to use column-oriented storage and configure the distribution key and segment key to optimize performance.
Frequently updated tables for analytical queries: Recommended to use the row-column hybrid storage if such tables must support both real-time binlogs consumption and OLAP analysis.
ImportantIf you do not explicitly specify a storage format when creating a Hologres table, it defaults to columnar storage. This storage format is immutable and cannot be altered post-creation. For more information, see Create a table in Hologres and Table storage formats.
Job parallelism: You can set the Flink job parallelism to be identical to the number of shards in the Hologres table.
-- Run this command in HoloWeb to view the number of shards in a table. select tg.property_value from hologres.hg_table_properties tb join hologres.hg_table_group_properties tg on tb.property_value = tg.tablegroup_name where tb.property_key = 'table_group' and tg.property_key = 'shard_count' and table_name = '<tablename>';Hologres version: Regularly check the Hologres connector release notes for known issues, feature updates, and version compatibility information.
Compatibility notes
Hologres and VVR version compatibility and limitations
Source table
VVR 8 and earlier: Set the consumption mode using
sdkMode.VVR 11+: Set the consumption mode using
source.binlog.read-mode.
VVR version
Hologres version
Default/Recommended consumption mode
Actual consumption mode
Notes
≥ 6.0.7
< 2.0
holohubholohub6.0.7–8.0.4
≥ 2.0
jdbc(automatic switchover)jdbc(forced)Hologres 2.0 and later has deprecated HoloHub. If
holohubis set, the connector automatically falls back tojdbc, which may require additional user permissions. For permission configurations, see Permission issues.≥ 8.0.5
≥ 2.1
jdbc(automatic switchover)jdbc(forced)For Hologres 2.1.27 and later, use
jdbc_fixed.≥ 11.1
Any version
AUTO(default)Automatically selected based on the Hologres version
Hologres 2.1.27 and later default to
JDBC, andconnection.fixed.enableddefaults totrueto enable lightweight connection.For Hologres 2.1.0 to 2.1.26,
JDBCis selected.For Hologres 2.0 and earlier,
HOLOHUBis selected.
ImportantIn VVR 11.1 and later, the connector enables binlogs consumption by default. Ensure that you have enabled binlog to avoid errors.
Sink table
VVR 8 and earlier: Set the data writing mode using
sdkMode.VVR 11+: Set the data writing mode using
sink.write-mode.
VVR version
Hologres version
Is RPC affected?
Actual data writing mode
Recommended/Default parameter value
Notes
6.0.4–8.0.2
< 2.0
No
rpcCustom
6.0.4–8.0.2
≥ 2.0
Yes
jdbc_fixed(automatic switchover)Custom
To prevent deduplication, set
'jdbcWriteBatchSize'='1'.≥ 8.0.3
Any version
Yes
jdbc_fixed(automatic switchover)Custom
If you set
rpc, the connector automatically switches tojdbc_fixedand sets'jdbcWriteBatchSize'='1'to prevent deduplication.≥ 8.0.5
Any version
Yes
jdbc_fixed(automatic switchover)Custom
If you set
rpc, the connector automatically switches tojdbc_fixedand sets'deduplication.enabled'='false'to prevent deduplication.ImportantRPC has been deprecated in Hologres 2.0 and later. If you set the data write mode to
rpc, the connector will automatically switch tojdbc_fixed.Ververica Runtime (VVR) 11.1 and later versions no longer support RPC; use
jdbcfor connections.For high-concurrency write scenarios, use
jdbc_copyorCOPY_STREAM.
Dimension table
VVR version
Hologres version
Is RPC affected?
Actual consumption mode
Recommended/Default parameter value
Notes
6.0.4–8.0.2
< 2.0
No
rpcCustom
/
6.0.4–8.0.2
≥ 2.0
Yes
jdbc_fixed(automatic switchover)Custom
RPC has been deprecated in Hologres 2.0 and later. If you set the data write mode to
rpc, the connector will automatically switch tojdbc_fixed.≥ 8.0.3
Any version
Yes
jdbc_fixed(automatic switchover)Custom
≥ 8.0.5
Any version
Yes
jdbc_fixed(automatic switchover)Custom
ImportantVVR 11.1 and later no longer support RPC and use JDBC by default. To enable lightweight connection, enable
connection.fixed.enabled.To consume JSONB data from source tables with binlog enabled in JDBC mode, enable the GUC parameter
hg_experimental_enable_binlog_jsonbat the database level:-- Only a superuser can execute this command. This needs to be set only once per DB. alter database <db_name> set hg_experimental_enable_binlog_jsonb = on;An
UPDATEoperation generates two consecutive binlogs: the old data (update_before), followed by the new data (update_after).Avoid truncating or recreating a source table with binlog enabled. If you run into errors, see FAQ for solutions.
To avoid errors, maintain consistent DECIMAL precision between Flink and Hologres.
Stateless unified full and incremental synchronization does not guarantee global ordering. For downstream applications relying on timestamp-based processing, use pure binlog reading mode.
Enable binlog
For new tables
Real-time data read is disabled by default. When creating a Hologres table in HoloWeb, set the binlog.level and binlog.ttl parameters. Sample code:
begin;
create table test_table(
id int primary key,
title text not null,
body text);
call set_table_property('test_table', 'orientation', 'row');-- Create a row-oriented table named test_table.
call set_table_property('test_table', 'clustering_key', 'id');-- Create a clustering key on the id column.
call set_table_property('test_table', 'binlog.level', 'replica');-- Enable binlog.
call set_table_property('test_table', 'binlog.ttl', '86400');-- Specifies binlog TTL, in seconds.
commit;For existing tables
In HoloWeb, use the following statements to enable binlog and set binlog TTL for an existing table. Replace table_name with your actual table name.
-- Enable binlog.
begin;
call set_table_property('<table_name>', 'binlog.level', 'replica');
commit;
-- Set binlog TTL, in seconds.
begin;
call set_table_property('<table_name>', 'binlog.ttl', '2592000');
commit;Connector options
Connector options for Hologres have been updated starting with VVR 11 to enhance support. Some options may have been renamed or removed. VVR 11 remains backward compatible with VVR 8. Please consult the parameter documentation specific to your VVR version for details.
Type mapping
See Data type mappings between Realtime Compute for Apache Flink or Blink and Hologres.
Examples
Source tables
Binlog enabled
Hologres CDC mode
This mode enables mirror synchronization of table data. The source consumes binlogs, and based on hg_binlog_event_type, it automatically assigns the accurate Flink RowKind (e.g., INSERT, DELETE, UPDATE_BEFORE, UPDATE_AFTER) to each row without explicit declaration. This process mirrors changes, akin to MySQL or PostgreSQL CDC functionality.
VVR 11+
CREATE TEMPORARY TABLE test_message_src_binlog_table(
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username'='${secret_values.ak_id}', -- We recommend using variables for AK/SK to prevent key leakage.
'password'='${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>',
'source.binlog.change-log-mode'='ALL', -- Read all changeLog types, including INSERT, DELETE, UPDATE_BEFORE, and UPDATE_AFTER.
'retry-count'='10', -- Number of retries after a binlog read error.
'retry-sleep-step-ms'='5000', -- Incremental backoff time between retries. The first retry waits 5s, the second 10s, and so on.
'source.binlog.batch-size'='512' -- Number of rows to read from binlog in a batch.
);VVR 8+
CREATE TEMPORARY TABLE test_message_src_binlog_table(
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username' = '${secret_values.ak_id}', -- We recommend using variables for AK/SK to prevent key leakage.
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>',
'binlog' = 'true',
'cdcMode' = 'true',
'sdkMode'='jdbc',
'binlogMaxRetryTimes' = '10', -- Number of retries after a Binlog read error.
'binlogRetryIntervalMs' = '500', -- Retry interval after a Binlog read error, in milliseconds.
'binlogBatchReadSize' = '100' -- Number of rows to read from Binlog in a batch.
);Hologres non-CDC mode
This mode treats binlogs as regular Flink data, with all incoming events defaulting to the INSERT RowKind. For specific binlog event types (controlled by hg_binlog_event_type), you may need to implement custom handling.
VVR 11+
CREATE TEMPORARY TABLE test_message_src_binlog_table(
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username' = '${secret_values.ak_id}', -- We recommend using variables for AK/SK to prevent key leakage.
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>',
'source.binlog.change-log-mode'='ALL_AS_APPEND_ONLY', -- All changelog types are treated as INSERT.
'retry-count'='10', -- Number of retries after a binlog read error.
'retry-sleep-step-ms'='5000', --The incremental wait time for retries. The first retry waits for 5 seconds, the second for 10 seconds, and so on.
'source.binlog.batch-size'='512' --The number of data rows to read from the binlog in each batch.
);VVR 8+
CREATE TEMPORARY TABLE test_message_src_binlog_table(
hg_binlog_lsn BIGINT,
hg_binlog_event_type BIGINT,
hg_binlog_timestamp_us BIGINT,
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username' = '${secret_values.ak_id}', -- We recommend using variables for AK/SK to prevent key leakage.
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>',
'binlog' = 'true',
'binlogMaxRetryTimes' = '10', -- Number of retries after a binlog read error.
'binlogRetryIntervalMs' = '500', -- Incremental backoff time between retries. The first retry waits 5s, the second 10s, and so on.
'binlogBatchReadSize' = '100' -- Number of rows to read from Binlog in a batch.
);Binlog disabled
VVR 11+
Starting with VVR 11.1, the connector defaults to binlog reading mode. To read from a source that does not have binlog enabled, explicitly set source.binlog='false', as demonstrated below. For more information, see Binlog enabled.
CREATE TEMPORARY TABLE hologres_source (
name varchar,
age BIGINT,
birthday BIGINT
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username' = '${secret_values.ak_id}', -- We recommend using variables for AK/SK to prevent key leakage.
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>',
'source.binlog'='false' -- Do not read binlog data.
);VVR 8+
CREATE TEMPORARY TABLE hologres_source (
name varchar,
age BIGINT,
birthday BIGINT
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username' = '${secret_values.ak_id}', -- We recommend using variables for AK/SK to prevent key leakage.
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>',
'sdkMode' = 'jdbc'
);Sink tables
CREATE TEMPORARY TABLE datagen_source(
name varchar,
age BIGINT,
birthday BIGINT
) WITH (
'connector'='datagen'
);
CREATE TEMPORARY TABLE hologres_sink (
name varchar,
age BIGINT,
birthday BIGINT
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username' = '${secret_values.ak_id}', -- We recommend using variables for AK/SK to prevent key leakage.
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>'
);
INSERT INTO hologres_sink SELECT * from datagen_source;Dimension tables
CREATE TEMPORARY TABLE datagen_source (
a INT,
b BIGINT,
c STRING,
proctime AS PROCTIME()
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE hologres_dim (
a INT,
b VARCHAR,
c VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username' = '${secret_values.ak_id}', -- We recommend using variables for AK/SK to prevent key leakage.
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>'
);
CREATE TEMPORARY TABLE blackhole_sink (
a INT,
b STRING
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink SELECT T.a,H.b
FROM datagen_source AS T JOIN hologres_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.a = H.a;Hologres connector features
Unified full and incremental ingestion
Limitations
The source table requires a primary key. Recommended for reading data in Hologres CDC mode.
Source tables with binlog enabled are supported. You can enable binlog for existing tables.
Sample code
VVR 11+
CREATE TABLE test_message_src_binlog_table(
hg_binlog_lsn BIGINT,
hg_binlog_event_type BIGINT,
hg_binlog_timestamp_us BIGINT,
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username'='<yourAccessID>',
'password'='<yourAccessSecret>',
'endpoint'='<yourEndpoint>',
'source.binlog.startup-mode' = 'INITIAL', -- First reads all historical data, then reads Binlog incrementally.
'retry-count'='10', -- Number of retries after a Binlog read error.
'retry-sleep-step-ms'='5000', -- Incremental backoff time between retries. The first retry waits 5s, the second 10s, and so on.
'source.binlog.batch-size'='512' -- Number of rows to read from Binlog in a batch.
);Set
source.binlog.startup-modetoINITIALto first read all historical data and then start incremental reading.startTimetakes precedence overbinlogStartUpMode. If you setstartTime(either directly or when starting a job deployment in the Development Console),binlogStartUpModewill be automatically forced totimestampmode, regardless of its original setting.
VVR 8+
CREATE TABLE test_message_src_binlog_table(
hg_binlog_lsn BIGINT,
hg_binlog_event_type BIGINT,
hg_binlog_timestamp_us BIGINT,
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username'='<yourAccessID>',
'password'='<yourAccessSecret>',
'endpoint'='<yourEndpoint>',
'binlog' = 'true',
'cdcMode' = 'true',
'binlogStartUpMode' = 'initial', -- First reads all historical data, then reads Binlog incrementally.
'binlogMaxRetryTimes' = '10', -- Number of retries after a Binlog read error.
'binlogRetryIntervalMs' = '500', -- Retry interval after a Binlog read error, in milliseconds.
'binlogBatchReadSize' = '100' -- Number of rows to read from Binlog in a batch.
);Set
binlogStartUpModetoinitialto first read all historical data and then start incremental reading.startTimetakes precedence overbinlogStartUpMode. If you setstartTime(either directly or when starting a job deployment in the Development Console),binlogStartUpModewill be automatically forced totimestamp, regardless of its original setting.
Handle primary key conflicts
When writing to Hologres, if data with a duplicate primary key exists, the connector provides three handling strategies.
VVR 11+
Specify the sink.on-conflict-action option to implement different strategies.
| Description |
| Keeps the first occurrence of the data and ignores subsequent duplicates. |
| Replaces the existing row with the new data. |
| Updates only the specified columns, leaving other columns in the existing row unchanged. |
VVR 8+
Specify the mutatetype option to implement different strategies.
| Meaning |
| Keeps the first occurrence of the data and ignores subsequent duplicates. |
| Replaces the existing row with the new data. |
| Updates only the specified columns, leaving other columns in the existing row unchanged. |
For example, assume a table has columnsa,b,c, andd, whereais the primary key. If the sink table only providesaandb, setting the strategy toINSERT_OR_UPDATEwill only update columnb, while columnscanddwill remain unchanged.
The number of columns in the sink table can be fewer than in the physical Hologres table, but any missing columns must be nullable. Otherwise, the write operation fails.
Write to partitioned tables
By default, the Hologres sink only supports importing data into a non-partitioned table. To import data into a partitioned table, enable the following configuration:
VVR 11+
Set sink.create-missing-partition to true. This allows the connector to automatically create child partitions if they do not exist.
VVR 11.1 and later support writing to partitioned tables, automatically routing data to the corresponding child partitions.
Set
tablenameto the name of the partitioned table.To ensure successful writes, either create child partitions in advance or set
sink.create-missing-partition=true.
VVR 8+
Set
partitionRoutertotrueto automatically route data to the corresponding child partitions.Set
createparttabletotrueto automatically create child partitions if they do not exist.
Set
tablenameto the name of the partitioned table.To ensure successful writes, either create child partitions in advance or set
createparttable=true.
Merge streams and partial updates
The connector can efficiently merge multiple data streams into a single Hologres wide table. It supports partial row updates, applying changes only to modified columns based on the primary key. This optimizes write efficiency and ensures data consistency.
Limitations and notes
The Hologres wide table must have a primary key.
Each source data stream must include all of the primary key columns for the Hologres wide table.
If your Hologres wide table uses column-oriented storage, high RPS can lead to elevated CPU usage. Consider disabling dictionary encoding for the table's columns to mitigate this.
Example
Assume there are two data streams. One stream contains columns a, b, and c, and the other contains columns a, d, and e. The Hologres wide table WIDE_TABLE contains columns a, b, c, d, and e, where a is the primary key.
VVR 11+
-- Assume source1 and source2 are already defined.
CREATE TEMPORARY TABLE hologres_sink (
a BIGINT,
b STRING,
c STRING,
d STRING,
e STRING,
primary key(a) not enforced
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourWideTablename>', -- The Hologres wide table, containing columns a, b, c, d, e.
'username' = '${secret_values.ak_id}',
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>',
'sink.on-conflict-action'='INSERT_OR_UPDATE', -- Handle primary key conflicts.
'sink.delete-strategy'='IGNORE_DELETE', -- Strategy for handling retraction messages. IGNORE_DELETE is suitable for scenarios that only require inserts or updates, not deletes.
'sink.partial-insert.enabled'='true' -- Enable partial column updates. This allows the connector to update or insert only the columns specified in the `INSERT` statement.
);
BEGIN STATEMENT SET;
INSERT INTO hologres_sink(a,b,c) select * from source1; -- Declare only columns a, b, and c are inserted.
INSERT INTO hologres_sink(a,d,e) select * from source2; -- Declare only columns a, d, and e are inserted.
END;VVR 8+
-- Assume source1 and source2 are already defined.
CREATE TEMPORARY TABLE hologres_sink (
a BIGINT,
b STRING,
c STRING,
d STRING,
e STRING,
primary key(a) not enforced
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourWideTablename>', -- The Hologres wide table, containing columns a, b, c, d, e.
'username' = '${secret_values.ak_id}',
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>',
'mutatetype'='insertorupdate', -- Handle primary key conflicts.
'ignoredelete'='true', -- Ignore delete requests generated by retraction messages.
'partial-insert.enabled'='true' -- Enable partial column updates, supporting updates for only the columns declared in the INSERT statement.
);
BEGIN STATEMENT SET;
INSERT INTO hologres_sink(a,b,c) select * from source1; -- Declare only columns a, b, and c are inserted.
INSERT INTO hologres_sink(a,d,e) select * from source2; -- Declare only columns a, d, and e are inserted.
END;Set ignoredelete to true to ignore delete requests generated by retraction messages. In VVR 8.0.8 and later, use sink.delete-strategy to configure desired strategies for partial updates.
Read binlogs from partitioned tables (Beta)
Partitioned tables improve data archiving and query performance. The Hologres connector supports reading binlogs from both physical and logical partitioned tables. For more information, see CREATE LOGICAL PARTITION TABLE.
Physical partitioned tables
The Hologres connector supports reading binlogs from partitioned tables and dynamically monitors for new partitions within a single job, greatly enhancing real-time data processing efficiency and usability.
Notes
Partitioned table binlog reading requires: VVR 8.0.11+, Hologres 2.1.27+, binlog-enabled tables, and JDBC consumption.
To ensure successful data reads, ensure partition names strictly follow the format:
{parent_table}_{partition_value}. For more information, see Naming conventions for child partitioned tables.ImportantIn
DYNAMICpartition binlog consumption mode, VVR 8.0.11 has limitations with partition column names containing hyphens (e.g.,YYYY-MM-DD).VVR 11.1+ fully supports reading from partitions with custom name formats
This restriction is only for reading; writing to partitioned tables is unaffected.
When declaring a Hologres source table in Flink, include the partition columns of the Hologres partitioned table.
When using
DYNAMICpartition binlog consumption mode, ensure dynamic partition management is enabled for your Hologres partitioned table. Setauto_partitioning.num_precreateto a value greater than 1 to prevent the job from throwing an exception when reading the latest partition.In
DYNAMICpartition binlog consumption mode, once a new partition is added, incremental data from older partitions will not be read.
Examples
Partition binlog consumption mode | Feature | Description |
| Dynamic partition reading | Automatically monitors new partitions and dynamically advances reading progress in chronological order. Suitable for real-time use cases. |
| Static partition reading | Reads only existing partitions (or explicitly specified ones) and does not automatically discover new partitions. Suitable for processing historical data within a fixed range. |
DYNAMIC mode
VVR 11+
Create a Hologres partitioned table with both binlog and dynamic partitioning enabled.
CREATE TABLE "test_message_src1" (
id int,
title text,
body text,
dt text,
PRIMARY KEY (id, dt)
)
PARTITION BY LIST (dt) WITH (
binlog_level = 'replica',
auto_partitioning_enable = 'true', -- Enable dynamic partitioning.
auto_partitioning_time_unit = 'DAY', -- Partitions are created daily. Example partition names: test_message_src1_20250512, test_message_src1_20250513.
auto_partitioning_num_precreate = '2' -- Pre-create two partitions.
);
-- For existing partitioned tables, you can also enable dynamic partitioning using ALTER TABLE.In Flink, use the Hologres connector to read data from test_message_src1 in DYNAMIC mode.
CREATE TEMPORARY TABLE hologres_source
(
id INTEGER,
title VARCHAR,
body VARCHAR,
dt VARCHAR -- The partition column of the Hologres partitioned table.
)
with (
'connector' = 'hologres',
'dbname' = '<yourDatabase>',
'tablename' = 'test_message_src1', -- The Hologres table with dynamic partitioning enabled.
'username' = '<yourUserName>',
'password' = '<yourPassword>',
'endpoint' = '<yourEndpoint>',
'source.binlog.partition-binlog-mode' = 'DYNAMIC', -- Dynamically monitor the latest partitions.
'source.binlog.startup-mode' = 'initial' -- First read all existing data, then start reading Binlog incrementally.
);VVR 8.0.11
Create a a Hologres partitioned table with both binlog and dynamic partitioning enabled.
CREATE TABLE "test_message_src1" (
id int,
title text,
body text,
dt text,
PRIMARY KEY (id, dt)
)
PARTITION BY LIST (dt) WITH (
binlog_level = 'replica',
auto_partitioning_enable = 'true', -- Enable dynamic partitioning.
auto_partitioning_time_unit = 'DAY', -- Partitions are created daily. Example partition names: test_message_src1_20241027, test_message_src1_20241028.
auto_partitioning_num_precreate = '2' -- Pre-create two partitions.
);
-- For existing partitioned tables, you can also enable dynamic partitioning using ALTER TABLE.In Flink, use the Hologres connector to read data from test_message_src1 in DYNAMIC mode.
CREATE TEMPORARY TABLE hologres_source
(
id INTEGER,
title VARCHAR,
body VARCHAR,
dt VARCHAR -- The partition column of the Hologres partitioned table.
)
with (
'connector' = 'hologres',
'dbname' = '<yourDatabase>',
'tablename' = 'test_message_src1', -- The Hologres table with dynamic partitioning enabled.
'username' = '<yourUserName>',
'password' = '<yourPassword>',
'endpoint' = '<yourEndpoint>',
'binlog' = 'true',
'partition-binlog.mode' = 'DYNAMIC', -- Dynamically monitor the latest partitions.
'binlogstartUpMode' = 'initial', -- First read all existing data, then start reading Binlog incrementally.
'sdkMode' = 'jdbc_fixed' -- Use this mode to avoid connection limit issues.
);STATIC mode
VVR 11+
Create a a Hologres partitioned table with binlog enabled.
CREATE TABLE test_message_src2 (
id int,
title text,
body text,
color text,
PRIMARY KEY (id, color)
)
PARTITION BY LIST (color) WITH (
binlog_level = 'replica'
);
create table test_message_src2_red partition of test_message_src2 for values in ('red');
create table test_message_src2_blue partition of test_message_src2 for values in ('blue');
create table test_message_src2_green partition of test_message_src2 for values in ('green');
create table test_message_src2_black partition of test_message_src2 for values in ('black');
In Flink, use the Hologres connector to read data from the partitioned table test_message_src2 in STATIC mode.
CREATE TEMPORARY TABLE hologres_source
(
id INTEGER,
title VARCHAR,
body VARCHAR,
color VARCHAR -- The partition column of the Hologres partitioned table.
)
with (
'connector' = 'hologres',
'dbname' = '<yourDatabase>',
'tablename' = 'test_message_src2', -- The partitioned table.
'username' = '<yourUserName>',
'password' = '<yourPassword>',
'endpoint' = '<yourEndpoint>',
'source.binlog.partition-binlog-mode' = 'STATIC', -- Read a fixed set of partitions.
'source.binlog.partition-values-to-read' = 'red,blue,green', -- Read only the three specified partitions; the 'black' partition is not read. New partitions are also not read. If not set, all partitions of the parent table are read.
'source.binlog.startup-mode' = 'initial' -- First read all existing data, then start reading Binlog incrementally.
);VVR 8.0.11
Create a a Hologres partitioned table with binlog enabled.
CREATE TABLE test_message_src2 (
id int,
title text,
body text,
color text,
PRIMARY KEY (id, color)
)
PARTITION BY LIST (color) WITH (
binlog_level = 'replica'
);
create table test_message_src2_red partition of test_message_src2 for values in ('red');
create table test_message_src2_blue partition of test_message_src2 for values in ('blue');
create table test_message_src2_green partition of test_message_src2 for values in ('green');
create table test_message_src2_black partition of test_message_src2 for values in ('black');
In Flink, use the Hologres connector to read data from the partitioned table test_message_src2 in STATIC mode.
CREATE TEMPORARY TABLE hologres_source
(
id INTEGER,
title VARCHAR,
body VARCHAR,
color VARCHAR -- The partition column of the Hologres partitioned table.
)
with (
'connector' = 'hologres',
'dbname' = '<yourDatabase>',
'tablename' = 'test_message_src2', -- The partitioned table.
'username' = '<yourUserName>',
'password' = '<yourPassword>',
'endpoint' = '<yourEndpoint>',
'binlog' = 'true',
'partition-binlog.mode' = 'STATIC', -- Read a fixed set of partitions.
'partition-values-to-read' = 'red,blue,green', -- Read only the three specified partitions; the 'black' partition is not read. New partitions are also not read. If not set, all partitions of the parent table are read.
'binlogstartUpMode' = 'initial', -- First read all existing data, then start reading Binlog incrementally.
'sdkMode' = 'jdbc_fixed' -- Use this mode to avoid connection limit issues.
);Logical partitioned tables
The Hologres connector supports reading binlogs from logical partitioned tables and allows explicitly specifying which partitions to read.
Limitations
Reading binlog from specific partitions requires: VVR 11.0.0+ and Hologres V3.1+.
Reading binlog from all partitions is equivalent to reading binlog from a non-partitioned Hologres table. For instructions, see Source tables.
Example
Option | Description | Example |
| The partition column names, enclosed in double quotes. Separate multiple column names with commas. If a column name contains a double quote, escape it with another double quote. |
The partition columns are |
| The partition column values. Each partition is specified by a set of column values. Values for different columns are separated by commas and enclosed in double quotes. If a value contains a double quote, escape it with another double quote. Multiple partitions are separated by semicolons. |
Specifies two partitions to read. The first partition's values are ( |
Assume the following table has been created in Hologres:
CREATE TABLE holo_table (
id int not null,
name text,
age numeric(18,4),
"Pt" text,
primary key(id, "Pt")
)
LOGICAL PARTITION BY LIST ("Pt", id)
WITH (
binlog_level ='replica'
);Read this table's binlogs using the Hologres connector:
CREATE TEMPORARY TABLE test_src_binlog_table(
id INTEGER,
name VARCHAR,
age decimal(18,4),
`Pt` VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='holo_table',
'username'='<yourAccessID>',
'password'='<yourAccessSecret>',
'endpoint'='<yourEndpoint>',
'source.binlog'='true',
'source.binlog.logical-partition-filter-column-names'='"Pt","id"',
'source.binlog.logical-partition-filter-column-values'='<yourPartitionColumnValues>',
'source.binlog.change-log-mode'='ALL', -- Read all changeLog types, including INSERT, DELETE, UPDATE_BEFORE, and UPDATE_AFTER.
'retry-count'='10', -- Number of retries after a Binlog read error.
'retry-sleep-step-ms'='5000', -- Incremental retry sleep time. The first retry waits 5s, the second 10s, and so on.
'source.binlog.batch-size'='512' -- Number of rows to read from binlog in a batch.
);DataStream API
To use the DataStream API, include the Hologres DataStream connector dependency in your project. For instructions on setting up DataStream connectors, see Integrate and use connectors in DataStream programs. The Hologres DataStream connector is available in our Maven Central Repository. For local debugging, use the corresponding Uber JAR. For more information, see Run and debug connectors locally.
Hologres source table
Binlog-enabled
Realtime Compute for Apache Flink provides the HologresBinlogSource class to read Hologres binlog data. The following example shows how to create a HologresBinlogSource.
VVR 8.0.11+
public class Sample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Initialize the schema of the table to be read. It must match the columns of the Hologres table schema. You can define a subset of columns.
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.field("b", DataTypes.STRING())
.field("c", DataTypes.TIMESTAMP())
.build();
// Hologres connector options.
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setString(HologresConfigs.SDK_MODE, "jdbc");
config.setBoolean(HologresBinlogConfigs.OPTIONAL_BINLOG, true);
config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
// Build JDBCOptions.
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
// Build the HologresBinlogSource.
long startTimeMs = 0;
HologresBinlogSource source = new HologresBinlogSource(
new HologresConnectionParam(config),
schema,
config,
jdbcOptions,
startTimeMs,
StartupMode.INITIAL,
"",
"",
-1,
Collections.emptySet(),
new ArrayList<>()
);
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
env.execute();
}
}VVR 8.0.7+
public class Sample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Initialize the schema of the table to be read. It must match the columns of the Hologres table schema. You can define a subset of columns.
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.field("b", DataTypes.STRING())
.field("c", DataTypes.TIMESTAMP())
.build();
// Hologres connector options.
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setString(HologresConfigs.SDK_MODE, "jdbc");
config.setBoolean(HologresBinlogConfigs.OPTIONAL_BINLOG, true);
config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
// Build JDBCOptions.
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
// Build the HologresBinlogSource.
long startTimeMs = 0;
HologresBinlogSource source = new HologresBinlogSource(
new HologresConnectionParam(config),
schema,
config,
jdbcOptions,
startTimeMs,
StartupMode.INITIAL,
"",
"",
-1,
Collections.emptySet()
);
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
env.execute();
}
}VVR 6.0.7+
public class Sample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Initialize the schema of the table to be read. It must match the columns of the Hologres table schema. You can define a subset of columns.
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.build();
// Hologres connector options.
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setString(HologresConfigs.SDK_MODE, "jdbc");
config.setBoolean(HologresBinlogConfigs.OPTIONAL_BINLOG, true);
config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
// Build JDBCOptions.
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
// Set or create the default slot name.
config.setString(HologresBinlogConfigs.JDBC_BINLOG_SLOT_NAME, HoloBinlogUtil.getOrCreateDefaultSlotForJDBCBinlog(jdbcOptions));
boolean cdcMode = config.get(HologresBinlogConfigs.BINLOG_CDC_MODE) && config.get(HologresBinlogConfigs.OPTIONAL_BINLOG);
// Build the JDBCBinlogRecordConverter.
JDBCBinlogRecordConverter recordConverter = new JDBCBinlogRecordConverter(
jdbcOptions.getTable(),
schema,
new HologresConnectionParam(config),
cdcMode,
Collections.emptySet());
// Build the HologresBinlogSource.
long startTimeMs = 0;
HologresJDBCBinlogSource source = new HologresJDBCBinlogSource(
new HologresConnectionParam(config),
schema,
config,
jdbcOptions,
startTimeMs,
StartupMode.TIMESTAMP,
recordConverter,
"",
-1);
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
env.execute();
}
}If you are using a Realtime Compute for Apache Flink engine version earlier than 8.0.5 or a Hologres version earlier than V2.1, ensure the user is a superuser or has the Replication Role permission. For more information, see Hologres permission issues.
Binlog-disabled
Realtime Compute for Apache Flink provides the HologresBulkreadInputFormat class, an implementation of RichInputFormat, to read data from Hologres tables. The following example shows how to build a Hologres source to read data from a binlog-disabled Hologres table.
public class Sample {
public static void main(String[] args) throws Exception {
// Set up the Java DataStream API
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Initialize the schema of the table to be read. It must match the columns of the Hologres table schema. You can define a subset of columns.
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.field("b", DataTypes.STRING())
.field("c", DataTypes.TIMESTAMP())
.build();
// Hologres connector options.
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
// Build JDBCOptions.
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
HologresBulkreadInputFormat inputFormat = new HologresBulkreadInputFormat(
new HologresConnectionParam(config),
jdbcOptions,
schema,
"",
-1);
TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
env.addSource(new InputFormatSourceFunction<>(inputFormat, typeInfo)).returns(typeInfo).print();
env.execute();
}
}POM.xml
Get the Hologres DataStream connector in our Maven central repository.
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-hologres</artifactId>
<version>${vvr-version}</version>
</dependency>Hologres sink table
Realtime Compute for Apache Flink provides the HologresSinkFunction class, an implementation of OutputFormatSinkFunction, to write data. The following example shows how to build a Hologres sink.
public class Sample {
public static void main(String[] args) throws Exception {
// Set up the Java DataStream API
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Initialize the schema of the table to be read. It must match the columns of the Hologres table schema. You can define a subset of columns.
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.field("b", DataTypes.STRING())
.build();
// Hologres connector options.
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setString(HologresConfigs.SDK_MODE, "jdbc");
HologresConnectionParam hologresConnectionParam = new HologresConnectionParam(config);
// Build a Hologres writer to write data as RowData.
AbstractHologresWriter<RowData> hologresWriter = HologresJDBCWriter.createRowDataWriter(
hologresConnectionParam,
schema,
HologresTableSchema.get(hologresConnectionParam),
new Integer[0]);
// Build the Hologres sink.
HologresSinkFunction sinkFunction = new HologresSinkFunction(hologresConnectionParam, hologresWriter);
TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
env.fromElements((RowData) GenericRowData.of(101, StringData.fromString("name"))).returns(typeInfo).addSink(sinkFunction);
env.execute();
}
}Metadata columns
Realtime Compute for Apache Flink VVR 8.0.11 and later support metadata columns for tables with binlog-enabled. Starting from this version, we recommend declaring binlog fields like hg_binlog_event_type as metadata columns. Metadata columns are an extension of the SQL standard that provide access to specific information such as the database and table name of the source, and the change type and timestamp of the data. You can use this information to define custom processing logic, such as filtering out DELETE events.
Metadata column name | Data type | Description |
db_name | STRING NOT NULL | The name of the database containing the row. |
table_name | STRING NOT NULL | The name of the table containing the row. |
hg_binlog_lsn | BIGINT NOT NULL | A binlog system column representing the binlog sequence number. It is monotonically increasing but not continuous within a shard. It is not guaranteed to be unique or ordered across different shards. |
hg_binlog_timestamp_us | BIGINT NOT NULL | The timestamp of the change event in the database, in microseconds (us). |
hg_binlog_event_type | BIGINT NOT NULL | The CDC event type for the row. Valid values:
|
hg_shard_id | INT NOT NULL | The ID of the data shard where the row is located. For more information, see Table group and shard count. |
When creating a Flink source table, declare a metadata column using <meta_column_name> <datatype> METADATA VIRTUAL. The following example shows how to do this.
CREATE TABLE test_message_src_binlog_table(
hg_binlog_lsn bigint METADATA VIRTUAL
hg_binlog_event_type bigint METADATA VIRTUAL
hg_binlog_timestamp_us bigint METADATA VIRTUAL
hg_shard_id int METADATA VIRTUAL
db_name string METADATA VIRTUAL
table_name string METADATA VIRTUAL
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
'connector'='hologres',
...
);FAQ
How do I fix the "permission denied for database" exception when launching a job?
How do I deal with data precision mismatch when reading binary logs in JDBC mode?
What do I do if the "BackPressure Exceed reject Limit" error message appears?
What do I do if the "no table is defined in publication" error message appears?