Hologres Binlog tracks INSERT, UPDATE, DELETE, and TRUNCATE operations as a stream of change events. This page shows how to consume that stream using Java Database Connectivity (JDBC) directly or through Holo-Client, Hologres's Java SDK.
Prerequisites
Before you begin, make sure you have:
Binlog enabled on the target table. See Subscribe to Hologres Binlog data.
The
hg_binlogextension created (Hologres V1.x only). Starting from V2.0, the extension is built in—no manual setup required. For Hologres versions earlier than V2.0, a superuser must run the following statement once per database. If you create a new database, run it again.ImportantNever use
DROP EXTENSION <extension_name> CASCADE. TheCASCADEoption purges all extension data—including PostGIS, RoaringBitmap, Proxima, Binlog, and BSI data—and drops all dependent objects such as tables, views, metadata, and server data.-- Create the extension. CREATE EXTENSION hg_binlog; -- Remove the extension. DROP EXTENSION hg_binlog;Access set up based on your Hologres version: To create a publication and replication slot (required for all versions that use this method), the user must have one of the following permission sets:
Superuser permissions on the instance
Owner permissions on the target table,
CREATE DATABASEpermissions, and the Replication Role on the instance
NoteStarting from V2.0, the Holohub mode is no longer supported. Before upgrading to V2.0 or later, upgrade Flink to 8.0.5 first. After the upgrade, JDBC mode is used automatically.
Hologres version Flink engine version What's required V2.1 and later 8.0.5 and later Read permissions on the target table. No replication slot needed. V2.0 8.0.5 and earlier A publication and a replication slot. See Create a publication and a replication slot. V1.3 and earlier 8.0.5 and earlier Read permissions on the target table. The Holohub mode is used automatically.
Limitations
JDBC-based Binlog consumption requires Hologres V1.1 or later.
Supported column data types: INTEGER, BIGINT, SMALLINT, TEXT, CHAR(n), VARCHAR(n), REAL, DOUBLE PRECISION, BOOLEAN, NUMERIC(38,8), DATE, TIME, TIMETZ, TIMESTAMP, TIMESTAMPTZ, BYTEA, JSON, SERIAL, OID, int4[], int8[], float4[], float8[], boolean[], and text[]. If a table contains columns of unsupported types, consumption fails. Starting from V1.3.36, JSONB columns are also supported. Enable the following Grand Unified Configuration (GUC) parameter before consuming:
-- Enable at the session level. SET hg_experimental_enable_binlog_jsonb = ON; -- Enable at the database level. ALTER DATABASE <db_name> SET hg_experimental_enable_binlog_jsonb = ON;Each shard of each consumed table uses one Walsender connection. Walsender connections are independent of regular connections.
The maximum number of Walsenders per frontend node varies by version. To check the current limit, run:
Identify active JDBC Binlog consumption jobs and stop any that are unnecessary.
Review whether table group and shard count settings are appropriate. See Best practices for setting table groups.
If the limit is still exceeded, scale out the instance.
Version Max Walsenders per frontend node V2.2 and later 600 V2.0 and V2.1 1,000 V1.1.26 to V2.0 100 SHOW max_wal_senders;Default values: The total limit is
max_wal_senders × number of frontend nodes. For the number of frontend nodes for a given instance specification, see Instance management. Use this formula to estimate how many tables you can consume concurrently:Number of tables ≤ (max_wal_senders × number of frontend nodes) / table shard countExample: An instance with two frontend nodes running V2.2 has a total of 600 × 2 = 1,200 Walsenders. For tables with a shard count of 20, it can support up to 60 concurrent table consumptions. If two jobs consume from the same table simultaneously, they each consume 20 Walsenders—counting as 40 toward the total. If the Walsender limit is reached, you'll see:
FATAL: sorry, too many wal senders alreadyTo resolve this:
Read-only secondary instances: Binlog consumption via JDBC is not supported before V2.0.18. Starting from V2.0.18, secondary instances support Binlog consumption but do not record consumption progress.
Create a publication and a replication slot
Required for Hologres V2.0 and earlier. Starting from V2.1, users with only read permissions on the target table can consume Binlog data without a replication slot—skip this section if you're on V2.1 or later.
Publication
A publication defines which table's changes are made available for logical replication. In Hologres, a publication is bound to exactly one physical table, and that table must have Binlog enabled.
Create a publication
CREATE PUBLICATION <name> FOR TABLE <table_name>;| Parameter | Description |
|---|---|
name | A custom name for the publication. |
table_name | The name of the target table. |
Example:
CREATE PUBLICATION hg_publication_test_1 FOR TABLE test_message_src;Query publications
SELECT * FROM pg_publication;Example output:
pubname | pubowner | puballtables | pubinsert | pubupdate | pubdelete | pubtruncate
------------------------+----------+--------------+-----------+-----------+-----------+-------------
hg_publication_test_1 | 16728 | f | t | t | t | t
(1 row)| Field | Description |
|---|---|
pubname | The name of the publication. |
pubowner | The owner of the publication. |
puballtables | Whether all tables are included. Always false—binding multiple physical tables is not supported. |
pubinsert | Whether INSERT events are published. Default: true. |
pubupdate | Whether UPDATE events are published. Default: true. |
pubdelete | Whether DELETE events are published. Default: true. |
pubtruncate | Whether TRUNCATE events are published. Default: true. |
For more information on Binlog event types, see Binlog format and principles.
Query tables in a publication
SELECT * FROM pg_publication_tables;Example output:
pubname | schemaname | tablename
------------------------+------------+------------------
hg_publication_test_1 | public | test_message_src
(1 row)Delete a publication
DROP PUBLICATION <name>;Example:
DROP PUBLICATION hg_publication_test_1;Replication slot
An inactive replication slot continues to retain WAL data on the server. If a slot is not consumed, WAL accumulates and can exhaust your storage. Delete any slot you no longer need.
A replication slot tracks the consumption progress for a publication and enables resumable transmission. After a failover, a consumer can recover from the last committed checkpoint recorded in the slot.
Only superusers and users with the Replication Role can create and use replication slots. To grant or revoke the Replication Role:
-- Grant the Replication Role to a user.
ALTER ROLE <user_name> REPLICATION;
-- Revoke the Replication Role from a user.
ALTER ROLE <user_name> NOREPLICATION;user_name is an Alibaba Cloud account ID or a Resource Access Management (RAM) user. See Account overview.
Create a replication slot
CALL hg_create_logical_replication_slot('<replication_slot_name>', 'hgoutput', '<publication_name>');| Parameter | Description |
|---|---|
replication_slot_name | A custom name for the replication slot. |
hgoutput | The output plugin for the Binlog format. Only hgoutput is supported. |
publication_name | The name of the publication to bind to. |
Example:
CALL hg_create_logical_replication_slot('hg_replication_slot_1', 'hgoutput', 'hg_publication_test_1');Query replication slots
SELECT * FROM hologres.hg_replication_slot_properties;Example output:
slot_name | property_key | property_value
------------------------+--------------+------------------------
hg_replication_slot_1 | plugin | hgoutput
hg_replication_slot_1 | publication | hg_publication_test_1
hg_replication_slot_1 | parallelism | 1
(3 rows)| Field | Description |
|---|---|
slot_name | The name of the replication slot. |
property_key | One of: plugin (output plugin), publication (bound publication), or parallelism (number of concurrent connections required to consume the full table, equal to the table group's shard count). |
property_value | The value of the corresponding property. |
Query the required parallelism
Because Hologres is a distributed database, each table's data is spread across multiple shards. You need one connection per shard to consume the full table. To check how many concurrent connections hg_replication_slot_1 requires:
SELECT hg_get_logical_replication_slot_parallelism('hg_replication_slot_1');Example output:
hg_get_logical_replication_slot_parallelism
---------------------------------------------
20Query consumption progress
The hologres.hg_replication_progress table records the consumer offset that you explicitly commit.
SELECT * FROM hologres.hg_replication_progress;Example output:
slot_name | parallel_index | lsn
------------------------+----------------+-----
hg_replication_slot_1 | 0 | 66
hg_replication_slot_1 | 1 | 122
hg_replication_slot_1 | 2 | 119
(3 rows)| Field | Description |
|---|---|
slot_name | The name of the replication slot. |
parallel_index | The index of the concurrent connection (one per shard). |
lsn | The Log Sequence Number (LSN) of the last consumed Binlog record that was committed. |
The
hologres.hg_replication_progresstable is created only after the first Binlog consumption.The table records only offsets that you explicitly commit by calling the commit LSN function in your code. The recorded value may not match the actual consumer position. Track the LSN on the client side and use it as the recovery point.
Checkpoint commits are effective only when consuming via a replication slot. When consuming by table name only (without
withSlotName), progress is not recorded.
Delete a replication slot
Delete a slot when you no longer need it to prevent WAL accumulation. Slots do not close automatically when a consumption job stops—you must delete them manually.
CALL hg_drop_logical_replication_slot('<replication_slot_name>');Example:
CALL hg_drop_logical_replication_slot('hg_replication_slot_1');Consume Binlog data using JDBC
JDBC-based consumption gives you fine-grained control over each shard. You create one PGReplicationStream per shard and manage parallelism yourself.
Step 1: Add dependencies
Use JDBC 42.2.18 or later. Add the following dependencies to your pom.xml:
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.3.8</version>
</dependency>
<!-- Used to get the table schema and decode Binlog records -->
<dependency>
<groupId>com.alibaba.hologres</groupId>
<artifactId>holo-client</artifactId>
<version>2.2.10</version>
</dependency>Step 2: Connect and consume
The following example connects to a replication slot, reads Binlog records from one shard, and prints each record.
import com.alibaba.hologres.client.HoloClient;
import com.alibaba.hologres.client.HoloConfig;
import com.alibaba.hologres.client.impl.binlog.HoloBinlogDecoder;
import com.alibaba.hologres.client.model.Record;
import com.alibaba.hologres.client.model.TableSchema;
import org.postgresql.PGConnection;
import org.postgresql.PGProperty;
import org.postgresql.replication.LogSequenceNumber;
import org.postgresql.replication.PGReplicationStream;
import java.nio.ByteBuffer;
import java.sql.Connection;
import java.sql.DriverManager;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
public class Test {
public static void main(String[] args) throws Exception {
String username = "";
String password = "";
String url = "jdbc:postgresql://Endpoint:Port/db_test";
// Set up the JDBC connection for replication.
Properties properties = new Properties();
PGProperty.USER.set(properties, username);
PGProperty.PASSWORD.set(properties, password);
PGProperty.ASSUME_MIN_SERVER_VERSION.set(properties, "9.4");
// Required for Binlog consumption.
PGProperty.REPLICATION.set(properties, "database");
try (Connection connection = DriverManager.getConnection(url, properties)) {
// Each PGReplicationStream consumes one shard. Set shardId for each stream.
int shardId = 0;
PGConnection pgConnection = connection.unwrap(PGConnection.class);
PGReplicationStream pgReplicationStream = pgConnection.getReplicationAPI()
.replicationStream()
.logical()
// V2.1+: two options
// Option 1: specify the replication slot name. table_name is ignored.
// Option 2: omit withSlotName and set table_name in withSlotOption instead.
.withSlotName("slot_name")
.withSlotOption("table_name", "public.test_message_src")
.withSlotOption("parallel_index", shardId)
.withSlotOption("batch_size", "1024")
.withSlotOption("start_time", "2021-01-01 00:00:00")
.withSlotOption("start_lsn", "0")
.start();
// HoloClient is needed to get the table schema for decoding.
HoloConfig holoConfig = new HoloConfig();
holoConfig.setJdbcUrl(url);
holoConfig.setUsername(username);
holoConfig.setPassword(password);
HoloClient client = new HoloClient(holoConfig);
// Decode raw binary Binlog data into BinlogRecord objects.
TableSchema schema = client.getTableSchema("test_message_src", true);
HoloBinlogDecoder decoder = new HoloBinlogDecoder(schema);
// Track the last consumed LSN for checkpoint recovery.
Long currentLsn = 0L;
ByteBuffer byteBuffer = pgReplicationStream.readPending();
while (true) {
if (byteBuffer != null) {
List<BinlogRecord> records = decoder.decode(shardId, byteBuffer);
Long latestLsn = 0L;
for (BinlogRecord record : records) {
latestLsn = record.getBinlogLsn();
// Process the record here.
System.out.println("lsn: " + latestLsn + ", record: " + Arrays.toString(record.getValues()));
}
// Save the latest LSN as the recovery point.
currentLsn = latestLsn;
pgReplicationStream.forceUpdateStatus();
}
byteBuffer = pgReplicationStream.readPending();
}
}
}
}Parameters
`withSlotName` behavior by version:
| Version | Behavior |
|---|---|
| Earlier than V2.1 | Specify the name of an existing replication slot. Required. |
| V2.1 and later | Optional. Omit withSlotName and set table_name in withSlotOption instead to consume without a replication slot. |
`withSlotOption` parameters:
| Parameter | Required | Description |
|---|---|---|
table_name | Required when withSlotName is not set | The target table to consume. Format: schema_name.table_name or table_name. Ignored if withSlotName is set. |
parallel_index | Yes | The shard index to consume. One PGReplicationStream handles one shard. For a table with three shards, create three streams with parallel_index 0, 1, and 2. |
start_time | No | Start consuming from this timestamp. Example: 2021-01-01 12:00:00+08. |
start_lsn | No | Start consuming from the record after this LSN. Takes priority over start_time. |
batch_size | No | Maximum records per retrieval. Default: 1,024. |
Default start behavior (when neither start_lsn nor start_time is set):
| Scenario | Behavior |
|---|---|
| First consumption from a replication slot | Starts from the beginning, similar to Kafka's earliest offset. |
| Subsequent consumption from a replication slot | Resumes from the last committed checkpoint. |
Consumption by table name only (no withSlotName) | Always starts from the beginning. |
BinlogRecord fields
BinlogRecord exposes the following Binlog system fields:
| Method | Returns |
|---|---|
getBinlogLsn() | The LSN of the Binlog record. |
getBinlogTimestamp() | The system timestamp when the event occurred. |
getBinlogEventType() | The event type: INSERT, UPDATE, DELETE, or TRUNCATE. |
For a full field reference, see Subscribe to Hologres Binlog data.
Commit the checkpoint
After consuming Binlog data, commit the checkpoint by calling the commit LSN function. This serves two purposes: it lets the server know which WAL segments are safe to archive or discard, and it provides a recovery point so consumption can resume from the correct position after a failover.
The sample code above does not include this step—add it based on your application's recovery requirements.
Consume Binlog data using Holo-Client
Holo-Client simplifies Binlog consumption by managing shard-level connections automatically. Specify the target table, and Holo-Client handles the rest.
The number of connections equals the table's shard count.
Save the checkpoint per shard so that consumption can resume from the correct position after a network failure or other interruption.
Use Holo-Client 2.2.10 or later. Versions 2.2.9 and earlier have a memory leak.
Step 1: Add dependencies
<dependency>
<groupId>com.alibaba.hologres</groupId>
<artifactId>holo-client</artifactId>
<version>2.2.10</version>
</dependency>Step 2: Connect and consume
The following example subscribes to a table, processes each record, saves a per-shard checkpoint, and retries on failure.
import com.alibaba.hologres.client.BinlogShardGroupReader;
import com.alibaba.hologres.client.Command;
import com.alibaba.hologres.client.HoloClient;
import com.alibaba.hologres.client.HoloConfig;
import com.alibaba.hologres.client.Subscribe;
import com.alibaba.hologres.client.exception.HoloClientException;
import com.alibaba.hologres.client.impl.binlog.BinlogOffset;
import com.alibaba.hologres.client.model.binlog.BinlogHeartBeatRecord;
import com.alibaba.hologres.client.model.binlog.BinlogRecord;
import java.util.HashMap;
import java.util.Map;
public class HoloBinlogExample {
public static BinlogShardGroupReader reader;
public static void main(String[] args) throws Exception {
String username = "";
String password = "";
String url = "jdbc:postgresql://ip:port/database";
String tableName = "test_message_src";
String slotName = "hg_replication_slot_1";
HoloConfig holoConfig = new HoloConfig();
holoConfig.setJdbcUrl(url);
holoConfig.setUsername(username);
holoConfig.setPassword(password);
holoConfig.setBinlogReadBatchSize(128);
holoConfig.setBinlogIgnoreDelete(true);
holoConfig.setBinlogIgnoreBeforeUpdate(true);
holoConfig.setBinlogHeartBeatIntervalMs(5000L);
HoloClient client = new HoloClient(holoConfig);
// Get the shard count to initialize per-shard checkpoint tracking.
int shardCount = Command.getShardCount(client, client.getTableSchema(tableName));
// Initialize the checkpoint map with LSN 0 for each shard.
Map<Integer, Long> shardIdToLsn = new HashMap<>(shardCount);
for (int i = 0; i < shardCount; i++) {
shardIdToLsn.put(i, 0L);
}
// Before V2.1: tableName and slotName are both required.
// V2.1 and later: tableName is enough (slotName defaults to "hg_table_name_slot").
Subscribe subscribe = Subscribe.newStartTimeBuilder(tableName, slotName)
.setBinlogReadStartTime("2021-01-01 12:00:00")
.build();
reader = client.binlogSubscribe(subscribe);
BinlogRecord record;
int retryCount = 0;
while (true) {
try {
if (reader.isCanceled()) {
// Re-create the reader using the last saved checkpoint.
reader = client.binlogSubscribe(subscribe);
}
while ((record = reader.getBinlogRecord()) != null) {
if (record instanceof BinlogHeartBeatRecord) {
// All data up to this heartbeat's timestamp has been consumed on this shard.
continue;
}
// Process the record here.
System.out.println(record);
// Save the checkpoint so consumption can resume from here on failure.
shardIdToLsn.put(record.getShardId(), record.getBinlogLsn());
retryCount = 0;
}
} catch (HoloClientException e) {
if (++retryCount > 10) {
throw new RuntimeException(e);
}
System.out.println(String.format(
"Binlog read failed: %s. Retrying (%d/10)...", e.getMessage(), retryCount));
// Wait before retrying, with increasing backoff.
Thread.sleep(5000L * retryCount);
// Resume from the saved per-shard checkpoint.
Subscribe.OffsetBuilder subscribeBuilder = Subscribe.newOffsetBuilder(tableName, slotName);
for (int i = 0; i < shardCount; i++) {
subscribeBuilder.addShardStartOffset(i,
new BinlogOffset().setSequence(shardIdToLsn.get(i)));
}
subscribe = subscribeBuilder.build();
reader.cancel();
}
}
}
}Parameters
`Subscribe` types:
| Type | When to use |
|---|---|
Subscribe.newStartTimeBuilder(tableName, slotName) | Start consumption from a specific timestamp. |
Subscribe.newOffsetBuilder(tableName, slotName) | Start consumption from a specific LSN per shard—use this for checkpoint-based recovery. |
Version note: Before V2.1, both tableName and slotName are required. Starting from V2.1, only tableName is required (equivalent to using the fixed slot name hg_table_name_slot).
`HoloConfig` Binlog parameters:
| Parameter | Default | Description |
|---|---|---|
binlogReadBatchSize | 1,024 | Maximum records per retrieval per shard. |
binlogHeartBeatIntervalMs | -1 (disabled) | Interval in milliseconds between BinlogHeartBeatRecord messages. When no new data arrives, heartbeat records indicate that all data up to that timestamp has been consumed on that shard. |
binlogIgnoreDelete | false | Skip DELETE events. |
binlogIgnoreBeforeUpdate | false | Skip the before-image records for UPDATE events. |
Troubleshooting
The hologres.hg_replication_progress table is missing or empty
If the table doesn't exist or shows no data after you commit consumption progress, check the following:
Consuming by table name without a replication slot. If you didn't set withSlotName (JDBC) or didn't specify a slot (Holo-Client), progress tracking is not supported. The table will not be created or updated. Switch to replication slot-based consumption to enable progress tracking.
First-time consumption on a read-only secondary instance. On secondary instances running Hologres earlier than V2.0.18, the hologres.hg_replication_progress table cannot be created during the first Binlog consumption. Consume from the primary instance once, then switch back to the secondary instance.
If neither applies, contact support through the Hologres DingTalk group. See How do I get more online support?.
What's next
Subscribe to Hologres Binlog data — Binlog format, event types, and system fields
Best practices for setting table groups — Optimize shard count to control Walsender usage
Instance management — Frontend node counts by instance specification