Topik ini menjelaskan konektor Hologres.
Informasi latar belakang
Hologres adalah mesin terpadu untuk gudang data real-time yang mendukung penulisan, pembaruan, dan analisis dataset dalam jumlah besar secara real-time. Hologres menggunakan sintaks SQL standar dan kompatibel dengan protokol PostgreSQL, serta mendukung OLAP dan analisis ad hoc berskala petabyte. Layanan ini menyediakan akses data online berlatensi rendah dengan konkurensi tinggi dan terintegrasi secara mendalam dengan MaxCompute, Realtime Compute for Apache Flink, dan DataWorks untuk menghadirkan solusi gudang data offline dan real-time end-to-end. Tabel berikut merangkum kemampuan konektor Hologres.
Category | Details |
Jenis yang didukung | Tabel sumber, tabel dimensi, dan tabel sink |
Mode eksekusi | Mode stream dan mode batch |
Format data | Tidak didukung |
Metrik pemantauan | |
Jenis API | DataStream dan SQL |
Mendukung pembaruan atau penghapusan pada tabel sink | Ya |
Fitur
Fitur | Detail |
Anda dapat membaca data Hologres dengan atau tanpa binary logging (binlog). Fitur ini kompatibel dengan mode change data capture (CDC) maupun non-CDC. | |
Anda dapat melakukan konsumsi penuh, inkremental, atau penuh dan inkremental terpadu. | |
Anda dapat mengabaikan data baru, mengganti seluruh baris, atau hanya memperbarui field tertentu. | |
Gabung dan perbaruan parsial untuk tabel lebar dengan penulisan multi-stream | Anda dapat memperbarui hanya kolom yang dimodifikasi, bukan seluruh baris. |
Anda dapat mengonsumsi binlog dari tabel partisi fisik. Satu pekerjaan dapat memantau semua partisi, termasuk partisi yang baru ditambahkan. Anda juga dapat mengonsumsi binlog dari tabel partisi logis. | |
Anda dapat menulis data ke tabel induk dari tabel partisi dan membuat otomatis partisi anak yang sesuai. | |
Sinkronisasi real-time untuk satu tabel atau seluruh database | Anda dapat melakukan sinkronisasi data real-time pada tingkat satu tabel atau seluruh database. Fitur ini menyediakan kemampuan berikut:
Untuk informasi selengkapnya, lihat Pernyataan CREATE TABLE AS (CTAS) dan Panduan Mulai Cepat untuk sinkronisasi database real-time. |
Batasan dan saran
Batasan
Tabel eksternal tidak didukung: Konektor Hologres tidak mendukung akses ke tabel eksternal Hologres, seperti tabel eksternal MaxCompute.
Batasan tipe waktu: Konsumsi real-time data TIMESTAMP tidak didukung. Gunakan tipe TIMESTAMPTZ saat membuat tabel.
Pemindaian tabel sumber (VVR 8 dan sebelumnya): Data dibaca dari Hologres dalam mode batch secara default. Artinya, data inkremental tidak dikonsumsi.
Batasan Watermark (VVR 8 dan sebelumnya): Mode CDC tidak mendukung definisi watermark. Untuk melakukan agregasi berbasis jendela, gunakan solusi agregasi non-jendela.
Saran
Pemilihan Mode Penyimpanan
Tabel dimensi untuk pencarian titik: Kami merekomendasikan menggunakan penyimpanan berorientasi baris. Ini memerlukan pengaturan kunci primer dan kunci pengelompokan.
Tabel dimensi untuk kueri satu-ke-banyak: Kami merekomendasikan menggunakan penyimpanan berorientasi kolom dan mengonfigurasi kunci distribusi dan kunci segmen untuk mengoptimalkan performa.
Tabel yang sering diperbarui untuk kueri analitis: Kami merekomendasikan menggunakan penyimpanan hibrida baris-kolom jika tabel tersebut harus mendukung konsumsi binlog real-time dan analisis OLAP.
PentingJika Anda tidak secara eksplisit menentukan format penyimpanan saat membuat tabel Hologres, format tersebut akan menggunakan penyimpanan kolom secara default. Format penyimpanan ini bersifat tidak dapat diubah dan tidak dapat diubah setelah pembuatan. Untuk informasi selengkapnya, lihat Buat tabel di Hologres dan Format penyimpanan tabel: kolom, berorientasi baris, dan hibrida baris-kolom.
Paralelisme pekerjaan: Anda dapat mengatur paralelisme pekerjaan Flink agar sesuai dengan jumlah shard di tabel Hologres.
# Jalankan perintah ini di HoloWeb untuk melihat jumlah shard dalam sebuah tabel. select tg.property_value from hologres.hg_table_properties tb join hologres.hg_table_group_properties tg on tb.property_value = tg.tablegroup_name where tb.property_key = 'table_group' and tg.property_key = 'shard_count' and table_name = '<tablename>';Versi dan fitur: Anda dapat memeriksa secara berkala Catatan Rilis Konektor Hologres untuk mengetahui isu yang diketahui, pembaruan fitur, dan informasi kompatibilitas versi.
Catatan Penting
Kompatibilitas dan batasan versi Hologres dan VVR
Tabel sumber
VVR 8 dan sebelumnya: Atur mode konsumsi menggunakan
sdkMode.VVR 11+: Atur mode konsumsi menggunakan
source.binlog.read-mode.
Versi VVR
Versi Hologres
Nilai parameter default/direkomendasikan
Mode konsumsi aktual
Catatan
≥ 6.0.7
< 2.0
Custom
holohub (default)
Kami merekomendasikan mengonfigurasi JDBC.
6.0.7–8.0.4
≥ 2.0
jdbc (alih otomatis)
jdbc (dipaksa)
Hologres 2.0 dan versi selanjutnya telah menghentikan HoloHub. Jika holohub diatur, konektor secara otomatis fallback ke jdbc, yang mungkin memerlukan izin tambahan pengguna. Untuk konfigurasi izin, lihat Masalah izin.
≥ 8.0.5
≥ 2.1
jdbc (alih otomatis)
jdbc (dipaksa)
Tidak ada masalah izin. Untuk Hologres 2.1.27 dan versi selanjutnya, konektor beralih ke jdbc_fixed.
≥ 11.1
Versi apa pun
AUTO (default)
Dipilih secara otomatis berdasarkan versi Hologres
Untuk Hologres 2.1.27 dan versi selanjutnya, konektor memilih jdbc dan mengaktifkan koneksi ringan secara default (connection.fixed.enabled diatur ke true).
Untuk versi 2.1.0–2.1.26, Anda dapat memilih mode JDBC.
Untuk Hologres 2.0 dan versi sebelumnya, konektor memilih holohub.
PentingPada VVR 11.1 dan versi selanjutnya, konektor mengaktifkan konsumsi binlog secara default. Pastikan Anda telah mengaktifkan binlog untuk menghindari error.
Tabel sink
Pada versi VVR 8 dan sebelumnya, mode konsumsi dipilih dengan menentukan parameter
sdkMode.VVR 11+: Atur mode penulisan data menggunakan
sink.write-mode.
Versi VVR
Versi Hologres
Apakah RPC terpengaruh?
Mode konsumsi RPC aktual
Nilai parameter direkomendasikan/default
Catatan
6.0.4–8.0.2
< 2.0
Tidak
rpc
Custom
/
6.0.4–8.0.2
≥ 2.0
Ya
jdbc_fixed (alih otomatis)
Custom
Untuk mencegah deduplikasi, atur 'jdbcWriteBatchSize'='1'.
≥ 8.0.3
Versi apa pun
Ya
jdbc_fixed (alih otomatis)
Custom
Jika Anda mengatur rpc, konektor secara otomatis beralih ke jdbc_fixed dan mengatur 'jdbcWriteBatchSize'='1' untuk mencegah deduplikasi.
≥ 8.0.5
Versi apa pun
Ya
jdbc_fixed (alih otomatis)
Custom
Jika Anda mengatur rpc, konektor secara otomatis beralih ke jdbc_fixed dan mengatur 'deduplication.enabled'='false' untuk mencegah deduplikasi.
PentingRPC telah dihentikan di Hologres 2.0 dan versi selanjutnya. Jika Anda mengatur mode penulisan data ke rpc, konektor secara otomatis beralih ke jdbc_fixed. Namun, jika Anda mengatur nilai lain, konektor akan menggunakan nilai yang Anda tentukan.
VVR 11.1 dan versi selanjutnya tidak lagi mendukung RPC. Kami merekomendasikan menggunakan JDBC untuk koneksi.
Untuk skenario penulisan konkurensi tinggi, gunakan
jdbc_copyatauCOPY_STREAM.
Tabel dimensi
Versi VVR
Versi Hologres
Apakah RPC terpengaruh?
Mode konsumsi RPC aktual
Nilai parameter direkomendasikan/default
Catatan
6.0.4–8.0.2
< 2.0
Tidak
rpc
Custom
/
6.0.4–8.0.2
≥ 2.0
Ya
jdbc_fixed (alih otomatis)
Custom
Jika instans Hologres Anda versi 2.0 atau lebih baru, konektor secara otomatis beralih ke jdbc_fixed karena RPC telah dihentikan. Namun, jika Anda mengatur nilai lain, konektor akan menggunakan nilai yang Anda tentukan.
≥ 8.0.3
Versi apa pun
Ya
jdbc_fixed (alih otomatis)
Custom
≥ 8.0.5
Versi apa pun
Ya
jdbc_fixed (alih otomatis)
Custom
PentingVVR 11.1 dan versi selanjutnya tidak lagi mendukung RPC dan menggunakan JDBC secara default. Untuk mengaktifkan koneksi ringan, atur
connection.fixed.enabled.Tabel sumber binlog dalam mode JDBC mendukung pembacaan data JSONB, tetapi Anda harus mengaktifkan parameter GUC di tingkat database.
-- Aktifkan parameter GUC di tingkat database. Hanya superuser yang dapat menjalankan perintah ini. Setiap database hanya memerlukan pengaturan ini sekali. alter database <db_name> set hg_experimental_enable_binlog_jsonb = on;Operasi UPDATE menghasilkan dua catatan binlog berturut-turut. Data lama (update_before) muncul terlebih dahulu, diikuti oleh data baru (update_after).
Hindari memotong atau membuat ulang tabel sumber yang telah diaktifkan binlog-nya. Untuk informasi selengkapnya, lihat FAQ.
Untuk menghindari error, pertahankan presisi
DECIMALyang konsisten antara Flink dan Hologres. Untuk informasi selengkapnya, lihat FAQ.Saat menggunakan mode INITIAL untuk ingesti penuh dan inkremental terpadu, pengurutan global tidak dijamin. Jika aplikasi downstream Anda bergantung pada perhitungan berbasis timestamp, gunakan mode pembacaan binlog murni sebagai gantinya.
Aktifkan binlog
Tabel Belum Dibuat
Konsumsi data real-time dinonaktifkan secara default. Saat membuat tabel Hologres di HoloWeb, atur parameter binlog.level dan binlog.ttl. Contoh kode:
begin;
create table test_table(
id int primary key,
title text not null,
body text);
call set_table_property('test_table', 'orientation', 'row');-- Buat tabel berorientasi baris bernama test_table.
call set_table_property('test_table', 'clustering_key', 'id');-- Buat kunci pengelompokan pada kolom id.
call set_table_property('test_table', 'binlog.level', 'replica');-- Aktifkan binlog.
call set_table_property('test_table', 'binlog.ttl', '86400');-- Atur TTL binlog, dalam detik.
commit;Tabel yang sudah ada
Di HoloWeb, gunakan pernyataan berikut untuk mengaktifkan binlog dan mengatur TTL binlog untuk tabel yang sudah ada. Ganti table_name dengan nama tabel aktual Anda.
-- Aktifkan binlog.
begin;
call set_table_property('<table_name>', 'binlog.level', 'replica');
commit;
-- Atur TTL binlog, dalam detik.
begin;
call set_table_property('<table_name>', 'binlog.ttl', '2592000');
commit;Dengan Parameter
Mulai VVR 11, opsi konektor untuk Hologres telah diperbarui untuk meningkatkan dukungan. Beberapa opsi mungkin telah diganti namanya atau dihapus. VVR 11 tetap kompatibel mundur dengan VVR 8. Konsultasikan dokumentasi parameter yang spesifik untuk versi VVR Anda untuk detailnya.
Pemetaan tipe
Lihat Pemetaan tipe data antara Flink dan Hologres.
Hologres mendukung pendefinisian kolom yang dihasilkan menggunakan sintaks GENERATED ALWAYS AS. Contoh:
ds TIMESTAMP NOT NULL GENERATED ALWAYS AS (date_trunc('month', create_time)) STOREDKendala NOT NULL pada kolom yang dihasilkan dipetakan ke field nullable. Ini adalah perilaku yang diharapkan: Hologres menghitung kolom yang dihasilkan secara otomatis, sehingga Flink tidak meneruskan nilai untuk kolom tersebut. Mempertahankan kendala NOT NULL akan menyebabkan validasi penulisan HologresClient gagal. Kendala NOT NULL pada kolom biasa tetap tidak terpengaruh (misalnya: ds TIMESTAMP NOT NULL).
Contoh
Tabel sumber
Tabel sumber yang diaktifkan binlog
Mode CDC
Mode ini memungkinkan sinkronisasi cermin data tabel. Sumber mengonsumsi data logging biner dan, berdasarkan hg_binlog_event_type, secara otomatis menetapkan tipe Flink RowKind yang benar (seperti INSERT, DELETE, UPDATE_BEFORE, atau UPDATE_AFTER) untuk setiap baris tanpa perlu deklarasi eksplisit. Proses ini mencerminkan perubahan, mirip dengan fungsi CDC MySQL atau PostgreSQL. Contoh kode berikut menunjukkan pernyataan DDL untuk membuat tabel sumber dalam mode ini.
VVR 11+
CREATE TEMPORARY TABLE test_message_src_binlog_table(
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username'='${secret_values.ak_id}', -- Kami merekomendasikan menggunakan variabel untuk AK/SK guna mencegah kebocoran kunci.
'password'='${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>',
'source.binlog.change-log-mode'='ALL', -- Baca semua jenis changeLog, termasuk INSERT, DELETE, UPDATE_BEFORE, dan UPDATE_AFTER.
'retry-count'='10', -- Jumlah percobaan ulang setelah error pembacaan binlog.
'retry-sleep-step-ms'='5000', -- Waktu backoff bertahap antar percobaan ulang. Percobaan ulang pertama menunggu 5 detik, kedua 10 detik, dan seterusnya.
'source.binlog.batch-size'='512' -- Jumlah baris yang dibaca dari binlog dalam satu batch.
);VVR 8+
CREATE TEMPORARY TABLE test_message_src_binlog_table(
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username' = '${secret_values.ak_id}', -- Kami merekomendasikan menggunakan variabel untuk AK/SK guna mencegah kebocoran kunci.
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>',
'binlog' = 'true',
'cdcMode' = 'true',
'sdkMode'='jdbc',
'binlogMaxRetryTimes' = '10', -- Jumlah percobaan ulang setelah error pembacaan binlog.
'binlogRetryIntervalMs' = '500', -- Interval percobaan ulang setelah error pembacaan binlog, dalam milidetik.
'binlogBatchReadSize' = '100' -- Jumlah baris yang dibaca dari binlog dalam satu batch.
);Mode non-CDC
Dalam mode ini, data binlog yang dikonsumsi oleh sumber diteruskan ke node turunan sebagai data Flink biasa. Artinya, semua data bertipe INSERT. Anda dapat menentukan cara memproses data dengan tipe hg_binlog_event_type tertentu sesuai kebutuhan. Contoh kode berikut menunjukkan pernyataan DDL untuk tabel sumber.
VVR 11+
CREATE TEMPORARY TABLE test_message_src_binlog_table(
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username' = '${secret_values.ak_id}', -- Kami merekomendasikan menggunakan variabel untuk AK/SK guna mencegah kebocoran kunci.
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>',
'source.binlog.change-log-mode'='ALL_AS_APPEND_ONLY', -- Perlakukan semua jenis changelog sebagai INSERT.
'retry-count'='10', -- Jumlah percobaan ulang setelah error pembacaan binlog.
'retry-sleep-step-ms'='5000', -- Waktu backoff bertahap antar percobaan ulang. Percobaan ulang pertama menunggu 5 detik, kedua 10 detik, dan seterusnya.
'source.binlog.batch-size'='512' -- Jumlah baris yang dibaca dari binlog dalam satu batch.
);VVR 8+
CREATE TEMPORARY TABLE test_message_src_binlog_table(
hg_binlog_lsn BIGINT,
hg_binlog_event_type BIGINT,
hg_binlog_timestamp_us BIGINT,
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username' = '${secret_values.ak_id}', -- Kami merekomendasikan menggunakan variabel untuk AK/SK guna mencegah kebocoran kunci.
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>',
'binlog' = 'true',
'binlogMaxRetryTimes' = '10', -- Jumlah percobaan ulang setelah error pembacaan binlog.
'binlogRetryIntervalMs' = '500', -- Interval percobaan ulang setelah error pembacaan binlog, dalam milidetik.
'binlogBatchReadSize' = '100' -- Jumlah baris yang dibaca dari binlog dalam satu batch.
);Tabel sumber yang dinonaktifkan binlog
VVR 11+
Secara default, Ververica Runtime (VVR) versi 11.1 dan lebih baru mengonsumsi data logging biner. Untuk informasi selengkapnya, lihat Tabel sumber Binlog.
CREATE TEMPORARY TABLE hologres_source (
name varchar,
age BIGINT,
birthday BIGINT
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username' = '${secret_values.ak_id}', -- Kami merekomendasikan menggunakan variabel untuk AK/SK guna mencegah kebocoran kunci.
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>',
'source.binlog'='false' -- Jangan baca data binlog.
);VVR 8+
CREATE TEMPORARY TABLE hologres_source (
name varchar,
age BIGINT,
birthday BIGINT
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username' = '${secret_values.ak_id}', -- Kami merekomendasikan menggunakan variabel untuk AK/SK guna mencegah kebocoran kunci.
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>',
'sdkMode' = 'jdbc'
);Tabel sink
CREATE TEMPORARY TABLE datagen_source(
name varchar,
age BIGINT,
birthday BIGINT
) WITH (
'connector'='datagen'
);
CREATE TEMPORARY TABLE hologres_sink (
name varchar,
age BIGINT,
birthday BIGINT
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username' = '${secret_values.ak_id}', -- Kami merekomendasikan menggunakan variabel untuk AK/SK guna mencegah kebocoran kunci.
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>'
);
INSERT INTO hologres_sink SELECT * from datagen_source;Contoh Tabel Dimensi
CREATE TEMPORARY TABLE datagen_source (
a INT,
b BIGINT,
c STRING,
proctime AS PROCTIME()
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE hologres_dim (
a INT,
b VARCHAR,
c VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username' = '${secret_values.ak_id}', -- Kami merekomendasikan menggunakan variabel untuk AK/SK guna mencegah kebocoran kunci.
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>'
);
CREATE TEMPORARY TABLE blackhole_sink (
a INT,
b STRING
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink SELECT T.a,H.b
FROM datagen_source AS T JOIN hologres_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.a = H.a;Detail fitur
Ingesti penuh dan inkremental terpadu
Skenario
Fitur ini hanya berlaku untuk tabel target yang memiliki kunci primer. Kami merekomendasikan menggunakannya dengan tabel sumber Hologres dalam mode CDC.
Hologres mendukung pengaktifan binlog sesuai permintaan. Anda dapat mengaktifkan binlog untuk tabel yang berisi data historis.
Kode contoh
VVR 11+
CREATE TABLE test_message_src_binlog_table(
hg_binlog_lsn BIGINT,
hg_binlog_event_type BIGINT,
hg_binlog_timestamp_us BIGINT,
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username'='<yourAccessID>',
'password'='<yourAccessSecret>',
'endpoint'='<yourEndpoint>',
'source.binlog.startup-mode' = 'INITIAL', -- Pertama membaca semua data historis, lalu membaca binlog secara inkremental.
'retry-count'='10', -- Jumlah percobaan ulang setelah error pembacaan binlog.
'retry-sleep-step-ms'='5000', -- Waktu backoff bertahap antar percobaan ulang. Percobaan ulang pertama menunggu 5 detik, kedua 10 detik, dan seterusnya.
'source.binlog.batch-size'='512' -- Jumlah baris yang dibaca dari binlog dalam satu batch.
);Atur
source.binlog.startup-modekeINITIALuntuk pertama membaca semua data historis lalu mulai membaca secara inkremental.Jika Anda mengatur parameter
startTimeatau memilih waktu mulai pada antarmuka startup,binlogStartUpModedipaksa menggunakan mode konsumsitimestamp, dan mode konsumsi lainnya diabaikan karena parameterstartTimememiliki prioritas lebih tinggi.
VVR 8+
CREATE TABLE test_message_src_binlog_table(
hg_binlog_lsn BIGINT,
hg_binlog_event_type BIGINT,
hg_binlog_timestamp_us BIGINT,
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username'='<yourAccessID>',
'password'='<yourAccessSecret>',
'endpoint'='<yourEndpoint>',
'binlog' = 'true',
'cdcMode' = 'true',
'binlogStartUpMode' = 'initial', -- Pertama membaca semua data historis, lalu membaca binlog secara inkremental.
'binlogMaxRetryTimes' = '10', -- Jumlah percobaan ulang setelah error pembacaan binlog.
'binlogRetryIntervalMs' = '500', -- Interval percobaan ulang setelah error pembacaan binlog, dalam milidetik.
'binlogBatchReadSize' = '100' -- Jumlah baris yang dibaca dari binlog dalam satu batch.
);Atur
binlogStartUpModekeinitialuntuk pertama membaca semua data historis lalu mulai membaca secara inkremental.startTimememiliki prioritas lebih tinggi daripadabinlogStartUpMode. Jika Anda mengatur parameterstartTimeatau memilih waktu mulai di antarmuka startup,binlogStartUpModesecara otomatis dipaksa menggunakan modetimestampuntuk konsumsi, dan mode konsumsi lainnya tidak berlaku.
Menangani konflik kunci primer
Saat menulis ke Hologres, jika data dengan kunci primer duplikat sudah ada, konektor menyediakan tiga strategi penanganan.
VVR 11+
Tentukan opsi sink.on-conflict-action untuk menerapkan strategi berbeda.
sink.on-conflict-action value | Deskripsi |
INSERT_OR_IGNORE | Menyimpan kemunculan pertama data dan mengabaikan duplikat berikutnya. |
INSERT_OR_REPLACE | Mengganti baris yang ada dengan data baru. |
INSERT_OR_UPDATE (default) | Hanya memperbarui kolom yang ditentukan, membiarkan kolom lain pada baris yang ada tidak berubah. |
VVR 8+
Tentukan opsi mutatetype untuk menerapkan strategi berbeda.
mutatetype value | Deskripsi |
insertorignore (default) | Menyimpan kemunculan pertama data dan mengabaikan duplikat berikutnya. |
insertorreplace | Mengganti baris yang ada dengan data baru. |
insertorupdate | Hanya memperbarui kolom yang ditentukan, membiarkan kolom lain pada baris yang ada tidak berubah. |
Asumsikan sebuah tabel memiliki field a, b, c, dan d, dengan a sebagai kunci primer. Jika tabel sink hanya berisi field a dan b, maka ketika Anda mengonfigurasi INSERT_OR_UPDATE, hanya field b yang diperbarui, sedangkan field c dan d tetap tidak berubah.Jumlah kolom dalam tabel sink dapat lebih sedikit daripada tabel Hologres fisik, tetapi kolom yang hilang harus nullable. Jika tidak, operasi penulisan gagal.
Menulis ke tabel partisi
Secara default, sink Hologres hanya mendukung impor data ke tabel non-partisi. Untuk mengimpor data ke tabel partisi, aktifkan konfigurasi berikut:
VVR 11+
Atur sink.create-missing-partition ke true. Ini memungkinkan konektor membuat partisi anak secara otomatis jika belum ada.
VVR 11.1 dan versi selanjutnya mendukung penulisan ke tabel partisi, mengarahkan data secara otomatis ke partisi anak yang sesuai.
Atur
tablenameke nama tabel induk.Jika Anda tidak membuat tabel anak terlebih dahulu dan tidak mengatur
sink.create-missing-partition=true, penulisan gagal.
VVR 8+
Atur
partitionRouterketrueuntuk mengarahkan data secara otomatis ke partisi anak yang sesuai.Atur
createparttableketrueuntuk membuat partisi anak secara otomatis jika belum ada.
Atur
tablenameke nama tabel induk.Untuk memastikan penulisan berhasil, buat partisi anak terlebih dahulu atau atur
createparttable=true.
Gabung dan Perbaruan Parsial untuk Tabel Lebar dengan Penulisan Multi-Stream
Konektor dapat secara efisien menggabungkan beberapa aliran data ke dalam satu tabel lebar Hologres. Konektor mendukung pembaruan parsial baris, menerapkan perubahan hanya pada kolom yang dimodifikasi berdasarkan kunci primer. Ini mengoptimalkan efisiensi penulisan dan memastikan konsistensi data.
Batasan
Tabel lebar harus memiliki kunci primer.
Setiap aliran data harus berisi semua field kunci primer.
Jika tabel lebar Hologres Anda menggunakan penyimpanan berorientasi kolom, RPS tinggi dapat menyebabkan penggunaan CPU meningkat. Pertimbangkan untuk menonaktifkan encoding kamus untuk kolom tabel guna mengurangi hal ini.
Contoh
Asumsikan ada dua aliran data. Satu aliran berisi kolom a, b, dan c, sedangkan yang lain berisi kolom a, d, dan e. Tabel lebar Hologres WIDE_TABLE berisi kolom a, b, c, d, dan e, dengan a sebagai kunci primer.
VVR 11+
// Asumsikan source1 dan source2 sudah didefinisikan.
CREATE TEMPORARY TABLE hologres_sink ( -- Deklarasikan lima kolom: a, b, c, d, e.
a BIGINT,
b STRING,
c STRING,
d STRING,
e STRING,
primary key(a) not enforced
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourWideTablename>', -- Tabel lebar Hologres, berisi kolom a, b, c, d, e.
'username' = '${secret_values.ak_id}',
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>',
'sink.on-conflict-action'='INSERT_OR_UPDATE', -- Perbarui hanya kolom yang ditentukan berdasarkan kunci primer.
'sink.delete-strategy'='IGNORE_DELETE', -- Strategi untuk menangani pesan retraction. IGNORE_DELETE cocok untuk skenario yang hanya memerlukan insert atau update, bukan delete.
'sink.partial-insert.enabled'='true' -- Aktifkan pembaruan kolom parsial. Ini memungkinkan konektor untuk memperbarui atau memasukkan hanya kolom yang ditentukan dalam pernyataan `INSERT`.
);
BEGIN STATEMENT SET;
INSERT INTO hologres_sink(a,b,c) select * from source1; -- Masukkan hanya kolom a, b, dan c.
INSERT INTO hologres_sink(a,d,e) select * from source2; -- Masukkan hanya kolom a, d, dan e.
END;VVR 8+
// Asumsikan source1 dan source2 sudah didefinisikan.
CREATE TEMPORARY TABLE hologres_sink ( -- Deklarasikan lima kolom: a, b, c, d, e.
a BIGINT,
b STRING,
c STRING,
d STRING,
e STRING,
primary key(a) not enforced
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourWideTablename>', -- Tabel lebar Hologres, berisi kolom a, b, c, d, e.
'username' = '${secret_values.ak_id}',
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>',
'mutatetype'='insertorupdate', -- Perbarui hanya kolom yang ditentukan berdasarkan kunci primer.
'ignoredelete'='true', -- Abaikan permintaan delete yang dihasilkan oleh pesan retraction.
'partial-insert.enabled'='true' -- Aktifkan pembaruan kolom parsial, mendukung pembaruan hanya untuk kolom yang dideklarasikan dalam pernyataan INSERT.
);
BEGIN STATEMENT SET;
INSERT INTO hologres_sink(a,b,c) select * from source1; -- Masukkan hanya kolom a, b, dan c.
INSERT INTO hologres_sink(a,d,e) select * from source2; -- Masukkan hanya kolom a, d, dan e.
END;ignoredelete diatur ke true untuk mengabaikan permintaan Delete yang dihasilkan oleh pesan retraction. Pada VVR 8.0.8 dan versi selanjutnya, kami merekomendasikan menggunakan sink.delete-strategy untuk mengonfigurasi berbagai strategi untuk pembaruan parsial.
Baca binlog dari tabel partisi (Beta)
Tabel partisi meningkatkan pengarsipan data dan performa kueri. Konektor Hologres mendukung pembacaan binlog dari tabel partisi fisik maupun logis. Untuk informasi selengkapnya, lihat BUAT TABEL PARTISI LOGIS.
Mengonsumsi log biner dari tabel partisi fisik
Konektor Hologres mendukung pembacaan binlog dari tabel partisi dan secara dinamis memantau partisi baru dalam satu pekerjaan, sangat meningkatkan efisiensi dan kegunaan pemrosesan data real-time.
Catatan
Pembacaan binlog dari tabel partisi memerlukan: VVR 8.0.11+, Hologres 2.1.27+, tabel yang diaktifkan binlog, dan konsumsi JDBC.
Nama partisi harus terdiri dari nama tabel induk, garis bawah (_), dan nilai partisi—yaitu,
{parent_table}_{partition_value}. Partisi yang tidak mengikuti format ini tidak dapat dikonsumsi. Untuk informasi selengkapnya, lihat Manajemen partisi dinamis.PentingUntuk mode
DYNAMIC, VVR 8.0.11 tidak mendukung field partisi dengan pemisah-(sepertiYYYY-MM-DD).VVR 11.1+ mendukung penuh pembacaan dari partisi dengan format nama kustom.
Pembatasan ini hanya berlaku untuk pembacaan; penulisan ke tabel partisi tidak terpengaruh.
Saat mendeklarasikan tabel sumber Hologres di Flink, sertakan kolom partisi dari tabel partisi Hologres.
Untuk mode DYNAMIC, pastikan tabel partisi Anda telah manajemen partisi dinamis diaktifkan. Juga, parameter pra-pembuatan partisi
auto_partitioning.num_precreateharus lebih besar dari 1. Jika tidak, pekerjaan akan melemparkan exception saat membaca partisi terbaru.Dalam mode konsumsi binlog partisi DYNAMIC, begitu partisi baru ditambahkan, data inkremental dari partisi lama tidak dibaca.
Contoh
Jenis pola | Fitur | Deskripsi |
DYNAMIC | Pembacaan partisi dinamis | Secara otomatis memantau partisi baru dan maju secara dinamis dalam urutan kronologis. Cocok untuk kasus penggunaan real-time. |
STATIC | Konsumsi partisi statis | Hanya membaca partisi yang sudah ada (atau yang ditentukan secara eksplisit) dan tidak secara otomatis menemukan partisi baru. Cocok untuk memproses data historis dalam rentang tetap. |
Mode DYNAMIC
VVR 11+
Asumsikan Hologres berisi tabel partisi yang didefinisikan DDL berikut, dengan logging biner dan partisi dinamis diaktifkan.
CREATE TABLE "test_message_src1" (
id int,
title text,
body text,
dt text,
PRIMARY KEY (id, dt)
)
PARTITION BY LIST (dt) WITH (
binlog_level = 'replica',
auto_partitioning_enable = 'true', -- Aktifkan partisi dinamis.
auto_partitioning_time_unit = 'DAY', -- Partisi dibuat harian. Contoh nama partisi: test_message_src1_20250512, test_message_src1_20250513.
auto_partitioning_num_precreate = '2' -- Pra-buat dua partisi.
);
-- Untuk tabel partisi yang sudah ada, Anda juga dapat mengaktifkan partisi dinamis menggunakan ALTER TABLE.Di Flink, gunakan pernyataan SQL berikut untuk mengonsumsi data dari tabel partisi test_message_src1 dalam mode DYNAMIC.
CREATE TEMPORARY TABLE hologres_source
(
id INTEGER,
title VARCHAR,
body VARCHAR,
dt VARCHAR -- Kolom partisi dari tabel partisi Hologres.
)
with (
'connector' = 'hologres',
'dbname' = '<yourDatabase>',
'tablename' = 'test_message_src1', -- Tabel Hologres dengan partisi dinamis diaktifkan.
'username' = '<yourUserName>',
'password' = '<yourPassword>',
'endpoint' = '<yourEndpoint>',
'source.binlog.partition-binlog-mode' = 'DYNAMIC', -- Pantau secara dinamis partisi terbaru.
'source.binlog.startup-mode' = 'initial' -- Pertama baca semua data yang ada, lalu mulai membaca binlog secara inkremental.
);VVR 8.0.11
Asumsikan Hologres berisi tabel partisi yang didefinisikan DDL berikut, dengan logging biner dan partisi dinamis diaktifkan.
CREATE TABLE "test_message_src1" (
id int,
title text,
body text,
dt text,
PRIMARY KEY (id, dt)
)
PARTITION BY LIST (dt) WITH (
binlog_level = 'replica',
auto_partitioning_enable = 'true', -- Aktifkan partisi dinamis.
auto_partitioning_time_unit = 'DAY', -- Partisi dibuat harian. Contoh nama partisi: test_message_src1_20241027, test_message_src1_20241028.
auto_partitioning_num_precreate = '2' -- Pra-buat dua partisi.
);
-- Untuk tabel partisi yang sudah ada, Anda juga dapat mengaktifkan partisi dinamis menggunakan ALTER TABLE.Di Flink, gunakan pernyataan SQL berikut untuk mengonsumsi data dari tabel partisi test_message_src1 dalam mode DYNAMIC.
CREATE TEMPORARY TABLE hologres_source
(
id INTEGER,
title VARCHAR,
body VARCHAR,
dt VARCHAR -- Kolom partisi dari tabel partisi Hologres.
)
with (
'connector' = 'hologres',
'dbname' = '<yourDatabase>',
'tablename' = 'test_message_src1', -- Tabel Hologres dengan partisi dinamis diaktifkan.
'username' = '<yourUserName>',
'password' = '<yourPassword>',
'endpoint' = '<yourEndpoint>',
'binlog' = 'true',
'partition-binlog.mode' = 'DYNAMIC', -- Pantau secara dinamis partisi terbaru.
'binlogstartUpMode' = 'initial', -- Pertama baca semua data yang ada, lalu mulai membaca binlog secara inkremental.
'sdkMode' = 'jdbc_fixed' -- Gunakan mode ini untuk menghindari masalah batas koneksi.
);Mode STATIC
VVR 11+
Asumsikan Hologres berisi tabel partisi berikut yang didefinisikan oleh Data Definition Language (DDL), dan logging biner diaktifkan.
CREATE TABLE test_message_src2 (
id int,
title text,
body text,
color text,
PRIMARY KEY (id, color)
)
PARTITION BY LIST (color) WITH (
binlog_level = 'replica'
);
create table test_message_src2_red partition of test_message_src2 for values in ('red');
create table test_message_src2_blue partition of test_message_src2 for values in ('blue');
create table test_message_src2_green partition of test_message_src2 for values in ('green');
create table test_message_src2_black partition of test_message_src2 for values in ('black');
Di Flink, gunakan pernyataan SQL berikut untuk mengonsumsi tabel partisi test_message_src2 dalam mode STATIC.
CREATE TEMPORARY TABLE hologres_source
(
id INTEGER,
title VARCHAR,
body VARCHAR,
color VARCHAR -- Kolom partisi dari tabel partisi Hologres.
)
with (
'connector' = 'hologres',
'dbname' = '<yourDatabase>',
'tablename' = 'test_message_src2', -- Tabel partisi.
'username' = '<yourUserName>',
'password' = '<yourPassword>',
'endpoint' = '<yourEndpoint>',
'source.binlog.partition-binlog-mode' = 'STATIC', -- Baca set partisi tetap.
'source.binlog.partition-values-to-read' = 'red,blue,green', -- Baca hanya tiga partisi yang ditentukan; partisi 'black' tidak dibaca. Partisi baru juga tidak dibaca. Jika tidak diatur, semua partisi tabel induk dibaca.
'source.binlog.startup-mode' = 'initial' -- Pertama baca semua data yang ada, lalu mulai membaca binlog secara inkremental.
);VVR 8.0.11
Asumsikan tabel partisi DDL berikut ada di Hologres dan logging biner diaktifkan.
CREATE TABLE test_message_src2 (
id int,
title text,
body text,
color text,
PRIMARY KEY (id, color)
)
PARTITION BY LIST (color) WITH (
binlog_level = 'replica'
);
create table test_message_src2_red partition of test_message_src2 for values in ('red');
create table test_message_src2_blue partition of test_message_src2 for values in ('blue');
create table test_message_src2_green partition of test_message_src2 for values in ('green');
create table test_message_src2_black partition of test_message_src2 for values in ('black');
Di Flink, gunakan pernyataan SQL berikut untuk membaca data dari tabel partisi test_message_src2 dalam mode STATIC.
CREATE TEMPORARY TABLE hologres_source
(
id INTEGER,
title VARCHAR,
body VARCHAR,
color VARCHAR -- Kolom partisi dari tabel partisi Hologres.
)
with (
'connector' = 'hologres',
'dbname' = '<yourDatabase>',
'tablename' = 'test_message_src2', -- Tabel partisi.
'username' = '<yourUserName>',
'password' = '<yourPassword>',
'endpoint' = '<yourEndpoint>',
'binlog' = 'true',
'partition-binlog.mode' = 'STATIC', -- Baca set partisi tetap.
'partition-values-to-read' = 'red,blue,green', -- Baca hanya tiga partisi yang ditentukan; partisi 'black' tidak dibaca. Partisi baru juga tidak dibaca. Jika tidak diatur, semua partisi tabel induk dibaca.
'binlogstartUpMode' = 'initial', -- Pertama baca semua data yang ada, lalu mulai membaca binlog secara inkremental.
'sdkMode' = 'jdbc_fixed' -- Gunakan mode ini untuk menghindari masalah batas koneksi.
);Mengonsumsi Logging Biner dari Tabel Partisi Logis
Konektor Hologres mendukung pembacaan binlog dari tabel partisi logis dan memungkinkan penentuan eksplisit partisi mana yang akan dibaca.
Perhatian
Pembacaan binlog dari partisi tertentu memerlukan: VVR 11.0.0+ dan Hologres V3.1+.
Pembacaan binlog dari semua partisi setara dengan membaca binlog dari tabel Hologres non-partisi. Untuk petunjuknya, lihat Tabel sumber.
Contoh
Nama Parameter | Deskripsi | Contoh |
source.binlog.logical-partition-filter-column-names | Nama kolom partisi, diapit tanda kutip ganda. Pisahkan beberapa nama kolom dengan koma. Jika nama kolom berisi tanda kutip ganda, escape dengan tanda kutip ganda lainnya. | 'source.binlog.logical-partition-filter-column-names'='"Pt","id"' Kolom partisi adalah Pt dan id. |
source.binlog.logical-partition-filter-column-values | Nilai kolom partisi. Setiap partisi ditentukan oleh satu set nilai kolom. Nilai untuk kolom berbeda dipisahkan koma dan diapit tanda kutip ganda. Jika nilai berisi tanda kutip ganda, escape dengan tanda kutip ganda lainnya. Beberapa partisi dipisahkan titik koma. | 'source.binlog.logical-partition-filter-column-values'='"20240910","0";"special""value","9"' Tentukan konsumsi dua partisi. Kunci partisi memiliki dua kolom. Nilai kunci partisi pertama adalah (20240910, 0), dan nilai kunci partisi kedua adalah (special"value, 9). |
Asumsikan tabel berikut telah dibuat di Hologres:
CREATE TABLE holo_table (
id int not null,
name text,
age numeric(18,4),
"Pt" text,
primary key(id, "Pt")
)
LOGICAL PARTITION BY LIST ("Pt", id)
WITH (
binlog_level ='replica'
);Konsumsi logging biner tabel ini di Flink.
CREATE TEMPORARY TABLE test_src_binlog_table(
id INTEGER,
name VARCHAR,
age decimal(18,4),
`Pt` VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='holo_table',
'username'='<yourAccessID>',
'password'='<yourAccessSecret>',
'endpoint'='<yourEndpoint>',
'source.binlog'='true',
'source.binlog.logical-partition-filter-column-names'='"Pt","id"',
'source.binlog.logical-partition-filter-column-values'='<yourPartitionColumnValues>',
'source.binlog.change-log-mode'='ALL', -- Baca semua jenis changeLog, termasuk INSERT, DELETE, UPDATE_BEFORE, dan UPDATE_AFTER.
'retry-count'='10', -- Jumlah percobaan ulang setelah error pembacaan binlog.
'retry-sleep-step-ms'='5000', -- Waktu backoff bertahap antar percobaan ulang. Percobaan ulang pertama menunggu 5 detik, kedua 10 detik, dan seterusnya.
'source.binlog.batch-size'='512' -- Jumlah baris yang dibaca dari binlog dalam satu batch.
);API DataStream
Untuk menggunakan API DataStream, sertakan dependensi konektor DataStream Hologres dalam proyek Anda. Untuk petunjuk tentang menyiapkan konektor DataStream, lihat Integrasikan dan gunakan konektor dalam program DataStream. Konektor DataStream Hologres tersedia di Repositori Maven Central kami. Untuk debugging lokal, gunakan Uber JAR yang sesuai. Untuk informasi selengkapnya, lihat Jalankan dan debug pekerjaan yang berisi konektor secara lokal.
Tabel sumber Hologres
Tabel sumber yang diaktifkan binlog
Realtime Compute for Apache Flink menyediakan kelas HologresBinlogSource untuk membaca data binlog Hologres. Contoh berikut menunjukkan cara membuat HologresBinlogSource.
VVR 11.3+
Sejak VVR 11.1.2, parameter JDBCOptions dan startTimeMs telah dihapus dari konstruktor HologresBinlogSource. Sejak VVR 11.3, parameter List<Subscribe.BinlogFilter> telah ditambahkan. Jika Anda menggunakan VVR 11 atau lebih baru, kami merekomendasikan menggunakan VVR 11.3 atau lebih baru.
public class Sample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Inisialisasi skema tabel yang akan dibaca. Harus sesuai dengan kolom skema tabel Hologres. Anda dapat mendefinisikan subset kolom.
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.field("b", DataTypes.STRING())
.field("c", DataTypes.TIMESTAMP())
.build();
// Nama tabel yang akan dibaca.
String sourceTableName = "sourceTableName";
// Opsi konektor Hologres.
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, sourceTableName);
config.set(HologresConfigs.BINLOG, true);
config.set(HologresConfigs.BINLOG_CHANGE_LOG_MODE, BinlogChangeLogMode.ALL);
// Bangun HologresBinlogSource.
HologresBinlogSource source = new HologresBinlogSource(
new HologresConnectionParam(config),
schema,
config,
StartupMode.INITIAL,
sourceTableName,
"",
Collections.emptyList(),
-1,
Collections.emptySet(),
Collections.emptyList()
);
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
env.execute();
}
}VVR 8.0.11+
public class Sample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Inisialisasi skema tabel yang akan dibaca. Harus sesuai dengan kolom skema tabel Hologres. Anda dapat mendefinisikan subset kolom.
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.field("b", DataTypes.STRING())
.field("c", DataTypes.TIMESTAMP())
.build();
// Opsi konektor Hologres.
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setString(HologresConfigs.SDK_MODE, "jdbc");
config.setBoolean(HologresBinlogConfigs.OPTIONAL_BINLOG, true);
config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
// Bangun JDBCOptions.
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
// Bangun HologresBinlogSource.
long startTimeMs = 0;
HologresBinlogSource source = new HologresBinlogSource(
new HologresConnectionParam(config),
schema,
config,
jdbcOptions,
startTimeMs,
StartupMode.INITIAL,
"",
"",
-1,
Collections.emptySet(),
new ArrayList<>()
);
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
env.execute();
}
}VVR 8.0.7+
public class Sample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Inisialisasi skema tabel yang akan dibaca. Harus sesuai dengan kolom skema tabel Hologres. Anda dapat mendefinisikan subset kolom.
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.field("b", DataTypes.STRING())
.field("c", DataTypes.TIMESTAMP())
.build();
// Opsi konektor Hologres.
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setString(HologresConfigs.SDK_MODE, "jdbc");
config.setBoolean(HologresBinlogConfigs.OPTIONAL_BINLOG, true);
config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
// Bangun JDBCOptions.
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
// Bangun HologresBinlogSource.
long startTimeMs = 0;
HologresBinlogSource source = new HologresBinlogSource(
new HologresConnectionParam(config),
schema,
config,
jdbcOptions,
startTimeMs,
StartupMode.INITIAL,
"",
"",
-1,
Collections.emptySet()
);
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
env.execute();
}
}VVR 6.0.7+
public class Sample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Inisialisasi skema tabel yang akan dibaca. Harus sesuai dengan kolom skema tabel Hologres. Anda dapat mendefinisikan subset kolom.
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.build();
// Opsi konektor Hologres.
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setString(HologresConfigs.SDK_MODE, "jdbc");
config.setBoolean(HologresBinlogConfigs.OPTIONAL_BINLOG, true);
config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
// Bangun JDBCOptions.
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
// Atur atau buat nama slot default.
config.setString(HologresBinlogConfigs.JDBC_BINLOG_SLOT_NAME, HoloBinlogUtil.getOrCreateDefaultSlotForJDBCBinlog(jdbcOptions));
boolean cdcMode = config.get(HologresBinlogConfigs.BINLOG_CDC_MODE) && config.get(HologresBinlogConfigs.OPTIONAL_BINLOG);
// Bangun JDBCBinlogRecordConverter.
JDBCBinlogRecordConverter recordConverter = new JDBCBinlogRecordConverter(
jdbcOptions.getTable(),
schema,
new HologresConnectionParam(config),
cdcMode,
Collections.emptySet());
// Bangun HologresJDBCBinlogSource.
long startTimeMs = 0;
HologresJDBCBinlogSource source = new HologresJDBCBinlogSource(
new HologresConnectionParam(config),
schema,
config,
jdbcOptions,
startTimeMs,
StartupMode.TIMESTAMP,
recordConverter,
"",
-1);
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
env.execute();
}
}Jika Anda menggunakan mesin Realtime Compute for Apache Flink versi sebelum 8.0.5 atau versi Hologres sebelum V2.1, pastikan pengguna adalah superuser atau memiliki izin Replication Role. Untuk informasi selengkapnya, lihat Masalah izin Hologres.
Tabel sumber yang dinonaktifkan binlog
Realtime Compute for Apache Flink menyediakan kelas HologresBulkreadInputFormat, implementasi RichInputFormat, untuk membaca data dari tabel Hologres. Contoh berikut menunjukkan cara membangun sumber Hologres untuk membaca data dari tabel Hologres yang dinonaktifkan binlog-nya.
public class Sample {
public static void main(String[] args) throws Exception {
// Siapkan API DataStream Java
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Inisialisasi skema tabel yang akan dibaca. Harus sesuai dengan kolom skema tabel Hologres. Anda dapat mendefinisikan subset kolom.
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.field("b", DataTypes.STRING())
.field("c", DataTypes.TIMESTAMP())
.build();
// Opsi konektor Hologres.
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
// Bangun JDBCOptions.
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
HologresBulkreadInputFormat inputFormat = new HologresBulkreadInputFormat(
new HologresConnectionParam(config),
jdbcOptions,
schema,
"",
-1);
TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
env.addSource(new InputFormatSourceFunction<>(inputFormat, typeInfo)).returns(typeInfo).print();
env.execute();
}
}Dependensi Maven
Konektor DataStream Hologres tersedia di Repositori Maven Central kami.
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-hologres</artifactId>
<version>${vvr-version}</version>
</dependency>Tabel sink Hologres
VVR 11+
public class Sample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Inisialisasi skema tabel yang akan ditulis. Harus sesuai dengan kolom skema tabel Hologres. Anda dapat mendefinisikan subset kolom.
TableSchema tableSchema = TableSchema.builder()
.field("a", DataTypes.INT().notNull())
.field("b", DataTypes.STRING())
.primaryKey("a")
.build();
// Opsi konektor Hologres.
Configuration config = new Configuration();
config.set(HologresConfigs.ENDPOINT, "yourEndpoint");
config.set(HologresConfigs.USERNAME, "yourUserName");
config.set(HologresConfigs.PASSWORD, "yourPassword");
config.set(HologresConfigs.DATABASE, "yourDatabaseName");
config.set(HologresConfigs.TABLE, "yourTableName");
HologresConnectionParam connectionParam = new HologresConnectionParam(config);
HologresTableSchema hologresTableSchema =
HologresTableSchema.get(connectionParam.getJDBCOptions());
// Indeks kolom yang akan ditulis ke sink.
Integer[] targetColumnIndexes = {0, 1};
// Bangun sink Hologres.
HologresSinkFunction sinkFunction =
new HologresSinkFunction(
connectionParam, tableSchema, targetColumnIndexes, hologresTableSchema);
TypeInformation<RowData> typeInfo = InternalTypeInfo.of(tableSchema.toRowDataType().getLogicalType());
env.fromElements((RowData) GenericRowData.of(101, StringData.fromString("name"))).returns(typeInfo).addSink(sinkFunction);
env.execute();
}
}VVR 8+
public class Sample {
public static void main(String[] args) throws Exception {
// Siapkan API DataStream Java
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Inisialisasi skema tabel yang akan ditulis. Harus sesuai dengan kolom skema tabel Hologres. Anda dapat mendefinisikan subset kolom.
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.field("b", DataTypes.STRING())
.build();
// Opsi konektor Hologres.
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setString(HologresConfigs.SDK_MODE, "jdbc");
HologresConnectionParam hologresConnectionParam = new HologresConnectionParam(config);
// Bangun writer Hologres untuk menulis data sebagai RowData.
AbstractHologresWriter<RowData> hologresWriter = HologresJDBCWriter.createRowDataWriter(
hologresConnectionParam,
schema,
HologresTableSchema.get(hologresConnectionParam),
new Integer[0]);
// Bangun sink Hologres.
HologresSinkFunction sinkFunction = new HologresSinkFunction(hologresConnectionParam, hologresWriter);
TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
env.fromElements((RowData) GenericRowData.of(101, StringData.fromString("name"))).returns(typeInfo).addSink(sinkFunction);
env.execute();
}
}Kolom metadata
Realtime Compute for Apache Flink VVR 8.0.11 dan versi selanjutnya mendukung kolom metadata untuk tabel sumber yang diaktifkan binlog. Mulai versi ini, kami merekomendasikan mendeklarasikan field binlog, seperti hg_binlog_event_type, sebagai kolom metadata. Kolom metadata memperluas standar SQL. Kolom ini memungkinkan Anda mengakses informasi spesifik, seperti nama database dan tabel sumber, serta tipe perubahan dan timestamp data. Anda dapat menggunakan informasi ini untuk mendefinisikan logika pemrosesan khusus, seperti memfilter event DELETE.
Nama field | Tipe data | Deskripsi |
db_name | STRING NOT NULL | Nama database yang berisi baris tersebut. |
table_name | STRING NOT NULL | Nama tabel yang berisi baris tersebut. |
hg_binlog_lsn | BIGINT NOT NULL | Kolom sistem binlog yang merepresentasikan nomor urutan binlog. Nilainya meningkat secara monoton tetapi tidak kontinu dalam satu shard. Tidak dijamin unik atau terurut di shard berbeda. |
hg_binlog_timestamp_us | BIGINT NOT NULL | Timestamp event perubahan di database, dalam mikrodetik (us). |
hg_binlog_event_type | BIGINT NOT NULL | Tipe event CDC untuk baris tersebut. Nilai valid:
|
hg_shard_id | INT NOT NULL | Shard tempat data berada. Untuk informasi selengkapnya, lihat Grup Tabel dan Shard. |
Dalam pernyataan DDL, deklarasikan kolom metadata menggunakan <meta_column_name> <datatype> METADATA VIRTUAL. Berikut contohnya:
CREATE TABLE test_message_src_binlog_table(
hg_binlog_lsn bigint METADATA VIRTUAL,
hg_binlog_event_type bigint METADATA VIRTUAL,
hg_binlog_timestamp_us bigint METADATA VIRTUAL,
hg_shard_id int METADATA VIRTUAL,
db_name string METADATA VIRTUAL,
table_name string METADATA VIRTUAL,
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
'connector'='hologres',
...
);FAQ
Referensi
Untuk informasi selengkapnya tentang mengelola katalog Hologres, lihat Kelola katalog Hologres.
Untuk informasi selengkapnya tentang membangun gudang data real-time terintegrasi dengan Hologres dan Flink, lihat Bangun gudang data real-time dengan Flink dan Hologres.
Hologres secara efisien mendukung pembaruan dan koreksi data, menjadikannya cocok untuk membangun tabel lebar dalam skenario penulisan multi-stream. Untuk informasi selengkapnya, lihat Analisis Perilaku Pengguna MongoDB+Hologres.