すべてのプロダクト
Search
ドキュメントセンター

Hologres:JDBC を使用した Hologres Binlog データの消費

最終更新日:Feb 04, 2026

このトピックでは、Java Database Connectivity (JDBC) および Holo-Client を使用して Hologres Binlog データを消費する方法について説明します。

前提条件

  • Hologres Binlog を有効化および設定します。詳細については、「Hologres Binlog データのサブスクライブ」をご参照ください。

  • hg_binlog 拡張機能を作成します。

    • Hologres V2.0 より前のバージョンでは、インスタンスのスーパーユーザが次の文を実行して拡張機能を作成する必要があります。この拡張機能はデータベース全体に適用され、1 つのデータベースに対しては一度だけ実行すれば十分です。新しいデータベースを作成した場合は、再度実行する必要があります。

      --拡張機能を作成します。
      CREATE extension hg_binlog;
      
      --拡張機能を削除します。
      DROP extension hg_binlog;
      重要

      拡張機能をカスケード方式でアンインストールするために、DROP EXTENSION <extension_name> CASCADE; コマンドを実行しないでください。CASCADE コマンドは指定された拡張機能だけでなく、PostGIS、RoaringBitmap、Proxima、Binlog、BSI データなどの拡張機能データや、メタデータ、テーブル、ビュー、サーバーデータなど、その拡張機能に依存するオブジェクトもすべて消去します。

    • Hologres V2.0 以降では、拡張機能を手動で作成せずにこの機能を使用できます。

  • Hologres V2.1 以降では、以下の 2 つの方法のいずれかで Binlog データを消費できます。

    • すべてのバージョンでサポート:対象テーブルのパブリケーションを作成し、そのパブリケーションのレプリケーションスロットを作成するなど、事前準備を完了してください。その後、対象テーブルの Binlog データを消費できます。

      説明

      この方法には、以下のいずれかの権限が必要です。

      • インスタンスに対するスーパーユーザ権限

      • 対象テーブルに対するオーナー権限、CREATE DATABASE 権限、およびインスタンスに対する Replication Role 権限

    • Hologres V2.1 以降でのみサポート:対象テーブルに対する読み取り権限をユーザーに付与すると、そのユーザーは対象テーブルの Binlog データを消費できます。

制限事項

  • JDBC を使用した Hologres Binlog データの消費は、Hologres V1.1 以降でのみサポートされています。ご利用のインスタンスが V1.1 より前のバージョンの場合、アップグレード準備中に発生する一般的なエラー」をご参照いただくか、Hologres DingTalk グループに参加してフィードバックをお送りください。詳細については、「オンラインサポートをさらに利用する方法」をご参照ください。

  • Hologres Binlog 消費でサポートされるデータの型は、INTEGER、BIGINT、SMALLINT、TEXT、CHAR(n)、VARCHAR(n)、REAL、DOUBLE PRECISION、BOOLEAN、NUMERIC(38,8)、DATE、TIME、TIMETZ、TIMESTAMP、TIMESTAMPTZ、BYTEA、JSON、SERIAL、OID、int4[]、int8[]、float4[]、float8[]、boolean[]、および text[] のみです。Hologres V1.3.36 以降では、JSONB 型もサポートされます。テーブルにこれらのデータの型以外の列が含まれている場合、消費は失敗します。

    説明

    Hologres V1.3.36 以降では、JSONB データの型の Hologres Binlog データを消費できます。消費前に、以下の Grand Unified Configuration (GUC) パラメーターを有効にする必要があります。

    -- セッションレベルで GUC パラメーターを有効にします。
    SET hg_experimental_enable_binlog_jsonb = ON;
    
    -- データベースレベルで GUC パラメーターを有効にします。
    ALTER database <db_name> SET hg_experimental_enable_binlog_jsonb = ON;
  • 通常の接続と同様に、JDBC を使用して Binlog データを消費する場合、消費される各テーブルの各シャードは 1 つの Walsender 接続を使用します。Walsender 接続は通常の接続とは独立しており、互いに影響しません。

  • Walsender の数にも制限があります。次のコマンドを実行して、単一のフロントノードにおける Walsender の最大数を確認できます。デフォルト値は、V2.2 以降で 600、V2.0 および V2.1 で 1000、V1.1.26 から V2.0 で 100 です。Walsender の総数は、最大数にご利用のインスタンスのフロントノード数を掛けた値になります。インスタンス仕様別のフロントノード数の詳細については、「インスタンス管理」をご参照ください。

    SHOW max_wal_senders;
    説明

    Hologres インスタンスで同時に Binlog データを消費できるテーブル数は、次の数式で計算できます。テーブル数 <= (max_wal_senders (100 または 1000) * フロントノード数) / テーブルの Shard Count

    例:

    • テーブル A およびテーブル B の Shard Count が 20、テーブル C の Shard Count が 30 の場合、それらの Binlog データを同時に消費するために使用される Walsender 数は 20 + 20 + 30 = 70 です。

    • テーブル A およびテーブル B の Shard Count が 20 で、テーブル A の Binlog データを 2 つのタスクが同時に消費している場合、使用される Walsender 数は 20 * 2 + 20 = 60 です。

    • インスタンスに 2 つのフロントノードがある場合、Walsender の最大数は 600 * 2 = 1200 です。Shard Count が 20 のテーブルを最大 60 個まで同時に消費できます。

    JDBC ベースの Binlog 消費による接続数が上限に達すると、エラーメッセージ FATAL: sorry, too many wal senders already が返されます。この問題は、以下の手順でトラブルシューティングできます。

    1. JDBC を使用して Binlog データを消費するタスクを確認し、不要な Binlog 消費を削減します。

    2. テーブルグループおよび Shard Count の設計が適切かどうかを確認します。詳細については、「テーブルグループ設定のベストプラクティス」をご参照ください。

    3. それでも接続数が上限を超える場合は、インスタンスのスケールアウトを検討してください。

  • Hologres V2.0.18 より前は、読み取り専用のセカンダリインスタンスは JDBC を使用した Binlog データの消費をサポートしていませんでした。V2.0.18 以降では、この機能がサポートされますが、消費進捗の記録はサポートされていません。

注意事項

Binlog データの消費方法は、Hologres インスタンスのバージョンおよび Flink エンジンのバージョンによって異なります。詳細は以下のとおりです。

Hologres インスタンスバージョン

Flink エンジンバージョン

説明

V2.1 以降

8.0.5 以降

テーブルに対する読み取り権限があれば、Binlog データを消費できます。レプリケーションスロットを作成する必要はありません。

V2.0

8.0.5 以前

デフォルトで JDBC モードが使用されます。対象テーブルの Binlog データを消費するには、事前に対象テーブルのパブリケーションおよびそのパブリケーションのレプリケーションスロットを作成する必要があります。

V1.3 以前

8.0.5 以前

デフォルトで Holohub モードが使用されます。テーブルに対する読み取り権限があれば、Binlog データを消費できます。

説明

Hologres V2.0 以降では、Binlog 消費に Holohub モードはサポートされなくなりました。Hologres インスタンスを V2.0 以降にアップグレードする前に、Flink をバージョン 8.0.5 にアップグレードすることを推奨します。アップグレード後、Binlog 消費には自動的に JDBC モードが使用されます。

準備: パブリケーションとレプリケーションスロットを作成する

Hologres V2.1 より前は、Binlog データを消費する前に、対象テーブルのパブリケーションおよびそのパブリケーションのレプリケーションスロットを作成する必要があります。

Hologres V2.1 以降では、上記の方法に加えて、対象テーブルに対する読み取り権限のみを持つユーザーも Binlog データを消費できます。ただし、この方法では Hologres 側に記録された Binlog 消費進捗を照会できません。クライアント側で消費進捗を記録することを推奨します。

パブリケーション

はじめに

パブリケーションは、論理レプリケーションを通じてレプリケーション対象となるデータ変更を行うテーブルのグループです。詳細については、Publication をご参照ください。現在、Hologres のパブリケーションは 1 つの物理テーブルにのみバインドでき、そのテーブルに対して Binlog 機能が有効になっている必要があります。

パブリケーションを作成する

  • 構文例

  • CREATE PUBLICATION name FOR TABLE table_name;
  • パラメーター

  • パラメーター

    説明

    name

    パブリケーションのカスタム名。

    table_name

    データベース内のテーブルの名前。

  • -- hg_publication_test_1 という名前のパブリケーションを作成し、テーブル test_message_src を追加します。
    CREATE publication hg_publication_test_1 FOR TABLE test_message_src;

作成済みのパブリケーションの照会

  • 構文例

  • SELECT * FROM pg_publication;
  • クエリ結果

  •         pubname        | pubowner | puballtables | pubinsert | pubupdate | pubdelete | pubtruncate
    -----------------------+----------+--------------+-----------+-----------+-----------+-------------
     hg_publication_test_1 |    16728 | f            | t         | t         | t         | t
    (1 row)

    パラメーター

    説明

    pubname

    パブリケーションの名前。

    pubowner

    パブリケーションの所有者。

    puballtables

    複数の物理テーブルをバインドします。デフォルト値は False です。この機能は現在サポートされていません。

    pubinsert

    INSERT Binlog イベントを公開するかどうかを指定します。デフォルト値は True です。Binlog の種類の詳細については、「Binlog フォーマットおよび原理」をご参照ください。

    pubupdate

    UPDATE Binlog イベントを公開するかどうかを指定します。デフォルト値は True です。

    pubdelete

    DELETE Binlog イベントを公開するかどうかを指定します。デフォルト値は True です。

    pubtruncate

    TRUNCATE Binlog イベントを公開するかどうかを指定します。デフォルト値は True です。

パブリケーションに関連付けられたテーブルの照会

  • 構文例

  • SELECT * FROM pg_publication_tables;
  • クエリ結果

  •         pubname        | schemaname |    tablename
    -----------------------+------------+------------------
     hg_publication_test_1 | public     | test_message_src
    (1 row) 

    パラメーター

    説明

    pubname

    パブリケーションの名前。

    schemaname

    テーブルが属するスキーマの名前。

    tablename

    テーブルの名前。

パブリケーションの削除

  • 構文

  • DROP PUBLICATION name;
  • name は既存のパブリケーションの名前です。

  • DROP PUBLICATION hg_publication_test_1;

レプリケーションスロット

概要

論理レプリケーションのシナリオでは、レプリケーションスロットはデータ変更のストリームを表します。レプリケーションスロットは現在の消費進捗にもバインドされ、再開可能な転送に使用されます。詳細については、PostgreSQL ドキュメントの Replication Slot をご参照ください。レプリケーションスロットは、Binlog 消費のチェックポイント情報を維持するために使用されます。これにより、フェールオーバー後にコンシューマーが最後にコミットされたチェックポイントから復旧できます。

権限

レプリケーションスロットを作成および使用できるのは、スーパーユーザおよび Replication Role を持つユーザーのみです。次の文を実行して、Replication Role を付与または取り消すことができます。

-- スーパーユーザを使用して、一般ユーザにレプリケーションロールを付与します:
ALTER role <user_name> replication;

-- スーパーユーザを使用して、ユーザーからレプリケーションロールを取り消します:
ALTER role <user_name> noreplication;

user_name は Alibaba Cloud アカウント ID または Resource Access Management (RAM) ユーザーです。詳細については、「アカウント概要」をご参照ください。

レプリケーションスロットを作成する

  • CALL hg_create_logical_replication_slot('replication_slot_name', 'hgoutput', 'publication_name');
  • パラメーター

  • パラメーター

    説明

    replication_slot_name

    レプリケーションスロットのカスタム名。

    hgoutput

    Binlog 出力フォーマットのプラグイン。現在、組み込みの hgoutput プラグインのみがサポートされています。

    publication_name

    レプリケーションスロットがバインドされているパブリケーションの名前。

  • -- hg_replication_slot_1 という名前のレプリケーションスロットを作成し、hg_publication_test_1 という名前のパブリケーションにバインドします。
    CALL hg_create_logical_replication_slot('hg_replication_slot_1', 'hgoutput', 'hg_publication_test_1');

作成済みのレプリケーションスロットの照会

  • 構文例

  • SELECT * FROM hologres.hg_replication_slot_properties;
  • クエリ結果

  •        slot_name       | property_key |    property_value
    -----------------------+--------------+-----------------------
     hg_replication_slot_1 | plugin       | hgoutput
     hg_replication_slot_1 | publication  | hg_publication_test_1
     hg_replication_slot_1 | parallelism  | 1
    (3 rows)

    パラメーター

    説明

    slot_name

    レプリケーションスロットの名前。

    property_key

    以下の 3 つのパラメーターが含まれます。

    • plugin:レプリケーションスロットで使用されるプラグイン。現在、pgoutput のみがサポートされています。

    • publication:レプリケーションスロットに対応するパブリケーション。

    • parallelism:レプリケーションスロットを通じてテーブル全体の Binlog データを消費するために必要な同時接続数。その値は、対象テーブルが存在するテーブルグループの Shard Count と等しくなります。

    property_value

    property_key で指定されたパラメーターの値。

レプリケーションスロットを通じてテーブル全体の Binlog データを消費するために必要な同時接続数の照会

Hologres は分散データベースです。テーブルのデータは複数のシャードに分散されています。そのため、JDBC を使用して Binlog データを消費する際には、完全な Binlog データを消費するために複数のクライアント接続を開始する必要があります。次のコマンドを実行して、hg_replication_slot_1 のデータを消費するために必要な同時接続数を照会できます。

  • 構文

  • SELECT hg_get_logical_replication_slot_parallelism('hg_replication_slot_1');
  • クエリ結果

  • hg_get_logical_replication_slot_parallelism  
    ------------------------------------------------
                                            20 

レプリケーションスロットの消費進捗の照会 (Hologres 側に記録された Binlog 消費進捗)

  • 構文例

  • SELECT * FROM hologres.hg_replication_progress;
  • クエリ結果

  •        slot_name       | parallel_index | lsn 
    -----------------------+----------------+-----
     hg_replication_slot_1 |              0 |  66
     hg_replication_slot_1 |              1 | 122
     hg_replication_slot_1 |              2 | 119
    
    (0 rows)

    パラメーター

    説明

    slot_name

    レプリケーションスロットの名前。

    parallel_index

    同時接続の序数。

    lsn

    最後に消費された Binlog レコードのシーケンス番号。

    重要
    • hologres.hg_replication_progress テーブルは、Binlog データを初めて消費した後にのみ作成されます。

    • hologres.hg_replication_progress テーブルには、ユーザーがアクティブにコミットしたコンシューマオフセットが記録されます。Binlog チェックポイント情報をコミットするには、コード内で commit lsn 関数を手動で呼び出す必要があります。このテーブルに記録された内容はユーザーの最後のコミットに完全に依存するため、実際のユーザー側のコンシューマオフセットを正確に反映していない可能性があります。そのため、コンシューマー側で LSN を記録し、消費が停止した際にその値を復旧ポイントとして使用することを推奨します。JDBC および Holo-Client による Binlog 消費の以下のサンプルコードには、LSN をコミットするコードは含まれていません。

    • Binlog チェックポイント情報を手動でコミットするのは、レプリケーションスロットを使用して Binlog データを消費する場合にのみ有効です。テーブル名を使用して Binlog データを消費する場合、チェックポイントの結果は hologres.hg_replication_progress テーブルに記録または保持されません。

レプリケーションスロットの削除

  • 構文例

  • CALL hg_drop_logical_replication_slot('<replication_slot_name>');
  • replication_slot_name は既存のレプリケーションスロットの名前です。

  • CALL hg_drop_logical_replication_slot('hg_replication_slot_1');

JDBC を使用した Binlog データの消費

  1. POM 依存関係の追加

    次の POM 依存関係を追加します。

    説明

    JDBC 42.2.18 以降を使用してください。

            <dependency>
                <groupId>org.postgresql</groupId>
                <artifactId>postgresql</artifactId>
                <version>42.3.8</version>
            </dependency>
            <!-- テーブルスキーマの取得およびバイナリログの解析に使用 -->
            <dependency>
                <groupId>com.alibaba.hologres</groupId>
                <artifactId>holo-client</artifactId>
                <version>2.2.10</version>
    				</dependency>
  2. Java コード例

    import com.alibaba.hologres.client.HoloClient;
    import com.alibaba.hologres.client.HoloConfig;
    import com.alibaba.hologres.client.impl.binlog.HoloBinlogDecoder;
    import com.alibaba.hologres.client.model.Record;
    import com.alibaba.hologres.client.model.TableSchema;
    
    import org.postgresql.PGConnection;
    import org.postgresql.PGProperty;
    import org.postgresql.replication.LogSequenceNumber;
    import org.postgresql.replication.PGReplicationStream;
    
    import java.nio.ByteBuffer;
    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.util.Arrays;
    import java.util.List;
    import java.util.Properties;
    
    public class Test {
    
        public static void main (String[] args) throws Exception {
    
            String username = "";
            String password = "";
            String url = "jdbc:postgresql://Endpoint:Port/db_test";
    
            // JDBC 接続を作成します。
            Properties properties = new Properties();
            PGProperty.USER.set(properties, username);
            PGProperty.PASSWORD.set(properties, password);
            PGProperty.ASSUME_MIN_SERVER_VERSION.set(properties, "9.4");
            // Binlog データを消費するには、次のパラメーターを追加する必要があります。
            PGProperty.REPLICATION.set(properties, "database");
            try (Connection connection = DriverManager.getConnection(url, properties)) {
                // PGReplicationStream を作成し、レプリケーションスロットにバインドして shardId を指定します。
                int shardId = 0;
                PGConnection pgConnection = connection.unwrap(PGConnection.class);
                PGReplicationStream pgReplicationStream = pgConnection.getReplicationAPI().replicationStream()
                        .logical()
            	  // V2.1 以降では、ここで 2 つの方法が利用可能です。
                // 方法 1:withSlotName パラメーターに準備段階で作成したレプリケーションスロットの名前を設定します。withSlotOption("table_name","xxx") を指定する必要はありません。
                // 方法 2:withSlotName パラメーターを指定しません。withSlotOption("table_name","xxx") を指定する必要があります。
                        .withSlotName("slot_name")
                        .withSlotOption("table_name","public.test_messsage_src") // 消費するテーブルの名前。
                        .withSlotOption("parallel_index", shardId)
                        .withSlotOption("batch_size", "1024")
                        .withSlotOption("start_time", "2021-01-01 00:00:00")
                        .withSlotOption("start_lsn","0")
                        .start();
    	
                // Binlog データを直接 Holo-Client で消費しませんが、消費したデータを解析するために Holo-Client が必要です。
                // HoloClient を作成します。
                HoloConfig holoConfig = new HoloConfig();
                holoConfig.setJdbcUrl(url);
                holoConfig.setUsername(username);
                holoConfig.setPassword(password);
                HoloClient client = new HoloClient(holoConfig);
    
                // バイナリデータをデコードする Binlog デコーダーを作成します。スキーマは HoloClient を通じて取得する必要があります。
                TableSchema schema = client.getTableSchema("test_message_src", true);
                HoloBinlogDecoder decoder = new HoloBinlogDecoder(schema);
    
                // 中断後の消費再開用に現在のコンシューマオフセットを記録します。
                Long currentLsn = 0;
                // データを消費します。
                ByteBuffer byteBuffer = pgReplicationStream.readPending();
                while (true) {
                    if (byteBuffer != null) {
                        List<BinlogRecord> records = decoder.decode(shardId, byteBuffer);
                        Long latestLsn = 0L;
                        for (BinlogRecord record : records) {
                            latestLsn = record.getBinlogLsn();
                            // 何らかの処理を行います
                            System.out.println( "lsn: " + latestLsn + ", record: " + Arrays.toString(record.getValues()));
                        }
                        // コンシューマオフセットを保存します。
                        currentLsn = latestLsn;                    
                        pgReplicationStream.forceUpdateStatus();
                    }
                    byteBuffer = pgReplicationStream.readPending();
                }
            }
            // pgReplicationStream.close();
            // connection.close();
        }

    PGReplicationStream を作成する際には、withSlotName を使用してレプリケーションスロットを指定する必要があります。

    • Hologres V2.1 より前のバージョンでは、既存のレプリケーションスロットの名前を指定する必要があります。

    • Hologres V2.1 以降では、withSlotName を指定する必要はありません。Slot Options で対象テーブル名を指定するだけで十分です。

    また、withSlotOption を使用して以下のパラメーターを指定できます。

    パラメーター

    必須

    説明

    table_name

    withSlotName を指定しない場合に必須。それ以外の場合は無効。

    withSlotName を指定しない場合、table_name は消費したい対象テーブルの名前を表します。形式は schema_name.table_name または table_name です。

    parallel_index

    はい

    • PGReplicationStream を使用して Binlog データを消費する場合、1 つの PGReplicationStream は 1 つの Walsender 接続を確立し、対象テーブルの 1 つのシャードの Binlog データを消費します。parallel_index は、指定されたインデックスのシャードからデータを消費することを表します。

    • テーブルに 3 つのシャードがあると仮定します。レプリケーションスロットを通じて Binlog データを消費するために必要な同時接続数は 3 です。最大 3 つの PGReplicationStream を作成し、それぞれの parallel_index パラメーターを 0、1、2 に設定できます。

    • 現在、JDBC を使用した Hologres Binlog データの消費は、Kafka Consumer Group に類似した実装をサポートしていません。複数の PGReplicationStream を自分で作成する必要があります。

    start_time

    いいえ

    Binlog データの消費を開始する時刻を指定します。例:2021-01-01 12:00:00+08。

    start_lsn または start_time を指定しない場合、以下のルールが適用されます。

    • レプリケーションスロットから Binlog データを初めて消費する場合、Kafka の Oldest 設定と同様に、先頭から消費が開始されます。

    • 以前にレプリケーションスロットから Binlog データを消費したことがある場合、最後にコミットされたチェックポイントから消費を再開しようとします。

    • withSlotName を指定せず table_name を指定する場合、このテーブルから以前に Binlog データを消費したことがあるかどうかに関係なく、常に先頭から消費が開始されます。

    start_lsn

    いいえ

    Binlog データの消費を開始する LSN の直後を指定します。このパラメーターは start_time より優先度が高くなります。

    batch_size

    いいえ

    1 回の Binlog 検索における最大バッチサイズ(行数)。デフォルト値は 1024 です。

    説明
    • BinlogRecord はデコーダーが返すレコードタイプです。このデータレコードの Binlog のシステムフィールドを取得するには、以下のメソッドを使用できます。詳細については、「Hologres Binlog データのサブスクライブ」をご参照ください。

      • getBinlogLsn() は Binlog のシーケンス番号を取得します。

      • getBinlogTimestamp() は Binlog のシステムタイムスタンプを取得します。

      • getBinlogEventType() は Binlog のイベントタイプを取得します。

    • Binlog データを消費した後は、フェールオーバー後に消費を再開できるように、チェックポイント情報を手動でコミットする必要があります。

Holo-Client を使用した Binlog データの消費

  • Hologres Binlog データの消費機能は Holo-Client に統合されています。消費したい物理テーブルを指定するだけで、そのすべてのシャードの Binlog データを簡単に消費できます。

  • Holo-Client を使用して Binlog データを消費する場合、必要な接続数は物理テーブルのシャード数(同時スロット数)と同じです。十分な接続数があることを確認してください。

  • Holo-Client を使用して Binlog データを消費する場合は、各シャードのチェックポイントを保存することを推奨します。ネットワーク接続の失敗などの理由で消費が中断された場合、保存されたチェックポイントから再開できます。詳細については、以下のサンプルコードをご参照ください。

  1. POM 依存関係の追加

    次の POM 依存関係を追加します。

    説明

    Holo-Client 2.2.10 以降の使用を推奨します。2.2.9 以前のバージョンにはメモリリークの問題があります。

    <dependency>
      <groupId>com.alibaba.hologres</groupId>
      <artifactId>holo-client</artifactId>
      <version>2.2.10</version>
    </dependency>
  2. Java コード例

    import com.alibaba.hologres.client.BinlogShardGroupReader;
    import com.alibaba.hologres.client.Command;
    import com.alibaba.hologres.client.HoloClient;
    import com.alibaba.hologres.client.HoloConfig;
    import com.alibaba.hologres.client.Subscribe;
    import com.alibaba.hologres.client.exception.HoloClientException;
    import com.alibaba.hologres.client.impl.binlog.BinlogOffset;
    import com.alibaba.hologres.client.model.binlog.BinlogHeartBeatRecord;
    import com.alibaba.hologres.client.model.binlog.BinlogRecord;
    
    import java.util.HashMap;
    import java.util.Map;
    
    public class HoloBinlogExample {
    
        public static BinlogShardGroupReader reader;
    
        public static void main(String[] args) throws Exception {
            String username = "";
            String password = "";
            String url = "jdbc:postgresql://ip:port/database";
            String tableName = "test_message_src";
            String slotName = "hg_replication_slot_1";
    
            // クライアント作成用のパラメーター。
            HoloConfig holoConfig = new HoloConfig();
            holoConfig.setJdbcUrl(url);
            holoConfig.setUsername(username);
            holoConfig.setPassword(password);
            holoConfig.setBinlogReadBatchSize(128);
            holoConfig.setBinlogIgnoreDelete(true);
            holoConfig.setBinlogIgnoreBeforeUpdate(true);
            holoConfig.setBinlogHeartBeatIntervalMs(5000L);
            HoloClient client = new HoloClient(holoConfig);
    
            // テーブルのシャード数を取得します。
            int shardCount = Command.getShardCount(client, client.getTableSchema(tableName));
    
            // 各シャードの消費進捗を保存するマップ。初期値は 0 です。
            Map<Integer, Long> shardIdToLsn = new HashMap<>(shardCount);
            for (int i = 0; i < shardCount; i++) {
                shardIdToLsn.put(i, 0L);
            }
    
            // Binlog データの消費リクエスト。V2.1 より前では、tableName および slotName は必須パラメーターです。V2.1 以降では、tableName のみを渡せば十分です(これは以前使用されていた固定スロット名 "hg_table_name_slot" に相当します)。
            // Subscribe には StartTimeBuilder と OffsetBuilder の 2 種類があります。この例では前者を使用しています。
            Subscribe subscribe = Subscribe.newStartTimeBuilder(tableName, slotName)
                    .setBinlogReadStartTime("2021-01-01 12:00:00")
                    .build();
            // binlog リーダーを作成します。
            reader = client.binlogSubscribe(subscribe);
    
            BinlogRecord record;
    
            int retryCount = 0;
            long count = 0;
            while(true) {
                try {
                    if (reader.isCanceled()) {
                        // 保存されたチェックポイントに基づいてリーダーを再作成します。
                        reader = client.binlogSubscribe(subscribe);
                    }
                    while ((record = reader.getBinlogRecord()) != null) {
                        // 最新まで消費しました。
                        if (record instanceof BinlogHeartBeatRecord) {
                            // 何らかの処理を行います
                            continue;
                        }
        
                        // 読み取った Binlog レコードを処理します。ここでは単に表示します。
                        System.out.println(record);
        
                        // 処理後、例外が発生した場合にこのポイントから復旧できるようにチェックポイントを保存します。
                        shardIdToLsn.put(record.getShardId(), record.getBinlogLsn());
                        count++;
                        
                        // 正常に読み取れたため、リトライ回数をリセットします。
                        retryCount = 0;
                    }
                } catch (HoloClientException e) {
                    if (++retryCount > 10) {
                        throw new RuntimeException(e);
                    }
                    // 例外が発生した場合は WARN レベルのログを出力することを推奨します。
                    System.out.println(String.format("binlog read failed because %s and retry %s times", e.getMessage(), retryCount));
        
                    // リトライ中に一定時間待機します。
                    Thread.sleep(5000L * retryCount);
        
                    // OffsetBuilder を使用して Subscribe を作成し、各シャードの開始コンシューマオフセットを指定します。
                    Subscribe.OffsetBuilder subscribeBuilder = Subscribe.newOffsetBuilder(tableName, slotName);
                    for (int i = 0; i < shardCount; i++) {
                        // BinlogOffset は setSequence で LSN を、setTimestamp で時刻を指定します。両方指定した場合、LSN がタイムスタンプより優先されます。
                        // shardIdToLsn マップに保存された消費進捗に基づいて復旧します。
                        subscribeBuilder.addShardStartOffset(i, new BinlogOffset().setSequence(shardIdToLsn.get(i)));
                    }
                    subscribe = subscribeBuilder.build();
                    // リーダーを閉じます。
                    reader.cancel();
                }
            }
        }
    }

    Holo-Client を使用して Binlog データを消費する場合、以下のパラメーターを指定できます。

    パラメーター

    必須

    デフォルト値

    説明

    binlogReadBatchSize

    いいえ

    1024

    各シャードから 1 回の Binlog 検索で取得する最大バッチサイズ(行数)。

    binlogHeartBeatIntervalMs

    いいえ

    -1

    binlogRead が BinlogHeartBeatRecord を送信する間隔。-1 は送信しないことを示します。

    新しい Binlog データがない場合、指定された間隔で BinlogHeartBeatRecord が送信されます。このレコードのタイムスタンプは、このシャードのデータがこの時刻まで完全に消費されたことを示します。

    binlogIgnoreDelete

    いいえ

    false

    DELETE Binlog イベントを無視するかどうかを指定します。

    binlogIgnoreBeforeUpdate

    いいえ

    false

    BeforeUpdate Binlog イベントを無視するかどうかを指定します。

よくある質問

Binlog データを消費して消費進捗をコミットした後、hologres.hg_replication_progress テーブルが存在しない、または消費進捗データが含まれていないことが判明した場合、考えられる理由は以下のとおりです。

  • レプリケーションスロットを経由せずに消費が行われており(つまり、withSlotName パラメーターが指定されていない)、このシナリオでは消費進捗の記録がサポートされていません。

  • 読み取り専用のセカンダリインスタンスを使用しており、このデータベースで Binlog データを消費するのは初めてです。この場合、hologres.hg_replication_progress テーブルの作成が失敗します。この問題は Hologres V2.0.18 以降で修正されており、セカンダリインスタンスでも Binlog データを正常に消費できます。Hologres V2.0.18 より前のバージョンでは、まずプライマリインスタンスを使用して一度 Binlog データを消費する必要があります。その後、セカンダリインスタンスで Binlog データを正常に消費できます。

  • 上記の理由に該当しない場合、Hologres DingTalk グループに参加してオンコール担当者に連絡してください。詳細については、「オンラインサポートをさらに利用する方法」をご参照ください。