Topik ini menjelaskan cara mengonsumsi data Hologres Binlog menggunakan Java Database Connectivity (JDBC) dan Holo-Client.
Prasyarat
-
Aktifkan dan konfigurasikan Hologres Binlog. Untuk informasi selengkapnya, lihat Berlangganan data Hologres Binlog.
-
Buat ekstensi hg_binlog.
-
Untuk versi Hologres sebelum V2.0, superuser instans harus menjalankan pernyataan berikut untuk membuat ekstensi. Ekstensi ini berlaku untuk seluruh database. Anda hanya perlu menjalankan pernyataan ini sekali untuk satu database. Jika Anda membuat database baru, Anda harus menjalankan pernyataan tersebut lagi.
--Create the extension. CREATE extension hg_binlog; --Delete the extension. DROP extension hg_binlog;PentingJangan jalankan perintah
DROP EXTENSION <extension_name> CASCADE;untuk menguninstall ekstensi secara cascade. Perintah CASCADE tidak hanya menghapus ekstensi yang ditentukan, tetapi juga purge data ekstensi seperti data PostGIS, RoaringBitmap, Proxima, Binlog, dan BSI, serta objek yang bergantung pada ekstensi tersebut, termasuk metadata, tabel, view, dan data server. -
Mulai dari Hologres V2.0, Anda dapat menggunakan fitur ini tanpa perlu membuat ekstensi secara manual.
-
-
Mulai dari Hologres V2.1, Anda dapat mengonsumsi data Binlog dengan salah satu dari dua cara berikut.
-
Didukung oleh semua versi: Selesaikan Persiapan, yang mencakup pembuatan publication untuk tabel target dan pembuatan replication slot untuk publication tersebut. Setelah itu, Anda dapat mengonsumsi data Binlog dari tabel target.
CatatanMetode ini memerlukan pengguna memiliki salah satu izin berikut:
-
Izin superuser pada instans
-
Izin Pemilik pada tabel target, izin CREATE DATABASE, dan izin Replication Role pada instans.
-
-
Hanya didukung oleh Hologres V2.1 dan versi setelahnya: Berikan izin read pada pengguna untuk tabel target. Pengguna tersebut kemudian dapat mengonsumsi data Binlog dari tabel target.
-
Batasan
-
Mengonsumsi data Hologres Binlog menggunakan JDBC hanya didukung pada Hologres V1.1 dan versi setelahnya. Jika instans Anda menjalankan versi sebelum V1.1, lihat Error umum yang terjadi saat mempersiapkan upgrade atau bergabunglah ke grup DingTalk Hologres untuk memberikan masukan. Untuk informasi selengkapnya, lihat Bagaimana cara mendapatkan dukungan online lebih lanjut?.
-
Hanya tipe data berikut yang didukung untuk konsumsi Hologres Binlog: INTEGER, BIGINT, SMALLINT, TEXT, CHAR(n), VARCHAR(n), REAL, DOUBLE PRECISION, BOOLEAN, NUMERIC(38,8), DATE, TIME, TIMETZ, TIMESTAMP, TIMESTAMPTZ, BYTEA, JSON, SERIAL, OID, int4[], int8[], float4[], float8[], boolean[], dan text[]. Mulai dari Hologres V1.3.36, tipe JSONB juga didukung. Jika sebuah tabel berisi kolom dengan tipe data lain, konsumsi akan gagal.
CatatanMulai dari Hologres V1.3.36, Anda dapat mengonsumsi data Hologres Binlog dengan tipe data JSONB. Sebelum konsumsi, Anda harus mengaktifkan parameter Grand Unified Configuration (GUC) berikut:
-- Enable the GUC parameter at the session level. SET hg_experimental_enable_binlog_jsonb = ON; -- Enable the GUC parameter at the database level. ALTER database <db_name> SET hg_experimental_enable_binlog_jsonb = ON; -
Seperti koneksi biasa, saat Anda menggunakan JDBC untuk mengonsumsi data Binlog, setiap shard dari setiap tabel yang dikonsumsi menggunakan satu koneksi Walsender. Koneksi Walsender bersifat independen dari koneksi biasa dan tidak saling memengaruhi.
-
Jumlah Walsender juga dibatasi. Anda dapat menjalankan perintah berikut untuk melihat jumlah maksimum Walsender untuk satu frontend node. Nilai default adalah 600 untuk V2.2 dan versi setelahnya, 1.000 untuk V2.0 dan V2.1, serta 100 untuk versi dari V1.1.26 hingga V2.0. Jumlah total Walsender adalah jumlah maksimum dikalikan dengan jumlah frontend node pada instans Anda. Untuk informasi lebih lanjut tentang jumlah frontend node untuk instans dengan spesifikasi berbeda, lihat Manajemen instans.
SHOW max_wal_senders;CatatanAnda dapat menghitung jumlah tabel yang dapat mengonsumsi data Binlog secara bersamaan dalam satu instans Hologres menggunakan rumus berikut:
Number of tables <= (max_wal_senders (100 or 1000) * Number of frontend nodes) / Table shard count.Contoh:
-
Jika Tabel A dan Tabel B masing-masing memiliki jumlah shard 20, dan Tabel C memiliki jumlah shard 30, jumlah Walsender yang digunakan untuk mengonsumsi data Binlog mereka secara bersamaan adalah
20 + 20 + 30 = 70. -
Jika Tabel A dan Tabel B masing-masing memiliki jumlah shard 20, dan dua pekerjaan mengonsumsi data Binlog dari Tabel A secara bersamaan, jumlah Walsender yang digunakan adalah
20 * 2 + 20 = 60. -
Jika sebuah instans memiliki dua frontend node, jumlah maksimum Walsender adalah
600 * 2 = 1.200. Instans tersebut dapat mendukung konsumsi data Binlog secara bersamaan hingga 60 tabel, masing-masing dengan jumlah shard 20.
Jika jumlah koneksi untuk konsumsi Binlog berbasis JDBC mencapai batas atas, pesan error
FATAL: sorry, too many wal senders alreadyakan dikembalikan. Anda dapat melakukan troubleshooting sebagai berikut:-
Periksa pekerjaan yang menggunakan JDBC untuk mengonsumsi data Binlog dan kurangi konsumsi Binlog yang tidak diperlukan.
-
Periksa apakah kelompok tabel dan jumlah shard dirancang secara wajar. Untuk informasi selengkapnya, lihat Praktik terbaik untuk mengatur kelompok tabel.
-
Jika jumlah koneksi masih melebihi batas, pertimbangkan untuk melakukan scaling out pada instans.
-
-
Sebelum Hologres V2.0.18, instans secondary read-only tidak mendukung konsumsi data Binlog menggunakan JDBC. Mulai dari V2.0.18, instans secondary read-only mendukung fitur ini tetapi tidak mendukung pencatatan progres konsumsi.
Catatan
Metode yang didukung untuk mengonsumsi data Binlog bervariasi berdasarkan versi instans Hologres dan versi mesin Flink. Rinciannya sebagai berikut:
Versi instance Hologres | Versi mesin Flink | Deskripsi |
V2.1 dan yang lebih baru |
8.0.5 dan yang lebih baru |
Anda dapat mengonsumsi data Binlog jika memiliki izin read pada tabel. Anda tidak perlu membuat replication slot. |
V2.0 |
8.0.5 dan yang lebih lama |
Mode JDBC digunakan secara default. Anda harus membuat publication untuk tabel target dan replication slot untuk publication tersebut sebelum dapat mengonsumsi data Binlog dari tabel target. |
V1.3 dan yang lebih lama |
8.0.5 dan yang lebih lama |
Mode Holohub digunakan secara default. Anda dapat mengonsumsi data Binlog jika memiliki izin read pada tabel. |
Mulai dari Hologres V2.0, mode Holohub tidak lagi didukung untuk konsumsi Binlog. Sebelum melakukan upgrade instans Hologres ke V2.0 atau versi setelahnya, kami menyarankan Anda terlebih dahulu melakukan upgrade Flink ke versi 8.0.5. Setelah upgrade, mode JDBC akan digunakan secara otomatis untuk konsumsi Binlog.
Persiapan: Buat publikasi dan slot replikasi
Sebelum Hologres V2.1, Anda harus membuat publication untuk tabel target dan replication slot untuk publication tersebut sebelum dapat mengonsumsi data Binlog.
Mulai dari Hologres V2.1, selain metode di atas, pengguna yang hanya memiliki izin read pada tabel target juga dapat mengonsumsi data Binlog. Metode ini tidak memungkinkan Anda untuk mengkueri progres konsumsi Binlog yang dicatat di sisi Hologres. Kami menyarankan Anda mencatat progres konsumsi di sisi client.
Publikasi
Pendahuluan
Publication pada dasarnya adalah kelompok tabel yang perubahan datanya dimaksudkan untuk direplikasi melalui replikasi logis. Untuk informasi selengkapnya, lihat Publication. Saat ini, sebuah publication Hologres hanya dapat diikat ke satu tabel fisik, dan fitur Binlog harus diaktifkan untuk tabel tersebut.
Buat publikasi
-
Contoh sintaksis
CREATE PUBLICATION name FOR TABLE table_name;
Parameter
Parameter | Deskripsi |
name |
Nama publikasi kustom. |
table_name | Nama tabel dalam database. |
Contoh
-- Create a publication named hg_publication_test_1 and add the table test_message_src to it.
CREATE publication hg_publication_test_1 FOR TABLE test_message_src;
Kueri publication yang telah dibuat
-
Contoh sintaksis
SELECT * FROM pg_publication;
Hasil Kueri
pubname | pubowner | puballtables | pubinsert | pubupdate | pubdelete | pubtruncate
-----------------------+----------+--------------+-----------+-----------+-----------+-------------
hg_publication_test_1 | 16728 | f | t | t | t | t
(1 row)
Parameter | Deskripsi |
pubname | Nama publikasi. |
pubowner | Pemilik publikasi. |
puballtables |
Mengikat beberapa tabel fisik. Nilai default adalah False. Fitur ini saat ini tidak didukung. |
pubinsert |
Menentukan apakah akan mempublikasikan event Binlog INSERT. Nilai default adalah True. Untuk informasi selengkapnya tentang tipe Binlog, lihat Format dan prinsip Binlog. |
pubupdate |
Menentukan apakah akan mempublikasikan event Binlog UPDATE. Nilai default adalah True. |
pubdelete |
Menentukan apakah akan mempublikasikan event Binlog DELETE. Nilai default adalah True. |
pubtruncate |
Menentukan apakah akan mempublikasikan event Binlog TRUNCATE. Nilai default adalah True. |
Kueri tabel yang terkait dengan publication
-
Contoh sintaksis
SELECT * FROM pg_publication_tables;
Hasil Kueri
pubname | schemaname | tablename
-----------------------+------------+------------------
hg_publication_test_1 | public | test_message_src
(1 row)
Parameter | Deskripsi |
pubname | Nama publikasi. |
schemaname | Nama skema tempat tabel milik. |
tablename | Nama tabel. |
Hapus publication
Sintaksis
DROP PUBLICATION name;
name adalah nama publication yang sudah ada.
Contoh
DROP PUBLICATION hg_publication_test_1;
Replication Slot
Pendahuluan
Dalam skenario replikasi logis, replication slot merepresentasikan aliran perubahan data. Replication slot juga diikat ke progres konsumsi saat ini dan digunakan untuk transmisi yang dapat dilanjutkan. Untuk informasi selengkapnya, lihat dokumentasi PostgreSQL Replication Slot. Replication slot digunakan untuk mempertahankan informasi checkpoint untuk konsumsi Binlog. Hal ini memungkinkan konsumen untuk pulih dari checkpoint terakhir yang telah dikomit setelah failover.
Izin
Hanya superuser dan pengguna dengan Replication Role yang memiliki izin untuk membuat dan menggunakan replication slot. Anda dapat menjalankan pernyataan berikut untuk memberikan atau mencabut Replication Role.
-- Use a superuser to grant the replication role to a regular user:
ALTER role <user_name> replication;
-- Use a superuser to revoke the replication role from a user:
ALTER role <user_name> noreplication;
user_name adalah ID akun Alibaba Cloud atau pengguna Resource Access Management (RAM). Untuk informasi selengkapnya, lihat Ikhtisar akun.
Buat slot replikasi
-
Contoh
CALL hg_create_logical_replication_slot('replication_slot_name', 'hgoutput', 'publication_name');
Parameter
Parameter | Deskripsi |
replication_slot_name |
Nama kustom untuk replication slot. |
hgoutput |
Plugin untuk format output Binlog. Saat ini, hanya plugin bawaan hgoutput yang didukung. |
publication_name | Nama publikasi yang terikat pada slot replikasi. |
Contoh
-- Create a replication slot named hg_replication_slot_1 and bind it to the publication named hg_publication_test_1.
CALL hg_create_logical_replication_slot('hg_replication_slot_1', 'hgoutput', 'hg_publication_test_1');
Kueri replication slot yang telah dibuat
-
Contoh sintaksis
SELECT * FROM hologres.hg_replication_slot_properties;
Hasil Kueri
slot_name | property_key | property_value
-----------------------+--------------+-----------------------
hg_replication_slot_1 | plugin | hgoutput
hg_replication_slot_1 | publication | hg_publication_test_1
hg_replication_slot_1 | parallelism | 1
(3 rows)
Parameter | Deskripsi |
slot_name | Nama slot replikasi. |
property_key |
Mencakup tiga parameter berikut.
|
property_value |
Nilai parameter yang ditentukan oleh property_key. |
Kueri jumlah koneksi bersamaan yang diperlukan untuk mengonsumsi data Binlog seluruh tabel melalui replication slot
Hologres adalah database terdistribusi. Data sebuah tabel didistribusikan di beberapa shard. Oleh karena itu, saat Anda menggunakan JDBC untuk mengonsumsi data Binlog, Anda harus memulai beberapa koneksi client untuk mengonsumsi data Binlog lengkap. Anda dapat menjalankan perintah berikut untuk mengkueri jumlah koneksi bersamaan yang diperlukan untuk mengonsumsi data dari hg_replication_slot_1.
Sintaksis
SELECT hg_get_logical_replication_slot_parallelism('hg_replication_slot_1');Hasil Kueri
hg_get_logical_replication_slot_parallelism
------------------------------------------------
20
Kueri progres konsumsi replication slot (progres konsumsi Binlog yang dicatat di sisi Hologres)
-
Contoh sintaksis
SELECT * FROM hologres.hg_replication_progress;
Hasil Kueri
slot_name | parallel_index | lsn
-----------------------+----------------+-----
hg_replication_slot_1 | 0 | 66
hg_replication_slot_1 | 1 | 122
hg_replication_slot_1 | 2 | 119
(0 rows)
Parameter | Deskripsi |
slot_name | Nama slot replikasi. |
parallel_index |
Nomor urut koneksi bersamaan. |
lsn |
Nomor urut catatan Binlog terakhir yang dikonsumsi. |
-
Tabel hologres.hg_replication_progress hanya dibuat setelah data Binlog dikonsumsi untuk pertama kalinya.
-
Tabel hologres.hg_replication_progress mencatat offset konsumen yang dikomit secara aktif oleh pengguna. Anda harus memanggil fungsi commit lsn secara manual dalam kode Anda untuk mengirimkan informasi checkpoint Binlog. Karena konten yang dicatat dalam tabel ini sepenuhnya bergantung pada komit terakhir pengguna, nilainya mungkin tidak secara akurat mencerminkan offset konsumen aktual di sisi pengguna. Oleh karena itu, kami menyarankan Anda mencatat LSN di sisi konsumen dan menggunakannya sebagai titik pemulihan saat konsumsi berhenti. Contoh kode berikut untuk konsumsi Binlog JDBC dan Holo-Client tidak menyertakan kode untuk mengkomit LSN.
-
Mengkomit informasi checkpoint Binlog secara manual hanya efektif saat Anda mengonsumsi data Binlog menggunakan replication slot. Saat Anda mengonsumsi data Binlog berdasarkan nama tabel, hasil checkpoint tidak dicatat atau dipertahankan dalam tabel hologres.hg_replication_progress.
Hapus replication slot
-
Contoh sintaksis
CALL hg_drop_logical_replication_slot('<replication_slot_name>');
replication_slot_name adalah nama replication slot yang sudah ada.
Contoh
CALL hg_drop_logical_replication_slot('hg_replication_slot_1');
Mengonsumsi data Binlog menggunakan JDBC
-
Tambahkan dependensi POM
Tambahkan dependensi POM berikut.
CatatanGunakan JDBC 42.2.18 atau versi setelahnya.
<dependency> <groupId>org.postgresql</groupId> <artifactId>postgresql</artifactId> <version>42.3.8</version> </dependency> <!-- Used to obtain the table schema and parse the binary log --> <dependency> <groupId>com.alibaba.hologres</groupId> <artifactId>holo-client</artifactId> <version>2.2.10</version> </dependency> -
Contoh kode Java
import com.alibaba.hologres.client.HoloClient; import com.alibaba.hologres.client.HoloConfig; import com.alibaba.hologres.client.impl.binlog.HoloBinlogDecoder; import com.alibaba.hologres.client.model.Record; import com.alibaba.hologres.client.model.TableSchema; import org.postgresql.PGConnection; import org.postgresql.PGProperty; import org.postgresql.replication.LogSequenceNumber; import org.postgresql.replication.PGReplicationStream; import java.nio.ByteBuffer; import java.sql.Connection; import java.sql.DriverManager; import java.util.Arrays; import java.util.List; import java.util.Properties; public class Test { public static void main (String[] args) throws Exception { String username = ""; String password = ""; String url = "jdbc:postgresql://Endpoint:Port/db_test"; // Create a JDBC connection. Properties properties = new Properties(); PGProperty.USER.set(properties, username); PGProperty.PASSWORD.set(properties, password); PGProperty.ASSUME_MIN_SERVER_VERSION.set(properties, "9.4"); // To consume Binlog data, you must add the following parameter. PGProperty.REPLICATION.set(properties, "database"); try (Connection connection = DriverManager.getConnection(url, properties)) { // Create a PGReplicationStream, bind it to a replication slot, and specify the shardId. int shardId = 0; PGConnection pgConnection = connection.unwrap(PGConnection.class); PGReplicationStream pgReplicationStream = pgConnection.getReplicationAPI().replicationStream() .logical() // Starting from V2.1, two methods are available here. // Method 1: Set the withSlotName parameter to the name of the replication slot created in the preparation phase. You do not need to specify withSlotOption("table_name","xxx"). // Method 2: Do not specify the withSlotName parameter. You must specify withSlotOption("table_name","xxx"). .withSlotName("slot_name") .withSlotOption("table_name","public.test_messsage_src") // The name of the table to consume. .withSlotOption("parallel_index", shardId) .withSlotOption("batch_size", "1024") .withSlotOption("start_time", "2021-01-01 00:00:00") .withSlotOption("start_lsn","0") .start(); // Although we do not directly use Holo-Client to consume Binlog data, we need Holo-Client to parse the consumed data. // Create a HoloClient. HoloConfig holoConfig = new HoloConfig(); holoConfig.setJdbcUrl(url); holoConfig.setUsername(username); holoConfig.setPassword(password); HoloClient client = new HoloClient(holoConfig); // Create a Binlog decoder to decode binary data. The schema must be obtained through HoloClient. TableSchema schema = client.getTableSchema("test_message_src", true); HoloBinlogDecoder decoder = new HoloBinlogDecoder(schema); // Used to record the current consumer offset for resuming consumption after an interruption. Long currentLsn = 0; // Consume data. ByteBuffer byteBuffer = pgReplicationStream.readPending(); while (true) { if (byteBuffer != null) { List<BinlogRecord> records = decoder.decode(shardId, byteBuffer); Long latestLsn = 0L; for (BinlogRecord record : records) { latestLsn = record.getBinlogLsn(); // Do Something System.out.println( "lsn: " + latestLsn + ", record: " + Arrays.toString(record.getValues())); } // Save the consumer offset. currentLsn = latestLsn; pgReplicationStream.forceUpdateStatus(); } byteBuffer = pgReplicationStream.readPending(); } } // pgReplicationStream.close(); // connection.close(); }Saat membuat PGReplicationStream, Anda harus menentukan replication slot menggunakan withSlotName:
-
Untuk versi Hologres sebelum V2.1, Anda harus menentukan nama replication slot yang sudah ada.
-
Mulai dari Hologres V2.1, Anda tidak perlu menentukan withSlotName. Anda hanya perlu menentukan nama tabel target dalam Slot Options.
Selain itu, Anda dapat menentukan parameter berikut menggunakan withSlotOption.
Parameter
Diperlukan
Deskripsi
table_name
Wajib saat withSlotName tidak ditentukan. Jika tidak, parameter ini tidak valid.
Saat withSlotName tidak ditentukan, table_name merepresentasikan nama tabel target yang ingin Anda konsumsi. Formatnya adalah schema_name.table_name atau table_name.
parallel_index
Ya
-
Saat Anda menggunakan PGReplicationStream untuk mengonsumsi data Binlog, satu PGReplicationStream membuat satu koneksi Walsender untuk mengonsumsi data Binlog dari satu shard tabel target. parallel_index merepresentasikan konsumsi data dari shard dengan indeks tertentu.
-
Asumsikan sebuah tabel memiliki tiga shard. Jumlah koneksi bersamaan yang diperlukan untuk mengonsumsi data Binlog melalui replication slot adalah 3. Anda dapat membuat hingga tiga PGReplicationStream, dan mengatur parameter parallel_index masing-masing menjadi 0, 1, dan 2.
-
Saat ini, konsumsi data Hologres Binlog berbasis JDBC tidak mendukung implementasi serupa Kafka Consumer Group. Anda perlu membuat beberapa PGReplicationStream sendiri.
start_time
Tidak
Menentukan waktu mulai mengonsumsi data Binlog. Contoh format: 2021-01-01 12:00:00+08.
Jika Anda tidak menentukan start_lsn atau start_time, aturan berikut berlaku:
-
Jika Anda mengonsumsi data Binlog dari replication slot untuk pertama kalinya, konsumsi dimulai dari awal, mirip dengan pengaturan Oldest di Kafka.
-
Jika Anda sebelumnya telah mengonsumsi data Binlog dari replication slot tersebut, konsumsi mencoba dilanjutkan dari checkpoint terakhir yang telah dikomit.
-
Jika Anda tidak menentukan withSlotName tetapi menentukan table_name, konsumsi dimulai dari awal, terlepas dari apakah Anda sebelumnya pernah mengonsumsi data Binlog dari tabel ini atau tidak.
start_lsn
Tidak
Menentukan LSN setelah mana konsumsi data Binlog dimulai. Parameter ini memiliki prioritas lebih tinggi daripada start_time.
batch_size
Tidak
Ukuran batch maksimum untuk satu pengambilan Binlog, dalam baris. Nilai default adalah 1024.
Catatan-
BinlogRecord adalah tipe catatan yang dikembalikan oleh decoder. Anda dapat menggunakan metode berikut untuk mengambil bidang sistem Binlog untuk catatan data ini. Untuk informasi selengkapnya, lihat Berlangganan data Hologres Binlog.
-
getBinlogLsn() mengambil nomor urut Binlog.
-
getBinlogTimestamp() mengambil timestamp sistem Binlog.
-
getBinlogEventType() mengambil tipe event Binlog.
-
-
Setelah mengonsumsi data Binlog, Anda harus mengkomit informasi checkpoint secara manual untuk memastikan konsumsi dapat dilanjutkan setelah failover.
-
Mengonsumsi data Binlog menggunakan Holo-Client
-
Fitur untuk mengonsumsi data Hologres Binlog telah diintegrasikan ke dalam Holo-Client. Anda dapat menentukan tabel fisik yang ingin dikonsumsi untuk dengan mudah mengonsumsi data Binlog dari semua shard-nya.
-
Saat menggunakan Holo-Client untuk mengonsumsi data Binlog, jumlah koneksi yang diperlukan sama dengan jumlah shard dalam tabel fisik (jumlah slot bersamaan). Pastikan Anda memiliki cukup koneksi.
-
Saat menggunakan Holo-Client untuk mengonsumsi data Binlog, kami menyarankan Anda menyimpan checkpoint untuk setiap shard. Jika konsumsi dihentikan karena kegagalan konektivitas jaringan atau alasan lain, Anda dapat melanjutkan dari checkpoint yang disimpan. Untuk informasi selengkapnya, lihat contoh kode di bawah.
-
Tambahkan dependensi POM
Tambahkan dependensi POM berikut.
CatatanKami menyarankan Anda menggunakan Holo-Client 2.2.10 atau versi setelahnya. Versi 2.2.9 dan sebelumnya memiliki masalah memory leak.
<dependency> <groupId>com.alibaba.hologres</groupId> <artifactId>holo-client</artifactId> <version>2.2.10</version> </dependency> -
Contoh kode Java
import com.alibaba.hologres.client.BinlogShardGroupReader; import com.alibaba.hologres.client.Command; import com.alibaba.hologres.client.HoloClient; import com.alibaba.hologres.client.HoloConfig; import com.alibaba.hologres.client.Subscribe; import com.alibaba.hologres.client.exception.HoloClientException; import com.alibaba.hologres.client.impl.binlog.BinlogOffset; import com.alibaba.hologres.client.model.binlog.BinlogHeartBeatRecord; import com.alibaba.hologres.client.model.binlog.BinlogRecord; import java.util.HashMap; import java.util.Map; public class HoloBinlogExample { public static BinlogShardGroupReader reader; public static void main(String[] args) throws Exception { String username = ""; String password = ""; String url = "jdbc:postgresql://ip:port/database"; String tableName = "test_message_src"; String slotName = "hg_replication_slot_1"; // Parameters for creating the client. HoloConfig holoConfig = new HoloConfig(); holoConfig.setJdbcUrl(url); holoConfig.setUsername(username); holoConfig.setPassword(password); holoConfig.setBinlogReadBatchSize(128); holoConfig.setBinlogIgnoreDelete(true); holoConfig.setBinlogIgnoreBeforeUpdate(true); holoConfig.setBinlogHeartBeatIntervalMs(5000L); HoloClient client = new HoloClient(holoConfig); // Get the shard count of the table. int shardCount = Command.getShardCount(client, client.getTableSchema(tableName)); // Use a map to save the consumption progress of each shard, initialized to 0. Map<Integer, Long> shardIdToLsn = new HashMap<>(shardCount); for (int i = 0; i < shardCount; i++) { shardIdToLsn.put(i, 0L); } // A request to consume Binlog data. Before V2.1, tableName and slotName are required parameters. Starting from V2.1, you only need to pass in tableName (equivalent to the fixed slotName "hg_table_name_slot" used earlier). // Subscribe has two types: StartTimeBuilder and OffsetBuilder. This example uses the former. Subscribe subscribe = Subscribe.newStartTimeBuilder(tableName, slotName) .setBinlogReadStartTime("2021-01-01 12:00:00") .build(); // Create a binlog reader. reader = client.binlogSubscribe(subscribe); BinlogRecord record; int retryCount = 0; long count = 0; while(true) { try { if (reader.isCanceled()) { // Re-create the reader based on the saved checkpoint. reader = client.binlogSubscribe(subscribe); } while ((record = reader.getBinlogRecord()) != null) { // Consumed to the latest. if (record instanceof BinlogHeartBeatRecord) { // do something continue; } // Process the read Binlog record. Here, we just print it. System.out.println(record); // After processing, save the checkpoint to recover from this point in case of an exception. shardIdToLsn.put(record.getShardId(), record.getBinlogLsn()); count++; // Read successfully. Reset the retry count. retryCount = 0; } } catch (HoloClientException e) { if (++retryCount > 10) { throw new RuntimeException(e); } // We recommend that you print a WARN level log when an exception occurs. System.out.println(String.format("binlog read failed because %s and retry %s times", e.getMessage(), retryCount)); // Wait for a period of time during retry. Thread.sleep(5000L * retryCount); // Use OffsetBuilder to create a Subscribe to specify the starting consumer offset for each shard. Subscribe.OffsetBuilder subscribeBuilder = Subscribe.newOffsetBuilder(tableName, slotName); for (int i = 0; i < shardCount; i++) { // BinlogOffset uses setSequence to specify the LSN and setTimestamp to specify the time. If both are specified, the LSN has a higher priority than the timestamp. // Recover based on the consumption progress saved in the shardIdToLsn map. subscribeBuilder.addShardStartOffset(i, new BinlogOffset().setSequence(shardIdToLsn.get(i))); } subscribe = subscribeBuilder.build(); // Close the reader. reader.cancel(); } } } }Saat menggunakan Holo-Client untuk mengonsumsi data Binlog, Anda dapat menentukan parameter berikut.
Parameter
Diperlukan
Nilai default
Deskripsi
binlogReadBatchSize
Tidak
1024
Ukuran batch maksimum untuk satu pengambilan Binlog dari setiap shard, dalam baris.
binlogHeartBeatIntervalMs
Tidak
-1
Interval pengiriman BinlogHeartBeatRecord oleh binlogRead. Nilai
-1menunjukkan tidak dikirim.Saat tidak ada data Binlog baru, BinlogHeartBeatRecord dikirim pada interval yang ditentukan. Timestamp catatan ini menunjukkan bahwa data pada shard ini telah sepenuhnya dikonsumsi hingga waktu tersebut.
binlogIgnoreDelete
Tidak
false
Menentukan apakah akan mengabaikan event Binlog DELETE.
binlogIgnoreBeforeUpdate
Tidak
false
Menentukan apakah akan mengabaikan event Binlog BeforeUpdate.
FAQ
Setelah Anda mengonsumsi data Binlog dan mengkomit progres konsumsi, Anda menemukan bahwa tabel hologres.hg_replication_progress tidak ada atau tidak berisi data progres konsumsi. Kemungkinan penyebabnya sebagai berikut:
-
Konsumsi tidak dilakukan melalui replication slot, artinya parameter withSlotName tidak ditentukan. Dalam skenario ini, pencatatan progres konsumsi tidak didukung.
-
Instans secondary read-only digunakan, dan ini adalah pertama kalinya data Binlog dikonsumsi untuk database ini. Dalam kasus ini, pembuatan tabel hologres.hg_replication_progress gagal. Masalah ini telah diperbaiki di Hologres V2.0.18 dan versi setelahnya, yang memungkinkan instans secondary mengonsumsi data Binlog secara normal. Untuk versi sebelum Hologres V2.0.18, Anda harus terlebih dahulu mengonsumsi data Binlog sekali menggunakan instans primary. Setelah itu, instans secondary dapat mengonsumsi data Binlog secara normal.
-
Jika masalah bukan disebabkan oleh alasan di atas, bergabunglah ke grup DingTalk Hologres dan hubungi petugas jaga untuk bantuan. Untuk informasi selengkapnya, lihat Bagaimana cara mendapatkan dukungan online lebih lanjut?.