全部产品
Search
文档中心

Realtime Compute for Apache Flink:Hologres

更新时间:Mar 05, 2026

Topik ini menjelaskan konektor Hologres.

Informasi latar belakang

Hologres adalah mesin terpadu untuk gudang data real-time yang mendukung penulisan, pembaruan, dan analisis dataset dalam jumlah besar secara real-time. Hologres menggunakan sintaks SQL standar dan kompatibel dengan protokol PostgreSQL, serta mendukung OLAP dan analisis ad hoc berskala petabyte. Layanan ini menyediakan akses data online berlatensi rendah dengan konkurensi tinggi dan terintegrasi secara mendalam dengan MaxCompute, Realtime Compute for Apache Flink, dan DataWorks untuk menghadirkan solusi gudang data offline dan real-time end-to-end. Tabel berikut merangkum kemampuan konektor Hologres.

Category

Details

Jenis yang didukung

Tabel sumber, tabel dimensi, dan tabel sink

Mode eksekusi

Mode stream dan mode batch

Format data

Tidak didukung

Metrik pemantauan

Metrik pemantauan

  • Tabel sumber:

    • numRecordsIn

    • numRecordsInPerSecond

  • Tabel sink:

    • numRecordsOut

    • numRecordsOutPerSecond

    • currentSendTime

    Catatan

    Untuk informasi selengkapnya, lihat Metrik pemantauan.

Jenis API

DataStream dan SQL

Mendukung pembaruan atau penghapusan pada tabel sink

Ya

Fitur

Fitur

Detail

Konsumsi data Hologres secara real-time

Anda dapat membaca data Hologres dengan atau tanpa binary logging (binlog). Fitur ini kompatibel dengan mode change data capture (CDC) maupun non-CDC.

Ingesti penuh dan inkremental terpadu

Anda dapat melakukan konsumsi penuh, inkremental, atau penuh dan inkremental terpadu.

Menangani konflik kunci primer

Anda dapat mengabaikan data baru, mengganti seluruh baris, atau hanya memperbarui field tertentu.

Gabung dan perbaruan parsial untuk tabel lebar dengan penulisan multi-stream

Anda dapat memperbarui hanya kolom yang dimodifikasi, bukan seluruh baris.

Baca binlog dari tabel partisi (Beta)

Anda dapat mengonsumsi binlog dari tabel partisi fisik. Satu pekerjaan dapat memantau semua partisi, termasuk partisi yang baru ditambahkan. Anda juga dapat mengonsumsi binlog dari tabel partisi logis.

Tulis ke tabel partisi

Anda dapat menulis data ke tabel induk dari tabel partisi dan membuat otomatis partisi anak yang sesuai.

Sinkronisasi real-time untuk satu tabel atau seluruh database

Anda dapat melakukan sinkronisasi data real-time pada tingkat satu tabel atau seluruh database. Fitur ini menyediakan kemampuan berikut:

  • Deteksi otomatis evolusi skema tabel sumber: Jika skema tabel database sumber berubah, Hologres dapat menyinkronkan perubahan tersebut ke tabel sink secara real-time.

  • Penanganan otomatis evolusi skema: Jika data baru mengalir ke Hologres, Flink terlebih dahulu memicu operasi modifikasi skema sebelum menulis data. Proses ini tidak memerlukan intervensi manual.

Untuk informasi selengkapnya, lihat Pernyataan CREATE TABLE AS (CTAS) dan Panduan Mulai Cepat untuk sinkronisasi database real-time.

Batasan dan saran

Batasan

  • Tabel eksternal tidak didukung: Konektor Hologres tidak mendukung akses ke tabel eksternal Hologres, seperti tabel eksternal MaxCompute.

  • Batasan tipe waktu: Konsumsi real-time data TIMESTAMP tidak didukung. Gunakan tipe TIMESTAMPTZ saat membuat tabel.

  • Pemindaian tabel sumber (VVR 8 dan sebelumnya): Data dibaca dari Hologres dalam mode batch secara default. Artinya, data inkremental tidak dikonsumsi.

  • Batasan Watermark (VVR 8 dan sebelumnya): Mode CDC tidak mendukung definisi watermark. Untuk melakukan agregasi berbasis jendela, gunakan solusi agregasi non-jendela.

Saran

  • Pemilihan Mode Penyimpanan

    • Tabel dimensi untuk pencarian titik: Kami merekomendasikan menggunakan penyimpanan berorientasi baris. Ini memerlukan pengaturan kunci primer dan kunci pengelompokan.

    • Tabel dimensi untuk kueri satu-ke-banyak: Kami merekomendasikan menggunakan penyimpanan berorientasi kolom dan mengonfigurasi kunci distribusi dan kunci segmen untuk mengoptimalkan performa.

    • Tabel yang sering diperbarui untuk kueri analitis: Kami merekomendasikan menggunakan penyimpanan hibrida baris-kolom jika tabel tersebut harus mendukung konsumsi binlog real-time dan analisis OLAP.

    Penting

    Jika Anda tidak secara eksplisit menentukan format penyimpanan saat membuat tabel Hologres, format tersebut akan menggunakan penyimpanan kolom secara default. Format penyimpanan ini bersifat tidak dapat diubah dan tidak dapat diubah setelah pembuatan. Untuk informasi selengkapnya, lihat Buat tabel di Hologres dan Format penyimpanan tabel: kolom, berorientasi baris, dan hibrida baris-kolom.

  • Paralelisme pekerjaan: Anda dapat mengatur paralelisme pekerjaan Flink agar sesuai dengan jumlah shard di tabel Hologres.

    # Jalankan perintah ini di HoloWeb untuk melihat jumlah shard dalam sebuah tabel.
    select tg.property_value from hologres.hg_table_properties tb join hologres.hg_table_group_properties tg on tb.property_value = tg.tablegroup_name where tb.property_key = 'table_group' and tg.property_key = 'shard_count' and table_name = '<tablename>';
  • Versi dan fitur: Anda dapat memeriksa secara berkala Catatan Rilis Konektor Hologres untuk mengetahui isu yang diketahui, pembaruan fitur, dan informasi kompatibilitas versi.

Catatan Penting

  • Kompatibilitas dan batasan versi Hologres dan VVR

    Tabel sumber

    • VVR 8 dan sebelumnya: Atur mode konsumsi menggunakan sdkMode.

    • VVR 11+: Atur mode konsumsi menggunakan source.binlog.read-mode.

    Versi VVR

    Versi Hologres

    Nilai parameter default/direkomendasikan

    Mode konsumsi aktual

    Catatan

    ≥ 6.0.7

    < 2.0

    Custom

    holohub (default)

    Kami merekomendasikan mengonfigurasi JDBC.

    6.0.7–8.0.4

    ≥ 2.0

    jdbc (alih otomatis)

    jdbc (dipaksa)

    Hologres 2.0 dan versi selanjutnya telah menghentikan HoloHub. Jika holohub diatur, konektor secara otomatis fallback ke jdbc, yang mungkin memerlukan izin tambahan pengguna. Untuk konfigurasi izin, lihat Masalah izin.

    ≥ 8.0.5

    ≥ 2.1

    jdbc (alih otomatis)

    jdbc (dipaksa)

    Tidak ada masalah izin. Untuk Hologres 2.1.27 dan versi selanjutnya, konektor beralih ke jdbc_fixed.

    ≥ 11.1

    Versi apa pun

    AUTO (default)

    Dipilih secara otomatis berdasarkan versi Hologres

    • Untuk Hologres 2.1.27 dan versi selanjutnya, konektor memilih jdbc dan mengaktifkan koneksi ringan secara default (connection.fixed.enabled diatur ke true).

    • Untuk versi 2.1.0–2.1.26, Anda dapat memilih mode JDBC.

    • Untuk Hologres 2.0 dan versi sebelumnya, konektor memilih holohub.

    Penting

    Pada VVR 11.1 dan versi selanjutnya, konektor mengaktifkan konsumsi binlog secara default. Pastikan Anda telah mengaktifkan binlog untuk menghindari error.

    Masalah izin

    Jika pengguna bukan superuser, Anda harus memberikan izin untuk membaca binlog dalam mode JDBC.

    user_name adalah ID Akun Alibaba Cloud atau Pengguna RAM. Untuk informasi selengkapnya, lihat Ikhtisar akun.

    -- Dalam model otorisasi PostgreSQL standar, berikan izin CREATE kepada pengguna dan berikan izin role replikasi pada instans kepada pengguna.
    GRANT CREATE ON DATABASE <db_name> TO <user_name>;
    alter role <user_name> replication;
    
    -- Jika database menggunakan model izin sederhana (SLMP), Anda tidak dapat menjalankan pernyataan GRANT. Gunakan spm_grant untuk memberikan izin Admin pada database kepada pengguna. Anda juga dapat memberikan izin di konsol HoloWeb.
    call spm_grant('<db_name>_admin', '<user_name>');
    alter role <user_name> replication;

    Tabel sink

    • Pada versi VVR 8 dan sebelumnya, mode konsumsi dipilih dengan menentukan parameter sdkMode.

    • VVR 11+: Atur mode penulisan data menggunakan sink.write-mode.

    Versi VVR

    Versi Hologres

    Apakah RPC terpengaruh?

    Mode konsumsi RPC aktual

    Nilai parameter direkomendasikan/default

    Catatan

    6.0.4–8.0.2

    < 2.0

    Tidak

    rpc

    Custom

    /

    6.0.4–8.0.2

    ≥ 2.0

    Ya

    jdbc_fixed (alih otomatis)

    Custom

    Untuk mencegah deduplikasi, atur 'jdbcWriteBatchSize'='1'.

    ≥ 8.0.3

    Versi apa pun

    Ya

    jdbc_fixed (alih otomatis)

    Custom

    Jika Anda mengatur rpc, konektor secara otomatis beralih ke jdbc_fixed dan mengatur 'jdbcWriteBatchSize'='1' untuk mencegah deduplikasi.

    ≥ 8.0.5

    Versi apa pun

    Ya

    jdbc_fixed (alih otomatis)

    Custom

    Jika Anda mengatur rpc, konektor secara otomatis beralih ke jdbc_fixed dan mengatur 'deduplication.enabled'='false' untuk mencegah deduplikasi.

    Penting
    • RPC telah dihentikan di Hologres 2.0 dan versi selanjutnya. Jika Anda mengatur mode penulisan data ke rpc, konektor secara otomatis beralih ke jdbc_fixed. Namun, jika Anda mengatur nilai lain, konektor akan menggunakan nilai yang Anda tentukan.

    • VVR 11.1 dan versi selanjutnya tidak lagi mendukung RPC. Kami merekomendasikan menggunakan JDBC untuk koneksi.

    • Untuk skenario penulisan konkurensi tinggi, gunakan jdbc_copy atau COPY_STREAM.

    Tabel dimensi

    Versi VVR

    Versi Hologres

    Apakah RPC terpengaruh?

    Mode konsumsi RPC aktual

    Nilai parameter direkomendasikan/default

    Catatan

    6.0.4–8.0.2

    < 2.0

    Tidak

    rpc

    Custom

    /

    6.0.4–8.0.2

    ≥ 2.0

    Ya

    jdbc_fixed (alih otomatis)

    Custom

    Jika instans Hologres Anda versi 2.0 atau lebih baru, konektor secara otomatis beralih ke jdbc_fixed karena RPC telah dihentikan. Namun, jika Anda mengatur nilai lain, konektor akan menggunakan nilai yang Anda tentukan.

    ≥ 8.0.3

    Versi apa pun

    Ya

    jdbc_fixed (alih otomatis)

    Custom

    ≥ 8.0.5

    Versi apa pun

    Ya

    jdbc_fixed (alih otomatis)

    Custom

    Penting

    VVR 11.1 dan versi selanjutnya tidak lagi mendukung RPC dan menggunakan JDBC secara default. Untuk mengaktifkan koneksi ringan, atur connection.fixed.enabled.

  • Tabel sumber binlog dalam mode JDBC mendukung pembacaan data JSONB, tetapi Anda harus mengaktifkan parameter GUC di tingkat database.

    -- Aktifkan parameter GUC di tingkat database. Hanya superuser yang dapat menjalankan perintah ini. Setiap database hanya memerlukan pengaturan ini sekali.
    alter database <db_name> set hg_experimental_enable_binlog_jsonb = on;
  • Operasi UPDATE menghasilkan dua catatan binlog berturut-turut. Data lama (update_before) muncul terlebih dahulu, diikuti oleh data baru (update_after).

  • Hindari memotong atau membuat ulang tabel sumber yang telah diaktifkan binlog-nya. Untuk informasi selengkapnya, lihat FAQ.

  • Untuk menghindari error, pertahankan presisi DECIMAL yang konsisten antara Flink dan Hologres. Untuk informasi selengkapnya, lihat FAQ.

  • Saat menggunakan mode INITIAL untuk ingesti penuh dan inkremental terpadu, pengurutan global tidak dijamin. Jika aplikasi downstream Anda bergantung pada perhitungan berbasis timestamp, gunakan mode pembacaan binlog murni sebagai gantinya.

Aktifkan binlog

Tabel Belum Dibuat

Konsumsi data real-time dinonaktifkan secara default. Saat membuat tabel Hologres di HoloWeb, atur parameter binlog.level dan binlog.ttl. Contoh kode:

begin;
create table test_table(
  id int primary key, 
  title text not null, 
  body text);
call set_table_property('test_table', 'orientation', 'row');-- Buat tabel berorientasi baris bernama test_table.
call set_table_property('test_table', 'clustering_key', 'id');-- Buat kunci pengelompokan pada kolom id.
call set_table_property('test_table', 'binlog.level', 'replica');-- Aktifkan binlog.
call set_table_property('test_table', 'binlog.ttl', '86400');-- Atur TTL binlog, dalam detik.
commit;

Tabel yang sudah ada

Di HoloWeb, gunakan pernyataan berikut untuk mengaktifkan binlog dan mengatur TTL binlog untuk tabel yang sudah ada. Ganti table_name dengan nama tabel aktual Anda.

-- Aktifkan binlog.
begin;
call set_table_property('<table_name>', 'binlog.level', 'replica');
commit;

-- Atur TTL binlog, dalam detik.
begin;
call set_table_property('<table_name>', 'binlog.ttl', '2592000');
commit;

Dengan Parameter

Mulai VVR 11, opsi konektor untuk Hologres telah diperbarui untuk meningkatkan dukungan. Beberapa opsi mungkin telah diganti namanya atau dihapus. VVR 11 tetap kompatibel mundur dengan VVR 8. Konsultasikan dokumentasi parameter yang spesifik untuk versi VVR Anda untuk detailnya.

Pemetaan tipe

Lihat Pemetaan tipe data antara Flink dan Hologres.

Catatan

Hologres mendukung pendefinisian kolom yang dihasilkan menggunakan sintaks GENERATED ALWAYS AS. Contoh:

ds TIMESTAMP NOT NULL GENERATED ALWAYS AS (date_trunc('month', create_time)) STORED

Kendala NOT NULL pada kolom yang dihasilkan dipetakan ke field nullable. Ini adalah perilaku yang diharapkan: Hologres menghitung kolom yang dihasilkan secara otomatis, sehingga Flink tidak meneruskan nilai untuk kolom tersebut. Mempertahankan kendala NOT NULL akan menyebabkan validasi penulisan HologresClient gagal. Kendala NOT NULL pada kolom biasa tetap tidak terpengaruh (misalnya: ds TIMESTAMP NOT NULL).

Contoh

Tabel sumber

Tabel sumber yang diaktifkan binlog

Mode CDC

Mode ini memungkinkan sinkronisasi cermin data tabel. Sumber mengonsumsi data logging biner dan, berdasarkan hg_binlog_event_type, secara otomatis menetapkan tipe Flink RowKind yang benar (seperti INSERT, DELETE, UPDATE_BEFORE, atau UPDATE_AFTER) untuk setiap baris tanpa perlu deklarasi eksplisit. Proses ini mencerminkan perubahan, mirip dengan fungsi CDC MySQL atau PostgreSQL. Contoh kode berikut menunjukkan pernyataan DDL untuk membuat tabel sumber dalam mode ini.

VVR 11+

CREATE TEMPORARY TABLE test_message_src_binlog_table(
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username'='${secret_values.ak_id}',            -- Kami merekomendasikan menggunakan variabel untuk AK/SK guna mencegah kebocoran kunci.
  'password'='${secret_values.ak_secret}',        
  'endpoint'='<yourEndpoint>',
  'source.binlog.change-log-mode'='ALL',  -- Baca semua jenis changeLog, termasuk INSERT, DELETE, UPDATE_BEFORE, dan UPDATE_AFTER.
  'retry-count'='10',                     -- Jumlah percobaan ulang setelah error pembacaan binlog.
  'retry-sleep-step-ms'='5000',           -- Waktu backoff bertahap antar percobaan ulang. Percobaan ulang pertama menunggu 5 detik, kedua 10 detik, dan seterusnya.
  'source.binlog.batch-size'='512'        -- Jumlah baris yang dibaca dari binlog dalam satu batch.
);

VVR 8+

CREATE TEMPORARY TABLE test_message_src_binlog_table(
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username' = '${secret_values.ak_id}',       -- Kami merekomendasikan menggunakan variabel untuk AK/SK guna mencegah kebocoran kunci.   
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>',
  'binlog' = 'true',
  'cdcMode' = 'true',
  'sdkMode'='jdbc',
  'binlogMaxRetryTimes' = '10',     -- Jumlah percobaan ulang setelah error pembacaan binlog.
  'binlogRetryIntervalMs' = '500',  -- Interval percobaan ulang setelah error pembacaan binlog, dalam milidetik.
  'binlogBatchReadSize' = '100'     -- Jumlah baris yang dibaca dari binlog dalam satu batch.
);

Mode non-CDC

Dalam mode ini, data binlog yang dikonsumsi oleh sumber diteruskan ke node turunan sebagai data Flink biasa. Artinya, semua data bertipe INSERT. Anda dapat menentukan cara memproses data dengan tipe hg_binlog_event_type tertentu sesuai kebutuhan. Contoh kode berikut menunjukkan pernyataan DDL untuk tabel sumber.

VVR 11+

CREATE TEMPORARY TABLE test_message_src_binlog_table(
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username' = '${secret_values.ak_id}',       -- Kami merekomendasikan menggunakan variabel untuk AK/SK guna mencegah kebocoran kunci.   
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>',
  'source.binlog.change-log-mode'='ALL_AS_APPEND_ONLY',  -- Perlakukan semua jenis changelog sebagai INSERT.
  'retry-count'='10',                     -- Jumlah percobaan ulang setelah error pembacaan binlog.
  'retry-sleep-step-ms'='5000',           -- Waktu backoff bertahap antar percobaan ulang. Percobaan ulang pertama menunggu 5 detik, kedua 10 detik, dan seterusnya.
  'source.binlog.batch-size'='512'        -- Jumlah baris yang dibaca dari binlog dalam satu batch.
);

VVR 8+

CREATE TEMPORARY TABLE test_message_src_binlog_table(
  hg_binlog_lsn BIGINT,
  hg_binlog_event_type BIGINT,
  hg_binlog_timestamp_us BIGINT,
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username' = '${secret_values.ak_id}',       -- Kami merekomendasikan menggunakan variabel untuk AK/SK guna mencegah kebocoran kunci.   
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>',
  'binlog' = 'true',
  'binlogMaxRetryTimes' = '10',     -- Jumlah percobaan ulang setelah error pembacaan binlog.
  'binlogRetryIntervalMs' = '500',  -- Interval percobaan ulang setelah error pembacaan binlog, dalam milidetik.
  'binlogBatchReadSize' = '100'     -- Jumlah baris yang dibaca dari binlog dalam satu batch.
);

Tabel sumber yang dinonaktifkan binlog

VVR 11+

Penting

Secara default, Ververica Runtime (VVR) versi 11.1 dan lebih baru mengonsumsi data logging biner. Untuk informasi selengkapnya, lihat Tabel sumber Binlog.

CREATE TEMPORARY TABLE hologres_source (
  name varchar, 
  age BIGINT,
  birthday BIGINT
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username' = '${secret_values.ak_id}',       -- Kami merekomendasikan menggunakan variabel untuk AK/SK guna mencegah kebocoran kunci.   
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>',
  'source.binlog'='false'                      -- Jangan baca data binlog.
);

VVR 8+

CREATE TEMPORARY TABLE hologres_source (
  name varchar, 
  age BIGINT,
  birthday BIGINT
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username' = '${secret_values.ak_id}',       -- Kami merekomendasikan menggunakan variabel untuk AK/SK guna mencegah kebocoran kunci.   
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>',
  'sdkMode' = 'jdbc'
);

Tabel sink

CREATE TEMPORARY TABLE datagen_source(
  name varchar, 
  age BIGINT,
  birthday BIGINT
) WITH (
  'connector'='datagen'
);
CREATE TEMPORARY TABLE hologres_sink (
  name varchar, 
  age BIGINT,
  birthday BIGINT
) WITH (
  'connector'='hologres',           
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username' = '${secret_values.ak_id}',       -- Kami merekomendasikan menggunakan variabel untuk AK/SK guna mencegah kebocoran kunci.   
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>'
);
INSERT INTO hologres_sink SELECT * from datagen_source;

Contoh Tabel Dimensi

CREATE TEMPORARY TABLE datagen_source (
  a INT,
  b BIGINT,
  c STRING,
  proctime AS PROCTIME()
) WITH (
  'connector' = 'datagen'
);
CREATE TEMPORARY TABLE hologres_dim (
  a INT, 
  b VARCHAR, 
  c VARCHAR
) WITH (
  'connector'='hologres',           
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username' = '${secret_values.ak_id}',       -- Kami merekomendasikan menggunakan variabel untuk AK/SK guna mencegah kebocoran kunci.   
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>'
);
CREATE TEMPORARY TABLE blackhole_sink (
  a INT,
  b STRING
) WITH (
  'connector' = 'blackhole'
);
INSERT INTO blackhole_sink SELECT T.a,H.b
FROM datagen_source AS T JOIN hologres_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.a = H.a;

Detail fitur

Ingesti penuh dan inkremental terpadu

Skenario

  • Fitur ini hanya berlaku untuk tabel target yang memiliki kunci primer. Kami merekomendasikan menggunakannya dengan tabel sumber Hologres dalam mode CDC.

  • Hologres mendukung pengaktifan binlog sesuai permintaan. Anda dapat mengaktifkan binlog untuk tabel yang berisi data historis.

Kode contoh

VVR 11+

CREATE TABLE test_message_src_binlog_table(
  hg_binlog_lsn BIGINT,
  hg_binlog_event_type BIGINT,
  hg_binlog_timestamp_us BIGINT,
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username'='<yourAccessID>',
  'password'='<yourAccessSecret>',
  'endpoint'='<yourEndpoint>',
  'source.binlog.startup-mode' = 'INITIAL',   -- Pertama membaca semua data historis, lalu membaca binlog secara inkremental.
  'retry-count'='10',                         -- Jumlah percobaan ulang setelah error pembacaan binlog.
  'retry-sleep-step-ms'='5000',               -- Waktu backoff bertahap antar percobaan ulang. Percobaan ulang pertama menunggu 5 detik, kedua 10 detik, dan seterusnya.
  'source.binlog.batch-size'='512'            -- Jumlah baris yang dibaca dari binlog dalam satu batch.
  );
Catatan
  • Atur source.binlog.startup-mode ke INITIAL untuk pertama membaca semua data historis lalu mulai membaca secara inkremental.

  • Jika Anda mengatur parameter startTime atau memilih waktu mulai pada antarmuka startup, binlogStartUpMode dipaksa menggunakan mode konsumsi timestamp, dan mode konsumsi lainnya diabaikan karena parameter startTime memiliki prioritas lebih tinggi.

VVR 8+

CREATE TABLE test_message_src_binlog_table(
  hg_binlog_lsn BIGINT,
  hg_binlog_event_type BIGINT,
  hg_binlog_timestamp_us BIGINT,
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username'='<yourAccessID>',
  'password'='<yourAccessSecret>',
  'endpoint'='<yourEndpoint>',
  'binlog' = 'true',
  'cdcMode' = 'true',
  'binlogStartUpMode' = 'initial', -- Pertama membaca semua data historis, lalu membaca binlog secara inkremental.
  'binlogMaxRetryTimes' = '10',     -- Jumlah percobaan ulang setelah error pembacaan binlog.
  'binlogRetryIntervalMs' = '500',  -- Interval percobaan ulang setelah error pembacaan binlog, dalam milidetik.
  'binlogBatchReadSize' = '100'     -- Jumlah baris yang dibaca dari binlog dalam satu batch.
  );
Catatan
  • Atur binlogStartUpMode ke initial untuk pertama membaca semua data historis lalu mulai membaca secara inkremental.

  • startTime memiliki prioritas lebih tinggi daripada binlogStartUpMode. Jika Anda mengatur parameter startTime atau memilih waktu mulai di antarmuka startup, binlogStartUpMode secara otomatis dipaksa menggunakan mode timestamp untuk konsumsi, dan mode konsumsi lainnya tidak berlaku.

Menangani konflik kunci primer

Saat menulis ke Hologres, jika data dengan kunci primer duplikat sudah ada, konektor menyediakan tiga strategi penanganan.

VVR 11+

Tentukan opsi sink.on-conflict-action untuk menerapkan strategi berbeda.

sink.on-conflict-action value

Deskripsi

INSERT_OR_IGNORE

Menyimpan kemunculan pertama data dan mengabaikan duplikat berikutnya.

INSERT_OR_REPLACE

Mengganti baris yang ada dengan data baru.

INSERT_OR_UPDATE (default)

Hanya memperbarui kolom yang ditentukan, membiarkan kolom lain pada baris yang ada tidak berubah.

VVR 8+

Tentukan opsi mutatetype untuk menerapkan strategi berbeda.

mutatetype value

Deskripsi

insertorignore (default)

Menyimpan kemunculan pertama data dan mengabaikan duplikat berikutnya.

insertorreplace

Mengganti baris yang ada dengan data baru.

insertorupdate

Hanya memperbarui kolom yang ditentukan, membiarkan kolom lain pada baris yang ada tidak berubah.

Asumsikan sebuah tabel memiliki field a, b, c, dan d, dengan a sebagai kunci primer. Jika tabel sink hanya berisi field a dan b, maka ketika Anda mengonfigurasi INSERT_OR_UPDATE, hanya field b yang diperbarui, sedangkan field c dan d tetap tidak berubah.
Catatan

Jumlah kolom dalam tabel sink dapat lebih sedikit daripada tabel Hologres fisik, tetapi kolom yang hilang harus nullable. Jika tidak, operasi penulisan gagal.

Menulis ke tabel partisi

Secara default, sink Hologres hanya mendukung impor data ke tabel non-partisi. Untuk mengimpor data ke tabel partisi, aktifkan konfigurasi berikut:

VVR 11+

Atur sink.create-missing-partition ke true. Ini memungkinkan konektor membuat partisi anak secara otomatis jika belum ada.

Catatan
  • VVR 11.1 dan versi selanjutnya mendukung penulisan ke tabel partisi, mengarahkan data secara otomatis ke partisi anak yang sesuai.

  • Atur tablename ke nama tabel induk.

  • Jika Anda tidak membuat tabel anak terlebih dahulu dan tidak mengatur sink.create-missing-partition=true, penulisan gagal.

VVR 8+

  • Atur partitionRouter ke true untuk mengarahkan data secara otomatis ke partisi anak yang sesuai.

  • Atur createparttable ke true untuk membuat partisi anak secara otomatis jika belum ada.

Catatan
  • Atur tablename ke nama tabel induk.

  • Untuk memastikan penulisan berhasil, buat partisi anak terlebih dahulu atau atur createparttable=true.

Gabung dan Perbaruan Parsial untuk Tabel Lebar dengan Penulisan Multi-Stream

Konektor dapat secara efisien menggabungkan beberapa aliran data ke dalam satu tabel lebar Hologres. Konektor mendukung pembaruan parsial baris, menerapkan perubahan hanya pada kolom yang dimodifikasi berdasarkan kunci primer. Ini mengoptimalkan efisiensi penulisan dan memastikan konsistensi data.

Batasan

  • Tabel lebar harus memiliki kunci primer.

  • Setiap aliran data harus berisi semua field kunci primer.

  • Jika tabel lebar Hologres Anda menggunakan penyimpanan berorientasi kolom, RPS tinggi dapat menyebabkan penggunaan CPU meningkat. Pertimbangkan untuk menonaktifkan encoding kamus untuk kolom tabel guna mengurangi hal ini.

Contoh

Asumsikan ada dua aliran data. Satu aliran berisi kolom a, b, dan c, sedangkan yang lain berisi kolom a, d, dan e. Tabel lebar Hologres WIDE_TABLE berisi kolom a, b, c, d, dan e, dengan a sebagai kunci primer.

VVR 11+

// Asumsikan source1 dan source2 sudah didefinisikan.
CREATE TEMPORARY TABLE hologres_sink ( -- Deklarasikan lima kolom: a, b, c, d, e.
  a BIGINT, 
  b STRING,
  c STRING,
  d STRING,
  e STRING,
  primary key(a) not enforced
) WITH (
  'connector'='hologres',           
  'dbname'='<yourDbname>',
  'tablename'='<yourWideTablename>',  -- Tabel lebar Hologres, berisi kolom a, b, c, d, e.
  'username' = '${secret_values.ak_id}',
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>',
  'sink.on-conflict-action'='INSERT_OR_UPDATE',   -- Perbarui hanya kolom yang ditentukan berdasarkan kunci primer.
  'sink.delete-strategy'='IGNORE_DELETE',         -- Strategi untuk menangani pesan retraction. IGNORE_DELETE cocok untuk skenario yang hanya memerlukan insert atau update, bukan delete.
  'sink.partial-insert.enabled'='true'            -- Aktifkan pembaruan kolom parsial. Ini memungkinkan konektor untuk memperbarui atau memasukkan hanya kolom yang ditentukan dalam pernyataan `INSERT`.
);

BEGIN STATEMENT SET;
INSERT INTO hologres_sink(a,b,c) select * from source1;  -- Masukkan hanya kolom a, b, dan c.
INSERT INTO hologres_sink(a,d,e) select * from source2;  -- Masukkan hanya kolom a, d, dan e.
END;

VVR 8+

// Asumsikan source1 dan source2 sudah didefinisikan.
CREATE TEMPORARY TABLE hologres_sink ( -- Deklarasikan lima kolom: a, b, c, d, e.
  a BIGINT, 
  b STRING,
  c STRING,
  d STRING,
  e STRING,
  primary key(a) not enforced
) WITH (
  'connector'='hologres',           
  'dbname'='<yourDbname>',
  'tablename'='<yourWideTablename>',  -- Tabel lebar Hologres, berisi kolom a, b, c, d, e.
  'username' = '${secret_values.ak_id}',
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>',
  'mutatetype'='insertorupdate',    -- Perbarui hanya kolom yang ditentukan berdasarkan kunci primer.
  'ignoredelete'='true',            -- Abaikan permintaan delete yang dihasilkan oleh pesan retraction.
  'partial-insert.enabled'='true'   -- Aktifkan pembaruan kolom parsial, mendukung pembaruan hanya untuk kolom yang dideklarasikan dalam pernyataan INSERT.
);

BEGIN STATEMENT SET;
INSERT INTO hologres_sink(a,b,c) select * from source1;  -- Masukkan hanya kolom a, b, dan c.
INSERT INTO hologres_sink(a,d,e) select * from source2;  -- Masukkan hanya kolom a, d, dan e.
END;
Catatan

ignoredelete diatur ke true untuk mengabaikan permintaan Delete yang dihasilkan oleh pesan retraction. Pada VVR 8.0.8 dan versi selanjutnya, kami merekomendasikan menggunakan sink.delete-strategy untuk mengonfigurasi berbagai strategi untuk pembaruan parsial.

Baca binlog dari tabel partisi (Beta)

Tabel partisi meningkatkan pengarsipan data dan performa kueri. Konektor Hologres mendukung pembacaan binlog dari tabel partisi fisik maupun logis. Untuk informasi selengkapnya, lihat BUAT TABEL PARTISI LOGIS.

Mengonsumsi log biner dari tabel partisi fisik

Konektor Hologres mendukung pembacaan binlog dari tabel partisi dan secara dinamis memantau partisi baru dalam satu pekerjaan, sangat meningkatkan efisiensi dan kegunaan pemrosesan data real-time.

Catatan

  • Pembacaan binlog dari tabel partisi memerlukan: VVR 8.0.11+, Hologres 2.1.27+, tabel yang diaktifkan binlog, dan konsumsi JDBC.

  • Nama partisi harus terdiri dari nama tabel induk, garis bawah (_), dan nilai partisi—yaitu, {parent_table}_{partition_value}. Partisi yang tidak mengikuti format ini tidak dapat dikonsumsi. Untuk informasi selengkapnya, lihat Manajemen partisi dinamis.

    Penting
    • Untuk mode DYNAMIC, VVR 8.0.11 tidak mendukung field partisi dengan pemisah - (seperti YYYY-MM-DD).

    • VVR 11.1+ mendukung penuh pembacaan dari partisi dengan format nama kustom.

    • Pembatasan ini hanya berlaku untuk pembacaan; penulisan ke tabel partisi tidak terpengaruh.

  • Saat mendeklarasikan tabel sumber Hologres di Flink, sertakan kolom partisi dari tabel partisi Hologres.

  • Untuk mode DYNAMIC, pastikan tabel partisi Anda telah manajemen partisi dinamis diaktifkan. Juga, parameter pra-pembuatan partisi auto_partitioning.num_precreate harus lebih besar dari 1. Jika tidak, pekerjaan akan melemparkan exception saat membaca partisi terbaru.

  • Dalam mode konsumsi binlog partisi DYNAMIC, begitu partisi baru ditambahkan, data inkremental dari partisi lama tidak dibaca.

Contoh

Jenis pola

Fitur

Deskripsi

DYNAMIC

Pembacaan partisi dinamis

Secara otomatis memantau partisi baru dan maju secara dinamis dalam urutan kronologis. Cocok untuk kasus penggunaan real-time.

STATIC

Konsumsi partisi statis

Hanya membaca partisi yang sudah ada (atau yang ditentukan secara eksplisit) dan tidak secara otomatis menemukan partisi baru. Cocok untuk memproses data historis dalam rentang tetap.

Mode DYNAMIC

VVR 11+

Asumsikan Hologres berisi tabel partisi yang didefinisikan DDL berikut, dengan logging biner dan partisi dinamis diaktifkan.

CREATE TABLE "test_message_src1" (
    id int,
    title text,
    body text,
    dt text,
    PRIMARY KEY (id, dt)
)
PARTITION BY LIST (dt) WITH (
    binlog_level = 'replica', 
    auto_partitioning_enable =  'true',   -- Aktifkan partisi dinamis.
    auto_partitioning_time_unit = 'DAY',  -- Partisi dibuat harian. Contoh nama partisi: test_message_src1_20250512, test_message_src1_20250513.
    auto_partitioning_num_precreate = '2' -- Pra-buat dua partisi.
);
-- Untuk tabel partisi yang sudah ada, Anda juga dapat mengaktifkan partisi dinamis menggunakan ALTER TABLE.

Di Flink, gunakan pernyataan SQL berikut untuk mengonsumsi data dari tabel partisi test_message_src1 dalam mode DYNAMIC.

CREATE TEMPORARY TABLE hologres_source
(
  id INTEGER,
  title VARCHAR,
  body VARCHAR,
  dt VARCHAR  -- Kolom partisi dari tabel partisi Hologres.
)
with (
  'connector' = 'hologres',
  'dbname' = '<yourDatabase>',
  'tablename' = 'test_message_src1',  -- Tabel Hologres dengan partisi dinamis diaktifkan.
  'username' = '<yourUserName>',
  'password' = '<yourPassword>',
  'endpoint' = '<yourEndpoint>',
  'source.binlog.partition-binlog-mode' = 'DYNAMIC', -- Pantau secara dinamis partisi terbaru.
  'source.binlog.startup-mode' = 'initial'           -- Pertama baca semua data yang ada, lalu mulai membaca binlog secara inkremental.
);

VVR 8.0.11

Asumsikan Hologres berisi tabel partisi yang didefinisikan DDL berikut, dengan logging biner dan partisi dinamis diaktifkan.

CREATE TABLE "test_message_src1" (
    id int,
    title text,
    body text,
    dt text,
    PRIMARY KEY (id, dt)
)
PARTITION BY LIST (dt) WITH (
    binlog_level = 'replica', 
    auto_partitioning_enable =  'true',   -- Aktifkan partisi dinamis.
    auto_partitioning_time_unit = 'DAY',  -- Partisi dibuat harian. Contoh nama partisi: test_message_src1_20241027, test_message_src1_20241028.
    auto_partitioning_num_precreate = '2' -- Pra-buat dua partisi.
);

-- Untuk tabel partisi yang sudah ada, Anda juga dapat mengaktifkan partisi dinamis menggunakan ALTER TABLE.

Di Flink, gunakan pernyataan SQL berikut untuk mengonsumsi data dari tabel partisi test_message_src1 dalam mode DYNAMIC.

CREATE TEMPORARY TABLE hologres_source
(
  id INTEGER,
  title VARCHAR,
  body VARCHAR,
  dt VARCHAR  -- Kolom partisi dari tabel partisi Hologres.
)
with (
  'connector' = 'hologres',
  'dbname' = '<yourDatabase>',
  'tablename' = 'test_message_src1',  -- Tabel Hologres dengan partisi dinamis diaktifkan.
  'username' = '<yourUserName>',
  'password' = '<yourPassword>',
  'endpoint' = '<yourEndpoint>',
  'binlog' = 'true',
  'partition-binlog.mode' = 'DYNAMIC',  -- Pantau secara dinamis partisi terbaru.
  'binlogstartUpMode' = 'initial',      -- Pertama baca semua data yang ada, lalu mulai membaca binlog secara inkremental.
  'sdkMode' = 'jdbc_fixed'              -- Gunakan mode ini untuk menghindari masalah batas koneksi.
);

Mode STATIC

VVR 11+

Asumsikan Hologres berisi tabel partisi berikut yang didefinisikan oleh Data Definition Language (DDL), dan logging biner diaktifkan.

CREATE TABLE test_message_src2 (
    id int,
    title text,
    body text,
    color text,
    PRIMARY KEY (id, color)
)
PARTITION BY LIST (color) WITH (
    binlog_level = 'replica'
);
create table test_message_src2_red partition of test_message_src2 for values in ('red');
create table test_message_src2_blue partition of test_message_src2 for values in ('blue');
create table test_message_src2_green partition of test_message_src2 for values in ('green');
create table test_message_src2_black partition of test_message_src2 for values in ('black');

Di Flink, gunakan pernyataan SQL berikut untuk mengonsumsi tabel partisi test_message_src2 dalam mode STATIC.

CREATE TEMPORARY TABLE hologres_source
(
  id INTEGER,
  title VARCHAR,
  body VARCHAR,
  color VARCHAR  -- Kolom partisi dari tabel partisi Hologres.
)
with (
  'connector' = 'hologres',
  'dbname' = '<yourDatabase>',
  'tablename' = 'test_message_src2',  -- Tabel partisi.
  'username' = '<yourUserName>',
  'password' = '<yourPassword>',
  'endpoint' = '<yourEndpoint>',
  'source.binlog.partition-binlog-mode' = 'STATIC', -- Baca set partisi tetap.
  'source.binlog.partition-values-to-read' = 'red,blue,green',  -- Baca hanya tiga partisi yang ditentukan; partisi 'black' tidak dibaca. Partisi baru juga tidak dibaca. Jika tidak diatur, semua partisi tabel induk dibaca.
  'source.binlog.startup-mode' = 'initial'  -- Pertama baca semua data yang ada, lalu mulai membaca binlog secara inkremental.
);

VVR 8.0.11

Asumsikan tabel partisi DDL berikut ada di Hologres dan logging biner diaktifkan.

CREATE TABLE test_message_src2 (
    id int,
    title text,
    body text,
    color text,
    PRIMARY KEY (id, color)
)
PARTITION BY LIST (color) WITH (
    binlog_level = 'replica'
);
create table test_message_src2_red partition of test_message_src2 for values in ('red');
create table test_message_src2_blue partition of test_message_src2 for values in ('blue');
create table test_message_src2_green partition of test_message_src2 for values in ('green');
create table test_message_src2_black partition of test_message_src2 for values in ('black');

Di Flink, gunakan pernyataan SQL berikut untuk membaca data dari tabel partisi test_message_src2 dalam mode STATIC.

CREATE TEMPORARY TABLE hologres_source
(
  id INTEGER,
  title VARCHAR,
  body VARCHAR,
  color VARCHAR  -- Kolom partisi dari tabel partisi Hologres.
)
with (
  'connector' = 'hologres',
  'dbname' = '<yourDatabase>',
  'tablename' = 'test_message_src2',  -- Tabel partisi.
  'username' = '<yourUserName>',
  'password' = '<yourPassword>',
  'endpoint' = '<yourEndpoint>',
  'binlog' = 'true',
  'partition-binlog.mode' = 'STATIC', -- Baca set partisi tetap.
  'partition-values-to-read' = 'red,blue,green',  -- Baca hanya tiga partisi yang ditentukan; partisi 'black' tidak dibaca. Partisi baru juga tidak dibaca. Jika tidak diatur, semua partisi tabel induk dibaca.
  'binlogstartUpMode' = 'initial',  -- Pertama baca semua data yang ada, lalu mulai membaca binlog secara inkremental.
  'sdkMode' = 'jdbc_fixed' -- Gunakan mode ini untuk menghindari masalah batas koneksi.
);

Mengonsumsi Logging Biner dari Tabel Partisi Logis

Konektor Hologres mendukung pembacaan binlog dari tabel partisi logis dan memungkinkan penentuan eksplisit partisi mana yang akan dibaca.

Perhatian

  • Pembacaan binlog dari partisi tertentu memerlukan: VVR 11.0.0+ dan Hologres V3.1+.

  • Pembacaan binlog dari semua partisi setara dengan membaca binlog dari tabel Hologres non-partisi. Untuk petunjuknya, lihat Tabel sumber.

Contoh

Nama Parameter

Deskripsi

Contoh

source.binlog.logical-partition-filter-column-names

Nama kolom partisi, diapit tanda kutip ganda. Pisahkan beberapa nama kolom dengan koma. Jika nama kolom berisi tanda kutip ganda, escape dengan tanda kutip ganda lainnya.

'source.binlog.logical-partition-filter-column-names'='"Pt","id"'

Kolom partisi adalah Pt dan id.

source.binlog.logical-partition-filter-column-values

Nilai kolom partisi. Setiap partisi ditentukan oleh satu set nilai kolom. Nilai untuk kolom berbeda dipisahkan koma dan diapit tanda kutip ganda. Jika nilai berisi tanda kutip ganda, escape dengan tanda kutip ganda lainnya. Beberapa partisi dipisahkan titik koma.

'source.binlog.logical-partition-filter-column-values'='"20240910","0";"special""value","9"'

Tentukan konsumsi dua partisi. Kunci partisi memiliki dua kolom. Nilai kunci partisi pertama adalah (20240910, 0), dan nilai kunci partisi kedua adalah (special"value, 9).

Asumsikan tabel berikut telah dibuat di Hologres:

CREATE TABLE holo_table (
    id int not null,
    name text,
    age numeric(18,4),
    "Pt" text,
    primary key(id, "Pt")
)
LOGICAL PARTITION BY LIST ("Pt", id)
WITH (
    binlog_level ='replica'
);

Konsumsi logging biner tabel ini di Flink.

CREATE TEMPORARY TABLE test_src_binlog_table(
  id INTEGER,
  name VARCHAR,
  age decimal(18,4),
  `Pt` VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='holo_table',
  'username'='<yourAccessID>',
  'password'='<yourAccessSecret>',
  'endpoint'='<yourEndpoint>',
  'source.binlog'='true',
  'source.binlog.logical-partition-filter-column-names'='"Pt","id"',
  'source.binlog.logical-partition-filter-column-values'='<yourPartitionColumnValues>',
  'source.binlog.change-log-mode'='ALL',  -- Baca semua jenis changeLog, termasuk INSERT, DELETE, UPDATE_BEFORE, dan UPDATE_AFTER.
  'retry-count'='10',                     -- Jumlah percobaan ulang setelah error pembacaan binlog.
  'retry-sleep-step-ms'='5000',           -- Waktu backoff bertahap antar percobaan ulang. Percobaan ulang pertama menunggu 5 detik, kedua 10 detik, dan seterusnya.
  'source.binlog.batch-size'='512'        -- Jumlah baris yang dibaca dari binlog dalam satu batch.
);

API DataStream

Penting

Untuk menggunakan API DataStream, sertakan dependensi konektor DataStream Hologres dalam proyek Anda. Untuk petunjuk tentang menyiapkan konektor DataStream, lihat Integrasikan dan gunakan konektor dalam program DataStream. Konektor DataStream Hologres tersedia di Repositori Maven Central kami. Untuk debugging lokal, gunakan Uber JAR yang sesuai. Untuk informasi selengkapnya, lihat Jalankan dan debug pekerjaan yang berisi konektor secara lokal.

Tabel sumber Hologres

Tabel sumber yang diaktifkan binlog

Realtime Compute for Apache Flink menyediakan kelas HologresBinlogSource untuk membaca data binlog Hologres. Contoh berikut menunjukkan cara membuat HologresBinlogSource.

VVR 11.3+

Penting

Sejak VVR 11.1.2, parameter JDBCOptions dan startTimeMs telah dihapus dari konstruktor HologresBinlogSource. Sejak VVR 11.3, parameter List<Subscribe.BinlogFilter> telah ditambahkan. Jika Anda menggunakan VVR 11 atau lebih baru, kami merekomendasikan menggunakan VVR 11.3 atau lebih baru.

public class Sample {                                                                                                                                                                          
        public static void main(String[] args) throws Exception {
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // Inisialisasi skema tabel yang akan dibaca. Harus sesuai dengan kolom skema tabel Hologres. Anda dapat mendefinisikan subset kolom.
            TableSchema schema = TableSchema.builder()
                    .field("a", DataTypes.INT())
                    .field("b", DataTypes.STRING())
                    .field("c", DataTypes.TIMESTAMP())
                    .build();

            // Nama tabel yang akan dibaca.
            String sourceTableName = "sourceTableName";

            // Opsi konektor Hologres.
            Configuration config = new Configuration();
            config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
            config.setString(HologresConfigs.USERNAME, "yourUserName");
            config.setString(HologresConfigs.PASSWORD, "yourPassword");
            config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
            config.setString(HologresConfigs.TABLE, sourceTableName);
            config.set(HologresConfigs.BINLOG, true);
            config.set(HologresConfigs.BINLOG_CHANGE_LOG_MODE, BinlogChangeLogMode.ALL);
            // Bangun HologresBinlogSource.
            HologresBinlogSource source = new HologresBinlogSource(
                    new HologresConnectionParam(config),
                    schema,
                    config,
                    StartupMode.INITIAL,
                    sourceTableName,
                    "",
                    Collections.emptyList(),
                    -1,
                    Collections.emptySet(),
                    Collections.emptyList()
            );
            env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
            env.execute();
        }
  }

VVR 8.0.11+

public class Sample {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // Inisialisasi skema tabel yang akan dibaca. Harus sesuai dengan kolom skema tabel Hologres. Anda dapat mendefinisikan subset kolom.
        TableSchema schema = TableSchema.builder()
                .field("a", DataTypes.INT())
                .field("b", DataTypes.STRING())
                .field("c", DataTypes.TIMESTAMP())
                .build();
        // Opsi konektor Hologres.
        Configuration config = new Configuration();
        config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
        config.setString(HologresConfigs.USERNAME, "yourUserName");
        config.setString(HologresConfigs.PASSWORD, "yourPassword");
        config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
        config.setString(HologresConfigs.TABLE, "yourTableName");
        config.setString(HologresConfigs.SDK_MODE, "jdbc");
        config.setBoolean(HologresBinlogConfigs.OPTIONAL_BINLOG, true);
        config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
        // Bangun JDBCOptions.
        JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
        // Bangun HologresBinlogSource.
        long startTimeMs = 0;
        HologresBinlogSource source = new HologresBinlogSource(
                new HologresConnectionParam(config),
                schema,
                config,
                jdbcOptions,
                startTimeMs,
                StartupMode.INITIAL,
                "",
                "",
                -1,
                Collections.emptySet(),
                new ArrayList<>()
        );
        env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
        env.execute();
    }
}

VVR 8.0.7+

public class Sample {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // Inisialisasi skema tabel yang akan dibaca. Harus sesuai dengan kolom skema tabel Hologres. Anda dapat mendefinisikan subset kolom.
        TableSchema schema = TableSchema.builder()
                .field("a", DataTypes.INT())
                .field("b", DataTypes.STRING())
                .field("c", DataTypes.TIMESTAMP())
                .build();
        // Opsi konektor Hologres.
        Configuration config = new Configuration();
        config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
        config.setString(HologresConfigs.USERNAME, "yourUserName");
        config.setString(HologresConfigs.PASSWORD, "yourPassword");
        config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
        config.setString(HologresConfigs.TABLE, "yourTableName");
        config.setString(HologresConfigs.SDK_MODE, "jdbc");
        config.setBoolean(HologresBinlogConfigs.OPTIONAL_BINLOG, true);
        config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
        // Bangun JDBCOptions.
        JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
        // Bangun HologresBinlogSource.
        long startTimeMs = 0;
        HologresBinlogSource source = new HologresBinlogSource(
                new HologresConnectionParam(config),
                schema,
                config,
                jdbcOptions,
                startTimeMs,
                StartupMode.INITIAL,
                "",
                "",
                -1,
                Collections.emptySet()
        );
        env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
        env.execute();
    }
}

VVR 6.0.7+

public class Sample {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // Inisialisasi skema tabel yang akan dibaca. Harus sesuai dengan kolom skema tabel Hologres. Anda dapat mendefinisikan subset kolom.
        TableSchema schema = TableSchema.builder()
                .field("a", DataTypes.INT())
                .build();
         // Opsi konektor Hologres.
        Configuration config = new Configuration();
        config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
        config.setString(HologresConfigs.USERNAME, "yourUserName");
        config.setString(HologresConfigs.PASSWORD, "yourPassword");
        config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
        config.setString(HologresConfigs.TABLE, "yourTableName");
        config.setString(HologresConfigs.SDK_MODE, "jdbc");
        config.setBoolean(HologresBinlogConfigs.OPTIONAL_BINLOG, true);
        config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
        // Bangun JDBCOptions.
        JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
        // Atur atau buat nama slot default.
        config.setString(HologresBinlogConfigs.JDBC_BINLOG_SLOT_NAME, HoloBinlogUtil.getOrCreateDefaultSlotForJDBCBinlog(jdbcOptions));

        boolean cdcMode = config.get(HologresBinlogConfigs.BINLOG_CDC_MODE) && config.get(HologresBinlogConfigs.OPTIONAL_BINLOG);
        // Bangun JDBCBinlogRecordConverter.
        JDBCBinlogRecordConverter recordConverter = new JDBCBinlogRecordConverter(
                jdbcOptions.getTable(),
                schema,
                new HologresConnectionParam(config),
                cdcMode,
                Collections.emptySet());
        
        // Bangun HologresJDBCBinlogSource.
        long startTimeMs = 0;
        HologresJDBCBinlogSource source = new HologresJDBCBinlogSource(
                new HologresConnectionParam(config),
                schema,
                config,
                jdbcOptions,
                startTimeMs,
                StartupMode.TIMESTAMP,
                recordConverter,
                "",
                -1);
        env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
        env.execute();
    }
}
Penting

Jika Anda menggunakan mesin Realtime Compute for Apache Flink versi sebelum 8.0.5 atau versi Hologres sebelum V2.1, pastikan pengguna adalah superuser atau memiliki izin Replication Role. Untuk informasi selengkapnya, lihat Masalah izin Hologres.

Tabel sumber yang dinonaktifkan binlog

Realtime Compute for Apache Flink menyediakan kelas HologresBulkreadInputFormat, implementasi RichInputFormat, untuk membaca data dari tabel Hologres. Contoh berikut menunjukkan cara membangun sumber Hologres untuk membaca data dari tabel Hologres yang dinonaktifkan binlog-nya.

public class Sample {
    public static void main(String[] args) throws Exception {
        // Siapkan API DataStream Java
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // Inisialisasi skema tabel yang akan dibaca. Harus sesuai dengan kolom skema tabel Hologres. Anda dapat mendefinisikan subset kolom.
        TableSchema schema = TableSchema.builder()
                .field("a", DataTypes.INT())
                .field("b", DataTypes.STRING())
                .field("c", DataTypes.TIMESTAMP())
                .build();
        // Opsi konektor Hologres.
        Configuration config = new Configuration();
        config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
        config.setString(HologresConfigs.USERNAME, "yourUserName");
        config.setString(HologresConfigs.PASSWORD, "yourPassword");
        config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
        config.setString(HologresConfigs.TABLE, "yourTableName");
        // Bangun JDBCOptions.
        JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
        HologresBulkreadInputFormat inputFormat = new HologresBulkreadInputFormat(
                new HologresConnectionParam(config),
                jdbcOptions,
                schema,
                "",
                -1);
        TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
        env.addSource(new InputFormatSourceFunction<>(inputFormat, typeInfo)).returns(typeInfo).print();
        env.execute();
    }
}

Dependensi Maven

Konektor DataStream Hologres tersedia di Repositori Maven Central kami.

<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-hologres</artifactId>
    <version>${vvr-version}</version>
</dependency>

Tabel sink Hologres

VVR 11+

public class Sample {
      public static void main(String[] args) throws Exception {
          final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
          // Inisialisasi skema tabel yang akan ditulis. Harus sesuai dengan kolom skema tabel Hologres. Anda dapat mendefinisikan subset kolom.
          TableSchema tableSchema = TableSchema.builder()
                  .field("a", DataTypes.INT().notNull())
                  .field("b", DataTypes.STRING())
                  .primaryKey("a")
                  .build();
          // Opsi konektor Hologres.
          Configuration config = new Configuration();
          config.set(HologresConfigs.ENDPOINT, "yourEndpoint");
          config.set(HologresConfigs.USERNAME, "yourUserName");
          config.set(HologresConfigs.PASSWORD, "yourPassword");
          config.set(HologresConfigs.DATABASE, "yourDatabaseName");
          config.set(HologresConfigs.TABLE, "yourTableName");
          HologresConnectionParam connectionParam = new HologresConnectionParam(config);
          HologresTableSchema hologresTableSchema =
                  HologresTableSchema.get(connectionParam.getJDBCOptions());
          // Indeks kolom yang akan ditulis ke sink.
          Integer[] targetColumnIndexes = {0, 1};
          // Bangun sink Hologres.
          HologresSinkFunction sinkFunction =
                  new HologresSinkFunction(
                          connectionParam, tableSchema, targetColumnIndexes, hologresTableSchema);
          TypeInformation<RowData> typeInfo = InternalTypeInfo.of(tableSchema.toRowDataType().getLogicalType());
          env.fromElements((RowData) GenericRowData.of(101, StringData.fromString("name"))).returns(typeInfo).addSink(sinkFunction);
          env.execute();
      }
  }

VVR 8+

public class Sample {
    public static void main(String[] args) throws Exception {
        // Siapkan API DataStream Java
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // Inisialisasi skema tabel yang akan ditulis. Harus sesuai dengan kolom skema tabel Hologres. Anda dapat mendefinisikan subset kolom.
        TableSchema schema = TableSchema.builder()
                .field("a", DataTypes.INT())
                .field("b", DataTypes.STRING())
                .build();
        // Opsi konektor Hologres.
        Configuration config = new Configuration();
        config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
        config.setString(HologresConfigs.USERNAME, "yourUserName");
        config.setString(HologresConfigs.PASSWORD, "yourPassword");
        config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
        config.setString(HologresConfigs.TABLE, "yourTableName");
        config.setString(HologresConfigs.SDK_MODE, "jdbc");
        HologresConnectionParam hologresConnectionParam = new HologresConnectionParam(config);
        
         // Bangun writer Hologres untuk menulis data sebagai RowData.
        AbstractHologresWriter<RowData> hologresWriter = HologresJDBCWriter.createRowDataWriter(
                hologresConnectionParam, 
                schema, 
                HologresTableSchema.get(hologresConnectionParam), 
                new Integer[0]);
        // Bangun sink Hologres.
        HologresSinkFunction sinkFunction = new HologresSinkFunction(hologresConnectionParam, hologresWriter);
        TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
        env.fromElements((RowData) GenericRowData.of(101, StringData.fromString("name"))).returns(typeInfo).addSink(sinkFunction);
        env.execute();
    }
}

Kolom metadata

Realtime Compute for Apache Flink VVR 8.0.11 dan versi selanjutnya mendukung kolom metadata untuk tabel sumber yang diaktifkan binlog. Mulai versi ini, kami merekomendasikan mendeklarasikan field binlog, seperti hg_binlog_event_type, sebagai kolom metadata. Kolom metadata memperluas standar SQL. Kolom ini memungkinkan Anda mengakses informasi spesifik, seperti nama database dan tabel sumber, serta tipe perubahan dan timestamp data. Anda dapat menggunakan informasi ini untuk mendefinisikan logika pemrosesan khusus, seperti memfilter event DELETE.

Nama field

Tipe data

Deskripsi

db_name

STRING NOT NULL

Nama database yang berisi baris tersebut.

table_name

STRING NOT NULL

Nama tabel yang berisi baris tersebut.

hg_binlog_lsn

BIGINT NOT NULL

Kolom sistem binlog yang merepresentasikan nomor urutan binlog. Nilainya meningkat secara monoton tetapi tidak kontinu dalam satu shard. Tidak dijamin unik atau terurut di shard berbeda.

hg_binlog_timestamp_us

BIGINT NOT NULL

Timestamp event perubahan di database, dalam mikrodetik (us).

hg_binlog_event_type

BIGINT NOT NULL

Tipe event CDC untuk baris tersebut. Nilai valid:

  • 5: INSERT

  • 2: DELETE

  • 3: UPDATE_BEFORE

  • 7: UPDATE_AFTER

hg_shard_id

INT NOT NULL

Shard tempat data berada. Untuk informasi selengkapnya, lihat Grup Tabel dan Shard.

Dalam pernyataan DDL, deklarasikan kolom metadata menggunakan <meta_column_name> <datatype> METADATA VIRTUAL. Berikut contohnya:

CREATE TABLE test_message_src_binlog_table(
  hg_binlog_lsn bigint METADATA VIRTUAL,
  hg_binlog_event_type bigint METADATA VIRTUAL,
  hg_binlog_timestamp_us bigint METADATA VIRTUAL,
  hg_shard_id int METADATA VIRTUAL,
  db_name string METADATA VIRTUAL,
  table_name string METADATA VIRTUAL,
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  ...
  );

FAQ

Referensi