All Products
Search
Document Center

Hologres:Consume Binlog via JDBC

Last Updated:Mar 26, 2026

Hologres Binlog tracks INSERT, UPDATE, DELETE, and TRUNCATE operations as a stream of change events. This page shows how to consume that stream using Java Database Connectivity (JDBC) directly or through Holo-Client, Hologres's Java SDK.

Prerequisites

Before you begin, make sure you have:

  • Binlog enabled on the target table. See Subscribe to Hologres Binlog data.

  • The hg_binlog extension created (Hologres V1.x only). Starting from V2.0, the extension is built in—no manual setup required. For Hologres versions earlier than V2.0, a superuser must run the following statement once per database. If you create a new database, run it again.

    Important

    Never use DROP EXTENSION <extension_name> CASCADE. The CASCADE option purges all extension data—including PostGIS, RoaringBitmap, Proxima, Binlog, and BSI data—and drops all dependent objects such as tables, views, metadata, and server data.

    -- Create the extension.
    CREATE EXTENSION hg_binlog;
    
    -- Remove the extension.
    DROP EXTENSION hg_binlog;
  • Access set up based on your Hologres version: To create a publication and replication slot (required for all versions that use this method), the user must have one of the following permission sets:

    • Superuser permissions on the instance

    • Owner permissions on the target table, CREATE DATABASE permissions, and the Replication Role on the instance

    Note

    Starting from V2.0, the Holohub mode is no longer supported. Before upgrading to V2.0 or later, upgrade Flink to 8.0.5 first. After the upgrade, JDBC mode is used automatically.

    Hologres versionFlink engine versionWhat's required
    V2.1 and later8.0.5 and laterRead permissions on the target table. No replication slot needed.
    V2.08.0.5 and earlierA publication and a replication slot. See Create a publication and a replication slot.
    V1.3 and earlier8.0.5 and earlierRead permissions on the target table. The Holohub mode is used automatically.

Limitations

  • JDBC-based Binlog consumption requires Hologres V1.1 or later.

  • Supported column data types: INTEGER, BIGINT, SMALLINT, TEXT, CHAR(n), VARCHAR(n), REAL, DOUBLE PRECISION, BOOLEAN, NUMERIC(38,8), DATE, TIME, TIMETZ, TIMESTAMP, TIMESTAMPTZ, BYTEA, JSON, SERIAL, OID, int4[], int8[], float4[], float8[], boolean[], and text[]. If a table contains columns of unsupported types, consumption fails. Starting from V1.3.36, JSONB columns are also supported. Enable the following Grand Unified Configuration (GUC) parameter before consuming:

    -- Enable at the session level.
    SET hg_experimental_enable_binlog_jsonb = ON;
    
    -- Enable at the database level.
    ALTER DATABASE <db_name> SET hg_experimental_enable_binlog_jsonb = ON;
  • Each shard of each consumed table uses one Walsender connection. Walsender connections are independent of regular connections.

  • The maximum number of Walsenders per frontend node varies by version. To check the current limit, run:

    1. Identify active JDBC Binlog consumption jobs and stop any that are unnecessary.

    2. Review whether table group and shard count settings are appropriate. See Best practices for setting table groups.

    3. If the limit is still exceeded, scale out the instance.

    VersionMax Walsenders per frontend node
    V2.2 and later600
    V2.0 and V2.11,000
    V1.1.26 to V2.0100
    SHOW max_wal_senders;

    Default values: The total limit is max_wal_senders × number of frontend nodes. For the number of frontend nodes for a given instance specification, see Instance management. Use this formula to estimate how many tables you can consume concurrently:

    Number of tables ≤ (max_wal_senders × number of frontend nodes) / table shard count

    Example: An instance with two frontend nodes running V2.2 has a total of 600 × 2 = 1,200 Walsenders. For tables with a shard count of 20, it can support up to 60 concurrent table consumptions. If two jobs consume from the same table simultaneously, they each consume 20 Walsenders—counting as 40 toward the total. If the Walsender limit is reached, you'll see:

    FATAL: sorry, too many wal senders already

    To resolve this:

  • Read-only secondary instances: Binlog consumption via JDBC is not supported before V2.0.18. Starting from V2.0.18, secondary instances support Binlog consumption but do not record consumption progress.

Create a publication and a replication slot

Required for Hologres V2.0 and earlier. Starting from V2.1, users with only read permissions on the target table can consume Binlog data without a replication slot—skip this section if you're on V2.1 or later.

Publication

A publication defines which table's changes are made available for logical replication. In Hologres, a publication is bound to exactly one physical table, and that table must have Binlog enabled.

Create a publication

CREATE PUBLICATION <name> FOR TABLE <table_name>;
ParameterDescription
nameA custom name for the publication.
table_nameThe name of the target table.

Example:

CREATE PUBLICATION hg_publication_test_1 FOR TABLE test_message_src;

Query publications

SELECT * FROM pg_publication;

Example output:

         pubname        | pubowner | puballtables | pubinsert | pubupdate | pubdelete | pubtruncate
------------------------+----------+--------------+-----------+-----------+-----------+-------------
 hg_publication_test_1  |    16728 | f            | t         | t         | t         | t
(1 row)
FieldDescription
pubnameThe name of the publication.
pubownerThe owner of the publication.
puballtablesWhether all tables are included. Always false—binding multiple physical tables is not supported.
pubinsertWhether INSERT events are published. Default: true.
pubupdateWhether UPDATE events are published. Default: true.
pubdeleteWhether DELETE events are published. Default: true.
pubtruncateWhether TRUNCATE events are published. Default: true.

For more information on Binlog event types, see Binlog format and principles.

Query tables in a publication

SELECT * FROM pg_publication_tables;

Example output:

         pubname        | schemaname |    tablename
------------------------+------------+------------------
 hg_publication_test_1  | public     | test_message_src
(1 row)

Delete a publication

DROP PUBLICATION <name>;

Example:

DROP PUBLICATION hg_publication_test_1;

Replication slot

Important

An inactive replication slot continues to retain WAL data on the server. If a slot is not consumed, WAL accumulates and can exhaust your storage. Delete any slot you no longer need.

A replication slot tracks the consumption progress for a publication and enables resumable transmission. After a failover, a consumer can recover from the last committed checkpoint recorded in the slot.

Only superusers and users with the Replication Role can create and use replication slots. To grant or revoke the Replication Role:

-- Grant the Replication Role to a user.
ALTER ROLE <user_name> REPLICATION;

-- Revoke the Replication Role from a user.
ALTER ROLE <user_name> NOREPLICATION;

user_name is an Alibaba Cloud account ID or a Resource Access Management (RAM) user. See Account overview.

Create a replication slot

CALL hg_create_logical_replication_slot('<replication_slot_name>', 'hgoutput', '<publication_name>');
ParameterDescription
replication_slot_nameA custom name for the replication slot.
hgoutputThe output plugin for the Binlog format. Only hgoutput is supported.
publication_nameThe name of the publication to bind to.

Example:

CALL hg_create_logical_replication_slot('hg_replication_slot_1', 'hgoutput', 'hg_publication_test_1');

Query replication slots

SELECT * FROM hologres.hg_replication_slot_properties;

Example output:

        slot_name       | property_key |      property_value
------------------------+--------------+------------------------
 hg_replication_slot_1  | plugin       | hgoutput
 hg_replication_slot_1  | publication  | hg_publication_test_1
 hg_replication_slot_1  | parallelism  | 1
(3 rows)
FieldDescription
slot_nameThe name of the replication slot.
property_keyOne of: plugin (output plugin), publication (bound publication), or parallelism (number of concurrent connections required to consume the full table, equal to the table group's shard count).
property_valueThe value of the corresponding property.

Query the required parallelism

Because Hologres is a distributed database, each table's data is spread across multiple shards. You need one connection per shard to consume the full table. To check how many concurrent connections hg_replication_slot_1 requires:

SELECT hg_get_logical_replication_slot_parallelism('hg_replication_slot_1');

Example output:

 hg_get_logical_replication_slot_parallelism
---------------------------------------------
                                          20

Query consumption progress

The hologres.hg_replication_progress table records the consumer offset that you explicitly commit.

SELECT * FROM hologres.hg_replication_progress;

Example output:

        slot_name       | parallel_index | lsn
------------------------+----------------+-----
 hg_replication_slot_1  |              0 |  66
 hg_replication_slot_1  |              1 | 122
 hg_replication_slot_1  |              2 | 119
(3 rows)
FieldDescription
slot_nameThe name of the replication slot.
parallel_indexThe index of the concurrent connection (one per shard).
lsnThe Log Sequence Number (LSN) of the last consumed Binlog record that was committed.
Important
  • The hologres.hg_replication_progress table is created only after the first Binlog consumption.

  • The table records only offsets that you explicitly commit by calling the commit LSN function in your code. The recorded value may not match the actual consumer position. Track the LSN on the client side and use it as the recovery point.

  • Checkpoint commits are effective only when consuming via a replication slot. When consuming by table name only (without withSlotName), progress is not recorded.

Delete a replication slot

Delete a slot when you no longer need it to prevent WAL accumulation. Slots do not close automatically when a consumption job stops—you must delete them manually.

CALL hg_drop_logical_replication_slot('<replication_slot_name>');

Example:

CALL hg_drop_logical_replication_slot('hg_replication_slot_1');

Consume Binlog data using JDBC

JDBC-based consumption gives you fine-grained control over each shard. You create one PGReplicationStream per shard and manage parallelism yourself.

Step 1: Add dependencies

Use JDBC 42.2.18 or later. Add the following dependencies to your pom.xml:

<dependency>
    <groupId>org.postgresql</groupId>
    <artifactId>postgresql</artifactId>
    <version>42.3.8</version>
</dependency>
<!-- Used to get the table schema and decode Binlog records -->
<dependency>
    <groupId>com.alibaba.hologres</groupId>
    <artifactId>holo-client</artifactId>
    <version>2.2.10</version>
</dependency>

Step 2: Connect and consume

The following example connects to a replication slot, reads Binlog records from one shard, and prints each record.

import com.alibaba.hologres.client.HoloClient;
import com.alibaba.hologres.client.HoloConfig;
import com.alibaba.hologres.client.impl.binlog.HoloBinlogDecoder;
import com.alibaba.hologres.client.model.Record;
import com.alibaba.hologres.client.model.TableSchema;

import org.postgresql.PGConnection;
import org.postgresql.PGProperty;
import org.postgresql.replication.LogSequenceNumber;
import org.postgresql.replication.PGReplicationStream;

import java.nio.ByteBuffer;
import java.sql.Connection;
import java.sql.DriverManager;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;

public class Test {

    public static void main(String[] args) throws Exception {

        String username = "";
        String password = "";
        String url = "jdbc:postgresql://Endpoint:Port/db_test";

        // Set up the JDBC connection for replication.
        Properties properties = new Properties();
        PGProperty.USER.set(properties, username);
        PGProperty.PASSWORD.set(properties, password);
        PGProperty.ASSUME_MIN_SERVER_VERSION.set(properties, "9.4");
        // Required for Binlog consumption.
        PGProperty.REPLICATION.set(properties, "database");

        try (Connection connection = DriverManager.getConnection(url, properties)) {

            // Each PGReplicationStream consumes one shard. Set shardId for each stream.
            int shardId = 0;
            PGConnection pgConnection = connection.unwrap(PGConnection.class);
            PGReplicationStream pgReplicationStream = pgConnection.getReplicationAPI()
                    .replicationStream()
                    .logical()
                    // V2.1+: two options
                    // Option 1: specify the replication slot name. table_name is ignored.
                    // Option 2: omit withSlotName and set table_name in withSlotOption instead.
                    .withSlotName("slot_name")
                    .withSlotOption("table_name", "public.test_message_src")
                    .withSlotOption("parallel_index", shardId)
                    .withSlotOption("batch_size", "1024")
                    .withSlotOption("start_time", "2021-01-01 00:00:00")
                    .withSlotOption("start_lsn", "0")
                    .start();

            // HoloClient is needed to get the table schema for decoding.
            HoloConfig holoConfig = new HoloConfig();
            holoConfig.setJdbcUrl(url);
            holoConfig.setUsername(username);
            holoConfig.setPassword(password);
            HoloClient client = new HoloClient(holoConfig);

            // Decode raw binary Binlog data into BinlogRecord objects.
            TableSchema schema = client.getTableSchema("test_message_src", true);
            HoloBinlogDecoder decoder = new HoloBinlogDecoder(schema);

            // Track the last consumed LSN for checkpoint recovery.
            Long currentLsn = 0L;

            ByteBuffer byteBuffer = pgReplicationStream.readPending();
            while (true) {
                if (byteBuffer != null) {
                    List<BinlogRecord> records = decoder.decode(shardId, byteBuffer);
                    Long latestLsn = 0L;
                    for (BinlogRecord record : records) {
                        latestLsn = record.getBinlogLsn();
                        // Process the record here.
                        System.out.println("lsn: " + latestLsn + ", record: " + Arrays.toString(record.getValues()));
                    }
                    // Save the latest LSN as the recovery point.
                    currentLsn = latestLsn;
                    pgReplicationStream.forceUpdateStatus();
                }
                byteBuffer = pgReplicationStream.readPending();
            }
        }
    }
}

Parameters

`withSlotName` behavior by version:

VersionBehavior
Earlier than V2.1Specify the name of an existing replication slot. Required.
V2.1 and laterOptional. Omit withSlotName and set table_name in withSlotOption instead to consume without a replication slot.

`withSlotOption` parameters:

ParameterRequiredDescription
table_nameRequired when withSlotName is not setThe target table to consume. Format: schema_name.table_name or table_name. Ignored if withSlotName is set.
parallel_indexYesThe shard index to consume. One PGReplicationStream handles one shard. For a table with three shards, create three streams with parallel_index 0, 1, and 2.
start_timeNoStart consuming from this timestamp. Example: 2021-01-01 12:00:00+08.
start_lsnNoStart consuming from the record after this LSN. Takes priority over start_time.
batch_sizeNoMaximum records per retrieval. Default: 1,024.

Default start behavior (when neither start_lsn nor start_time is set):

ScenarioBehavior
First consumption from a replication slotStarts from the beginning, similar to Kafka's earliest offset.
Subsequent consumption from a replication slotResumes from the last committed checkpoint.
Consumption by table name only (no withSlotName)Always starts from the beginning.

BinlogRecord fields

BinlogRecord exposes the following Binlog system fields:

MethodReturns
getBinlogLsn()The LSN of the Binlog record.
getBinlogTimestamp()The system timestamp when the event occurred.
getBinlogEventType()The event type: INSERT, UPDATE, DELETE, or TRUNCATE.

For a full field reference, see Subscribe to Hologres Binlog data.

Commit the checkpoint

After consuming Binlog data, commit the checkpoint by calling the commit LSN function. This serves two purposes: it lets the server know which WAL segments are safe to archive or discard, and it provides a recovery point so consumption can resume from the correct position after a failover.

The sample code above does not include this step—add it based on your application's recovery requirements.

Consume Binlog data using Holo-Client

Holo-Client simplifies Binlog consumption by managing shard-level connections automatically. Specify the target table, and Holo-Client handles the rest.

  • The number of connections equals the table's shard count.

  • Save the checkpoint per shard so that consumption can resume from the correct position after a network failure or other interruption.

Note

Use Holo-Client 2.2.10 or later. Versions 2.2.9 and earlier have a memory leak.

Step 1: Add dependencies

<dependency>
    <groupId>com.alibaba.hologres</groupId>
    <artifactId>holo-client</artifactId>
    <version>2.2.10</version>
</dependency>

Step 2: Connect and consume

The following example subscribes to a table, processes each record, saves a per-shard checkpoint, and retries on failure.

import com.alibaba.hologres.client.BinlogShardGroupReader;
import com.alibaba.hologres.client.Command;
import com.alibaba.hologres.client.HoloClient;
import com.alibaba.hologres.client.HoloConfig;
import com.alibaba.hologres.client.Subscribe;
import com.alibaba.hologres.client.exception.HoloClientException;
import com.alibaba.hologres.client.impl.binlog.BinlogOffset;
import com.alibaba.hologres.client.model.binlog.BinlogHeartBeatRecord;
import com.alibaba.hologres.client.model.binlog.BinlogRecord;

import java.util.HashMap;
import java.util.Map;

public class HoloBinlogExample {

    public static BinlogShardGroupReader reader;

    public static void main(String[] args) throws Exception {
        String username = "";
        String password = "";
        String url = "jdbc:postgresql://ip:port/database";
        String tableName = "test_message_src";
        String slotName = "hg_replication_slot_1";

        HoloConfig holoConfig = new HoloConfig();
        holoConfig.setJdbcUrl(url);
        holoConfig.setUsername(username);
        holoConfig.setPassword(password);
        holoConfig.setBinlogReadBatchSize(128);
        holoConfig.setBinlogIgnoreDelete(true);
        holoConfig.setBinlogIgnoreBeforeUpdate(true);
        holoConfig.setBinlogHeartBeatIntervalMs(5000L);
        HoloClient client = new HoloClient(holoConfig);

        // Get the shard count to initialize per-shard checkpoint tracking.
        int shardCount = Command.getShardCount(client, client.getTableSchema(tableName));

        // Initialize the checkpoint map with LSN 0 for each shard.
        Map<Integer, Long> shardIdToLsn = new HashMap<>(shardCount);
        for (int i = 0; i < shardCount; i++) {
            shardIdToLsn.put(i, 0L);
        }

        // Before V2.1: tableName and slotName are both required.
        // V2.1 and later: tableName is enough (slotName defaults to "hg_table_name_slot").
        Subscribe subscribe = Subscribe.newStartTimeBuilder(tableName, slotName)
                .setBinlogReadStartTime("2021-01-01 12:00:00")
                .build();

        reader = client.binlogSubscribe(subscribe);

        BinlogRecord record;
        int retryCount = 0;

        while (true) {
            try {
                if (reader.isCanceled()) {
                    // Re-create the reader using the last saved checkpoint.
                    reader = client.binlogSubscribe(subscribe);
                }
                while ((record = reader.getBinlogRecord()) != null) {
                    if (record instanceof BinlogHeartBeatRecord) {
                        // All data up to this heartbeat's timestamp has been consumed on this shard.
                        continue;
                    }

                    // Process the record here.
                    System.out.println(record);

                    // Save the checkpoint so consumption can resume from here on failure.
                    shardIdToLsn.put(record.getShardId(), record.getBinlogLsn());
                    retryCount = 0;
                }
            } catch (HoloClientException e) {
                if (++retryCount > 10) {
                    throw new RuntimeException(e);
                }
                System.out.println(String.format(
                        "Binlog read failed: %s. Retrying (%d/10)...", e.getMessage(), retryCount));

                // Wait before retrying, with increasing backoff.
                Thread.sleep(5000L * retryCount);

                // Resume from the saved per-shard checkpoint.
                Subscribe.OffsetBuilder subscribeBuilder = Subscribe.newOffsetBuilder(tableName, slotName);
                for (int i = 0; i < shardCount; i++) {
                    subscribeBuilder.addShardStartOffset(i,
                            new BinlogOffset().setSequence(shardIdToLsn.get(i)));
                }
                subscribe = subscribeBuilder.build();
                reader.cancel();
            }
        }
    }
}

Parameters

`Subscribe` types:

TypeWhen to use
Subscribe.newStartTimeBuilder(tableName, slotName)Start consumption from a specific timestamp.
Subscribe.newOffsetBuilder(tableName, slotName)Start consumption from a specific LSN per shard—use this for checkpoint-based recovery.

Version note: Before V2.1, both tableName and slotName are required. Starting from V2.1, only tableName is required (equivalent to using the fixed slot name hg_table_name_slot).

`HoloConfig` Binlog parameters:

ParameterDefaultDescription
binlogReadBatchSize1,024Maximum records per retrieval per shard.
binlogHeartBeatIntervalMs-1 (disabled)Interval in milliseconds between BinlogHeartBeatRecord messages. When no new data arrives, heartbeat records indicate that all data up to that timestamp has been consumed on that shard.
binlogIgnoreDeletefalseSkip DELETE events.
binlogIgnoreBeforeUpdatefalseSkip the before-image records for UPDATE events.

Troubleshooting

The hologres.hg_replication_progress table is missing or empty

If the table doesn't exist or shows no data after you commit consumption progress, check the following:

Consuming by table name without a replication slot. If you didn't set withSlotName (JDBC) or didn't specify a slot (Holo-Client), progress tracking is not supported. The table will not be created or updated. Switch to replication slot-based consumption to enable progress tracking.

First-time consumption on a read-only secondary instance. On secondary instances running Hologres earlier than V2.0.18, the hologres.hg_replication_progress table cannot be created during the first Binlog consumption. Consume from the primary instance once, then switch back to the secondary instance.

If neither applies, contact support through the Hologres DingTalk group. See How do I get more online support?.

What's next