All Products
Search
Document Center

Hologres:Use JDBC to consume Hologres binary logs

Last Updated:Apr 22, 2024

This topic describes how to use Java Database Connectivity (JDBC) and Holo Client to consume Hologres binary logs.

Prerequisites

  • The binary logging feature is enabled and configured for your Hologres instance. For more information, see Subscribe to Hologres binary logs.

  • The hg_binlog extension is created.

    • Before you use the binary logging feature in versions earlier than Hologres V2.0, you must execute the CREATE EXTENSION statement to create the hg_binlog extension as the superuser. The hg_binlog extension is created at the database level. You need to create the extension only once for each database. Each time you create a database and want to use the binary logging feature, you must execute the statement.

      -- Create the hg_binlog extension.
      CREATE extension hg_binlog;
      -- Drop the hg_binlog extension.
      DROP extension hg_binlog;
      Important

      We recommend that you do not execute the DROP EXTENSION <extension_name> CASCADE; statement to drop an extension. The CASCADE statement drops not only the specified extension but also the extension data and the objects that depend on the extension. The extension data includes the PostGIS data, roaring bitmap data, Proxima data, binary log data, and BSI data. The objects include metadata, tables, views, and server data.

    • In Hologres V2.0 and later, you do not need to manually create the hg_binlog extension.

  • The conditions that are required for consuming binary logs are met. The conditions vary based on the version of your Hologres instance:

    • For all versions: Preparations are complete, including the creation of a publication for the desired table and the creation of a replication slot for the publication. After you complete the preparations in a Hologres version, you can directly consume binary logs of the table.

      Note

      To complete the preparations, you must be granted one set of the following permissions:

      • Permissions of the Superuser of the Hologres instance

      • Permissions of the owner of the desired table, CREATE DATABASE permission, and permissions of the Replication role of the Hologres instance

    • For Hologres V2.1 and later: You are granted the read permissions on the desired table. In Hologres V2.1 and later, if this condition is met, you can consume binary logs of the desired table without the need to complete the preparations.

Limits

  • You can use JDBC to consume Hologres binary logs only in Hologres V1.1 and later. If the version of your Hologres instance is earlier than V1.1, manually upgrade your Hologres instance in the Hologres console or join the Hologres DingTalk group to contact technical support. For more information about how to manually upgrade your Hologres instance in the Hologres console, see Instance upgrades. For more information about how to contact technical support, see Obtain online support for Hologres.

  • Only the fields of the following data types in Hologres binary logs can be consumed: INTEGER, BIGINT, SMALLINT, TEXT, CHAR(n), VARCHAR(n), REAL, DOUBLE PRECISION, BOOLEAN, NUMERIC(38,8), DATE, TIME, TIMETZ, TIMESTAMP, TIMESTAMPTZ, BYTEA, JSON, SERIAL, OID, int4[], int8[], float4[], float8[], boolean[], and text[]. In Hologres V1.3.36 and later, fields of the JSONB data type in binary logs can be consumed. You cannot consume Hologres binary logs if the Hologres binary logs contain fields of other data types.

    Note

    In Hologres V1.3.36 and later, Hologres binary logs that contain fields of the JSONB data type can be consumed. Before you consume these binary logs, you must configure one of the following Grand Unified Configuration (GUC) parameters:

    -- Configure a GUC parameter for a session.
    set hg_experimental_enable_binlog_jsonb = on;
    
    -- Configure a GUC parameter for a database.
    alter database <db_name> set hg_experimental_enable_binlog_jsonb = on;
  • When you use JDBC to consume Hologres binary logs, a walsender is used to connect each shard of the table, which is similar to regular connections. The connections over walsenders are independent of regular connections.

  • You can execute the following statement to view the maximum number of walsenders of a single frontend node. By default, the maximum number is 100 for a single frontend node of an instance of Hologres V1.1.26 to a version earlier than V2.0, and the maximum number is 1,000 for Hologres V2.0 and later. The maximum number of walsenders of a Hologres instance is calculated by using the following formula: Maximum number of walsenders for an instance = Maximum number of walsenders of a frontend node × Number of frontend nodes of an instance. For more information about the number of frontend nodes that can be created for Hologres instances of different specifications, see Instance types.

    show max_wal_senders;
    Note

    The maximum number of tables of a Hologres instance from which you consume binary logs at the same time can be calculated by using the following formula: Maximum number of tables = (Maximum number of walsenders of a frontend node (100 or 1,000) × Number of frontend nodes of an instance)/Total number of table shards.

    Examples:

    • Data in Table A is distributed to 20 shards, data in Table B is distributed to 20 shards, and data in Table C is distributed to 30 shards. If you consume binary logs from the three tables at the same time, the number of walsenders required can be calculated by using the following equation: 20 + 20 + 30 = 70.

    • Data in Table A is distributed to 20 shards, and data in Table B is distributed to 20 shards. Two jobs are run to consume binary logs from Table A at the same time. If you consume binary logs from Table A and Table B at the same time, the number of walsenders required can be calculated by using the following equation: 20 × 2 + 20 = 60.

    • If a Hologres instance has two frontend nodes, the maximum number of walsenders for this instance can be calculated by using the following equation: 100 × 2 = 200. In this case, if data in each table is distributed to 20 shards, you can consume binary logs from a maximum of 10 tables at the same time.

    If you use JDBC to consume binary logs and the number of JDBC connections reaches the upper limit, the following error message is returned: FATAL: sorry, too many wal senders already. You can perform the following steps to resolve this issue:

    1. Check the jobs that use JDBC to consume binary logs, and reduce unnecessary consumption of binary logs.

    2. Check whether the number of table groups and the number of shards are properly configured. For more information, see Best practices for specifying table groups.

    3. If the issue persists, you can scale up the instance.

  • In versions earlier than Hologres V2.0.18, you cannot use JDBC to consume binary logs of read-only secondary instances. In Hologres V2.0.18 and later, JDBC can be used to consume binary logs of read-only secondary instances, but consumption progress is not recorded.

Usage notes

The following table describes the methods of consuming binary logs in different Hologres instance versions and Flink engine versions.

Hologres instance version

Flink engine version

Description

V2.1 and later

8.0.5 and later

You do not need to create a replication slot. You can consume binary logs of a table only if you have the read permission on the table.

V2.0

8.0.5 and earlier

By default, JDBC is used to consume binary logs. You must create a publication for the desired table and then a replication slot for the publication before you can consume binary logs.

V1.3 and earlier

8.0.5 and earlier

By default, the Holohub mode is used. If you have the read permission on a table, you can consume binary logs.

Note

For versions later than Hologres V2.0, the HoloHub mode is not supported for consuming binary logs. Before you upgrade your Hologres instance to V2.0 or later, we recommend that you upgrade the version of Flink to 8.0.5. The JDBC mode is automatically used to consume binary logs.

Preparations: Create a publication and a replication slot

In versions earlier than Hologres V2.1, you must create a publication for the desired table and then a replication slot for the publication before you can consume binary logs.

In Hologres V2.1 and later, if you have read permissions on a table, you can consume binary logs of the table even if you do not create a publication or a replication slot. However, you cannot query the consumption progress of binary logs in this case. We recommend that the consumer record the consumption progress.

Publication

Overview

A publication is essentially a group of tables. The publication is used to replicate data changes in the tables by using logical replication. For more information, see Publication. In Hologres, only one physical table can be added to a publication, and binary logging must be enabled for the table.

Create a publication

  • Syntax

  • CREATE PUBLICATION name FOR TABLE table_name;
  • Parameters

  • Parameter

    Description

    name

    The name of the custom publication.

    table_name

    The name of the table in the database.

  • Examples

  • -- Create a publication named hg_publication_test_1 and add the test_message_src table to the publication.
    create publication hg_publication_test_1 for table test_message_src;

Query the created publication

  • Syntax

  • select * from pg_publication;
  • Query result

  •         pubname        | pubowner | puballtables | pubinsert | pubupdate | pubdelete | pubtruncate
    -----------------------+----------+--------------+-----------+-----------+-----------+-------------
     hg_publication_test_1 |    16728 | f            | t         | t         | t         | t
    (1 row)

    Parameter

    Description

    pubname

    The name of the publication.

    pubowner

    The owner of the publication.

    puballtables

    Indicates whether multiple physical tables are added to the publication. The default value is f, which indicates false. In Hologres, you can add only one physical table to a publication.

    pubinsert

    Indicates whether binary logs of the INSERT type are published. The default value is True. For more information about binary log types, see Binary log format and principle in the "Subscribe to Hologres binary logs" topic.

    pubupdate

    Indicates whether binary logs of the UPDATE type are published. The default value is t, which indicates that binary logs of the UPDATE type are published.

    pubdelete

    Indicates whether binary logs of the DELETE type are published. The default value is t, which indicates that binary logs of the DELETE type are published.

    pubtruncate

    Indicates whether binary logs of the TRUNCATE type are published. The default value is t, which indicates true.

Query the table that is added to a publication

  • Syntax

  • select * from pg_publication_tables;
  • Query result

  •         pubname        | schemaname |    tablename
    -----------------------+------------+------------------
     hg_publication_test_1 | public     | test_message_src
    (1 row) 

    Parameter

    Description

    pubname

    The name of the publication.

    schemaname

    The name of the schema to which the table belongs.

    tablename

    The name of the table.

Drop a publication

  • Syntax

  • DROP PUBLICATION name;
  • The name parameter specifies the name of the created publication.

  • Examples

  • DROP PUBLICATION hg_publication_test_1;

Replication Slot

Overview

In logical replication scenarios, a replication slot indicates a data change stream. The replication slot is bound to the current consumption progress for resumable consumption. For more information, see Replication Slots in PostgreSQL documentation. Replication slots are used to store offsets for binary log consumption. This way, a consumer can resume consumption from the offset that is committed after a failover.

Permissions

Only a superuser or a replication role can create and use replication slots. You can execute the following statements to create or remove a replication role:

-- As a superuser, execute the following statement to configure a regular user as a replication role:
alter role <user_name> replication;

-- As a superuser, execute the following statement to configure a replication role as a regular user:
alter role <user_name> noreplication;

The user_name parameter specifies the ID of the Alibaba Cloud account or the Resource Access Management (RAM) user. For more information, see Overview.

Create a replication slot

  • Syntax

  • call hg_create_logical_replication_slot('replication_slot_name', 'hgoutput', 'publication_name');
  • Parameters

  • Parameter

    Description

    replication_slot_name

    The name of the custom replication slot.

    hgoutput

    The binary log output plug-in that is used by the replication slot. Only the hgoutput built-in plug-in is supported.

    publication_name

    The name of the publication to which the replication slot is bound.

  • Examples

  • -- Create a replication slot named hg_replication_slot_1 and bind the replication slot to the hg_publication_test_1 publication. 
    call hg_create_logical_replication_slot('hg_replication_slot_1', 'hgoutput', 'hg_publication_test_1');

Query the created replication slot

  • Syntax

  • select * from hologres.hg_replication_slot_properties;
  • Query result

  •        slot_name       | property_key |    property_value
    -----------------------+--------------+-----------------------
     hg_replication_slot_1 | plugin       | hgoutput
     hg_replication_slot_1 | publication  | hg_publication_test_1
     hg_replication_slot_1 | parallelism  | 1
    (3 rows)

    Parameter

    Description

    slot_name

    The name of the replication slot.

    property_key

    The key of the property. Valid values:

    • plugin: the plug-in that is used by the replication slot. Only the built-in hgoutput plug-in is supported.

    • publication: the publication to which the replication slot is bound.

    • parallelism: the number of parallel client connections that are required to consume the binary logs of an entire table by using a replication slot. The value of this parameter is equivalent to the number of shards in the table group to which the desired table belongs.

    property_value

    The value of the property that is specified by the property_key parameter.

Query the parallelism that is required to consume the binary logs of an entire table by using a replication slot

Hologres is a distributed data warehouse. The data of a table is distributed to different shards. When you use JDBC to consume binary logs, you must establish multiple client connections to consume complete binary logs. You can use the following syntax to query the number of concurrent client connections that are required to consume the binary logs associated with the hg_replication_slot_1 replication slot.

  • Syntax

  • select hg_get_logical_replication_slot_parallelism('hg_replication_slot_1');
  • Query result

  • hg_get_logical_replication_slot_parallelism  
    ------------------------------------------------
                                            20 

Query the consumption progress in a replication slot in Hologres

  • Syntax

  • select * from hologres.hg_replication_progress;
  • Query result

  •        slot_name       | parallel_index | lsn 
    -----------------------+----------------+-----
     hg_replication_slot_1 |              0 |  66
     hg_replication_slot_1 |              1 | 122
     hg_replication_slot_1 |              2 | 119
    
    (0 rows)

    Parameter

    Description

    slot_name

    The name of the replication slot.

    parallel_index

    The parallelism index.

    lsn

    The sequence number of the last consumed binary log.

    Important
    • The hologres.hg_replication_progress table is generated after binary logs are consumed for the first time.

    • The hologres.hg_replication_progress table records offsets that were actively committed by users. Users need to manually call the commit_lsn function in the code to commit the offsets for binary log consumption. The table records the offset that is committed for the last time. The recorded offset does not indicate the actual consumer offset of a user. We recommend that consumers record their log sequence numbers (LSNs) and resume consumption from the offsets with the recorded LSNs. The following sample code for consuming binary logs by using JDBC or Holo Client does not include the commit_lsn function.

    • The offsets that are manually committed by users are valid only when binary logs are consumed based on replication slots. If you consume binary logs based on table names, the hologres.hg_replication_progress table does not record the manually committed offsets.

Drop a replication slot

  • Syntax

  • call hg_drop_logical_replication_slot('<replication_slot_name>');
  • The replication_slot_name parameter specifies the name of the created replication slot.

  • Examples

  • call hg_drop_logical_replication_slot('hg_replication_slot_1');

Use JDBC to consume binary logs

  1. Add dependencies to the pom.xml file.

    Use the following code to add dependencies to the pom.xml file.

    Note

    To add dependencies to the pom.xml file, use JDBC V42.2.18 or later.

            <dependency>
                <groupId>org.postgresql</groupId>
                <artifactId>postgresql</artifactId>
                <version>42.3.8</version>
            </dependency>
            <!-- Obtain the table schema and parse binary logs. -->
            <dependency>
                <groupId>com.alibaba.hologres</groupId>
                <artifactId>holo-client</artifactId>
                <version>2.2.10</version>
    				</dependency>
  2. Sample code in Java

    import com.alibaba.hologres.client.HoloClient;
    import com.alibaba.hologres.client.HoloConfig;
    import com.alibaba.hologres.client.impl.binlog.HoloBinlogDecoder;
    import com.alibaba.hologres.client.model.Record;
    import com.alibaba.hologres.client.model.TableSchema;
    
    import org.postgresql.PGConnection;
    import org.postgresql.PGProperty;
    import org.postgresql.replication.LogSequenceNumber;
    import org.postgresql.replication.PGReplicationStream;
    
    import java.nio.ByteBuffer;
    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.util.Arrays;
    import java.util.List;
    import java.util.Properties;
    
    public class Test {
    
        public static void main (String[] args) throws Exception {
    
            String username = "";
            String password = "";
            String url = "jdbc:postgresql://Endpoint:Port/db_test";
    
            // Create a JDBC connection.
            Properties properties = new Properties();
            PGProperty.USER.set(properties, username);
            PGProperty.PASSWORD.set(properties, password);
            PGProperty.ASSUME_MIN_SERVER_VERSION.set(properties, "9.4");
            // Add the following properties that are required to consume binary logs.
            PGProperty.REPLICATION.set(properties, "database");
            try (Connection connection = DriverManager.getConnection(url, properties)) {
                // Specify the shard ID, create a PGReplicationStream interface, and associate the interface with a replication slot.
                int shardId = 0;
                PGConnection pgConnection = connection.unwrap(PGConnection.class);
                PGReplicationStream pgReplicationStream = pgConnection.getReplicationAPI().replicationStream()
                        .logical()
            	  // In Hologres V2.1 and later, the following methods are available:
                // Method 1: Set the withSlotName parameter to the name of the replication slot created in the preparation phase. You do not need to specify withSlotOption("table_name","xxx"). 
                // Method 2: Specify withSlotOption("table_name","xxx"). You do not need to configure the withSlotName parameter. 
                        .withSlotName("slot_name")
                        .withSlotOption("table_name","public.test_message_src") // The name of the table whose binary logs you want to consume.
                        .withSlotOption("parallel_index", shardId)
                        .withSlotOption("batch_size", "1024")
                        .withSlotOption("start_time", "2021-01-01 00:00:00")
                        .withSlotOption("start_lsn","0")
                        .start();
    	
                // The Holo Client interface is not used to consume binary logs but used to parse consumed data.         
                // Create a Holo Client object.
                HoloConfig holoConfig = new HoloConfig();
                holoConfig.setJdbcUrl(url);
                holoConfig.setUsername(username);
                holoConfig.setPassword(password);
                HoloClient client = new HoloClient(holoConfig);
    
                // Create a binary log decoder to decode binary logs and obtain the table schema by using Holo Client.
                TableSchema schema = client.getTableSchema("test_message_src", true);
                HoloBinlogDecoder decoder = new HoloBinlogDecoder(schema);
    
                // Record the current consumer offset, from which you can resume consumption.
                Long currentLsn = 0;
                // Consume data.
                ByteBuffer byteBuffer = pgReplicationStream.readPending();
                while (true) {
                    if (byteBuffer != null) {
                        List<BinlogRecord> records = decoder.decode(shardId, byteBuffer);
                        Long latestLsn = 0L;
                        for (BinlogRecord record : records) {
                            latestLsn = record.getBinlogLsn();
                            // Do Something
                            System.out.println( "lsn: " + latestLsn + ", record: " + Arrays.toString(record.getValues()));
                        }
                        // Save the consumer offset.
                        currentLsn = latestLsn;                    
                        pgReplicationStream.forceUpdateStatus();
                    }
                    byteBuffer = pgReplicationStream.readPending();
                }
            }
            // pgReplicationStream.close();
            // connection.close();
        }

    You must specify a replication slot by using withSlotOption when you create a PGReplicationStream interface.

    • In versions earlier than Hologres V2.1, you must enter the name of the replication slot in withSlotName.

    • In Hologres V2.1 and later, you do not need to specify withSlotName. You only need to specify the table name in withSlotOption.

    The following table describes the parameters that you can specify in withSlotOption.

    Parameter

    Required

    Description

    table_name

    Required if withSlotName is not specified. If withSlotName is specified, this parameter is invalid.

    If withSlotName is not specified, table_name indicates the name of the table from which you want to consume binary logs. The value of this parameter is in the schema_name.table_name or table_name format.

    parallel_index

    Yes.

    • When you use a PGReplicationStream interface to consume binary logs, a walsender connection is established for the PGReplicationStream interface for consuming binary logs of a shard of the table. The parallel_index parameter specifies the Nth shard of the table from which you want to consume the data.

    • For example, if data in a table is distributed to three shards, three parallel connections are required to consume binary logs by using the replication slot. In this example, you can create a maximum of three PGReplicationStream interfaces and set the parallel_index parameter to 0, 1, and 2 for these PGReplicationStream interfaces.

    • Hologres does not provide an implementation similar to Kafka consumer groups to consume binary logs. Therefore, you must manually create multiple PGReplicationStream interfaces.

    start_time

    No.

    The point in time from which binary logs are consumed. Example: 2021-01-01 12:00:00+08.

    If start_lsn or start_time is not specified, binary logs are consumed based on the following rules:

    • If a consumer consumes the binary logs associated with the replication slot for the first time, the consumer consumes the binary logs from the beginning. This is similar to the Oldest option in Kafka.

    • If a consumer has consumed the binary logs associated with the replication slot before, the consumer attempts to consume the binary logs from the offset that was committed.

    • If the withSlotName parameter is not specified but the table_name parameter is specified, a consumer consumes the binary logs from the beginning regardless of whether the consumer consumes the binary logs for the first time.

    start_lsn

    No.

    The log sequence number (LSN) from which the binary logs are consumed. The value of this parameter takes precedence over the value of the start_time parameter.

    batch_size

    No.

    The maximum number of binary logs that can be consumed at a time. Unit: rows. Default value: 1024.

    Note
    • The BinlogRecord field specifies the type of the record returned by the binary log decoder. You can call the following interfaces to obtain binary log system fields of the record. For more information, see Subscribe to Hologres binary logs.

      • getBinlogLsn(): queries the LSN of the binary log.

      • getBinlogTimestamp(): queries the system timestamp of the binary log. The timestamp indicates the time at which the binary log is generated.

      • getBinlogEventType(): queries the event type recorded in the binary log.

    • After you consume the binary logs, you must manually commit the offset for binary log consumption. This way, you can resume consumption from the offset that is committed after a failover.

Use Holo Client to consume binary logs

  • You can use Holo Client to consume Hologres binary logs. You can specify a physical table to consume the binary logs of all shards of the table.

  • If you use Holo Client to consume the binary logs of a physical table, Holo Client occupies the same number of connections as the number of shards to which the physical table is distributed. This number is also the parallelism of the replication slot that is used to consume the binary logs. Make sure that the connections are sufficient.

  • When you use Holo Client to consume binary logs, we recommend that you save consumption offsets by shard. If the consumption is suspended because of a failure such as a network failure, you can resume consumption from the offset. For more information, see the following sample code.

  1. Add dependencies to the pom.xml file.

    Add the following dependency to the pom.xml file.

    Note

    To add dependencies to the pom.xml file, use Holo Client V2.2.10 or later. Memory leak may occur if you use Holo Client V2.2.9 or earlier.

    <dependency>
      <groupId>com.alibaba.hologres</groupId>
      <artifactId>holo-client</artifactId>
      <version>2.2.10</version>
    </dependency>
  2. Sample code in Java.

    import com.alibaba.hologres.client.BinlogShardGroupReader;
    import com.alibaba.hologres.client.Command;
    import com.alibaba.hologres.client.HoloClient;
    import com.alibaba.hologres.client.HoloConfig;
    import com.alibaba.hologres.client.Subscribe;
    import com.alibaba.hologres.client.exception.HoloClientException;
    import com.alibaba.hologres.client.impl.binlog.BinlogOffset;
    import com.alibaba.hologres.client.model.binlog.BinlogHeartBeatRecord;
    import com.alibaba.hologres.client.model.binlog.BinlogRecord;
    
    import java.util.HashMap;
    import java.util.Map;
    
    public class HoloBinlogExample {
    
        public static BinlogShardGroupReader reader;
    
        public static void main(String[] args) throws Exception {
            String username = "";
            String password = "";
            String url = "jdbc:postgresql://ip:port/database";
            String tableName = "test_message_src";
            String slotName = "hg_replication_slot_1";
    
            // Create a client object.
            HoloConfig holoConfig = new HoloConfig();
            holoConfig.setJdbcUrl(url);
            holoConfig.setUsername(username);
            holoConfig.setPassword(password);
            holoConfig.setBinlogReadBatchSize(128);
            holoConfig.setBinlogIgnoreDelete(true);
            holoConfig.setBinlogIgnoreBeforeUpdate(true);
            holoConfig.setBinlogHeartBeatIntervalMs(5000L);
            HoloClient client = new HoloClient(holoConfig);
    
            // Obtain the number of shards of the table.
            int shardCount = Command.getShardCount(client, client.getTableSchema(tableName));
    
            // Use a map to save the data consumption progress of each shard. The initial value is 0.
            Map<Integer, Long> shardIdToLsn = new HashMap<>(shardCount);
            for (int i = 0; i < shardCount; i++) {
                shardIdToLsn.put(i, 0L);
            }
    
            // Initiate a request to consume binary logs. In versions earlier than V2.1, the tableName parameter and the slotName parameter are required. In Hologres V2.1 and later, you need to specify only the tableName parameter. 
            // The StartTimeBuilder method and the OffsetBuilder method are available for the Subscribe class. In the following example, StartTimeBuilder is used.
            Subscribe subscribe = Subscribe.newStartTimeBuilder(tableName, slotName)
                    .setBinlogReadStartTime("2021-01-01 12:00:00")
                    .build();
            // Create a binary log reader.
            reader = client.binlogSubscribe(subscribe);
    
            BinlogRecord record;
    
            int retryCount = 0;
            long count = 0;
            while(true) {
                try {
                    if (reader.isCanceled()) {
                        // Create a binary log reader again based on the saved consumption offset.
                        reader = client.binlogSubscribe(subscribe);
                    }
                    while ((record = reader.getBinlogRecord()) != null) {
                        // Consume the latest binary logs.
                        if (record instanceof BinlogHeartBeatRecord) {
                            // do something
                            continue;
                        }
        
                        // Display the BinlogRecord field.
                        System.out.println(record);
        
                        // Save the consumption offset. If an exception occurs, you can resume the consumption from the offset.
                        shardIdToLsn.put(record.getShardId(), record.getBinlogLsn());
                        count++;
                        
                        // Data reading is successful. Reset the number of retries.
                        retryCount = 0;
                    }
                } catch (HoloClientException e) {
                    if (++retryCount > 10) {
                        throw new RuntimeException(e);
                    }
                    // Logs of the warning level are displayed upon exceptions. This configuration is recommended.
                    System.out.println(String.format("binlog read failed because %s and retry %s times", e.getMessage(), retryCount));
        
                    // Wait for a period of time during the retry.
                    Thread.sleep(5000L * retryCount);
        
                    // You can use an OffsetBuilder object to subscribe to the consumption progress and specify the start offset for consuming binary logs for each shard.
                    Subscribe.OffsetBuilder subscribeBuilder = Subscribe.newOffsetBuilder(tableName, slotName);
                    for (int i = 0; i < shardCount; i++) {
                        // The offset for binary log consumption is determined by the setting of the setSequence parameter or the setting of the setTimestamp parameter. The setSequence parameter specifies the LSN of the binary logs, and the setTimestamp parameter specifies the system timestamp in microseconds. If you set both the setSequence parameter and the setTimestamp parameter, the value of the setSequence parameter takes precedence.
                        // Resume the consumption based on the consumption progress that is stored in the shardIdToLsn map.
                        subscribeBuilder.addShardStartOffset(i, new BinlogOffset().setSequence(shardIdToLsn.get(i)));
                    }
                    subscribe = subscribeBuilder.build();
                    // Close the reader.
                    reader.cancel();
                }
            }
        }
    }

    You can specify the parameters that are described in the following table when you use Holo Client to consume binary logs.

    Parameter

    Required

    Default value

    Description

    binlogReadBatchSize

    No

    1024

    The maximum number of binary logs to consume in each batch for each shard. Unit: rows.

    binlogHeartBeatIntervalMs

    No

    -1

    The interval at which the binary log reader sends heartbeat records. If you set this parameter to ‒1, the binary log reader does not send heartbeat records.

    Heartbeat records are sent at the specified interval if no incremental logs are generated. The timestamp of each heartbeat record indicates that binary logs generated before this timestamp are all consumed.

    binlogIgnoreDelete

    No

    false

    Specifies whether to ignore binary logs of the DELETE type.

    binlogIgnoreBeforeUpdate

    No

    false

    Specifies whether to ignore binary logs of the BEFORE_UPDATE type.

FAQ

After I consume binary logs and submit the consumption progress, the hologres.hg_replication_progress table does not exist, or the table does not contain consumption progress data. What do I do?

Possible causes:

  • The consumption is not performed by using the replication slot. The withSlotName parameter is not specified. In this case, the consumption progress cannot be recorded.

  • A read-only secondary instance is used and binary logs of databases in the read-only secondary instance are consumed for the first time. In this case, the hologres.hg_replication_progress table fails to be created. This issue is fixed in Hologres V2.0.18 and later. Binary logs of databases in secondary instances can be normally consumed. In versions earlier than Hologres V2.0.18, binary logs of databases in secondary instances can be consumed only after binary logs of databases in the primary instance are consumed.

If this issue is not caused by the preceding reasons, join the Hologres DingTalk group for technical support. For more information, see Obtain online support for Hologres.