本文为您介绍如何通过JDBC和Holo-Client这两种方式消费Hologres Binlog。

使用限制

  • 仅Hologres V1.1及以上版本支持通过JDBC消费Hologres Binlog,如果您的实例是V1.1以下版本,请您提交工单或加入在线支持钉钉群申请升级实例。
  • 需提前开启和配置Binlog,详情请参见订阅Hologres Binlog
  • 仅以下数据类型支持消费Hologres Binlog:INTEGER、BIGINT、TEXT、REAL、DOUBLE PRECISION、BOOLEAN、NUMERIC(38,8)、DATE、TIME、TIMETZ、TIMESTAMP、TIMESTAMPTZ、BYTEA、int4[]、int8[]、float4[]、float8[]、boolean[]、text[]。
  • 消费Hologres Binlog之前,需要实例的Superuser执行以下语句创建extension,extension针对整个数据库生效,一个数据库只需执行一次,新建数据库需要再次执行。
    create extension hg_binlog;
  • 同时消费多个表的Binlog会导致性能变差,建议同一时间只消费一张表的Binlog。

Publication和Replication Slot

  • Publication
    • 简介:

      本质上是一组表,这些表的数据更改旨在通过逻辑复制进行表中数据复制,详细内容请参见Publication。当前Hologres支持的Publication只支持绑定一张物理表,且该表需要开启Binlog功能。

    • 创建Publication
      • 语法示例
        CREATE PUBLICATION name FOR TABLE table_name;
      • 参数说明
        参数 说明
        name 自定义Publication名称。
        table_name 数据库中表名称。
      • 使用示例
        --示例创建一个名为hg_publication_test_1的Publication,且将表test_message_src添加至该Publication下
        create publication hg_publication_test_1 for table test_message_src;
    • 查询已经创建的Publication
      • 语法示例
        select * from pg_publication;
      • 查询结果
                pubname        | pubowner | puballtables | pubinsert | pubupdate | pubdelete | pubtruncate
        -----------------------+----------+--------------+-----------+-----------+-----------+-------------
         hg_publication_test_1 |    16728 | f            | t         | t         | t         | t
        (1 row)
        参数 说明
        pubname Publication名称。
        pubowner Publication拥有者。
        puballtables 绑定多个物理表,默认为False,目前暂不支持。
        pubinsert 是否发布INSERT类型的Binlog,默认为True,Binlog类型请参考Binlog格式说明
        pubupdate 是否发布UPDATE类型的Binlog,默认为True。
        pubdelete 是否发布DELETE类型的Binlog,默认为True。
        pubtruncate 是否发布TRUNCATE类型的Binlog,默认为True。
    • 查询Publication关联的表
      • 语法示例
        select * from pg_publication_tables;
      • 查询结果
                pubname        | schemaname |    tablename
        -----------------------+------------+------------------
         hg_publication_test_1 | public     | test_message_src
        (1 row) 
        参数 说明
        pubname Publication名称。
        schemaname 表所属schema的名称。
        tablename 表名称。
  • Replication Slot
    • 简介:

      在逻辑复制场景下,一个Replication Slot表示一个数据的更改流,该Replication Slot也与当前消费进度绑定,用于断点续传,详细内容可以参见Postgres文档Replication Slot。Replicaiton Slot用于维护Binlog消费的点位信息,使得消费端Failover之后可以从之前已经Commit的点位进行恢复。

    • 创建Replication Slot
      • 语法示例
        call hg_create_logical_replication_slot('replication_slot_name', 'hgoutput', 'publication_name');
      • 参数说明
        参数 说明
        replication_slot_name 自定义Replication Slot的名称。
        hgoutput Binlog输出格式的插件,当前仅支持hgoutput内置插件。
        publication_name Replication Slot所绑定的Publication名称。
      • 使用示例
        --创建一个名称为hg_replication_slot_1的Replication Slot,并且绑定名称为hg_publication_test_1的Publication。
        call hg_create_logical_replication_slot('hg_replication_slot_1', 'hgoutput', 'hg_publication_test_1');
    • 查询已经创建的Replication Slot
      • 语法示例
        select * from hologres.hg_replication_slot_properties;
      • 查询结果
               slot_name       | property_key |    property_value
        -----------------------+--------------+-----------------------
         hg_replication_slot_1 | plugin       | hgoutput
         hg_replication_slot_1 | publication  | hg_publication_test_1
         hg_replication_slot_1 | parallelism  | 1
        (3 rows)
        参数 说明
        slot_name Replication Slot名称。
        property_key 包含如下三个参数。
        • plugin:Replication Slot使用的插件,目前只支持hgoutput。
        • publication:Replication Slot对应的Publication。
        • parallelism:Replication Slot的并发数。
        property_value property_key包含参数对应的值。
    • 查询Replication Slot的并发数

      Hologres是一个分布式数仓,所以一张表的数据会分布在多个Shard上,所以使用JDBC消费Binlog的时候,需要启动多个客户端连接,才能消费到完整的Binlog数据。通过以下命令可以查询消费hg_replication_slot_1所需要的并发数。

      • 语法示例
        select hg_get_logical_replication_slot_parallelism('hg_replication_slot_1');
      • 查询结果
        hg_get_logical_replication_slot_parallelism  
        ------------------------------------------------
                                                20 
        参数 说明
        pubname Publication名称。
        schemaname 表所属schema的名称。
        tablename 表名称。
    • 查询Replication Slot的消费进度
      • 语法示例
        select * from hologres.hg_replication_progress;
      • 查询结果
               slot_name       | parallel_index | lsn 
        -----------------------+----------------+-----
         hg_replication_slot_1 |              0 |  66
         hg_replication_slot_1 |              1 | 122
         hg_replication_slot_1 |              2 | 119
        
        (0 rows)
        参数 说明
        slot_name Replication Slot名称。
        parallel_index 并发序号。
        lsn 当前消费到最后的Binlog序号。

使用JDBC消费Binlog

  1. 添加POM依赖
    使用如下语句添加POM依赖。
    说明 添加POM依赖,请使用42.2.18及以上版本的JDBC。
            <dependency>
                <groupId>org.postgresql</groupId>
                <artifactId>postgresql</artifactId>
                <version>42.2.23</version>
            </dependency>
  2. Java代码示例
    import com.alibaba.hologres.client.HoloClient;
    import com.alibaba.hologres.client.HoloConfig;
    import com.alibaba.hologres.client.impl.binlog.HoloBinlogDecoder;
    import com.alibaba.hologres.client.model.Record;
    
    import org.postgresql.PGConnection;
    import org.postgresql.PGProperty;
    import org.postgresql.replication.LogSequenceNumber;
    import org.postgresql.replication.PGReplicationStream;
    import org.postgresql.model.TableSchema;
    
    import java.nio.ByteBuffer;
    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.util.List;
    import java.util.Properties;
    
    import com.alibaba.hologres.client.model.Record;
    
    public static void main (String[] args) throws Exception {
        
        String username = "";
        String password = "";
        String url = "jdbc:postgresql://ip:port/database";
        
        // 创建JDBC连接
        Properties properties = new Properties();
        PGProperty.USER.set(properties, username);
        PGProperty.PASSWORD.set(properties, password);
        PGProperty.ASSUME_MIN_SERVER_VERSION.set(properties, "9.4");
        // 消费Binlog,务必加上以下参数
        PGProperty.REPLICATION.set(properties, "database");
        Connection connection = DriverManager.getConnection(url, properties);
    
        // 创建PGReplicationStream并绑定Replicaiton slot
        PGConnection pgConnection = connection.unwrap(PGConnection.class);
        PGReplicationStream pgReplicationStream = pgConnection.getReplicationAPI().replicationStream()
                .logical()
                .withSlotName("hg_replication_slot_1")
                .withSlotOption("parallel_index", "0")
                .withSlotOption("batch_size", "1024")
                .start();
        
        // 创建holo-client
        HoloConfig holoConfig = new HoloConfig();
        holoConfig.setJdbcUrl(url);
        holoConfig.setUsername(username);
        holoConfig.setPassword(password);
        HoloClient client = new HoloClient(holoConfig);
        
        // 创建Binlog decoder用于Decode binary数据
        TableSchema schema = client.getTableSchema("test_message_src", true);
        HoloBinlogDecoder decoder = new HoloBinlogDecoder(client, schema);
        
        // 消费数据
        ByteBuffer byteBuffer = pgReplicationStream.readPending();
        while (true) {
          if (byteBuffer != null) {
              List<Record> records = decoder.decode(byteBuffer);
              Long latestLsn = 0L;
              for (Record record: records) {
                  // Do Something
                  latestLsn = (Long) record.getObject("hg_binlog_lsn");
              }
              // Commit Binlog 点位信息
              pgReplicationStream.setFlushedLSN(LogSequenceNumber.valueOf(latestLsn));
              pgReplicationStream.forceUpdateStatus();
          }
          byteBuffer = pgReplicationStream.readPending();
        }
        
        pgReplicationStream.close();
        connection.close();
    }
    创建PGReplicationStream时的withSlotOption可以指定如下参数。
    参数 是否必须 说明
    parallel_index Replication Slot的并发数序号,表示PGReplicationStream消费的是第几个并发的数据。假设某个Replication Slot的并发数是3,则用户最多可以创建3个PGReplicationStream,每个PGReplicationStream的parallel_index参数分别是0、1、2。当前Hologres Binlog并不支持类似Kafka Consumer Group的实现,所以需要用户自己创建多个PGReplicationStream。
    start_time 表示从某个时间点位开始消费Binlog,示例参数格式为:2021-01-01 12:00:00+08。
    如果未指定此参数,分为如下两种情况:
    • 第一次开始消费Replication Slot的Binlog,则从头开始消费,类似Kafka的Oldest。
    • 曾经消费过Replication Slot的Binlog,则尝试从之前Commit过的点位开始消费。
    batch_size 单次获取的Binlog最大批大小,单位为行,默认值为1024。

使用Holo-Client消费Binlog

消费Hologres Binlog功能已经集成至Holo-Client中,您可以通过指定需要消费的物理表,方便的消费所有parallel_index的Binlog数据。使用Holo-Client消费Binlog,需要占用与物理表shard数(slot并发数)相同的连接数,请保证连接数充足。

  1. 添加POM依赖
    使用如下语句添加POM依赖。
    说明 添加POM依赖,请使用1.2.16.1及以上版本的Holo-Client。
    <dependency>
      <groupId>com.alibaba.hologres</groupId>
      <artifactId>holo-client</artifactId>
      <version>1.2.16.1</version>
    </dependency>
  2. Java代码示例
    import com.alibaba.hologres.client.BinlogShardGroupReader;
    import com.alibaba.hologres.client.HoloConfig;
    import com.alibaba.hologres.client.HoloClient;
    import com.alibaba.hologres.client.model.Record;
    import org.postgresql.model.TableSchema;
    
    import java.util.Arrays;
    
    public class HoloBinlogExample {
    
        public static void main(String[] args) throws Exception {
    
            String username = "";
            String password = "";
            String url = "jdbc:postgresql://ip:port/database";
            String tableName = "test_message_src";
    
            HoloConfig holoConfig = new HoloConfig();
            holoConfig.setJdbcUrl(url);
            holoConfig.setUsername(username);
            holoConfig.setPassword(password);
            holoConfig.setBinlogReadBatchSize(128);
            holoConfig.setBinlogIgnoreDelete(true);
            holoConfig.setBinlogIgnoreBeforeUpdate(true);
            holoConfig.setBinlogReadStartTime("2021-01-01 12:00:00+08");
            holoConfig.setBinlogReadTimeoutSeconds(20);
    
            HoloClient client = new HoloClient(holoConfig);
            TableSchema schema = client.getTableSchema(tableName);
    
            BinlogShardGroupReader reader = client.binlogSubscribe(schema);
            Record record;
    
            while ((record = reader.getRecord()) != null) {
                //handle record
            }
        }
    }
                            
    使用Holo-Client消费Binlog时可以指定如下参数。
    参数 是否必须 说明
    setBinlogReadBatchSize 单次获取的Binlog最大批大小,单位为行,默认值为1024。
    binlogIgnoreDelete 是否忽略Delete类型的Binlog,默认为false。
    binlogIgnoreBeforeUpdate 是否忽略BeforeUpdate类型的Binlog,默认值为false。
    start_time 表示从某个时间点位开始消费Binlog,示例参数格式为:2021-01-01 12:00:00+08。
    如果未指定此参数,分为如下两种情况:
    • 第一次开始消费Replication Slot的Binlog,则从头开始消费,类似Kafka的Oldest。
    • 曾经消费过Replication Slot的Binlog,则尝试从之前Commit过的点位开始消费。
    binlogReadTimeoutSeconds 源端停止传输Binlog数据的超时时间。
    说明 此处的超时时间指的是读取不到源端的Binlog数据,到停止读取源端的Binlog数据的时间间隔,可以理解为初始化或读取卡顿可等待的最长时长。需要注意,消费到了数据但是没有可返回的Binlog并不会导致超时。