Realtime Compute for Apache Flink can consume Hologres binary logs (binlogs) in real time, enabling change data capture (CDC) pipelines from Hologres to downstream systems. This topic covers the source table DDL syntax for both non-CDC and CDC modes, how to migrate from Holohub mode to JDBC (Java Database Connectivity) mode, and how to identify Flink jobs that need upgrading before you upgrade your Hologres instance.
Prerequisites
Before you begin, make sure you have:
A Hologres instance running V0.9 or later
Binary logging enabled on the target table. For details, see Subscribe to Hologres binary logs
Ververica Platform (VVP) 2.4 or later for real-time binlog consumption via the Hologres connector
Limitations
Binary logging is supported at the table level for row-oriented, column-oriented, and (from Hologres V1.1) row-column hybrid storage tables.
Column-oriented tables have higher write overhead than row-oriented tables when binary logging is enabled. For tables with frequent updates, use row-oriented storage.
Binary logs cannot be consumed from parent partitioned tables.
Engine whitelists require Hologres V1.3.21 or later. If you enable a whitelist on an earlier version, binlog consumption fails.
CDC mode source tables do not support watermark definitions. For window aggregation, use non-window aggregation instead.
Holohub mode is deprecated as of Hologres V2.1 and fully replaced by JDBC mode. See Switch from Holohub mode to JDBC mode.
If your instance is earlier than the required version, contact the Hologres team via online support.
Permissions
| Mode | Required permissions |
|---|---|
| Holohub mode | Read and write permissions on the target table. Custom accounts are not supported. |
| JDBC mode | Instance superuser or Owner on the target table + Replication Role on the instance. Custom accounts are supported. |
For JDBC mode, the hg_binlog extension must also exist on the instance. This extension is created by default in Hologres V2.0 and later. For setup details, see Consume Hologres binary logs using JDBC.
Binlog system fields
All binlog source tables expose three system metadata fields. Their names and types are fixed and cannot be changed.
| Field | Type | Description |
|---|---|---|
hg_binlog_lsn | BIGINT | Log sequence number — the position of the event in the binary log |
hg_binlog_event_type | BIGINT | The type of change event (INSERT, DELETE, UPDATE_BEFORE, or UPDATE_AFTER). In non-CDC mode, use this field to determine how to handle each row. |
hg_binlog_timestamp_us | BIGINT | The timestamp of the event, in microseconds |
All user-defined fields in a binlog source table must use lowercase column names.
Consume binlogs in non-CDC mode
In non-CDC mode, all binlog records arrive as Flink INSERT rows. The hg_binlog_event_type field carries the original change type, and your downstream logic decides how to handle each event.
After enabling binary logging on a Hologres table, define the source table with the following DDL:
CREATE TABLE test_message_src_binlog_table (
hg_binlog_lsn BIGINT,
hg_binlog_event_type BIGINT,
hg_binlog_timestamp_us BIGINT,
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
'connector' = 'hologres',
'dbname' = '<your-database>',
'tablename' = '<your-table>',
'username' = '<your-access-key-id>',
'password' = '<your-access-key-secret>',
'endpoint' = '<your-vpc-endpoint>',
'binlog' = 'true',
'binlogMaxRetryTimes' = '10',
'binlogRetryIntervalMs'= '500',
'binlogBatchReadSize' = '100'
);Replace the placeholders with your actual values:
| Placeholder | Description | Example |
|---|---|---|
<your-database> | The Hologres database name | my_db |
<your-table> | The Hologres table name | orders |
<your-access-key-id> | Your AccessKey ID | LTAI5tXxx... |
<your-access-key-secret> | Your AccessKey secret | xXxXxXx... |
<your-vpc-endpoint> | The VPC endpoint of your Hologres instance | <instance-id>-cn-hangzhou.hologres.aliyuncs.com:80 |
Connector parameters
| Parameter | Required | Description |
|---|---|---|
connector | Yes | Must be hologres |
dbname | Yes | Hologres database name |
tablename | Yes | Hologres table name |
username | Yes | AccessKey ID |
password | Yes | AccessKey secret |
endpoint | Yes | VPC endpoint of the Hologres instance |
binlog | Yes | Set to true to enable binlog consumption |
binlogMaxRetryTimes | No | Maximum retry attempts on failure. Example value: 10 |
binlogRetryIntervalMs | No | Interval between retries, in milliseconds. Example value: 500 |
binlogBatchReadSize | No | Number of records to read per batch. Example value: 100 |
cdcMode | No | Set to true to enable CDC mode |
sdkMode | No | Set to jdbc to switch to JDBC mode |
Consume binlogs in CDC mode
In CDC mode, Flink automatically maps each binlog record to an accurate RowKind (INSERT, DELETE, UPDATE_BEFORE, or UPDATE_AFTER) based on hg_binlog_event_type. This mirrors table changes similarly to CDC from MySQL or PostgreSQL, and is useful for replicating Hologres table data to downstream sinks.
The DDL is the same as non-CDC mode, with two differences:
Omit the three binlog system fields from the schema — Flink manages them internally.
Add
'cdcMode' = 'true'to theWITHclause.
CREATE TABLE test_message_src_binlog_table (
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
'connector' = 'hologres',
'dbname' = '<your-database>',
'tablename' = '<your-table>',
'username' = '<your-access-key-id>',
'password' = '<your-access-key-secret>',
'endpoint' = '<your-vpc-endpoint>',
'binlog' = 'true',
'cdcMode' = 'true',
'binlogMaxRetryTimes' = '10',
'binlogRetryIntervalMs'= '500',
'binlogBatchReadSize' = '100'
);CDC mode source tables do not support watermark definitions. For time-based window aggregation, see MySQL/Hologres CDC source tables do not support window functions.
Consume both full and incremental data
Starting from Ververica Runtime (VVR) engine 1.13-vvr-4.0.13 and Hologres V0.10, CDC source tables support a combined full + incremental mode. The job first reads all existing data from the table, then automatically switches to consuming incremental binlog records without any gap. For configuration details, see Hologres connector.
Use JDBC mode
Starting from Flink 6.0.3, binlog consumption is also available in JDBC mode. JDBC mode supports more data types and custom Hologres accounts, making it more flexible than Holohub mode. For configuration details, see Hologres connector.
Switch from Holohub mode to JDBC mode
Hologres has been phasing out Holohub mode since V2.0, and fully replaced it with JDBC mode in V2.1. Before upgrading your Hologres instance, upgrade your Flink VVR jobs first.
Upgrade to Hologres V2.1
Choose one of the following solutions based on your current VVR version:
(Recommended) Solution 1: Upgrade VVR to 8.0.7 or later
Flink automatically switches from Holohub mode to JDBC mode. No additional configuration is needed.
Solution 2: Upgrade VVR to 6.0.7–8.0.5
Add
'sdkMode'='jdbc'to each binlog source table DDL.Restart the Flink job.
Grant the Flink job user one of the following permission sets:
Instance superuser permission
Owner permission on the target table + CREATE DATABASE permission + Replication Role permission for the instance
Confirm that the job runs correctly.
Upgrade the Hologres instance.
(Not recommended) Solution 3: Upgrade VVR to 8.0.6
Flink automatically switches to JDBC mode. However, VVR 8.0.6 has a known issue: if a dimension table has too many fields, the VVR job may time out during deployment. For details, see Hologres Connector Release Note. Use Solution 1 instead.
Upgrade to Hologres V2.0
(Recommended) Solution 1: Upgrade VVR to 8.0.6 or later
Flink automatically switches to JDBC mode. Use VVR 8.0.7 or later to avoid the known dimension table issue in VVR 8.0.6. For details, see Hologres Connector Release Note.
Solution 2: Upgrade VVR to 8.0.4 or 8.0.5
Grant the Flink job user one of the following permission sets:
Instance superuser permission
Owner permission on the target table + CREATE DATABASE permission + Replication Role permission for the instance
Restart the Flink job and verify it runs correctly.
Upgrade the Hologres instance.
Solution 3: Upgrade VVR to 6.0.7–8.0.3
Flink continues to use Holohub mode. No mode switch is required.
Find jobs that need upgrading
If you have many Flink VVR jobs, use the following tool to identify which jobs and tables need to be upgraded before migrating.
This tool supports SQL jobs (DDL-based table definitions) and Catalog jobs (hint-based parameters). It does not support JAR jobs or Catalog tables without hint parameters.
Step 1: Download the tool
Download find-incompatible-flink-jobs-1.0-SNAPSHOT-jar-with-dependencies.jar.
Step 2: Run the scan
Requires JDK 8 or later.
java -cp find-incompatible-flink-jobs-1.0-SNAPSHOT-jar-with-dependencies.jar \
com.alibaba.hologres.FindIncompatibleFlinkJobs \
<region> <url> <AccessKeyID> <AccessKeySecret> <binlog|rpc>Example:
java -cp find-incompatible-flink-jobs-1.0-SNAPSHOT-jar-with-dependencies.jar \
com.alibaba.hologres.FindIncompatibleFlinkJobs \
Beijing \
https://vvp.console.aliyun.com/web/xxxxxx/zh/#/workspaces/xxxx/namespaces/xxxx/operations/stream/xxxx \
my-access-key-id \
my-access-key-secret \
binlogParameters:
| Parameter | Description |
|---|---|
region | The region of the Realtime Compute for Apache Flink project. See the region value table below. |
url | The URL of any job in the target Flink project. |
AccessKeyID | The AccessKey ID of an account with access to the Flink project. |
AccessKeySecret | The AccessKey secret of that account. |
binlog | rpc | binlog: scans all binlog source tables in the project. rpc: scans all dimension tables and sink tables that use RPC mode. |
Region values:
| Region | Value |
|---|---|
| China (Beijing) | Beijing |
| China (Shanghai) | Shanghai |
| China (Hangzhou) | Hangzhou |
| China (Shenzhen) | Shenzhen |
| China (Zhangjiakou) | Zhangjiakou |
| China (Hong Kong) | Hong Kong |
| Singapore | Singapore |
| Germany (Frankfurt) | Germany |
| Indonesia (Jakarta) | Indonesia |
| Malaysia (Kuala Lumpur) | Malaysia |
| US (Silicon Valley) | US |
| Shanghai Finance Cloud | Shanghai Finance Cloud |
Step 3: Review the results
The tool outputs a list of jobs and tables that require upgrading.
