Pelajari cara menggunakan konektor Hologres.
Informasi latar belakang
Hologres adalah gudang data waktu nyata terpadu untuk memasukkan, memperbarui, dan menganalisis dataset dalam skala besar. Hologres mendukung SQL standar (kompatibel dengan protokol PostgreSQL), pemrosesan analitik online (OLAP) berskala petabyte, analisis ad hoc, serta penyajian data berlatensi rendah dengan konkurensi tinggi. Hologres terintegrasi secara mendalam dengan MaxCompute, Realtime Compute for Apache Flink, dan DataWorks, sehingga menyediakan solusi terpadu berbasis stack penuh untuk gudang data offline dan online. Tabel berikut menjelaskan kemampuan konektor Hologres.
|
Kategori |
Deskripsi |
|
Tipe yang didukung |
tabel source, dimension, dan sink |
|
Mode eksekusi |
mode stream dan batch |
|
Format data |
Tidak didukung |
|
Metriks pemantauan spesifik |
|
|
Tipe API |
DataStream dan SQL |
|
Mendukung pembaruan atau penghapusan data pada tabel sink |
Ya |
Fitur
|
Fitur |
Deskripsi |
|
Mendukung pembacaan data Hologres dengan atau tanpa binlog dalam mode CDC maupun non-CDC. |
|
|
Mendukung konsumsi penuh, inkremental, dan penuh serta inkremental terpadu. |
|
|
Mendukung pengabaian data baru, penggantian seluruh baris, atau pembaruan hanya pada field tertentu. |
|
|
Hanya memperbarui kolom yang dimodifikasi, bukan seluruh baris. |
|
|
Mendukung konsumsi binlog dari tabel partisi fisik maupun logis. Untuk tabel partisi fisik, satu job dapat memantau semua partisi, termasuk partisi yang baru ditambahkan. |
|
|
Mendukung penulisan ke tabel induk untuk membuat otomatis partisi anak yang sesuai. |
|
|
Sinkronisasi waktu nyata untuk satu tabel atau seluruh database |
Mendukung sinkronisasi waktu nyata untuk satu tabel atau seluruh database, dengan fitur utama berikut:
Untuk informasi lebih lanjut, lihat Pernyataan CREATE TABLE AS (CTAS) dan Panduan Cepat untuk sinkronisasi database waktu nyata. |
Batasan dan rekomendasi
Batasan
-
Tabel eksternal tidak didukung: Konektor Hologres tidak mendukung tabel eksternal Hologres, seperti tabel eksternal MaxCompute.
-
Batasan tipe waktu: Konektor Hologres saat ini tidak mendukung konsumsi waktu nyata data
TIMESTAMP. Saat membuat tabel, gunakan tipeTIMESTAMPTZsecara eksklusif. -
Mode pemindaian tabel sumber (Ververica Runtime (VVR) 8 dan sebelumnya): Secara default, konektor membaca data dalam mode batch, hanya memindai seluruh tabel sekali. Akibatnya, data yang baru ditambahkan tidak dikonsumsi.
-
Batasan Watermark (Ververica Runtime (VVR) 8 dan sebelumnya): Mode CDC tidak mendukung definisi watermark. Untuk melakukan agregasi berbasis jendela, gunakan solusi agregasi non-jendela sebagai gantinya.
-
Perilaku fungsi waktu dalam mode COPY: Saat Anda menggunakan mode tulis
COPY_STREAMatauCOPY_BULK_LOAD, nilai kolom tabel yang menggunakanCURRENT_TIMESTAMPatauNOW()sebagai nilai default akan tetap pada waktu mulai koneksi dan tidak diperbarui untuk setiap baris. Kolom metadata binloghg_binlog_timestamp_usmenyediakan waktu pemasukan data yang sebenarnya.
Rekomendasi
-
Pemilihan format penyimpanan:
-
Untuk pencarian titik pada tabel dimensi: Gunakan penyimpanan berorientasi baris. Anda harus menetapkan kunci primer dan kunci pengelompokan.
-
Untuk kueri satu-ke-banyak pada tabel dimensi: Gunakan penyimpanan berorientasi kolom. Untuk kinerja optimal, konfigurasikan dengan benar kunci distribusi dan kunci segmen.
-
Untuk tabel yang memerlukan pembaruan dan kueri analitis sering: Jika tabel harus mendukung konsumsi binlog waktu nyata dan analisis OLAP, kami sangat merekomendasikan penggunaan penyimpanan hibrida baris-kolom.
PentingJika Anda tidak menentukan format penyimpanan saat membuat tabel di Hologres, format default-nya adalah penyimpanan berorientasi kolom. Anda tidak dapat mengubah format penyimpanan setelah tabel dibuat. Untuk detail lebih lanjut, lihat Buat Tabel di Hologres dan Format Penyimpanan Tabel: Berorientasi Baris, Berorientasi Kolom, dan Hibrida Baris-Kolom.
-
-
Pengaturan paralelisme job: Atur paralelisme job Flink agar sesuai dengan jumlah shard pada tabel Hologres.
-- Di HoloWeb, jalankan pernyataan berikut untuk mengetahui jumlah shard pada suatu tabel. Ganti <tablename> dengan nama tabel Anda. select tg.property_value from hologres.hg_table_properties tb join hologres.hg_table_group_properties tg on tb.property_value = tg.tablegroup_name where tb.property_key = 'table_group' and tg.property_key = 'shard_count' and table_name = '<tablename>'; -
Versi dan fitur: Periksa secara berkala Catatan Rilis konektor Hologres untuk informasi tentang masalah yang diketahui, pembaruan fitur, dan kompatibilitas versi.
Catatan
-
Kompatibilitas dan batasan untuk mode konsumsi Hologres dan VVR
Tabel source
-
Untuk VVR 8 dan sebelumnya, pilih mode konsumsi menggunakan parameter
sdkMode. -
Untuk VVR 11 dan seterusnya, pilih mode konsumsi menggunakan parameter
source.binlog.read-mode.
Versi VVR
Versi Hologres
Nilai default/direkomendasikan
Mode konsumsi aktual
Catatan
≥ 6.0.7
< 2.0
Custom
holohub (default)
Kami merekomendasikan menyetel nilai ke
jdbc.6.0.7 hingga 8.0.4
≥ 2.0
jdbc (alih otomatis, tidak perlu konfigurasi)
jdbc (dipaksakan)
Layanan
holohubtelah ditinggalkan di Hologres 2.0 dan seterusnya. Sistem secara otomatis beralih ke modejdbc, yang dapat menyebabkan masalah izin. Untuk konfigurasi izin, lihat Masalah izin.≥ 8.0.5
≥ 2.1
jdbc (alih otomatis, tidak perlu konfigurasi)
jdbc (dipaksakan)
Tidak ada masalah izin. Untuk Hologres 2.1.27 dan seterusnya, mode beralih ke
jdbc_fixed.≥ 11.1
Versi apa pun
AUTO (default)
Dipilih secara otomatis berdasarkan versi Hologres
-
Untuk Hologres 2.1.27 dan seterusnya, mode
jdbcdipilih dengan koneksi ringan diaktifkan secara default (parameterconnection.fixed.enableddisetel ketrue). -
Untuk Hologres versi 2.1.0 hingga 2.1.26, mode
jdbcdipilih. -
Untuk Hologres 2.0 dan sebelumnya, mode
holohubdipilih.
PentingDi VVR 11.1 dan seterusnya, konektor secara default mengonsumsi data binlog. Pastikan Anda telah mengaktifkan binlog. Jika tidak, kesalahan dapat terjadi.
Tabel sink
-
Untuk VVR 8 dan sebelumnya, pilih mode penulisan data menggunakan parameter
sdkMode. -
Untuk VVR 11 dan seterusnya, pilih mode penulisan data menggunakan parameter
sink.write-mode.
Versi VVR
Versi Hologres
Mode RPC terpengaruh
Actual Write Mode
Nilai default/direkomendasikan
Catatan
6.0.4 hingga 8.0.2
< 2.0
Tidak
rpc
Custom
N/A
6.0.4 hingga 8.0.2
≥ 2.0
Ya
jdbc_fixed (alih otomatis)
Custom
Untuk mencegah deduplikasi, setel 'jdbcWriteBatchSize'='1'.
≥ 8.0.3
Versi apa pun
Ya
jdbc_fixed (alih otomatis)
Custom
Jika Anda mengonfigurasi mode sebagai
rpc, sistem secara otomatis beralih ke modejdbc_fixeddan menyetel 'jdbcWriteBatchSize'='1' untuk mencegah deduplikasi.≥ 8.0.5
Versi apa pun
Ya
jdbc_fixed (alih otomatis)
Custom
Jika Anda mengonfigurasi mode sebagai
rpc, sistem secara otomatis beralih ke modejdbc_fixeddan menyetel 'deduplication.enabled'='false' untuk mencegah deduplikasi.Penting-
Layanan
rpctelah ditinggalkan di Hologres 2.0 dan seterusnya. Jika Anda menyetel parameter ini kerpc, Flink secara otomatis beralih ke nilaijdbc_fixed. Jika Anda menyetel parameter ke nilai lain, Flink menggunakan nilai yang ditentukan. -
Mode
rpcdihapus dari VVR 11.1 dan seterusnya. Kami merekomendasikan penggunaan modejdbcuntuk koneksi. -
Untuk operasi tulis dalam skenario konkurensi tinggi, kami merekomendasikan penggunaan mode
jdbc_copyatauCOPY_STREAM.
Tabel dimensi
Versi VVR
Versi Hologres
Mode RPC terpengaruh
Mode konsumsi aktual
Nilai default/direkomendasikan
Catatan
6.0.4 hingga 8.0.2
< 2.0
Tidak
rpc
Custom
N/A
6.0.4 hingga 8.0.2
≥ 2.0
Ya
jdbc_fixed (alih otomatis)
Custom
Layanan
rpctelah ditinggalkan untuk instans Hologres versi 2.0 atau lebih baru. Jika Anda menyetel parameter ini kerpc, Flink secara otomatis beralih ke nilaijdbc_fixed. Namun, jika Anda menyetel parameter ke nilai lain, Flink menggunakan nilai yang Anda tentukan.≥ 8.0.3
Versi apa pun
Ya
jdbc_fixed (alih otomatis)
Custom
≥ 8.0.5
Versi apa pun
Ya
jdbc_fixed (alih otomatis)
Custom
PentingMode
rpcdihapus dari VVR 11.1 dan seterusnya. Secara default, modejdbcdigunakan untuk koneksi. Anda juga dapat mengaktifkan mode koneksi ringan dengan menyetel parameterconnection.fixed.enabled. -
-
Untuk membaca data
JSONBdari tabel sumber binlog dalam mode JDBC, Anda harus mengaktifkan parameter GUC di tingkat database.-- Aktifkan parameter GUC di tingkat database. Hanya superuser yang dapat menjalankan perintah ini. Anda hanya perlu menjalankan perintah ini sekali untuk setiap database. alter database <db_name> set hg_experimental_enable_binlog_jsonb = on; -
Operasi
UPDATEmenghasilkan dua catatan binlog berturut-turut: catatanupdate_beforeuntuk data lama, diikuti oleh catatanupdate_afteruntuk data baru. -
Hindari menjalankan operasi
TRUNCATEatau operasi pembuatan ulang tabel lainnya pada tabel sumber binlog. Untuk informasi lebih lanjut, lihat FAQ. -
Pastikan presisi tipe
DECIMALkonsisten antara Flink dan Hologres untuk menghindari kesalahan. Untuk informasi lebih lanjut, lihat FAQ. -
Saat Anda menggunakan mode
initialuntuk konsumsi data penuh dan inkremental terpadu dari tabel sumber, pengurutan global tidak dijamin. Jika sistem downstream bergantung pada perhitungan berbasis waktu, gunakan mode konsumsi lain yang hanya berbasis binlog.
Aktifkan binlog
Untuk tabel baru
Fitur pembacaan data waktu nyata dinonaktifkan secara default. Oleh karena itu, saat Anda membuat tabel di HoloWeb menggunakan pernyataan DDL, Anda harus menyetel parameter binlog.level dan binlog.ttl. Berikut adalah contohnya.
begin;
create table test_table(
id int primary key,
title text not null,
body text);
call set_table_property('test_table', 'orientation', 'row');--Buat tabel berorientasi baris bernama test_table.
call set_table_property('test_table', 'clustering_key', 'id');--Buat kunci pengelompokan pada kolom id.
call set_table_property('test_table', 'binlog.level', 'replica');--Aktifkan fitur binlog.
call set_table_property('test_table', 'binlog.ttl', '86400');--Setel waktu hidup (TTL) untuk binlog dalam detik.
commit;
Untuk tabel yang sudah ada
Di HoloWeb, Anda dapat menggunakan pernyataan berikut untuk mengaktifkan Binlog untuk tabel yang sudah ada dan menyetel TTL Binlog. table_name adalah nama tabel yang ingin Anda aktifkan Binlog-nya.
-- Aktifkan fitur binlog.
begin;
call set_table_property('<table_name>', 'binlog.level', 'replica');
commit;
-- Setel waktu hidup (TTL) binlog dalam detik.
begin;
call set_table_property('<table_name>', 'binlog.ttl', '2592000');
commit;
Dengan parameter
Mulai VVR 11, opsi konektor untuk Hologres telah diperbarui. Beberapa parameter mungkin telah diganti namanya atau dihapus. VVR 11 kompatibel mundur dengan VVR 8. Lihat dokumentasi parameter untuk versi VVR Anda.
Pemetaan tipe
Untuk pemetaan tipe data antara Flink dan Hologres, lihat Pemetaan tipe data antara Flink dan Hologres.
Hologres mendukung sintaks GENERATED ALWAYS AS untuk mendefinisikan kolom yang dihasilkan. Contohnya:
ds TIMESTAMP NOT NULL GENERATED ALWAYS AS (date_trunc('month', create_time)) STORED
Kendala NOT NULL pada kolom yang dihasilkan dipetakan ke field nullable, sebagaimana diharapkan. Karena nilai kolom yang dihasilkan dihitung oleh Hologres, Flink tidak menulis nilai untuk field ini. Jika kendala NOT NULL dipertahankan, operasi tulis akan gagal validasi di klien Hologres. Perilaku ini tidak memengaruhi kendala NOT NULL pada kolom biasa (misalnya, ds TIMESTAMP NOT NULL).
Contoh
Contoh tabel source
Tabel source binlog
CDC mode
Dalam mode ini, source mengonsumsi data binlog dan secara otomatis menetapkan tipe Flink RowKind yang sesuai untuk setiap baris berdasarkan hg_binlog_event_type, sehingga tidak perlu deklarasi eksplisit. Jenis ini mencakup INSERT, DELETE, UPDATE_BEFORE, dan UPDATE_AFTER. Hal ini memungkinkan pencermatan data tabel, mirip dengan fungsionalitas Change Data Capture (CDC) pada database seperti MySQL atau PostgreSQL. Berikut adalah contoh DDL untuk tabel source.
VVR 11+
CREATE TEMPORARY TABLE test_message_src_binlog_table(
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username'='${secret_values.ak_id}', --Gunakan variabel untuk Pasangan Kunci Akses Anda untuk mencegah kebocoran kunci.
'password'='${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>',
'source.binlog.change-log-mode'='ALL', --Membaca semua jenis changelog, termasuk INSERT, DELETE, UPDATE_BEFORE, dan UPDATE_AFTER.
'retry-count'='10', --Jumlah percobaan ulang saat terjadi kesalahan pembacaan binlog.
'retry-sleep-step-ms'='5000', --Waktu backoff inkremental antar percobaan ulang. Percobaan ulang pertama menunggu 5 detik, yang kedua menunggu 10 detik, dan seterusnya.
'source.binlog.batch-size'='512' --Ukuran batch untuk membaca data binlog.
);
VVR 8+
CREATE TEMPORARY TABLE test_message_src_binlog_table(
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username' = '${secret_values.ak_id}', --Gunakan variabel untuk Pasangan Kunci Akses Anda untuk mencegah kebocoran kunci.
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>',
'binlog' = 'true',
'cdcMode' = 'true',
'sdkMode'='jdbc',
'binlogMaxRetryTimes' = '10', --Jumlah percobaan ulang saat terjadi kesalahan pembacaan binlog.
'binlogRetryIntervalMs' = '500', --Interval percobaan ulang dalam milidetik setelah kesalahan pembacaan binlog.
'binlogBatchReadSize' = '100' --Ukuran batch untuk membaca data binlog.
);
Mode non-CDC
Dalam mode ini, source meneruskan data binlog yang dikonsumsi ke node downstream sebagai data Flink biasa, memperlakukan semua catatan sebagai tipe INSERT. Anda dapat menangani catatan dengan hg_binlog_event_type tertentu berdasarkan kebutuhan bisnis Anda. Berikut adalah contoh DDL untuk tabel source.
VVR 11+
CREATE TEMPORARY TABLE test_message_src_binlog_table(
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username' = '${secret_values.ak_id}', --Gunakan variabel untuk Pasangan Kunci Akses Anda untuk mencegah kebocoran kunci.
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>',
'source.binlog.change-log-mode'='ALL_AS_APPEND_ONLY', --Semua jenis changelog diperlakukan sebagai operasi INSERT.
'retry-count'='10', --Jumlah percobaan ulang saat terjadi kesalahan pembacaan binlog.
'retry-sleep-step-ms'='5000', --Waktu backoff inkremental antar percobaan ulang. Percobaan ulang pertama menunggu 5 detik, yang kedua menunggu 10 detik, dan seterusnya.
'source.binlog.batch-size'='512' --Ukuran batch untuk membaca data binlog.
);
VVR 8+
CREATE TEMPORARY TABLE test_message_src_binlog_table(
hg_binlog_lsn BIGINT,
hg_binlog_event_type BIGINT,
hg_binlog_timestamp_us BIGINT,
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username' = '${secret_values.ak_id}', --Gunakan variabel untuk Pasangan Kunci Akses Anda untuk mencegah kebocoran kunci.
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>',
'binlog' = 'true',
'binlogMaxRetryTimes' = '10', --Jumlah percobaan ulang saat terjadi kesalahan pembacaan binlog.
'binlogRetryIntervalMs' = '500', --Interval percobaan ulang dalam milidetik setelah kesalahan pembacaan binlog.
'binlogBatchReadSize' = '100' --Ukuran batch untuk membaca data binlog.
);
Tabel source non-binlog
VVR 11+
Mulai VVR 11.1, konektor secara default mengonsumsi data binlog. Untuk membaca dari tabel source non-binlog, Anda harus secara eksplisit menyetel 'source.binlog' ke 'false'. Untuk informasi lebih lanjut, lihat Tabel source binlog.
CREATE TEMPORARY TABLE hologres_source (
name varchar,
age BIGINT,
birthday BIGINT
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username' = '${secret_values.ak_id}', --Gunakan manajemen variabel untuk mencegah kebocoran kunci AK/SK.
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>',
'source.binlog'='false' --Menentukan apakah akan mengonsumsi data binlog.
);
VVR 8+
CREATE TEMPORARY TABLE hologres_source (
name varchar,
age BIGINT,
birthday BIGINT
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username' = '${secret_values.ak_id}', --Gunakan variabel untuk Pasangan Kunci Akses Anda untuk mencegah kebocoran kunci.
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>',
'sdkMode' = 'jdbc'
);
Sink table
CREATE TEMPORARY TABLE datagen_source(
name varchar,
age BIGINT,
birthday BIGINT
) WITH (
'connector'='datagen'
);
CREATE TEMPORARY TABLE hologres_sink (
name varchar,
age BIGINT,
birthday BIGINT
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username' = '${secret_values.ak_id}', -- Gunakan manajemen variabel untuk mencegah kebocoran kunci AK/SK.
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>'
);
INSERT INTO hologres_sink SELECT * from datagen_source;
Contoh tabel dimensi
CREATE TEMPORARY TABLE datagen_source (
a INT,
b BIGINT,
c STRING,
proctime AS PROCTIME()
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE hologres_dim (
a INT,
b VARCHAR,
c VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username' = '${secret_values.ak_id}', -- Gunakan variabel untuk Pasangan Kunci Akses Anda untuk mencegah kebocoran kunci.
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>'
);
CREATE TEMPORARY TABLE blackhole_sink (
a INT,
b STRING
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink SELECT T.a,H.b
FROM datagen_source AS T JOIN hologres_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.a = H.a;
Fitur lanjutan
Ingesti penuh dan inkremental terpadu
Skenario
-
Fitur ini hanya berlaku untuk tabel source yang memiliki kunci primer. Fitur ini direkomendasikan untuk tabel source Hologres yang menggunakan mode CDC.
-
Hologres memungkinkan Anda mengaktifkan Binlog sesuai kebutuhan. Anda dapat mengaktifkan Binlog untuk tabel yang sudah ada dan berisi data.
Contoh kode
VVR 11+
CREATE TABLE test_message_src_binlog_table(
hg_binlog_lsn BIGINT,
hg_binlog_event_type BIGINT,
hg_binlog_timestamp_us BIGINT,
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username'='<yourAccessID>',
'password'='<yourAccessSecret>',
'endpoint'='<yourEndpoint>',
'source.binlog.startup-mode' = 'INITIAL', --Membaca semua data historis, lalu mengonsumsi Binlog secara inkremental.
'retry-count'='10', --Jumlah percobaan ulang jika terjadi kesalahan saat membaca data Binlog.
'retry-sleep-step-ms'='5000', --Waktu tunggu inkremental antar percobaan ulang. Percobaan ulang pertama menunggu 5 detik, yang kedua menunggu 10 detik, dan seterusnya.
'source.binlog.batch-size'='512' --Jumlah baris yang dibaca dari Binlog dalam satu batch.
);
-
Setel
source.binlog.startup-modekeINITIALuntuk melakukan pembacaan penuh awal pada tabel sebelum beralih ke konsumsi inkremental Binlog. -
Jika parameter
startTimedisetel, baik secara langsung maupun dengan memilih waktu mulai di UI startup, parameter tersebut memiliki prioritas lebih tinggi dan menyetelbinlogStartUpModeke modetimestamp, menggantikan pengaturan mode lainnya, karena parameterstartTimememiliki prioritas lebih tinggi.
VVR 8+
CREATE TABLE test_message_src_binlog_table(
hg_binlog_lsn BIGINT,
hg_binlog_event_type BIGINT,
hg_binlog_timestamp_us BIGINT,
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username'='<yourAccessID>',
'password'='<yourAccessSecret>',
'endpoint'='<yourEndpoint>',
'binlog' = 'true',
'cdcMode' = 'true',
'binlogStartUpMode' = 'initial', --Membaca semua data historis, lalu mengonsumsi Binlog secara inkremental.
'binlogMaxRetryTimes' = '10', --Jumlah percobaan ulang jika terjadi kesalahan saat membaca data Binlog.
'binlogRetryIntervalMs' = '500', --Interval percobaan ulang dalam milidetik setelah kesalahan pembacaan Binlog.
'binlogBatchReadSize' = '100' --Jumlah baris yang dibaca dari Binlog dalam satu batch.
);
-
Setel
binlogStartUpModekeinitialuntuk melakukan pembacaan penuh awal pada tabel sebelum beralih ke konsumsi inkremental Binlog. -
Jika parameter
startTimedisetel, baik secara langsung maupun dengan memilih waktu mulai di UI startup, parameter tersebut memiliki prioritas lebih tinggi dan menyetelbinlogStartUpModeke modetimestamp, menggantikan pengaturan mode lainnya, karena parameterstartTimememiliki prioritas lebih tinggi.
Resolusi konflik kunci primer
Saat menulis data ke Hologres, konektor menawarkan tiga strategi untuk menangani catatan dengan kunci primer duplikat.
VVR 11+
Anda dapat menentukan strategi dengan menyetel parameter sink.on-conflict-action.
|
Nilai |
Deskripsi |
|
INSERT_OR_IGNORE |
Menyimpan catatan yang tiba pertama dan mengabaikan duplikat berikutnya. |
|
INSERT_OR_REPLACE |
Menimpa catatan yang ada dengan yang baru. |
|
INSERT_OR_UPDATE (Default) |
Hanya memperbarui kolom yang disediakan dalam tabel sink, meninggalkan kolom lain dalam catatan yang ada tidak berubah. |
VVR 8+
Anda dapat menentukan strategi dengan menyetel parameter mutatetype.
|
Nilai |
Deskripsi |
|
insertorignore (Default) |
Menyimpan catatan yang tiba pertama dan mengabaikan duplikat berikutnya. |
|
insertorreplace |
Menimpa catatan yang ada dengan yang baru. |
|
insertorupdate |
Hanya memperbarui kolom yang disediakan dalam tabel sink, meninggalkan kolom lain dalam catatan yang ada tidak berubah. |
Misalnya, asumsikan sebuah tabel memiliki kolom a, b, c, dan d, dan a adalah kunci primer. Jika tabel sink hanya menyediakan kolom a dan b, menyetel strategi ke INSERT_OR_UPDATE hanya memperbarui kolom b. Kolom c dan d tetap tidak berubah.
Namun, kolom apa pun dalam tabel fisik yang dihilangkan dari tabel sink harus nullable. Jika tidak, operasi tulis akan gagal.
Menulis ke tabel partisi
Secara default, sink Hologres menulis data ke satu tabel non-partisi. Untuk menulis ke tabel partisi dengan menargetkan tabel induknya, Anda harus mengaktifkan opsi berikut.
VVR 11+
Untuk memungkinkan konektor membuat partisi anak secara otomatis jika belum ada, setel sink.create-missing-partition ke true.
-
VVR 11.1 dan versi yang lebih baru mendukung penulisan ke tabel partisi secara default dan secara otomatis mengarahkan data ke partisi anak yang sesuai.
-
Setel parameter
tablenameke nama tabel induk. -
Jika partisi anak yang diperlukan tidak ada dan
sink.create-missing-partition=truetidak disetel, operasi tulis akan gagal.
VVR 8+
-
Untuk mengarahkan data secara otomatis ke partisi anak yang sesuai, setel
partitionRouterketrue. -
Untuk memungkinkan konektor membuat partisi anak secara otomatis jika belum ada, setel
createparttableketrue.
-
Setel parameter
tablenameke nama tabel induk. -
Jika partisi anak yang diperlukan tidak ada dan
createparttable=truetidak disetel, operasi tulis akan gagal.
Gabungkan stream dan pembaruan parsial
Saat Anda menulis beberapa stream data ke satu tabel lebar Hologres, konektor dapat secara otomatis menggabungkan catatan yang memiliki kunci primer yang sama. Anda juga dapat melakukan pembaruan parsial, yang hanya memperbarui kolom yang dimodifikasi alih-alih mengganti seluruh baris. Hal ini meningkatkan kinerja tulis dan konsistensi data.
Batasan
-
Tabel lebar harus memiliki kunci primer.
-
Setiap stream data harus menyertakan semua kolom yang membentuk kunci primer.
-
Untuk tabel lebar yang menggunakan penyimpanan berorientasi kolom, penggabungan stream pada laju permintaan per detik (RPS) yang tinggi dapat menyebabkan penggunaan CPU tinggi. Untuk menguranginya, pertimbangkan untuk menonaktifkan encoding kamus untuk kolom tabel.
Contoh
Asumsikan Anda memiliki dua stream data Flink. Stream pertama berisi kolom a, b, dan c. Stream kedua berisi kolom a, d, dan e. Tabel lebar Hologres, WIDE_TABLE, berisi kolom a, b, c, d, dan e, dengan kolom a sebagai kunci primer.
VVR 11+
// source1 dan source2 sudah didefinisikan.
CREATE TEMPORARY TABLE hologres_sink ( -- Deklarasikan kolom a, b, c, d, dan e.
a BIGINT,
b STRING,
c STRING,
d STRING,
e STRING,
primary key(a) not enforced
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourWideTablename>', -- Tabel lebar Hologres, yang berisi kolom a, b, c, d, dan e.
'username' = '${secret_values.ak_id}',
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>',
'sink.on-conflict-action'='INSERT_OR_UPDATE', -- Perbarui kolom tertentu berdasarkan kunci primer.
'sink.delete-strategy'='IGNORE_DELETE', -- Strategi untuk menangani pesan penarikan. IGNORE_DELETE cocok untuk stream append-only atau upsert yang tidak memerlukan operasi hapus.
'sink.partial-insert.enabled'='true' -- Mengaktifkan pembaruan parsial. Hanya kolom yang ditentukan dalam pernyataan INSERT yang dikirim ke konektor.
);
BEGIN STATEMENT SET;
INSERT INTO hologres_sink(a,b,c) select * from source1; -- Deklarasikan bahwa hanya kolom a, b, dan c yang dimasukkan.
INSERT INTO hologres_sink(a,d,e) select * from source2; -- Deklarasikan bahwa hanya kolom a, d, dan e yang dimasukkan.
END;
VVR 8+
// source1 dan source2 sudah didefinisikan.
CREATE TEMPORARY TABLE hologres_sink ( -- Deklarasikan kolom a, b, c, d, dan e.
a BIGINT,
b STRING,
c STRING,
d STRING,
e STRING,
primary key(a) not enforced
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourWideTablename>', -- Tabel lebar Hologres, yang berisi kolom a, b, c, d, dan e.
'username' = '${secret_values.ak_id}',
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>',
'mutatetype'='insertorupdate', -- Perbarui kolom tertentu berdasarkan kunci primer.
'ignoredelete'='true', -- Abaikan permintaan HAPUS yang dihasilkan oleh pesan penarikan.
'partial-insert.enabled'='true' -- Aktifkan pembaruan parsial untuk hanya memperbarui kolom yang dideklarasikan dalam pernyataan INSERT.
);
BEGIN STATEMENT SET;
INSERT INTO hologres_sink(a,b,c) select * from source1; -- Deklarasikan bahwa hanya kolom a, b, dan c yang dimasukkan.
INSERT INTO hologres_sink(a,d,e) select * from source2; -- Deklarasikan bahwa hanya kolom a, d, dan e yang dimasukkan.
END;
Setel ignoredelete ke true untuk mengabaikan permintaan Hapus yang dihasilkan oleh pesan penarikan. Untuk VVR 8.0.8 dan seterusnya, kami merekomendasikan penggunaan sink.delete-strategy untuk mengonfigurasi berbagai strategi untuk pembaruan parsial.
Konsumsi binlog dari tabel partisi (Beta)
Tabel partisi membantu meningkatkan pengarsipan data dan kinerja kueri. Konektor Hologres mendukung konsumsi binlog dari tabel partisi fisik maupun logis. Untuk perbedaan antara tabel partisi fisik dan logis, lihat BUAT TABEL PARTISI LOGIS.
Konsumsi binlog dari tabel partisi fisik
Konektor Hologres dapat mengonsumsi binlog dari tabel partisi dan secara dinamis memantau partisi baru dalam satu job. Hal ini secara signifikan meningkatkan efisiensi dan kegunaan pemrosesan data waktu nyata.
Catatan penggunaan
-
Fitur ini hanya tersedia untuk tabel sumber binlog dalam mode JDBC. Fitur ini memerlukan VVR 8.0.11 atau lebih baru dan instans Hologres versi 2.1.27 atau lebih baru.
-
Nama partisi harus mengikuti format
{parent_table}_{partition_value}secara ketat. Konektor mungkin tidak mengonsumsi partisi yang tidak mengikuti format ini. Untuk informasi lebih lanjut, lihat partisi dinamis.Penting-
Untuk mode DYNAMIC, VVR versi 8.0.11 tidak mendukung kolom partisi dengan pembatas
-(seperti YYYY-MM-DD). -
Mulai VVR 11.1, Anda dapat mengonsumsi data dari kolom partisi yang menggunakan format kustom.
-
Batasan ini tidak berlaku saat Anda menulis ke tabel partisi.
-
-
Saat mendeklarasikan tabel sumber Hologres di Flink, Anda harus menyertakan kolom partisinya.
-
Dalam mode DYNAMIC, tabel partisi harus memiliki partisi dinamis diaktifkan. Selain itu, parameter pra-pembuatan partisi
auto_partitioning.num_precreateharus lebih besar dari 1. Jika tidak, job akan melemparkan pengecualian saat mencoba mengonsumsi partisi terbaru. -
Dalam mode DYNAMIC, setelah partisi baru ditambahkan, konektor tidak lagi mengonsumsi perubahan data berikutnya dari partisi lama.
Contoh
|
Mode |
Fitur |
Deskripsi |
|
DYNAMIC |
konsumsi partisi dinamis |
Secara otomatis memantau dan mengonsumsi partisi baru secara kronologis. Mode ini cocok untuk skenario streaming data waktu nyata. |
|
STATIC |
konsumsi partisi statis |
Hanya mengonsumsi partisi yang sudah ada (atau yang ditentukan secara manual) dan tidak secara otomatis menemukan partisi baru. Mode ini cocok untuk memproses data historis dalam rentang tetap. |
Mode Dinamis
VVR 11+
Asumsikan tabel partisi Hologres dibuat dengan DDL berikut, dan binlog serta partisi dinamis diaktifkan.
CREATE TABLE "test_message_src1" (
id int,
title text,
body text,
dt text,
PRIMARY KEY (id, dt)
)
PARTITION BY LIST (dt) WITH (
binlog_level = 'replica',
auto_partitioning_enable = 'true', -- Mengaktifkan partisi dinamis.
auto_partitioning_time_unit = 'DAY', -- Partisi dibuat setiap hari. Contoh nama partisi: test_message_src1_20250512, test_message_src1_20250513.
auto_partitioning_num_precreate = '2' -- Membuat dua partisi sebelumnya.
);
-- Untuk tabel partisi yang sudah ada, Anda juga dapat mengaktifkan partisi dinamis menggunakan ALTER TABLE.
Di Flink, gunakan pernyataan SQL berikut untuk mengonsumsi tabel partisi test_message_src1 dalam mode DYNAMIC.
CREATE TEMPORARY TABLE hologres_source
(
id INTEGER,
title VARCHAR,
body VARCHAR,
dt VARCHAR -- Kolom partisi dari tabel partisi Hologres.
)
with (
'connector' = 'hologres',
'dbname' = '<yourDatabase>',
'tablename' = 'test_message_src1', -- Tabel induk yang diaktifkan partisi dinamisnya.
'username' = '<yourUserName>',
'password' = '<yourPassword>',
'endpoint' = '<yourEndpoint>',
'source.binlog.partition-binlog-mode' = 'DYNAMIC', -- Memantau partisi terbaru secara dinamis.
'source.binlog.startup-mode' = 'initial' -- Mengonsumsi semua data yang ada, lalu mengonsumsi data inkremental dari binlog.
);
VVR 8.0.11
Asumsikan tabel partisi Hologres dibuat dengan DDL berikut, dan binlog serta partisi dinamis diaktifkan.
CREATE TABLE "test_message_src1" (
id int,
title text,
body text,
dt text,
PRIMARY KEY (id, dt)
)
PARTITION BY LIST (dt) WITH (
binlog_level = 'replica',
auto_partitioning_enable = 'true', -- Mengaktifkan partisi dinamis.
auto_partitioning_time_unit = 'DAY', -- Partisi dibuat setiap hari. Contoh nama partisi: test_message_src1_20241027, test_message_src1_20241028.
auto_partitioning_num_precreate = '2' -- Membuat dua partisi sebelumnya.
);
-- Untuk tabel partisi yang sudah ada, Anda juga dapat mengaktifkan partisi dinamis menggunakan ALTER TABLE.
Di Flink, gunakan pernyataan SQL berikut untuk mengonsumsi data dari tabel partisi test_message_src1 dalam mode DYNAMIC.
CREATE TEMPORARY TABLE hologres_source
(
id INTEGER,
title VARCHAR,
body VARCHAR,
dt VARCHAR -- Kolom partisi dari tabel partisi Hologres.
)
with (
'connector' = 'hologres',
'dbname' = '<yourDatabase>',
'tablename' = 'test_message_src1', -- Tabel induk yang diaktifkan partisi dinamisnya.
'username' = '<yourUserName>',
'password' = '<yourPassword>',
'endpoint' = '<yourEndpoint>',
'binlog' = 'true',
'partition-binlog.mode' = 'DYNAMIC', -- Memantau partisi terbaru secara dinamis.
'binlogstartUpMode' = 'initial', -- Mengonsumsi semua data yang ada, lalu mengonsumsi data inkremental dari binlog.
'sdkMode' = 'jdbc_fixed' -- Gunakan mode ini untuk menghindari masalah batas koneksi.
);
Mode Statis
VVR 11+
Asumsikan tabel partisi Hologres dibuat dengan DDL berikut, dan binlog diaktifkan.
CREATE TABLE test_message_src2 (
id int,
title text,
body text,
color text,
PRIMARY KEY (id, color)
)
PARTITION BY LIST (color) WITH (
binlog_level = 'replica'
);
create table test_message_src2_red partition of test_message_src2 for values in ('red');
create table test_message_src2_blue partition of test_message_src2 for values in ('blue');
create table test_message_src2_green partition of test_message_src2 for values in ('green');
create table test_message_src2_black partition of test_message_src2 for values in ('black');
Di Flink, gunakan pernyataan SQL berikut untuk mengonsumsi tabel partisi test_message_src2 dalam mode STATIC.
CREATE TEMPORARY TABLE hologres_source
(
id INTEGER,
title VARCHAR,
body VARCHAR,
color VARCHAR -- Kolom partisi dari tabel partisi Hologres.
)
with (
'connector' = 'hologres',
'dbname' = '<yourDatabase>',
'tablename' = 'test_message_src2', -- Tabel partisi.
'username' = '<yourUserName>',
'password' = '<yourPassword>',
'endpoint' = '<yourEndpoint>',
'source.binlog.partition-binlog-mode' = 'STATIC', -- Mengonsumsi set partisi tetap.
'source.binlog.partition-values-to-read' = 'red,blue,green', -- Hanya mengonsumsi tiga partisi yang ditentukan. Partisi 'black' tidak dikonsumsi. Partisi baru juga tidak dikonsumsi. Jika opsi ini tidak disetel, job mengonsumsi semua partisi dari tabel induk.
'source.binlog.startup-mode' = 'initial' -- Mengonsumsi semua data yang ada, lalu mengonsumsi data inkremental dari binlog.
);
VVR 8.0.11
Asumsikan tabel partisi Hologres dibuat dengan DDL berikut, dan binlog diaktifkan.
CREATE TABLE test_message_src2 (
id int,
title text,
body text,
color text,
PRIMARY KEY (id, color)
)
PARTITION BY LIST (color) WITH (
binlog_level = 'replica'
);
create table test_message_src2_red partition of test_message_src2 for values in ('red');
create table test_message_src2_blue partition of test_message_src2 for values in ('blue');
create table test_message_src2_green partition of test_message_src2 for values in ('green');
create table test_message_src2_black partition of test_message_src2 for values in ('black');
Di Flink, gunakan pernyataan SQL berikut untuk mengonsumsi data dari tabel partisi test_message_src2 dalam mode STATIC.
CREATE TEMPORARY TABLE hologres_source
(
id INTEGER,
title VARCHAR,
body VARCHAR,
color VARCHAR -- Kolom partisi dari tabel partisi Hologres.
)
with (
'connector' = 'hologres',
'dbname' = '<yourDatabase>',
'tablename' = 'test_message_src2', -- Tabel partisi.
'username' = '<yourUserName>',
'password' = '<yourPassword>',
'endpoint' = '<yourEndpoint>',
'binlog' = 'true',
'partition-binlog.mode' = 'STATIC', -- Mengonsumsi set partisi tetap.
'partition-values-to-read' = 'red,blue,green', -- Hanya mengonsumsi tiga partisi yang ditentukan. Partisi 'black' tidak dikonsumsi. Partisi baru juga tidak dikonsumsi. Jika opsi ini tidak disetel, job mengonsumsi semua partisi dari tabel induk.
'binlogstartUpMode' = 'initial', -- Mengonsumsi semua data yang ada, lalu mengonsumsi data inkremental dari binlog.
'sdkMode' = 'jdbc_fixed' -- Gunakan mode ini untuk menghindari masalah batas koneksi.
);
Konsumsi binlog dari tabel partisi logis
Konektor Hologres mendukung konsumsi binlog dari tabel partisi logis dan memungkinkan Anda menentukan partisi mana yang akan dikonsumsi dengan opsi.
Catatan penggunaan
-
Mengonsumsi binlog dari partisi tertentu tabel partisi logis memerlukan VVR 11.0.0 atau lebih baru dan instans Hologres versi V3.1 atau lebih baru.
-
Mengonsumsi binlog dari semua partisi tabel partisi logis sama dengan mengonsumsi binlog dari tabel Hologres non-partisi. Untuk informasi lebih lanjut, lihat Tabel source.
Contoh
|
Parameter |
Deskripsi |
Contoh |
|
source.binlog.logical-partition-filter-column-names |
Nama kolom partisi yang menentukan partisi mana yang akan dikonsumsi. Tutup nama kolom dengan tanda kutip ganda ("). Pisahkan beberapa nama kolom dengan koma (,). Jika nama kolom berisi tanda kutip ganda, escape dengan tanda kutip ganda tambahan. |
'source.binlog.logical-partition-filter-column-names'='"Pt","id"' Dua kolom partisi digunakan: Pt dan id. |
|
source.binlog.logical-partition-filter-column-values |
Nilai partisi yang menentukan partisi mana yang akan dikonsumsi. Partisi ditentukan oleh satu set nilai, satu untuk setiap kolom partisi. Tutup setiap nilai dengan tanda kutip ganda ("). Pisahkan nilai untuk partisi yang sama dengan koma (,). Pisahkan partisi dengan titik koma (;). Jika nilai berisi tanda kutip ganda, escape dengan tanda kutip ganda tambahan. |
'source.binlog.logical-partition-filter-column-values'='"20240910","0";"special""value","9"' Ini menentukan dua partisi untuk dikonsumsi. Nilai partisi pertama adalah (20240910, 0), dan yang kedua adalah (special"value, 9). |
Asumsikan Anda telah membuat tabel berikut di Hologres.
CREATE TABLE holo_table (
id int not null,
name text,
age numeric(18,4),
"Pt" text,
primary key(id, "Pt")
)
LOGICAL PARTITION BY LIST ("Pt", id)
WITH (
binlog_level ='replica'
);
Untuk mengonsumsi binlog tabel ini di Flink:
CREATE TEMPORARY TABLE test_src_binlog_table(
id INTEGER,
name VARCHAR,
age decimal(18,4),
`Pt` VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='holo_table',
'username'='<yourAccessID>',
'password'='<yourAccessSecret>',
'endpoint'='<yourEndpoint>',
'source.binlog'='true',
'source.binlog.logical-partition-filter-column-names'='"Pt","id"',
'source.binlog.logical-partition-filter-column-values'='<yourPartitionColumnValues>',
'source.binlog.change-log-mode'='ALL', --Membaca semua jenis changelog, termasuk INSERT, DELETE, UPDATE_BEFORE, dan UPDATE_AFTER.
'retry-count'='10', -- Jumlah percobaan ulang untuk kesalahan pembacaan binlog.
'retry-sleep-step-ms'='5000', --Waktu backoff inkremental antar percobaan ulang. Percobaan ulang pertama menunggu 5 detik, yang kedua menunggu 10 detik, dan seterusnya.
'source.binlog.batch-size'='512' --Jumlah baris yang dibaca dari binlog dalam satu batch.
);
API DataStream
Untuk membaca dari atau menulis ke Hologres menggunakan API DataStream, gunakan konektor DataStream yang sesuai untuk terhubung ke Realtime Compute for Apache Flink. Untuk informasi tentang cara menyiapkan konektor DataStream, lihat Cara menggunakan konektor DataStream. Konektor DataStream Hologres tersedia di Maven Central. Untuk debugging lokal, gunakan Uber JAR yang sesuai. Untuk informasi lebih lanjut, lihat Jalankan dan debug job yang berisi konektor secara lokal.
Tabel source Hologres
Tabel source binlog
VVR menyediakan kelas HologresBinlogSource untuk membaca data binlog Hologres. Contoh berikut menunjukkan cara membangun source binlog Hologres.
VVR 11.3+
Mulai VVR 11.1.2, parameter JDBCOptions dan startTimeMs telah dihapus dari konstruktor HologresBinlogSource. Mulai VVR 11.3, parameter List<Subscribe.BinlogFilter> telah ditambahkan. Jika Anda menggunakan VVR 11 atau lebih baru, kami merekomendasikan penggunaan VVR 11.3 atau lebih baru.
public class Sample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Inisialisasi skema untuk tabel yang akan dibaca. Skema harus sesuai dengan field skema tabel Hologres. Anda dapat mendefinisikan subset field.
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.field("b", DataTypes.STRING())
.field("c", DataTypes.TIMESTAMP())
.build();
// Nama tabel yang akan dibaca.
String sourceTableName = "sourceTableName";
// Parameter untuk koneksi Hologres.
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, sourceTableName);
config.set(HologresConfigs.BINLOG, true);
config.set(HologresConfigs.BINLOG_CHANGE_LOG_MODE, BinlogChangeLogMode.ALL);
// Bangun source binlog Hologres.
HologresBinlogSource source = new HologresBinlogSource(
new HologresConnectionParam(config),
schema,
config,
StartupMode.INITIAL,
sourceTableName,
"",
Collections.emptyList(),
-1,
Collections.emptySet(),
Collections.emptyList()
);
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
env.execute();
}
}
VVR 8.0.11+
public class Sample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Inisialisasi skema untuk tabel yang akan dibaca. Skema harus sesuai dengan field skema tabel Hologres. Anda dapat mendefinisikan subset field.
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.field("b", DataTypes.STRING())
.field("c", DataTypes.TIMESTAMP())
.build();
// Parameter untuk koneksi Hologres.
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setString(HologresConfigs.SDK_MODE, "jdbc");
config.setBoolean(HologresBinlogConfigs.OPTIONAL_BINLOG, true);
config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
// Bangun JDBCOptions.
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
// Bangun source binlog Hologres.
long startTimeMs = 0;
HologresBinlogSource source = new HologresBinlogSource(
new HologresConnectionParam(config),
schema,
config,
jdbcOptions,
startTimeMs,
StartupMode.INITIAL,
"",
"",
-1,
Collections.emptySet(),
new ArrayList<>()
);
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
env.execute();
}
}
VVR 8.0.7+
public class Sample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Inisialisasi skema untuk tabel yang akan dibaca. Skema harus sesuai dengan field skema tabel Hologres. Anda dapat mendefinisikan subset field.
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.field("b", DataTypes.STRING())
.field("c", DataTypes.TIMESTAMP())
.build();
// Parameter untuk koneksi Hologres.
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setString(HologresConfigs.SDK_MODE, "jdbc");
config.setBoolean(HologresBinlogConfigs.OPTIONAL_BINLOG, true);
config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
// Bangun JDBCOptions.
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
// Bangun source binlog Hologres.
long startTimeMs = 0;
HologresBinlogSource source = new HologresBinlogSource(
new HologresConnectionParam(config),
schema,
config,
jdbcOptions,
startTimeMs,
StartupMode.INITIAL,
"",
"",
-1,
Collections.emptySet()
);
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
env.execute();
}
}
VVR 6.0.7+
public class Sample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Inisialisasi skema untuk tabel yang akan dibaca. Skema harus sesuai dengan field skema tabel Hologres. Anda dapat mendefinisikan subset field.
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.build();
// Parameter untuk koneksi Hologres.
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setString(HologresConfigs.SDK_MODE, "jdbc");
config.setBoolean(HologresBinlogConfigs.OPTIONAL_BINLOG, true);
config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
// Bangun JDBCOptions.
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
// Setel atau buat nama slot default.
config.setString(HologresBinlogConfigs.JDBC_BINLOG_SLOT_NAME, HoloBinlogUtil.getOrCreateDefaultSlotForJDBCBinlog(jdbcOptions));
boolean cdcMode = config.get(HologresBinlogConfigs.BINLOG_CDC_MODE) && config.get(HologresBinlogConfigs.OPTIONAL_BINLOG);
// Bangun JDBCBinlogRecordConverter.
JDBCBinlogRecordConverter recordConverter = new JDBCBinlogRecordConverter(
jdbcOptions.getTable(),
schema,
new HologresConnectionParam(config),
cdcMode,
Collections.emptySet());
// Bangun source binlog Hologres JDBC.
long startTimeMs = 0;
HologresJDBCBinlogSource source = new HologresJDBCBinlogSource(
new HologresConnectionParam(config),
schema,
config,
jdbcOptions,
startTimeMs,
StartupMode.TIMESTAMP,
recordConverter,
"",
-1);
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
env.execute();
}
}
Jika Anda menggunakan mesin Realtime Compute for Apache Flink versi lebih lama dari 8.0.5 atau versi Hologres lebih lama dari 2.1, pastikan pengguna adalah superuser atau memiliki izin Replication Role. Untuk informasi lebih lanjut, lihat Masalah izin Hologres.
Tabel source non-binlog
VVR menyediakan kelas HologresBulkreadInputFormat, implementasi RichInputFormat, untuk membaca data dari tabel Hologres. Contoh berikut menunjukkan cara membangun source Hologres.
public class Sample {
public static void main(String[] args) throws Exception {
// atur API Java DataStream
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Inisialisasi skema untuk tabel yang akan dibaca. Skema harus sesuai dengan field skema tabel Hologres. Anda dapat mendefinisikan subset field.
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.field("b", DataTypes.STRING())
.field("c", DataTypes.TIMESTAMP())
.build();
// Parameter untuk koneksi Hologres.
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
// Bangun JDBCOptions.
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
HologresBulkreadInputFormat inputFormat = new HologresBulkreadInputFormat(
new HologresConnectionParam(config),
jdbcOptions,
schema,
"",
-1);
TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
env.addSource(new InputFormatSourceFunction<>(inputFormat, typeInfo)).returns(typeInfo).print();
env.execute();
}
}
Dependensi Maven
Konektor DataStream Hologres tersedia di Maven Central.
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-hologres</artifactId>
<version>${vvr-version}</version>
</dependency>
Tabel sink Hologres
VVR 11+
public class Sample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Inisialisasi skema untuk tabel yang akan ditulis. Skema harus sesuai dengan field skema tabel Hologres. Anda dapat mendefinisikan subset field.
TableSchema tableSchema = TableSchema.builder()
.field("a", DataTypes.INT().notNull())
.field("b", DataTypes.STRING())
.primaryKey("a")
.build();
// Parameter untuk koneksi Hologres.
Configuration config = new Configuration();
config.set(HologresConfigs.ENDPOINT, "yourEndpoint");
config.set(HologresConfigs.USERNAME, "yourUserName");
config.set(HologresConfigs.PASSWORD, "yourPassword");
config.set(HologresConfigs.DATABASE, "yourDatabaseName");
config.set(HologresConfigs.TABLE, "yourTableName");
HologresConnectionParam connectionParam = new HologresConnectionParam(config);
HologresTableSchema hologresTableSchema =
HologresTableSchema.get(connectionParam.getJDBCOptions());
// Indeks kolom yang akan ditulis ke sink.
Integer[] targetColumnIndexes = {0, 1};
// Bangun sink Hologres.
HologresSinkFunction sinkFunction =
new HologresSinkFunction(
connectionParam, tableSchema, targetColumnIndexes, hologresTableSchema);
TypeInformation<RowData> typeInfo = InternalTypeInfo.of(tableSchema.toRowDataType().getLogicalType());
env.fromElements((RowData) GenericRowData.of(101, StringData.fromString("name"))).returns(typeInfo).addSink(sinkFunction);
env.execute();
}
}
VVR 8+
public class Sample {
public static void main(String[] args) throws Exception {
// atur API Java DataStream
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Inisialisasi skema untuk tabel yang akan ditulis. Skema harus sesuai dengan field skema tabel Hologres. Anda dapat mendefinisikan subset field.
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.field("b", DataTypes.STRING())
.build();
// Parameter untuk koneksi Hologres.
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setString(HologresConfigs.SDK_MODE, "jdbc");
HologresConnectionParam hologresConnectionParam = new HologresConnectionParam(config);
// Bangun writer Hologres untuk menulis data sebagai RowData.
AbstractHologresWriter<RowData> hologresWriter = HologresJDBCWriter.createRowDataWriter(
hologresConnectionParam,
schema,
HologresTableSchema.get(hologresConnectionParam),
new Integer[0]);
// Bangun sink Hologres.
HologresSinkFunction sinkFunction = new HologresSinkFunction(hologresConnectionParam, hologresWriter);
TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
env.fromElements((RowData) GenericRowData.of(101, StringData.fromString("name"))).returns(typeInfo).addSink(sinkFunction);
env.execute();
}
}
Kolom metadata
Di Realtime Compute for Apache Flink VVR 8.0.11 dan seterusnya, tabel source binlog mendukung kolom metadata. Mulai versi ini, kami merekomendasikan Anda mendeklarasikan field binlog seperti hg_binlog_event_type sebagai kolom metadata. Kolom metadata adalah ekstensi terhadap standar SQL. Kolom ini memungkinkan Anda mengakses informasi spesifik, seperti nama database sumber, nama tabel, jenis perubahan, dan timestamp event. Anda dapat menggunakan metadata ini untuk mengimplementasikan logika kustom, misalnya, untuk memfilter event DELETE.
|
Parameter |
Tipe |
Deskripsi |
|
db_name |
STRING NOT NULL |
Nama database yang berisi baris tersebut. |
|
table_name |
STRING NOT NULL |
Nama tabel yang berisi baris tersebut. |
|
hg_binlog_lsn |
BIGINT NOT NULL |
Field sistem untuk nomor urutan binlog. Nilainya meningkat secara monoton tetapi tidak kontinu dalam satu shard. Keunikan dan urutan tidak dijamin di seluruh shard. |
|
hg_binlog_timestamp_us |
BIGINT NOT NULL |
Timestamp event perubahan di database, dalam mikrodetik (us). |
|
hg_binlog_event_type |
BIGINT NOT NULL |
Jenis perubahan baris. Nilai yang valid adalah:
|
|
hg_shard_id |
INT NOT NULL |
ID shard data yang berisi baris tersebut. Untuk informasi lebih lanjut tentang kelompok tabel dan shard, lihat kelompok tabel dan shard. |
Dalam pernyataan DDL, Anda dapat mendeklarasikan kolom metadata dengan menggunakan <meta_column_name> <datatype> METADATA VIRTUAL. Berikut adalah contohnya:
CREATE TABLE test_message_src_binlog_table(
hg_binlog_lsn bigint METADATA VIRTUAL
hg_binlog_event_type bigint METADATA VIRTUAL
hg_binlog_timestamp_us bigint METADATA VIRTUAL
hg_shard_id int METADATA VIRTUAL
db_name string METADATA VIRTUAL
table_name string METADATA VIRTUAL
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
'connector'='hologres',
...
);
FAQ
Referensi
-
Untuk mempelajari cara membuat dan menggunakan katalog Hologres, lihat Kelola katalog Hologres.
-
Hologres terintegrasi dengan Flink untuk menyediakan solusi terpadu dalam membangun gudang data waktu nyata. Untuk detailnya, lihat Bangun gudang data waktu nyata dengan Hologres.
-
Hologres mendukung pembaruan dan koreksi data yang efisien, sehingga cocok untuk membangun tabel lebar dalam skenario penulisan multi-stream. Untuk contohnya, lihat Analisis perilaku pengguna dengan Flink, MongoDB, dan Hologres.