全部产品
Search
文档中心

Hologres:Mengonsumsi data Hologres Binlog menggunakan JDBC

更新时间:Feb 04, 2026

Topik ini menjelaskan cara mengonsumsi data Hologres Binlog menggunakan Java Database Connectivity (JDBC) dan Holo-Client.

Prasyarat

  • Aktifkan dan konfigurasikan Hologres Binlog. Untuk informasi selengkapnya, lihat Berlangganan data Hologres Binlog.

  • Buat ekstensi hg_binlog.

    • Untuk versi Hologres sebelum V2.0, superuser instans harus menjalankan pernyataan berikut untuk membuat ekstensi. Ekstensi ini berlaku untuk seluruh database. Anda hanya perlu menjalankan pernyataan ini sekali untuk satu database. Jika Anda membuat database baru, Anda harus menjalankan pernyataan tersebut lagi.

      --Create the extension.
      CREATE extension hg_binlog;
      
      --Delete the extension.
      DROP extension hg_binlog;
      Penting

      Jangan jalankan perintah DROP EXTENSION <extension_name> CASCADE; untuk menguninstall ekstensi secara cascade. Perintah CASCADE tidak hanya menghapus ekstensi yang ditentukan, tetapi juga purge data ekstensi seperti data PostGIS, RoaringBitmap, Proxima, Binlog, dan BSI, serta objek yang bergantung pada ekstensi tersebut, termasuk metadata, tabel, view, dan data server.

    • Mulai dari Hologres V2.0, Anda dapat menggunakan fitur ini tanpa perlu membuat ekstensi secara manual.

  • Mulai dari Hologres V2.1, Anda dapat mengonsumsi data Binlog dengan salah satu dari dua cara berikut.

    • Didukung oleh semua versi: Selesaikan Persiapan, yang mencakup pembuatan publication untuk tabel target dan pembuatan replication slot untuk publication tersebut. Setelah itu, Anda dapat mengonsumsi data Binlog dari tabel target.

      Catatan

      Metode ini memerlukan pengguna memiliki salah satu izin berikut:

      • Izin superuser pada instans

      • Izin Pemilik pada tabel target, izin CREATE DATABASE, dan izin Replication Role pada instans.

    • Hanya didukung oleh Hologres V2.1 dan versi setelahnya: Berikan izin read pada pengguna untuk tabel target. Pengguna tersebut kemudian dapat mengonsumsi data Binlog dari tabel target.

Batasan

  • Mengonsumsi data Hologres Binlog menggunakan JDBC hanya didukung pada Hologres V1.1 dan versi setelahnya. Jika instans Anda menjalankan versi sebelum V1.1, lihat Error umum yang terjadi saat mempersiapkan upgrade atau bergabunglah ke grup DingTalk Hologres untuk memberikan masukan. Untuk informasi selengkapnya, lihat Bagaimana cara mendapatkan dukungan online lebih lanjut?.

  • Hanya tipe data berikut yang didukung untuk konsumsi Hologres Binlog: INTEGER, BIGINT, SMALLINT, TEXT, CHAR(n), VARCHAR(n), REAL, DOUBLE PRECISION, BOOLEAN, NUMERIC(38,8), DATE, TIME, TIMETZ, TIMESTAMP, TIMESTAMPTZ, BYTEA, JSON, SERIAL, OID, int4[], int8[], float4[], float8[], boolean[], dan text[]. Mulai dari Hologres V1.3.36, tipe JSONB juga didukung. Jika sebuah tabel berisi kolom dengan tipe data lain, konsumsi akan gagal.

    Catatan

    Mulai dari Hologres V1.3.36, Anda dapat mengonsumsi data Hologres Binlog dengan tipe data JSONB. Sebelum konsumsi, Anda harus mengaktifkan parameter Grand Unified Configuration (GUC) berikut:

    -- Enable the GUC parameter at the session level.
    SET hg_experimental_enable_binlog_jsonb = ON;
    
    -- Enable the GUC parameter at the database level.
    ALTER database <db_name> SET hg_experimental_enable_binlog_jsonb = ON;
  • Seperti koneksi biasa, saat Anda menggunakan JDBC untuk mengonsumsi data Binlog, setiap shard dari setiap tabel yang dikonsumsi menggunakan satu koneksi Walsender. Koneksi Walsender bersifat independen dari koneksi biasa dan tidak saling memengaruhi.

  • Jumlah Walsender juga dibatasi. Anda dapat menjalankan perintah berikut untuk melihat jumlah maksimum Walsender untuk satu frontend node. Nilai default adalah 600 untuk V2.2 dan versi setelahnya, 1.000 untuk V2.0 dan V2.1, serta 100 untuk versi dari V1.1.26 hingga V2.0. Jumlah total Walsender adalah jumlah maksimum dikalikan dengan jumlah frontend node pada instans Anda. Untuk informasi lebih lanjut tentang jumlah frontend node untuk instans dengan spesifikasi berbeda, lihat Manajemen instans.

    SHOW max_wal_senders;
    Catatan

    Anda dapat menghitung jumlah tabel yang dapat mengonsumsi data Binlog secara bersamaan dalam satu instans Hologres menggunakan rumus berikut: Number of tables <= (max_wal_senders (100 or 1000) * Number of frontend nodes) / Table shard count .

    Contoh:

    • Jika Tabel A dan Tabel B masing-masing memiliki jumlah shard 20, dan Tabel C memiliki jumlah shard 30, jumlah Walsender yang digunakan untuk mengonsumsi data Binlog mereka secara bersamaan adalah 20 + 20 + 30 = 70.

    • Jika Tabel A dan Tabel B masing-masing memiliki jumlah shard 20, dan dua pekerjaan mengonsumsi data Binlog dari Tabel A secara bersamaan, jumlah Walsender yang digunakan adalah 20 * 2 + 20 = 60.

    • Jika sebuah instans memiliki dua frontend node, jumlah maksimum Walsender adalah 600 * 2 = 1.200. Instans tersebut dapat mendukung konsumsi data Binlog secara bersamaan hingga 60 tabel, masing-masing dengan jumlah shard 20.

    Jika jumlah koneksi untuk konsumsi Binlog berbasis JDBC mencapai batas atas, pesan error FATAL: sorry, too many wal senders already akan dikembalikan. Anda dapat melakukan troubleshooting sebagai berikut:

    1. Periksa pekerjaan yang menggunakan JDBC untuk mengonsumsi data Binlog dan kurangi konsumsi Binlog yang tidak diperlukan.

    2. Periksa apakah kelompok tabel dan jumlah shard dirancang secara wajar. Untuk informasi selengkapnya, lihat Praktik terbaik untuk mengatur kelompok tabel.

    3. Jika jumlah koneksi masih melebihi batas, pertimbangkan untuk melakukan scaling out pada instans.

  • Sebelum Hologres V2.0.18, instans secondary read-only tidak mendukung konsumsi data Binlog menggunakan JDBC. Mulai dari V2.0.18, instans secondary read-only mendukung fitur ini tetapi tidak mendukung pencatatan progres konsumsi.

Catatan

Metode yang didukung untuk mengonsumsi data Binlog bervariasi berdasarkan versi instans Hologres dan versi mesin Flink. Rinciannya sebagai berikut:

Versi instance Hologres

Versi mesin Flink

Deskripsi

V2.1 dan yang lebih baru

8.0.5 dan yang lebih baru

Anda dapat mengonsumsi data Binlog jika memiliki izin read pada tabel. Anda tidak perlu membuat replication slot.

V2.0

8.0.5 dan yang lebih lama

Mode JDBC digunakan secara default. Anda harus membuat publication untuk tabel target dan replication slot untuk publication tersebut sebelum dapat mengonsumsi data Binlog dari tabel target.

V1.3 dan yang lebih lama

8.0.5 dan yang lebih lama

Mode Holohub digunakan secara default. Anda dapat mengonsumsi data Binlog jika memiliki izin read pada tabel.

Catatan

Mulai dari Hologres V2.0, mode Holohub tidak lagi didukung untuk konsumsi Binlog. Sebelum melakukan upgrade instans Hologres ke V2.0 atau versi setelahnya, kami menyarankan Anda terlebih dahulu melakukan upgrade Flink ke versi 8.0.5. Setelah upgrade, mode JDBC akan digunakan secara otomatis untuk konsumsi Binlog.

Persiapan: Buat publikasi dan slot replikasi

Sebelum Hologres V2.1, Anda harus membuat publication untuk tabel target dan replication slot untuk publication tersebut sebelum dapat mengonsumsi data Binlog.

Mulai dari Hologres V2.1, selain metode di atas, pengguna yang hanya memiliki izin read pada tabel target juga dapat mengonsumsi data Binlog. Metode ini tidak memungkinkan Anda untuk mengkueri progres konsumsi Binlog yang dicatat di sisi Hologres. Kami menyarankan Anda mencatat progres konsumsi di sisi client.

Publikasi

Pendahuluan

Publication pada dasarnya adalah kelompok tabel yang perubahan datanya dimaksudkan untuk direplikasi melalui replikasi logis. Untuk informasi selengkapnya, lihat Publication. Saat ini, sebuah publication Hologres hanya dapat diikat ke satu tabel fisik, dan fitur Binlog harus diaktifkan untuk tabel tersebut.

Buat publikasi

  • Contoh sintaksis

  • CREATE PUBLICATION name FOR TABLE table_name;
  • Parameter

  • Parameter

    Deskripsi

    name

    Nama publikasi kustom.

    table_name

    Nama tabel dalam database.

  • Contoh

  • -- Create a publication named hg_publication_test_1 and add the table test_message_src to it.
    CREATE publication hg_publication_test_1 FOR TABLE test_message_src;

Kueri publication yang telah dibuat

  • Contoh sintaksis

  • SELECT * FROM pg_publication;
  • Hasil Kueri

  •         pubname        | pubowner | puballtables | pubinsert | pubupdate | pubdelete | pubtruncate
    -----------------------+----------+--------------+-----------+-----------+-----------+-------------
     hg_publication_test_1 |    16728 | f            | t         | t         | t         | t
    (1 row)

    Parameter

    Deskripsi

    pubname

    Nama publikasi.

    pubowner

    Pemilik publikasi.

    puballtables

    Mengikat beberapa tabel fisik. Nilai default adalah False. Fitur ini saat ini tidak didukung.

    pubinsert

    Menentukan apakah akan mempublikasikan event Binlog INSERT. Nilai default adalah True. Untuk informasi selengkapnya tentang tipe Binlog, lihat Format dan prinsip Binlog.

    pubupdate

    Menentukan apakah akan mempublikasikan event Binlog UPDATE. Nilai default adalah True.

    pubdelete

    Menentukan apakah akan mempublikasikan event Binlog DELETE. Nilai default adalah True.

    pubtruncate

    Menentukan apakah akan mempublikasikan event Binlog TRUNCATE. Nilai default adalah True.

Kueri tabel yang terkait dengan publication

  • Contoh sintaksis

  • SELECT * FROM pg_publication_tables;
  • Hasil Kueri

  •         pubname        | schemaname |    tablename
    -----------------------+------------+------------------
     hg_publication_test_1 | public     | test_message_src
    (1 row) 

    Parameter

    Deskripsi

    pubname

    Nama publikasi.

    schemaname

    Nama skema tempat tabel milik.

    tablename

    Nama tabel.

Hapus publication

  • Sintaksis

  • DROP PUBLICATION name;
  • name adalah nama publication yang sudah ada.

  • Contoh

  • DROP PUBLICATION hg_publication_test_1;

Replication Slot

Pendahuluan

Dalam skenario replikasi logis, replication slot merepresentasikan aliran perubahan data. Replication slot juga diikat ke progres konsumsi saat ini dan digunakan untuk transmisi yang dapat dilanjutkan. Untuk informasi selengkapnya, lihat dokumentasi PostgreSQL Replication Slot. Replication slot digunakan untuk mempertahankan informasi checkpoint untuk konsumsi Binlog. Hal ini memungkinkan konsumen untuk pulih dari checkpoint terakhir yang telah dikomit setelah failover.

Izin

Hanya superuser dan pengguna dengan Replication Role yang memiliki izin untuk membuat dan menggunakan replication slot. Anda dapat menjalankan pernyataan berikut untuk memberikan atau mencabut Replication Role.

-- Use a superuser to grant the replication role to a regular user:
ALTER role <user_name> replication;

-- Use a superuser to revoke the replication role from a user:
ALTER role <user_name> noreplication;

user_name adalah ID akun Alibaba Cloud atau pengguna Resource Access Management (RAM). Untuk informasi selengkapnya, lihat Ikhtisar akun.

Buat slot replikasi

  • Contoh

  • CALL hg_create_logical_replication_slot('replication_slot_name', 'hgoutput', 'publication_name');
  • Parameter

  • Parameter

    Deskripsi

    replication_slot_name

    Nama kustom untuk replication slot.

    hgoutput

    Plugin untuk format output Binlog. Saat ini, hanya plugin bawaan hgoutput yang didukung.

    publication_name

    Nama publikasi yang terikat pada slot replikasi.

  • Contoh

  • -- Create a replication slot named hg_replication_slot_1 and bind it to the publication named hg_publication_test_1.
    CALL hg_create_logical_replication_slot('hg_replication_slot_1', 'hgoutput', 'hg_publication_test_1');

Kueri replication slot yang telah dibuat

  • Contoh sintaksis

  • SELECT * FROM hologres.hg_replication_slot_properties;
  • Hasil Kueri

  •        slot_name       | property_key |    property_value
    -----------------------+--------------+-----------------------
     hg_replication_slot_1 | plugin       | hgoutput
     hg_replication_slot_1 | publication  | hg_publication_test_1
     hg_replication_slot_1 | parallelism  | 1
    (3 rows)

    Parameter

    Deskripsi

    slot_name

    Nama slot replikasi.

    property_key

    Mencakup tiga parameter berikut.

    • plugin: Plugin yang digunakan oleh replication slot. Saat ini, hanya pgoutput yang didukung.

    • publication: Publication yang sesuai dengan replication slot.

    • parallelism: Jumlah koneksi bersamaan yang diperlukan untuk mengonsumsi data Binlog seluruh tabel melalui replication slot. Nilainya sama dengan jumlah shard dari kelompok tabel tempat tabel target berada.

    property_value

    Nilai parameter yang ditentukan oleh property_key.

Kueri jumlah koneksi bersamaan yang diperlukan untuk mengonsumsi data Binlog seluruh tabel melalui replication slot

Hologres adalah database terdistribusi. Data sebuah tabel didistribusikan di beberapa shard. Oleh karena itu, saat Anda menggunakan JDBC untuk mengonsumsi data Binlog, Anda harus memulai beberapa koneksi client untuk mengonsumsi data Binlog lengkap. Anda dapat menjalankan perintah berikut untuk mengkueri jumlah koneksi bersamaan yang diperlukan untuk mengonsumsi data dari hg_replication_slot_1.

  • Sintaksis

  • SELECT hg_get_logical_replication_slot_parallelism('hg_replication_slot_1');
  • Hasil Kueri

  • hg_get_logical_replication_slot_parallelism  
    ------------------------------------------------
                                            20 

Kueri progres konsumsi replication slot (progres konsumsi Binlog yang dicatat di sisi Hologres)

  • Contoh sintaksis

  • SELECT * FROM hologres.hg_replication_progress;
  • Hasil Kueri

  •        slot_name       | parallel_index | lsn 
    -----------------------+----------------+-----
     hg_replication_slot_1 |              0 |  66
     hg_replication_slot_1 |              1 | 122
     hg_replication_slot_1 |              2 | 119
    
    (0 rows)

    Parameter

    Deskripsi

    slot_name

    Nama slot replikasi.

    parallel_index

    Nomor urut koneksi bersamaan.

    lsn

    Nomor urut catatan Binlog terakhir yang dikonsumsi.

    Penting
    • Tabel hologres.hg_replication_progress hanya dibuat setelah data Binlog dikonsumsi untuk pertama kalinya.

    • Tabel hologres.hg_replication_progress mencatat offset konsumen yang dikomit secara aktif oleh pengguna. Anda harus memanggil fungsi commit lsn secara manual dalam kode Anda untuk mengirimkan informasi checkpoint Binlog. Karena konten yang dicatat dalam tabel ini sepenuhnya bergantung pada komit terakhir pengguna, nilainya mungkin tidak secara akurat mencerminkan offset konsumen aktual di sisi pengguna. Oleh karena itu, kami menyarankan Anda mencatat LSN di sisi konsumen dan menggunakannya sebagai titik pemulihan saat konsumsi berhenti. Contoh kode berikut untuk konsumsi Binlog JDBC dan Holo-Client tidak menyertakan kode untuk mengkomit LSN.

    • Mengkomit informasi checkpoint Binlog secara manual hanya efektif saat Anda mengonsumsi data Binlog menggunakan replication slot. Saat Anda mengonsumsi data Binlog berdasarkan nama tabel, hasil checkpoint tidak dicatat atau dipertahankan dalam tabel hologres.hg_replication_progress.

Hapus replication slot

  • Contoh sintaksis

  • CALL hg_drop_logical_replication_slot('<replication_slot_name>');
  • replication_slot_name adalah nama replication slot yang sudah ada.

  • Contoh

  • CALL hg_drop_logical_replication_slot('hg_replication_slot_1');

Mengonsumsi data Binlog menggunakan JDBC

  1. Tambahkan dependensi POM

    Tambahkan dependensi POM berikut.

    Catatan

    Gunakan JDBC 42.2.18 atau versi setelahnya.

            <dependency>
                <groupId>org.postgresql</groupId>
                <artifactId>postgresql</artifactId>
                <version>42.3.8</version>
            </dependency>
            <!-- Used to obtain the table schema and parse the binary log -->
            <dependency>
                <groupId>com.alibaba.hologres</groupId>
                <artifactId>holo-client</artifactId>
                <version>2.2.10</version>
    				</dependency>
  2. Contoh kode Java

    import com.alibaba.hologres.client.HoloClient;
    import com.alibaba.hologres.client.HoloConfig;
    import com.alibaba.hologres.client.impl.binlog.HoloBinlogDecoder;
    import com.alibaba.hologres.client.model.Record;
    import com.alibaba.hologres.client.model.TableSchema;
    
    import org.postgresql.PGConnection;
    import org.postgresql.PGProperty;
    import org.postgresql.replication.LogSequenceNumber;
    import org.postgresql.replication.PGReplicationStream;
    
    import java.nio.ByteBuffer;
    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.util.Arrays;
    import java.util.List;
    import java.util.Properties;
    
    public class Test {
    
        public static void main (String[] args) throws Exception {
    
            String username = "";
            String password = "";
            String url = "jdbc:postgresql://Endpoint:Port/db_test";
    
            // Create a JDBC connection.
            Properties properties = new Properties();
            PGProperty.USER.set(properties, username);
            PGProperty.PASSWORD.set(properties, password);
            PGProperty.ASSUME_MIN_SERVER_VERSION.set(properties, "9.4");
            // To consume Binlog data, you must add the following parameter.
            PGProperty.REPLICATION.set(properties, "database");
            try (Connection connection = DriverManager.getConnection(url, properties)) {
                // Create a PGReplicationStream, bind it to a replication slot, and specify the shardId.
                int shardId = 0;
                PGConnection pgConnection = connection.unwrap(PGConnection.class);
                PGReplicationStream pgReplicationStream = pgConnection.getReplicationAPI().replicationStream()
                        .logical()
            	  // Starting from V2.1, two methods are available here.
                // Method 1: Set the withSlotName parameter to the name of the replication slot created in the preparation phase. You do not need to specify withSlotOption("table_name","xxx"). 
                // Method 2: Do not specify the withSlotName parameter. You must specify withSlotOption("table_name","xxx"). 
                        .withSlotName("slot_name")
                        .withSlotOption("table_name","public.test_messsage_src") // The name of the table to consume.
                        .withSlotOption("parallel_index", shardId)
                        .withSlotOption("batch_size", "1024")
                        .withSlotOption("start_time", "2021-01-01 00:00:00")
                        .withSlotOption("start_lsn","0")
                        .start();
    	
                // Although we do not directly use Holo-Client to consume Binlog data, we need Holo-Client to parse the consumed data.        
                // Create a HoloClient.
                HoloConfig holoConfig = new HoloConfig();
                holoConfig.setJdbcUrl(url);
                holoConfig.setUsername(username);
                holoConfig.setPassword(password);
                HoloClient client = new HoloClient(holoConfig);
    
                // Create a Binlog decoder to decode binary data. The schema must be obtained through HoloClient.
                TableSchema schema = client.getTableSchema("test_message_src", true);
                HoloBinlogDecoder decoder = new HoloBinlogDecoder(schema);
    
                // Used to record the current consumer offset for resuming consumption after an interruption.
                Long currentLsn = 0;
                // Consume data.
                ByteBuffer byteBuffer = pgReplicationStream.readPending();
                while (true) {
                    if (byteBuffer != null) {
                        List<BinlogRecord> records = decoder.decode(shardId, byteBuffer);
                        Long latestLsn = 0L;
                        for (BinlogRecord record : records) {
                            latestLsn = record.getBinlogLsn();
                            // Do Something
                            System.out.println( "lsn: " + latestLsn + ", record: " + Arrays.toString(record.getValues()));
                        }
                        // Save the consumer offset.
                        currentLsn = latestLsn;                    
                        pgReplicationStream.forceUpdateStatus();
                    }
                    byteBuffer = pgReplicationStream.readPending();
                }
            }
            // pgReplicationStream.close();
            // connection.close();
        }

    Saat membuat PGReplicationStream, Anda harus menentukan replication slot menggunakan withSlotName:

    • Untuk versi Hologres sebelum V2.1, Anda harus menentukan nama replication slot yang sudah ada.

    • Mulai dari Hologres V2.1, Anda tidak perlu menentukan withSlotName. Anda hanya perlu menentukan nama tabel target dalam Slot Options.

    Selain itu, Anda dapat menentukan parameter berikut menggunakan withSlotOption.

    Parameter

    Diperlukan

    Deskripsi

    table_name

    Wajib saat withSlotName tidak ditentukan. Jika tidak, parameter ini tidak valid.

    Saat withSlotName tidak ditentukan, table_name merepresentasikan nama tabel target yang ingin Anda konsumsi. Formatnya adalah schema_name.table_name atau table_name.

    parallel_index

    Ya

    • Saat Anda menggunakan PGReplicationStream untuk mengonsumsi data Binlog, satu PGReplicationStream membuat satu koneksi Walsender untuk mengonsumsi data Binlog dari satu shard tabel target. parallel_index merepresentasikan konsumsi data dari shard dengan indeks tertentu.

    • Asumsikan sebuah tabel memiliki tiga shard. Jumlah koneksi bersamaan yang diperlukan untuk mengonsumsi data Binlog melalui replication slot adalah 3. Anda dapat membuat hingga tiga PGReplicationStream, dan mengatur parameter parallel_index masing-masing menjadi 0, 1, dan 2.

    • Saat ini, konsumsi data Hologres Binlog berbasis JDBC tidak mendukung implementasi serupa Kafka Consumer Group. Anda perlu membuat beberapa PGReplicationStream sendiri.

    start_time

    Tidak

    Menentukan waktu mulai mengonsumsi data Binlog. Contoh format: 2021-01-01 12:00:00+08.

    Jika Anda tidak menentukan start_lsn atau start_time, aturan berikut berlaku:

    • Jika Anda mengonsumsi data Binlog dari replication slot untuk pertama kalinya, konsumsi dimulai dari awal, mirip dengan pengaturan Oldest di Kafka.

    • Jika Anda sebelumnya telah mengonsumsi data Binlog dari replication slot tersebut, konsumsi mencoba dilanjutkan dari checkpoint terakhir yang telah dikomit.

    • Jika Anda tidak menentukan withSlotName tetapi menentukan table_name, konsumsi dimulai dari awal, terlepas dari apakah Anda sebelumnya pernah mengonsumsi data Binlog dari tabel ini atau tidak.

    start_lsn

    Tidak

    Menentukan LSN setelah mana konsumsi data Binlog dimulai. Parameter ini memiliki prioritas lebih tinggi daripada start_time.

    batch_size

    Tidak

    Ukuran batch maksimum untuk satu pengambilan Binlog, dalam baris. Nilai default adalah 1024.

    Catatan
    • BinlogRecord adalah tipe catatan yang dikembalikan oleh decoder. Anda dapat menggunakan metode berikut untuk mengambil bidang sistem Binlog untuk catatan data ini. Untuk informasi selengkapnya, lihat Berlangganan data Hologres Binlog.

      • getBinlogLsn() mengambil nomor urut Binlog.

      • getBinlogTimestamp() mengambil timestamp sistem Binlog.

      • getBinlogEventType() mengambil tipe event Binlog.

    • Setelah mengonsumsi data Binlog, Anda harus mengkomit informasi checkpoint secara manual untuk memastikan konsumsi dapat dilanjutkan setelah failover.

Mengonsumsi data Binlog menggunakan Holo-Client

  • Fitur untuk mengonsumsi data Hologres Binlog telah diintegrasikan ke dalam Holo-Client. Anda dapat menentukan tabel fisik yang ingin dikonsumsi untuk dengan mudah mengonsumsi data Binlog dari semua shard-nya.

  • Saat menggunakan Holo-Client untuk mengonsumsi data Binlog, jumlah koneksi yang diperlukan sama dengan jumlah shard dalam tabel fisik (jumlah slot bersamaan). Pastikan Anda memiliki cukup koneksi.

  • Saat menggunakan Holo-Client untuk mengonsumsi data Binlog, kami menyarankan Anda menyimpan checkpoint untuk setiap shard. Jika konsumsi dihentikan karena kegagalan konektivitas jaringan atau alasan lain, Anda dapat melanjutkan dari checkpoint yang disimpan. Untuk informasi selengkapnya, lihat contoh kode di bawah.

  1. Tambahkan dependensi POM

    Tambahkan dependensi POM berikut.

    Catatan

    Kami menyarankan Anda menggunakan Holo-Client 2.2.10 atau versi setelahnya. Versi 2.2.9 dan sebelumnya memiliki masalah memory leak.

    <dependency>
      <groupId>com.alibaba.hologres</groupId>
      <artifactId>holo-client</artifactId>
      <version>2.2.10</version>
    </dependency>
  2. Contoh kode Java

    import com.alibaba.hologres.client.BinlogShardGroupReader;
    import com.alibaba.hologres.client.Command;
    import com.alibaba.hologres.client.HoloClient;
    import com.alibaba.hologres.client.HoloConfig;
    import com.alibaba.hologres.client.Subscribe;
    import com.alibaba.hologres.client.exception.HoloClientException;
    import com.alibaba.hologres.client.impl.binlog.BinlogOffset;
    import com.alibaba.hologres.client.model.binlog.BinlogHeartBeatRecord;
    import com.alibaba.hologres.client.model.binlog.BinlogRecord;
    
    import java.util.HashMap;
    import java.util.Map;
    
    public class HoloBinlogExample {
    
        public static BinlogShardGroupReader reader;
    
        public static void main(String[] args) throws Exception {
            String username = "";
            String password = "";
            String url = "jdbc:postgresql://ip:port/database";
            String tableName = "test_message_src";
            String slotName = "hg_replication_slot_1";
    
            // Parameters for creating the client.
            HoloConfig holoConfig = new HoloConfig();
            holoConfig.setJdbcUrl(url);
            holoConfig.setUsername(username);
            holoConfig.setPassword(password);
            holoConfig.setBinlogReadBatchSize(128);
            holoConfig.setBinlogIgnoreDelete(true);
            holoConfig.setBinlogIgnoreBeforeUpdate(true);
            holoConfig.setBinlogHeartBeatIntervalMs(5000L);
            HoloClient client = new HoloClient(holoConfig);
    
            // Get the shard count of the table.
            int shardCount = Command.getShardCount(client, client.getTableSchema(tableName));
    
            // Use a map to save the consumption progress of each shard, initialized to 0.
            Map<Integer, Long> shardIdToLsn = new HashMap<>(shardCount);
            for (int i = 0; i < shardCount; i++) {
                shardIdToLsn.put(i, 0L);
            }
    
            // A request to consume Binlog data. Before V2.1, tableName and slotName are required parameters. Starting from V2.1, you only need to pass in tableName (equivalent to the fixed slotName "hg_table_name_slot" used earlier).
            // Subscribe has two types: StartTimeBuilder and OffsetBuilder. This example uses the former.
            Subscribe subscribe = Subscribe.newStartTimeBuilder(tableName, slotName)
                    .setBinlogReadStartTime("2021-01-01 12:00:00")
                    .build();
            // Create a binlog reader.
            reader = client.binlogSubscribe(subscribe);
    
            BinlogRecord record;
    
            int retryCount = 0;
            long count = 0;
            while(true) {
                try {
                    if (reader.isCanceled()) {
                        // Re-create the reader based on the saved checkpoint.
                        reader = client.binlogSubscribe(subscribe);
                    }
                    while ((record = reader.getBinlogRecord()) != null) {
                        // Consumed to the latest.
                        if (record instanceof BinlogHeartBeatRecord) {
                            // do something
                            continue;
                        }
        
                        // Process the read Binlog record. Here, we just print it.
                        System.out.println(record);
        
                        // After processing, save the checkpoint to recover from this point in case of an exception.
                        shardIdToLsn.put(record.getShardId(), record.getBinlogLsn());
                        count++;
                        
                        // Read successfully. Reset the retry count.
                        retryCount = 0;
                    }
                } catch (HoloClientException e) {
                    if (++retryCount > 10) {
                        throw new RuntimeException(e);
                    }
                    // We recommend that you print a WARN level log when an exception occurs.
                    System.out.println(String.format("binlog read failed because %s and retry %s times", e.getMessage(), retryCount));
        
                    // Wait for a period of time during retry.
                    Thread.sleep(5000L * retryCount);
        
                    // Use OffsetBuilder to create a Subscribe to specify the starting consumer offset for each shard.
                    Subscribe.OffsetBuilder subscribeBuilder = Subscribe.newOffsetBuilder(tableName, slotName);
                    for (int i = 0; i < shardCount; i++) {
                        // BinlogOffset uses setSequence to specify the LSN and setTimestamp to specify the time. If both are specified, the LSN has a higher priority than the timestamp.
                        // Recover based on the consumption progress saved in the shardIdToLsn map.
                        subscribeBuilder.addShardStartOffset(i, new BinlogOffset().setSequence(shardIdToLsn.get(i)));
                    }
                    subscribe = subscribeBuilder.build();
                    // Close the reader.
                    reader.cancel();
                }
            }
        }
    }

    Saat menggunakan Holo-Client untuk mengonsumsi data Binlog, Anda dapat menentukan parameter berikut.

    Parameter

    Diperlukan

    Nilai default

    Deskripsi

    binlogReadBatchSize

    Tidak

    1024

    Ukuran batch maksimum untuk satu pengambilan Binlog dari setiap shard, dalam baris.

    binlogHeartBeatIntervalMs

    Tidak

    -1

    Interval pengiriman BinlogHeartBeatRecord oleh binlogRead. Nilai -1 menunjukkan tidak dikirim.

    Saat tidak ada data Binlog baru, BinlogHeartBeatRecord dikirim pada interval yang ditentukan. Timestamp catatan ini menunjukkan bahwa data pada shard ini telah sepenuhnya dikonsumsi hingga waktu tersebut.

    binlogIgnoreDelete

    Tidak

    false

    Menentukan apakah akan mengabaikan event Binlog DELETE.

    binlogIgnoreBeforeUpdate

    Tidak

    false

    Menentukan apakah akan mengabaikan event Binlog BeforeUpdate.

FAQ

Setelah Anda mengonsumsi data Binlog dan mengkomit progres konsumsi, Anda menemukan bahwa tabel hologres.hg_replication_progress tidak ada atau tidak berisi data progres konsumsi. Kemungkinan penyebabnya sebagai berikut:

  • Konsumsi tidak dilakukan melalui replication slot, artinya parameter withSlotName tidak ditentukan. Dalam skenario ini, pencatatan progres konsumsi tidak didukung.

  • Instans secondary read-only digunakan, dan ini adalah pertama kalinya data Binlog dikonsumsi untuk database ini. Dalam kasus ini, pembuatan tabel hologres.hg_replication_progress gagal. Masalah ini telah diperbaiki di Hologres V2.0.18 dan versi setelahnya, yang memungkinkan instans secondary mengonsumsi data Binlog secara normal. Untuk versi sebelum Hologres V2.0.18, Anda harus terlebih dahulu mengonsumsi data Binlog sekali menggunakan instans primary. Setelah itu, instans secondary dapat mengonsumsi data Binlog secara normal.

  • Jika masalah bukan disebabkan oleh alasan di atas, bergabunglah ke grup DingTalk Hologres dan hubungi petugas jaga untuk bantuan. Untuk informasi selengkapnya, lihat Bagaimana cara mendapatkan dukungan online lebih lanjut?.