All Products
Search
Document Center

Realtime Compute for Apache Flink:Hologres connector

Last Updated:Nov 24, 2025

This topic describes the Hologres connector.

Overview

Hologres is Alibaba Cloud's unified real-time data warehousing service, engineered for high-performance data operations. It enables real-time writing, updating, processing, and analysis of massive datasets. Supporting standard SQL syntax and most PostgreSQL functions, Hologres excels at petabyte-scale OLAP and ad hoc analysis, delivering high-concurrency, low-latency data services. Its capabilities include fine-grained workload isolation and enterprise-grade security. Deeply integrated with MaxCompute, Realtime Compute for Apache Flink, and DataWorks, Hologres provides comprehensive end-to-end data warehousing solutions for enterprises. The following table briefly describes the Hologres connector of Realtime Compute for Apache Flink:

Category

Details

Supported types

Source, dimension, and sink tables

Execution modes

Stream and batch modes

Data format

Not supported

Monitoring metrics

Monitoring metrics

  • Source table:

    • numRecordsIn

    • numRecordsInPerSecond

  • Sink table:

    • numRecordsOut

    • numRecordsOutPerSecond

    • currentSendTime

    Note

    For more information, see Metrics.

API types

DataStream and SQL

Supports data updates or deletions in sink tables

Yes

Features

Feature

Details

Real-time consumption of Hologres data

Read Hologres data with or without binary logging (binlog). This feature is compatible with both change data capture (CDC) and non-CDC modes.

Unified full and incremental ingestion

Perform full, incremental, or unified full and incremental consumption.

Handle primary key conflicts

Ignore new data, replace entire rows, or update only specific fields.

Merge streams and partial updates

Update only modified columns instead of the entire row.

Read binlogs from partitioned tables (Beta)

Consume binlogs from physical partitioned tables. A single job can monitor all partitions, including newly added ones. You can also consume binlogs from logical partitioned tables.

Write to partitioned tables

You can write data to the parent table of a partitioned table and automatically create the corresponding child partitions.

Real-time synchronization for a single table or an entire database

You can perform real-time synchronization of data at the single-table or entire-database level. This feature provides the following capabilities:

  • Automatic detection of source table schema evolution: If the schema of the source database table changes, Hologres can synchronize these changes to the sink table in real time.

  • Automatic handling of schema evolution: If new data flows into Hologres, Flink first triggers a schema modification operation before it writes the data. This process requires no manual intervention.

For more information, see CREATE TABLE AS (CTAS) statement and Quick Start for real-time database synchronization.

Limitations and suggestions

Limitations

  • Foreign tables not supported: The Hologres connector does not support access to Hologres foreign tables, such as MaxCompute foreign tables.

  • Time type limit: Real-time consumption of TIMESTAMP data is not supported. Use the TIMESTAMPTZ type when you create tables.

  • Source table scan: In Ververica Runtime (VVR) 8 and earlier, data is read from Hologres in batch mode by default. This means incremental data is not consumed.

  • Watermark limit: In VVR 8 and earlier, CDC mode does not support defining watermarks. To perform windowed aggregations, use a non-windowed aggregation solution.

Suggestions

  • Table storage format:

    • Dimension tables for point lookups: Recommended to use row-oriented storage. Requires setting both a primary key and a clustering key.

    • Dimension tables for one-to-many queries: Recommended to use column-oriented storage and configure the distribution key and segment key to optimize performance.

    • Frequently updated tables for analytical queries: Recommended to use the row-column hybrid storage if such tables must support both real-time binlogs consumption and OLAP analysis.

    Important

    If you do not explicitly specify a storage format when creating a Hologres table, it defaults to columnar storage. This storage format is immutable and cannot be altered post-creation. For more information, see Create a table in Hologres and Table storage formats.

  • Job parallelism: You can set the Flink job parallelism to be identical to the number of shards in the Hologres table.

    -- Run this command in HoloWeb to view the number of shards in a table.
    select tg.property_value from hologres.hg_table_properties tb join hologres.hg_table_group_properties tg on tb.property_value = tg.tablegroup_name where tb.property_key = 'table_group' and tg.property_key = 'shard_count' and table_name = '<tablename>';
  • Hologres version: Regularly check the Hologres connector release notes for known issues, feature updates, and version compatibility information.

Compatibility notes

  • Hologres and VVR version compatibility and limitations

    Source table

    • VVR 8 and earlier: Set the consumption mode using sdkMode.

    • VVR 11+: Set the consumption mode using source.binlog.read-mode.

    VVR version

    Hologres version

    Default/Recommended consumption mode

    Actual consumption mode

    Notes

    ≥ 6.0.7

    < 2.0

    holohub

    holohub

    6.0.7–8.0.4

    ≥ 2.0

    jdbc (automatic switchover)

    jdbc (forced)

    Hologres 2.0 and later has deprecated HoloHub. If holohub is set, the connector automatically falls back to jdbc, which may require additional user permissions. For permission configurations, see Permission issues.

    ≥ 8.0.5

    ≥ 2.1

    jdbc (automatic switchover)

    jdbc (forced)

    For Hologres 2.1.27 and later, use jdbc_fixed.

    ≥ 11.1

    Any version

    AUTO (default)

    Automatically selected based on the Hologres version

    • Hologres 2.1.27 and later default to JDBC, and connection.fixed.enabled defaults to true to enable lightweight connection.

    • For Hologres 2.1.0 to 2.1.26, JDBC is selected.

    • For Hologres 2.0 and earlier, HOLOHUB is selected.

    Important

    In VVR 11.1 and later, the connector enables binlogs consumption by default. Ensure that you have enabled binlog to avoid errors.

    Permission issues

    If a user is not a superuser, you must grant permissions to read binlogs in JDBC mode.

    user_name is the ID of an Alibaba Cloud account ID or RAM user. For more information, see Account overview.

    -- In the standard PostgreSQL authorization model, grant the CREATE permission to the user and grant the replication role permission on the instance to the user.
    GRANT CREATE ON DATABASE <db_name> TO <user_name>;
    alter role <user_name> replication;
    
    -- If the database uses the simple permission model (SLMP), you cannot run the GRANT statement. Use spm_grant to grant the Admin permission on the database to the user. You can also grant the permission in the HoloWeb console.
    call spm_grant('<db_name>_admin', '<user_name>');
    alter role <user_name> replication;

    Sink table

    • VVR 8 and earlier: Set the data writing mode using sdkMode.

    • VVR 11+: Set the data writing mode using sink.write-mode.

    VVR version

    Hologres version

    Is RPC affected?

    Actual data writing mode

    Recommended/Default parameter value

    Notes

    6.0.4–8.0.2

    < 2.0

    No

    rpc

    Custom

    6.0.4–8.0.2

    ≥ 2.0

    Yes

    jdbc_fixed (automatic switchover)

    Custom

    To prevent deduplication, set 'jdbcWriteBatchSize'='1'.

    ≥ 8.0.3

    Any version

    Yes

    jdbc_fixed (automatic switchover)

    Custom

    If you set rpc, the connector automatically switches to jdbc_fixed and sets 'jdbcWriteBatchSize'='1' to prevent deduplication.

    ≥ 8.0.5

    Any version

    Yes

    jdbc_fixed (automatic switchover)

    Custom

    If you set rpc, the connector automatically switches to jdbc_fixed and sets 'deduplication.enabled'='false' to prevent deduplication.

    Important
    • RPC has been deprecated in Hologres 2.0 and later. If you set the data write mode to rpc, the connector will automatically switch to jdbc_fixed.

    • Ververica Runtime (VVR) 11.1 and later versions no longer support RPC; use jdbc for connections.

    • For high-concurrency write scenarios, use jdbc_copy or COPY_STREAM.

    Dimension table

    VVR version

    Hologres version

    Is RPC affected?

    Actual consumption mode

    Recommended/Default parameter value

    Notes

    6.0.4–8.0.2

    < 2.0

    No

    rpc

    Custom

    /

    6.0.4–8.0.2

    ≥ 2.0

    Yes

    jdbc_fixed (automatic switchover)

    Custom

    RPC has been deprecated in Hologres 2.0 and later. If you set the data write mode to rpc, the connector will automatically switch to jdbc_fixed.

    ≥ 8.0.3

    Any version

    Yes

    jdbc_fixed (automatic switchover)

    Custom

    ≥ 8.0.5

    Any version

    Yes

    jdbc_fixed (automatic switchover)

    Custom

    Important

    VVR 11.1 and later no longer support RPC and use JDBC by default. To enable lightweight connection, enable connection.fixed.enabled.

  • To consume JSONB data from source tables with binlog enabled in JDBC mode, enable the GUC parameter hg_experimental_enable_binlog_jsonb at the database level:

    -- Only a superuser can execute this command. This needs to be set only once per DB.
    alter database <db_name> set hg_experimental_enable_binlog_jsonb = on;
  • An UPDATE operation generates two consecutive binlogs: the old data (update_before), followed by the new data (update_after).

  • Avoid truncating or recreating a source table with binlog enabled. If you run into errors, see FAQ for solutions.

  • To avoid errors, maintain consistent DECIMAL precision between Flink and Hologres.

  • Stateless unified full and incremental synchronization does not guarantee global ordering. For downstream applications relying on timestamp-based processing, use pure binlog reading mode.

Enable binlog

For new tables

Real-time data read is disabled by default. When creating a Hologres table in HoloWeb, set the binlog.level and binlog.ttl parameters. Sample code:

begin;
create table test_table(
  id int primary key, 
  title text not null, 
  body text);
call set_table_property('test_table', 'orientation', 'row');-- Create a row-oriented table named test_table.
call set_table_property('test_table', 'clustering_key', 'id');-- Create a clustering key on the id column.
call set_table_property('test_table', 'binlog.level', 'replica');-- Enable binlog.
call set_table_property('test_table', 'binlog.ttl', '86400');-- Specifies binlog TTL, in seconds.
commit;

For existing tables

In HoloWeb, use the following statements to enable binlog and set binlog TTL for an existing table. Replace table_name with your actual table name.

-- Enable binlog.
begin;
call set_table_property('<table_name>', 'binlog.level', 'replica');
commit;

-- Set binlog TTL, in seconds.
begin;
call set_table_property('<table_name>', 'binlog.ttl', '2592000');
commit;

Connector options

Connector options for Hologres have been updated starting with VVR 11 to enhance support. Some options may have been renamed or removed. VVR 11 remains backward compatible with VVR 8. Please consult the parameter documentation specific to your VVR version for details.

Type mapping

See Data type mappings between Realtime Compute for Apache Flink or Blink and Hologres.

Examples

Source tables

Binlog enabled

Hologres CDC mode

This mode enables mirror synchronization of table data. The source consumes binlogs, and based on hg_binlog_event_type, it automatically assigns the accurate Flink RowKind (e.g., INSERT, DELETE, UPDATE_BEFORE, UPDATE_AFTER) to each row without explicit declaration. This process mirrors changes, akin to MySQL or PostgreSQL CDC functionality.

VVR 11+

CREATE TEMPORARY TABLE test_message_src_binlog_table(
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username'='${secret_values.ak_id}',            -- We recommend using variables for AK/SK to prevent key leakage.
  'password'='${secret_values.ak_secret}',        
  'endpoint'='<yourEndpoint>',
  'source.binlog.change-log-mode'='ALL',  -- Read all changeLog types, including INSERT, DELETE, UPDATE_BEFORE, and UPDATE_AFTER.
  'retry-count'='10',                     -- Number of retries after a binlog read error.
  'retry-sleep-step-ms'='5000',           -- Incremental backoff time between retries. The first retry waits 5s, the second 10s, and so on.
  'source.binlog.batch-size'='512'        -- Number of rows to read from binlog in a batch.
);

VVR 8+

CREATE TEMPORARY TABLE test_message_src_binlog_table(
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username' = '${secret_values.ak_id}',       -- We recommend using variables for AK/SK to prevent key leakage.   
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>',
  'binlog' = 'true',
  'cdcMode' = 'true',
  'sdkMode'='jdbc',
  'binlogMaxRetryTimes' = '10',     -- Number of retries after a Binlog read error.
  'binlogRetryIntervalMs' = '500',  -- Retry interval after a Binlog read error, in milliseconds.
  'binlogBatchReadSize' = '100'     -- Number of rows to read from Binlog in a batch.
);

Hologres non-CDC mode

This mode treats binlogs as regular Flink data, with all incoming events defaulting to the INSERT RowKind. For specific binlog event types (controlled by hg_binlog_event_type), you may need to implement custom handling.

VVR 11+

CREATE TEMPORARY TABLE test_message_src_binlog_table(
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username' = '${secret_values.ak_id}',       -- We recommend using variables for AK/SK to prevent key leakage.   
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>',
  'source.binlog.change-log-mode'='ALL_AS_APPEND_ONLY',  -- All changelog types are treated as INSERT.
  'retry-count'='10',                     -- Number of retries after a binlog read error.
  'retry-sleep-step-ms'='5000',           --The incremental wait time for retries. The first retry waits for 5 seconds, the second for 10 seconds, and so on.
  'source.binlog.batch-size'='512'        --The number of data rows to read from the binlog in each batch.
);

VVR 8+

CREATE TEMPORARY TABLE test_message_src_binlog_table(
  hg_binlog_lsn BIGINT,
  hg_binlog_event_type BIGINT,
  hg_binlog_timestamp_us BIGINT,
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username' = '${secret_values.ak_id}',       -- We recommend using variables for AK/SK to prevent key leakage.   
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>',
  'binlog' = 'true',
  'binlogMaxRetryTimes' = '10',     -- Number of retries after a binlog read error.
  'binlogRetryIntervalMs' = '500',  -- Incremental backoff time between retries. The first retry waits 5s, the second 10s, and so on.
  'binlogBatchReadSize' = '100'     -- Number of rows to read from Binlog in a batch.
);

Binlog disabled

VVR 11+

Important

Starting with VVR 11.1, the connector defaults to binlog reading mode. To read from a source that does not have binlog enabled, explicitly set source.binlog='false', as demonstrated below. For more information, see Binlog enabled.

CREATE TEMPORARY TABLE hologres_source (
  name varchar, 
  age BIGINT,
  birthday BIGINT
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username' = '${secret_values.ak_id}',       -- We recommend using variables for AK/SK to prevent key leakage.   
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>',
  'source.binlog'='false'                      -- Do not read binlog data.
);

VVR 8+

CREATE TEMPORARY TABLE hologres_source (
  name varchar, 
  age BIGINT,
  birthday BIGINT
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username' = '${secret_values.ak_id}',       -- We recommend using variables for AK/SK to prevent key leakage.   
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>',
  'sdkMode' = 'jdbc'
);

Sink tables

CREATE TEMPORARY TABLE datagen_source(
  name varchar, 
  age BIGINT,
  birthday BIGINT
) WITH (
  'connector'='datagen'
);
CREATE TEMPORARY TABLE hologres_sink (
  name varchar, 
  age BIGINT,
  birthday BIGINT
) WITH (
  'connector'='hologres',           
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username' = '${secret_values.ak_id}',       -- We recommend using variables for AK/SK to prevent key leakage.   
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>'
);
INSERT INTO hologres_sink SELECT * from datagen_source;

Dimension tables

CREATE TEMPORARY TABLE datagen_source (
  a INT,
  b BIGINT,
  c STRING,
  proctime AS PROCTIME()
) WITH (
  'connector' = 'datagen'
);
CREATE TEMPORARY TABLE hologres_dim (
  a INT, 
  b VARCHAR, 
  c VARCHAR
) WITH (
  'connector'='hologres',           
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username' = '${secret_values.ak_id}',       -- We recommend using variables for AK/SK to prevent key leakage.   
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>'
);
CREATE TEMPORARY TABLE blackhole_sink (
  a INT,
  b STRING
) WITH (
  'connector' = 'blackhole'
);
INSERT INTO blackhole_sink SELECT T.a,H.b
FROM datagen_source AS T JOIN hologres_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.a = H.a;

Hologres connector features

Unified full and incremental ingestion

Limitations

  • The source table requires a primary key. Recommended for reading data in Hologres CDC mode.

  • Source tables with binlog enabled are supported. You can enable binlog for existing tables.

Sample code

VVR 11+

CREATE TABLE test_message_src_binlog_table(
  hg_binlog_lsn BIGINT,
  hg_binlog_event_type BIGINT,
  hg_binlog_timestamp_us BIGINT,
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username'='<yourAccessID>',
  'password'='<yourAccessSecret>',
  'endpoint'='<yourEndpoint>',
  'source.binlog.startup-mode' = 'INITIAL',   -- First reads all historical data, then reads Binlog incrementally.
  'retry-count'='10',                         -- Number of retries after a Binlog read error.
  'retry-sleep-step-ms'='5000',               -- Incremental backoff time between retries. The first retry waits 5s, the second 10s, and so on.
  'source.binlog.batch-size'='512'            -- Number of rows to read from Binlog in a batch.
  );
Note
  • Set source.binlog.startup-mode to INITIAL to first read all historical data and then start incremental reading.

  • startTime takes precedence over binlogStartUpMode. If you set startTime (either directly or when starting a job deployment in the Development Console), binlogStartUpMode will be automatically forced to timestamp mode, regardless of its original setting.

VVR 8+

CREATE TABLE test_message_src_binlog_table(
  hg_binlog_lsn BIGINT,
  hg_binlog_event_type BIGINT,
  hg_binlog_timestamp_us BIGINT,
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username'='<yourAccessID>',
  'password'='<yourAccessSecret>',
  'endpoint'='<yourEndpoint>',
  'binlog' = 'true',
  'cdcMode' = 'true',
  'binlogStartUpMode' = 'initial', -- First reads all historical data, then reads Binlog incrementally.
  'binlogMaxRetryTimes' = '10',     -- Number of retries after a Binlog read error.
  'binlogRetryIntervalMs' = '500',  -- Retry interval after a Binlog read error, in milliseconds.
  'binlogBatchReadSize' = '100'     -- Number of rows to read from Binlog in a batch.
  );
Note
  • Set binlogStartUpMode to initial to first read all historical data and then start incremental reading.

  • startTime takes precedence over binlogStartUpMode. If you set startTime (either directly or when starting a job deployment in the Development Console), binlogStartUpMode will be automatically forced to timestamp, regardless of its original setting.

Handle primary key conflicts

When writing to Hologres, if data with a duplicate primary key exists, the connector provides three handling strategies.

VVR 11+

Specify the sink.on-conflict-action option to implement different strategies.

sink.on-conflict-action value

Description

INSERT_OR_IGNORE

Keeps the first occurrence of the data and ignores subsequent duplicates.

INSERT_OR_REPLACE

Replaces the existing row with the new data.

INSERT_OR_UPDATE (default)

Updates only the specified columns, leaving other columns in the existing row unchanged.

VVR 8+

Specify the mutatetype option to implement different strategies.

mutatetype value

Meaning

insertorignore (default)

Keeps the first occurrence of the data and ignores subsequent duplicates.

insertorreplace

Replaces the existing row with the new data.

insertorupdate

Updates only the specified columns, leaving other columns in the existing row unchanged.

For example, assume a table has columns a, b, c, and d, where a is the primary key. If the sink table only provides a and b, setting the strategy to INSERT_OR_UPDATE will only update column b, while columns c and d will remain unchanged.
Note

The number of columns in the sink table can be fewer than in the physical Hologres table, but any missing columns must be nullable. Otherwise, the write operation fails.

Write to partitioned tables

By default, the Hologres sink only supports importing data into a non-partitioned table. To import data into a partitioned table, enable the following configuration:

VVR 11+

Set sink.create-missing-partition to true. This allows the connector to automatically create child partitions if they do not exist.

Note
  • VVR 11.1 and later support writing to partitioned tables, automatically routing data to the corresponding child partitions.

  • Set tablename to the name of the partitioned table.

  • To ensure successful writes, either create child partitions in advance or set sink.create-missing-partition=true.

VVR 8+

  • Set partitionRouter to true to automatically route data to the corresponding child partitions.

  • Set createparttable to true to automatically create child partitions if they do not exist.

Note
  • Set tablename to the name of the partitioned table.

  • To ensure successful writes, either create child partitions in advance or set createparttable=true.

Merge streams and partial updates

The connector can efficiently merge multiple data streams into a single Hologres wide table. It supports partial row updates, applying changes only to modified columns based on the primary key. This optimizes write efficiency and ensures data consistency.

Limitations and notes

  • The Hologres wide table must have a primary key.

  • Each source data stream must include all of the primary key columns for the Hologres wide table.

  • If your Hologres wide table uses column-oriented storage, high RPS can lead to elevated CPU usage. Consider disabling dictionary encoding for the table's columns to mitigate this.

Example

Assume there are two data streams. One stream contains columns a, b, and c, and the other contains columns a, d, and e. The Hologres wide table WIDE_TABLE contains columns a, b, c, d, and e, where a is the primary key.

VVR 11+

-- Assume source1 and source2 are already defined.
CREATE TEMPORARY TABLE hologres_sink (
  a BIGINT, 
  b STRING,
  c STRING,
  d STRING,
  e STRING,
  primary key(a) not enforced
) WITH (
  'connector'='hologres',           
  'dbname'='<yourDbname>',
  'tablename'='<yourWideTablename>',  -- The Hologres wide table, containing columns a, b, c, d, e.
  'username' = '${secret_values.ak_id}',
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>',
  'sink.on-conflict-action'='INSERT_OR_UPDATE',   -- Handle primary key conflicts.
  'sink.delete-strategy'='IGNORE_DELETE',         -- Strategy for handling retraction messages. IGNORE_DELETE is suitable for scenarios that only require inserts or updates, not deletes.
  'sink.partial-insert.enabled'='true'            -- Enable partial column updates. This allows the connector to update or insert only the columns specified in the `INSERT` statement.
);

BEGIN STATEMENT SET;
INSERT INTO hologres_sink(a,b,c) select * from source1;  -- Declare only columns a, b, and c are inserted.
INSERT INTO hologres_sink(a,d,e) select * from source2;  -- Declare only columns a, d, and e are inserted.
END;

VVR 8+

-- Assume source1 and source2 are already defined.
CREATE TEMPORARY TABLE hologres_sink ( 
  a BIGINT, 
  b STRING,
  c STRING,
  d STRING,
  e STRING,
  primary key(a) not enforced
) WITH (
  'connector'='hologres',           
  'dbname'='<yourDbname>',
  'tablename'='<yourWideTablename>',  -- The Hologres wide table, containing columns a, b, c, d, e.
  'username' = '${secret_values.ak_id}',
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>',
  'mutatetype'='insertorupdate',    -- Handle primary key conflicts.
  'ignoredelete'='true',            -- Ignore delete requests generated by retraction messages.
  'partial-insert.enabled'='true'   -- Enable partial column updates, supporting updates for only the columns declared in the INSERT statement.
);

BEGIN STATEMENT SET;
INSERT INTO hologres_sink(a,b,c) select * from source1;  -- Declare only columns a, b, and c are inserted.
INSERT INTO hologres_sink(a,d,e) select * from source2;  -- Declare only columns a, d, and e are inserted.
END;
Note

Set ignoredelete to true to ignore delete requests generated by retraction messages. In VVR 8.0.8 and later, use sink.delete-strategy to configure desired strategies for partial updates.

Read binlogs from partitioned tables (Beta)

Partitioned tables improve data archiving and query performance. The Hologres connector supports reading binlogs from both physical and logical partitioned tables. For more information, see CREATE LOGICAL PARTITION TABLE.

Physical partitioned tables

The Hologres connector supports reading binlogs from partitioned tables and dynamically monitors for new partitions within a single job, greatly enhancing real-time data processing efficiency and usability.

Notes

  • Partitioned table binlog reading requires: VVR 8.0.11+, Hologres 2.1.27+, binlog-enabled tables, and JDBC consumption.

  • To ensure successful data reads, ensure partition names strictly follow the format: {parent_table}_{partition_value}. For more information, see Naming conventions for child partitioned tables.

    Important
    • In DYNAMIC partition binlog consumption mode, VVR 8.0.11 has limitations with partition column names containing hyphens (e.g., YYYY-MM-DD).

    • VVR 11.1+ fully supports reading from partitions with custom name formats

    • This restriction is only for reading; writing to partitioned tables is unaffected.

  • When declaring a Hologres source table in Flink, include the partition columns of the Hologres partitioned table.

  • When using DYNAMIC partition binlog consumption mode, ensure dynamic partition management is enabled for your Hologres partitioned table. Set auto_partitioning.num_precreate to a value greater than 1 to prevent the job from throwing an exception when reading the latest partition.

  • In DYNAMIC partition binlog consumption mode, once a new partition is added, incremental data from older partitions will not be read.

Examples

Partition binlog consumption mode

Feature

Description

DYNAMIC

Dynamic partition reading

Automatically monitors new partitions and dynamically advances reading progress in chronological order. Suitable for real-time use cases.

STATIC

Static partition reading

Reads only existing partitions (or explicitly specified ones) and does not automatically discover new partitions. Suitable for processing historical data within a fixed range.

DYNAMIC mode

VVR 11+

Create a Hologres partitioned table with both binlog and dynamic partitioning enabled.

CREATE TABLE "test_message_src1" (
    id int,
    title text,
    body text,
    dt text,
    PRIMARY KEY (id, dt)
)
PARTITION BY LIST (dt) WITH (
    binlog_level = 'replica', 
    auto_partitioning_enable =  'true',   -- Enable dynamic partitioning.
    auto_partitioning_time_unit = 'DAY',  -- Partitions are created daily. Example partition names: test_message_src1_20250512, test_message_src1_20250513.
    auto_partitioning_num_precreate = '2' -- Pre-create two partitions.
);
-- For existing partitioned tables, you can also enable dynamic partitioning using ALTER TABLE.

In Flink, use the Hologres connector to read data from test_message_src1 in DYNAMIC mode.

CREATE TEMPORARY TABLE hologres_source
(
  id INTEGER,
  title VARCHAR,
  body VARCHAR,
  dt VARCHAR  -- The partition column of the Hologres partitioned table.
)
with (
  'connector' = 'hologres',
  'dbname' = '<yourDatabase>',
  'tablename' = 'test_message_src1',  -- The Hologres table with dynamic partitioning enabled.
  'username' = '<yourUserName>',
  'password' = '<yourPassword>',
  'endpoint' = '<yourEndpoint>',
  'source.binlog.partition-binlog-mode' = 'DYNAMIC', -- Dynamically monitor the latest partitions.
  'source.binlog.startup-mode' = 'initial'           -- First read all existing data, then start reading Binlog incrementally.
);

VVR 8.0.11

Create a a Hologres partitioned table with both binlog and dynamic partitioning enabled.

CREATE TABLE "test_message_src1" (
    id int,
    title text,
    body text,
    dt text,
    PRIMARY KEY (id, dt)
)
PARTITION BY LIST (dt) WITH (
    binlog_level = 'replica', 
    auto_partitioning_enable =  'true',   -- Enable dynamic partitioning.
    auto_partitioning_time_unit = 'DAY',  -- Partitions are created daily. Example partition names: test_message_src1_20241027, test_message_src1_20241028.
    auto_partitioning_num_precreate = '2' -- Pre-create two partitions.
);

-- For existing partitioned tables, you can also enable dynamic partitioning using ALTER TABLE.

In Flink, use the Hologres connector to read data from test_message_src1 in DYNAMIC mode.

CREATE TEMPORARY TABLE hologres_source
(
  id INTEGER,
  title VARCHAR,
  body VARCHAR,
  dt VARCHAR  -- The partition column of the Hologres partitioned table.
)
with (
  'connector' = 'hologres',
  'dbname' = '<yourDatabase>',
  'tablename' = 'test_message_src1',  -- The Hologres table with dynamic partitioning enabled.
  'username' = '<yourUserName>',
  'password' = '<yourPassword>',
  'endpoint' = '<yourEndpoint>',
  'binlog' = 'true',
  'partition-binlog.mode' = 'DYNAMIC',  -- Dynamically monitor the latest partitions.
  'binlogstartUpMode' = 'initial',      -- First read all existing data, then start reading Binlog incrementally.
  'sdkMode' = 'jdbc_fixed'              -- Use this mode to avoid connection limit issues.
);

STATIC mode

VVR 11+

Create a a Hologres partitioned table with binlog enabled.

CREATE TABLE test_message_src2 (
    id int,
    title text,
    body text,
    color text,
    PRIMARY KEY (id, color)
)
PARTITION BY LIST (color) WITH (
    binlog_level = 'replica'
);
create table test_message_src2_red partition of test_message_src2 for values in ('red');
create table test_message_src2_blue partition of test_message_src2 for values in ('blue');
create table test_message_src2_green partition of test_message_src2 for values in ('green');
create table test_message_src2_black partition of test_message_src2 for values in ('black');

In Flink, use the Hologres connector to read data from the partitioned table test_message_src2 in STATIC mode.

CREATE TEMPORARY TABLE hologres_source
(
  id INTEGER,
  title VARCHAR,
  body VARCHAR,
  color VARCHAR  -- The partition column of the Hologres partitioned table.
)
with (
  'connector' = 'hologres',
  'dbname' = '<yourDatabase>',
  'tablename' = 'test_message_src2',  -- The partitioned table.
  'username' = '<yourUserName>',
  'password' = '<yourPassword>',
  'endpoint' = '<yourEndpoint>',
  'source.binlog.partition-binlog-mode' = 'STATIC', -- Read a fixed set of partitions.
  'source.binlog.partition-values-to-read' = 'red,blue,green',  -- Read only the three specified partitions; the 'black' partition is not read. New partitions are also not read. If not set, all partitions of the parent table are read.
  'source.binlog.startup-mode' = 'initial'  -- First read all existing data, then start reading Binlog incrementally.
);

VVR 8.0.11

Create a a Hologres partitioned table with binlog enabled.

CREATE TABLE test_message_src2 (
    id int,
    title text,
    body text,
    color text,
    PRIMARY KEY (id, color)
)
PARTITION BY LIST (color) WITH (
    binlog_level = 'replica'
);
create table test_message_src2_red partition of test_message_src2 for values in ('red');
create table test_message_src2_blue partition of test_message_src2 for values in ('blue');
create table test_message_src2_green partition of test_message_src2 for values in ('green');
create table test_message_src2_black partition of test_message_src2 for values in ('black');

In Flink, use the Hologres connector to read data from the partitioned table test_message_src2 in STATIC mode.

CREATE TEMPORARY TABLE hologres_source
(
  id INTEGER,
  title VARCHAR,
  body VARCHAR,
  color VARCHAR  -- The partition column of the Hologres partitioned table.
)
with (
  'connector' = 'hologres',
  'dbname' = '<yourDatabase>',
  'tablename' = 'test_message_src2',  -- The partitioned table.
  'username' = '<yourUserName>',
  'password' = '<yourPassword>',
  'endpoint' = '<yourEndpoint>',
  'binlog' = 'true',
  'partition-binlog.mode' = 'STATIC', -- Read a fixed set of partitions.
  'partition-values-to-read' = 'red,blue,green',  -- Read only the three specified partitions; the 'black' partition is not read. New partitions are also not read. If not set, all partitions of the parent table are read.
  'binlogstartUpMode' = 'initial',  -- First read all existing data, then start reading Binlog incrementally.
  'sdkMode' = 'jdbc_fixed' -- Use this mode to avoid connection limit issues.
);

Logical partitioned tables

The Hologres connector supports reading binlogs from logical partitioned tables and allows explicitly specifying which partitions to read.

Limitations

  • Reading binlog from specific partitions requires: VVR 11.0.0+ and Hologres V3.1+.

  • Reading binlog from all partitions is equivalent to reading binlog from a non-partitioned Hologres table. For instructions, see Source tables.

Example

Option

Description

Example

source.binlog.logical-partition-filter-column-names

The partition column names, enclosed in double quotes. Separate multiple column names with commas. If a column name contains a double quote, escape it with another double quote.

'source.binlog.logical-partition-filter-column-names'='"Pt","id"'

The partition columns are Pt and id.

source.binlog.logical-partition-filter-column-values

The partition column values. Each partition is specified by a set of column values. Values for different columns are separated by commas and enclosed in double quotes. If a value contains a double quote, escape it with another double quote. Multiple partitions are separated by semicolons.

'source.binlog.logical-partition-filter-column-values'='"20240910","0";"special""value","9"'

Specifies two partitions to read. The first partition's values are (20240910, 0), and the second's are (special"value, 9).

Assume the following table has been created in Hologres:

CREATE TABLE holo_table (
    id int not null,
    name text,
    age numeric(18,4),
    "Pt" text,
    primary key(id, "Pt")
)
LOGICAL PARTITION BY LIST ("Pt", id)
WITH (
    binlog_level ='replica'
);

Read this table's binlogs using the Hologres connector:

CREATE TEMPORARY TABLE test_src_binlog_table(
  id INTEGER,
  name VARCHAR,
  age decimal(18,4),
  `Pt` VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='holo_table',
  'username'='<yourAccessID>',
  'password'='<yourAccessSecret>',
  'endpoint'='<yourEndpoint>',
  'source.binlog'='true',
  'source.binlog.logical-partition-filter-column-names'='"Pt","id"',
  'source.binlog.logical-partition-filter-column-values'='<yourPartitionColumnValues>',
  'source.binlog.change-log-mode'='ALL',  -- Read all changeLog types, including INSERT, DELETE, UPDATE_BEFORE, and UPDATE_AFTER.
  'retry-count'='10',                     -- Number of retries after a Binlog read error.
  'retry-sleep-step-ms'='5000',           -- Incremental retry sleep time. The first retry waits 5s, the second 10s, and so on.
  'source.binlog.batch-size'='512'        -- Number of rows to read from binlog in a batch.
);

DataStream API

Important

To use the DataStream API, include the Hologres DataStream connector dependency in your project. For instructions on setting up DataStream connectors, see Integrate and use connectors in DataStream programs. The Hologres DataStream connector is available in our Maven Central Repository. For local debugging, use the corresponding Uber JAR. For more information, see Run and debug connectors locally.

Hologres source table

Binlog-enabled

Realtime Compute for Apache Flink provides the HologresBinlogSource class to read Hologres binlog data. The following example shows how to create a HologresBinlogSource.

VVR 8.0.11+

public class Sample {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // Initialize the schema of the table to be read. It must match the columns of the Hologres table schema. You can define a subset of columns.
        TableSchema schema = TableSchema.builder()
                .field("a", DataTypes.INT())
                .field("b", DataTypes.STRING())
                .field("c", DataTypes.TIMESTAMP())
                .build();
        // Hologres connector options.
        Configuration config = new Configuration();
        config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
        config.setString(HologresConfigs.USERNAME, "yourUserName");
        config.setString(HologresConfigs.PASSWORD, "yourPassword");
        config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
        config.setString(HologresConfigs.TABLE, "yourTableName");
        config.setString(HologresConfigs.SDK_MODE, "jdbc");
        config.setBoolean(HologresBinlogConfigs.OPTIONAL_BINLOG, true);
        config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
        // Build JDBCOptions.
        JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
        // Build the HologresBinlogSource.
        long startTimeMs = 0;
        HologresBinlogSource source = new HologresBinlogSource(
                new HologresConnectionParam(config),
                schema,
                config,
                jdbcOptions,
                startTimeMs,
                StartupMode.INITIAL,
                "",
                "",
                -1,
                Collections.emptySet(),
                new ArrayList<>()
        );
        env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
        env.execute();
    }
}

VVR 8.0.7+

public class Sample {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // Initialize the schema of the table to be read. It must match the columns of the Hologres table schema. You can define a subset of columns.
        TableSchema schema = TableSchema.builder()
                .field("a", DataTypes.INT())
                .field("b", DataTypes.STRING())
                .field("c", DataTypes.TIMESTAMP())
                .build();
        // Hologres connector options.
        Configuration config = new Configuration();
        config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
        config.setString(HologresConfigs.USERNAME, "yourUserName");
        config.setString(HologresConfigs.PASSWORD, "yourPassword");
        config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
        config.setString(HologresConfigs.TABLE, "yourTableName");
        config.setString(HologresConfigs.SDK_MODE, "jdbc");
        config.setBoolean(HologresBinlogConfigs.OPTIONAL_BINLOG, true);
        config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
        // Build JDBCOptions.
        JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
        // Build the HologresBinlogSource.
        long startTimeMs = 0;
        HologresBinlogSource source = new HologresBinlogSource(
                new HologresConnectionParam(config),
                schema,
                config,
                jdbcOptions,
                startTimeMs,
                StartupMode.INITIAL,
                "",
                "",
                -1,
                Collections.emptySet()
        );
        env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
        env.execute();
    }
}

VVR 6.0.7+

public class Sample {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // Initialize the schema of the table to be read. It must match the columns of the Hologres table schema. You can define a subset of columns.
        TableSchema schema = TableSchema.builder()
                .field("a", DataTypes.INT())
                .build();
         // Hologres connector options.
        Configuration config = new Configuration();
        config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
        config.setString(HologresConfigs.USERNAME, "yourUserName");
        config.setString(HologresConfigs.PASSWORD, "yourPassword");
        config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
        config.setString(HologresConfigs.TABLE, "yourTableName");
        config.setString(HologresConfigs.SDK_MODE, "jdbc");
        config.setBoolean(HologresBinlogConfigs.OPTIONAL_BINLOG, true);
        config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
        // Build JDBCOptions.
        JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
        // Set or create the default slot name.
        config.setString(HologresBinlogConfigs.JDBC_BINLOG_SLOT_NAME, HoloBinlogUtil.getOrCreateDefaultSlotForJDBCBinlog(jdbcOptions));

        boolean cdcMode = config.get(HologresBinlogConfigs.BINLOG_CDC_MODE) && config.get(HologresBinlogConfigs.OPTIONAL_BINLOG);
        // Build the JDBCBinlogRecordConverter.
        JDBCBinlogRecordConverter recordConverter = new JDBCBinlogRecordConverter(
                jdbcOptions.getTable(),
                schema,
                new HologresConnectionParam(config),
                cdcMode,
                Collections.emptySet());
        
        // Build the HologresBinlogSource.
        long startTimeMs = 0;
        HologresJDBCBinlogSource source = new HologresJDBCBinlogSource(
                new HologresConnectionParam(config),
                schema,
                config,
                jdbcOptions,
                startTimeMs,
                StartupMode.TIMESTAMP,
                recordConverter,
                "",
                -1);
        env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
        env.execute();
    }
}
Important

If you are using a Realtime Compute for Apache Flink engine version earlier than 8.0.5 or a Hologres version earlier than V2.1, ensure the user is a superuser or has the Replication Role permission. For more information, see Hologres permission issues.

Binlog-disabled

Realtime Compute for Apache Flink provides the HologresBulkreadInputFormat class, an implementation of RichInputFormat, to read data from Hologres tables. The following example shows how to build a Hologres source to read data from a binlog-disabled Hologres table.

public class Sample {
    public static void main(String[] args) throws Exception {
        // Set up the Java DataStream API
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // Initialize the schema of the table to be read. It must match the columns of the Hologres table schema. You can define a subset of columns.
        TableSchema schema = TableSchema.builder()
                .field("a", DataTypes.INT())
                .field("b", DataTypes.STRING())
                .field("c", DataTypes.TIMESTAMP())
                .build();
        // Hologres connector options.
        Configuration config = new Configuration();
        config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
        config.setString(HologresConfigs.USERNAME, "yourUserName");
        config.setString(HologresConfigs.PASSWORD, "yourPassword");
        config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
        config.setString(HologresConfigs.TABLE, "yourTableName");
        // Build JDBCOptions.
        JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
        HologresBulkreadInputFormat inputFormat = new HologresBulkreadInputFormat(
                new HologresConnectionParam(config),
                jdbcOptions,
                schema,
                "",
                -1);
        TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
        env.addSource(new InputFormatSourceFunction<>(inputFormat, typeInfo)).returns(typeInfo).print();
        env.execute();
    }
}

POM.xml

Get the Hologres DataStream connector in our Maven central repository.

<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-hologres</artifactId>
    <version>${vvr-version}</version>
</dependency>

Hologres sink table

Realtime Compute for Apache Flink provides the HologresSinkFunction class, an implementation of OutputFormatSinkFunction, to write data. The following example shows how to build a Hologres sink.

public class Sample {
    public static void main(String[] args) throws Exception {
        // Set up the Java DataStream API
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // Initialize the schema of the table to be read. It must match the columns of the Hologres table schema. You can define a subset of columns.
        TableSchema schema = TableSchema.builder()
                .field("a", DataTypes.INT())
                .field("b", DataTypes.STRING())
                .build();
        // Hologres connector options.
        Configuration config = new Configuration();
        config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
        config.setString(HologresConfigs.USERNAME, "yourUserName");
        config.setString(HologresConfigs.PASSWORD, "yourPassword");
        config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
        config.setString(HologresConfigs.TABLE, "yourTableName");
        config.setString(HologresConfigs.SDK_MODE, "jdbc");
        HologresConnectionParam hologresConnectionParam = new HologresConnectionParam(config);
        
         // Build a Hologres writer to write data as RowData.
        AbstractHologresWriter<RowData> hologresWriter = HologresJDBCWriter.createRowDataWriter(
                hologresConnectionParam, 
                schema, 
                HologresTableSchema.get(hologresConnectionParam), 
                new Integer[0]);
        // Build the Hologres sink.
        HologresSinkFunction sinkFunction = new HologresSinkFunction(hologresConnectionParam, hologresWriter);
        TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
        env.fromElements((RowData) GenericRowData.of(101, StringData.fromString("name"))).returns(typeInfo).addSink(sinkFunction);
        env.execute();
    }
}

Metadata columns

Realtime Compute for Apache Flink VVR 8.0.11 and later support metadata columns for tables with binlog-enabled. Starting from this version, we recommend declaring binlog fields like hg_binlog_event_type as metadata columns. Metadata columns are an extension of the SQL standard that provide access to specific information such as the database and table name of the source, and the change type and timestamp of the data. You can use this information to define custom processing logic, such as filtering out DELETE events.

Metadata column name

Data type

Description

db_name

STRING NOT NULL

The name of the database containing the row.

table_name

STRING NOT NULL

The name of the table containing the row.

hg_binlog_lsn

BIGINT NOT NULL

A binlog system column representing the binlog sequence number. It is monotonically increasing but not continuous within a shard. It is not guaranteed to be unique or ordered across different shards.

hg_binlog_timestamp_us

BIGINT NOT NULL

The timestamp of the change event in the database, in microseconds (us).

hg_binlog_event_type

BIGINT NOT NULL

The CDC event type for the row. Valid values:

  • 5: INSERT

  • 2: DELETE

  • 3: UPDATE_BEFORE

  • 7: UPDATE_AFTER

hg_shard_id

INT NOT NULL

The ID of the data shard where the row is located. For more information, see Table group and shard count.

When creating a Flink source table, declare a metadata column using <meta_column_name> <datatype> METADATA VIRTUAL. The following example shows how to do this.

CREATE TABLE test_message_src_binlog_table(
  hg_binlog_lsn bigint METADATA VIRTUAL
  hg_binlog_event_type bigint METADATA VIRTUAL
  hg_binlog_timestamp_us bigint METADATA VIRTUAL
  hg_shard_id int METADATA VIRTUAL
  db_name string METADATA VIRTUAL
  table_name string METADATA VIRTUAL
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  ...
  );

FAQ

References