All Products
Search
Document Center

Realtime Compute for Apache Flink:Hologres: Gudang data waktu nyata

Last Updated:May 16, 2026

Pelajari cara menggunakan konektor Hologres.

Informasi latar belakang

Hologres adalah gudang data waktu nyata terpadu untuk memasukkan, memperbarui, dan menganalisis dataset dalam skala besar. Hologres mendukung SQL standar (kompatibel dengan protokol PostgreSQL), pemrosesan analitik online (OLAP) berskala petabyte, analisis ad hoc, serta penyajian data berlatensi rendah dengan konkurensi tinggi. Hologres terintegrasi secara mendalam dengan MaxCompute, Realtime Compute for Apache Flink, dan DataWorks, sehingga menyediakan solusi terpadu berbasis stack penuh untuk gudang data offline dan online. Tabel berikut menjelaskan kemampuan konektor Hologres.

Kategori

Deskripsi

Tipe yang didukung

tabel source, dimension, dan sink

Mode eksekusi

mode stream dan batch

Format data

Tidak didukung

Metriks pemantauan spesifik

Metriks pemantauan

  • Tabel source:

    • numRecordsIn

    • numRecordsInPerSecond

  • Tabel sink:

    • numRecordsOut

    • numRecordsOutPerSecond

    • currentSendTime

    Catatan

    Untuk penjelasan mengenai metrik ini, lihat Metrik Pemantauan.

Tipe API

DataStream dan SQL

Mendukung pembaruan atau penghapusan data pada tabel sink

Ya

Fitur

Fitur

Deskripsi

Konsumsi data Hologres waktu nyata

Mendukung pembacaan data Hologres dengan atau tanpa binlog dalam mode CDC maupun non-CDC.

Konsumsi penuh dan inkremental terpadu

Mendukung konsumsi penuh, inkremental, dan penuh serta inkremental terpadu.

Penanganan konflik kunci primer

Mendukung pengabaian data baru, penggantian seluruh baris, atau pembaruan hanya pada field tertentu.

Penggabungan multi-stream dan pembaruan parsial

Hanya memperbarui kolom yang dimodifikasi, bukan seluruh baris.

Konsumsi binlog dari tabel partisi (Beta)

Mendukung konsumsi binlog dari tabel partisi fisik maupun logis. Untuk tabel partisi fisik, satu job dapat memantau semua partisi, termasuk partisi yang baru ditambahkan.

Menulis ke tabel partisi

Mendukung penulisan ke tabel induk untuk membuat otomatis partisi anak yang sesuai.

Sinkronisasi waktu nyata untuk satu tabel atau seluruh database

Mendukung sinkronisasi waktu nyata untuk satu tabel atau seluruh database, dengan fitur utama berikut:

  • Deteksi otomatis evolusi skema tabel sumber: Ketika skema tabel sumber berubah, Hologres menyinkronkan perubahan tersebut ke tabel sink secara waktu nyata.

  • Penanganan otomatis perubahan skema: Saat data baru dimasukkan, Flink terlebih dahulu memodifikasi skema tabel sink sebelum menulis data.

Untuk informasi lebih lanjut, lihat Pernyataan CREATE TABLE AS (CTAS) dan Panduan Cepat untuk sinkronisasi database waktu nyata.

Batasan dan rekomendasi

Batasan

  • Tabel eksternal tidak didukung: Konektor Hologres tidak mendukung tabel eksternal Hologres, seperti tabel eksternal MaxCompute.

  • Batasan tipe waktu: Konektor Hologres saat ini tidak mendukung konsumsi waktu nyata data TIMESTAMP. Saat membuat tabel, gunakan tipe TIMESTAMPTZ secara eksklusif.

  • Mode pemindaian tabel sumber (Ververica Runtime (VVR) 8 dan sebelumnya): Secara default, konektor membaca data dalam mode batch, hanya memindai seluruh tabel sekali. Akibatnya, data yang baru ditambahkan tidak dikonsumsi.

  • Batasan Watermark (Ververica Runtime (VVR) 8 dan sebelumnya): Mode CDC tidak mendukung definisi watermark. Untuk melakukan agregasi berbasis jendela, gunakan solusi agregasi non-jendela sebagai gantinya.

  • Perilaku fungsi waktu dalam mode COPY: Saat Anda menggunakan mode tulis COPY_STREAM atau COPY_BULK_LOAD, nilai kolom tabel yang menggunakan CURRENT_TIMESTAMP atau NOW() sebagai nilai default akan tetap pada waktu mulai koneksi dan tidak diperbarui untuk setiap baris. Kolom metadata binlog hg_binlog_timestamp_us menyediakan waktu pemasukan data yang sebenarnya.

Rekomendasi

  • Pemilihan format penyimpanan:

    • Untuk pencarian titik pada tabel dimensi: Gunakan penyimpanan berorientasi baris. Anda harus menetapkan kunci primer dan kunci pengelompokan.

    • Untuk kueri satu-ke-banyak pada tabel dimensi: Gunakan penyimpanan berorientasi kolom. Untuk kinerja optimal, konfigurasikan dengan benar kunci distribusi dan kunci segmen.

    • Untuk tabel yang memerlukan pembaruan dan kueri analitis sering: Jika tabel harus mendukung konsumsi binlog waktu nyata dan analisis OLAP, kami sangat merekomendasikan penggunaan penyimpanan hibrida baris-kolom.

    Penting

    Jika Anda tidak menentukan format penyimpanan saat membuat tabel di Hologres, format default-nya adalah penyimpanan berorientasi kolom. Anda tidak dapat mengubah format penyimpanan setelah tabel dibuat. Untuk detail lebih lanjut, lihat Buat Tabel di Hologres dan Format Penyimpanan Tabel: Berorientasi Baris, Berorientasi Kolom, dan Hibrida Baris-Kolom.

  • Pengaturan paralelisme job: Atur paralelisme job Flink agar sesuai dengan jumlah shard pada tabel Hologres.

    -- Di HoloWeb, jalankan pernyataan berikut untuk mengetahui jumlah shard pada suatu tabel. Ganti <tablename> dengan nama tabel Anda.
    select tg.property_value from hologres.hg_table_properties tb join hologres.hg_table_group_properties tg on tb.property_value = tg.tablegroup_name where tb.property_key = 'table_group' and tg.property_key = 'shard_count' and table_name = '<tablename>';
  • Versi dan fitur: Periksa secara berkala Catatan Rilis konektor Hologres untuk informasi tentang masalah yang diketahui, pembaruan fitur, dan kompatibilitas versi.

Catatan

  • Kompatibilitas dan batasan untuk mode konsumsi Hologres dan VVR

    Tabel source

    • Untuk VVR 8 dan sebelumnya, pilih mode konsumsi menggunakan parameter sdkMode.

    • Untuk VVR 11 dan seterusnya, pilih mode konsumsi menggunakan parameter source.binlog.read-mode.

    Versi VVR

    Versi Hologres

    Nilai default/direkomendasikan

    Mode konsumsi aktual

    Catatan

    ≥ 6.0.7

    < 2.0

    Custom

    holohub (default)

    Kami merekomendasikan menyetel nilai ke jdbc.

    6.0.7 hingga 8.0.4

    ≥ 2.0

    jdbc (alih otomatis, tidak perlu konfigurasi)

    jdbc (dipaksakan)

    Layanan holohub telah ditinggalkan di Hologres 2.0 dan seterusnya. Sistem secara otomatis beralih ke mode jdbc, yang dapat menyebabkan masalah izin. Untuk konfigurasi izin, lihat Masalah izin.

    ≥ 8.0.5

    ≥ 2.1

    jdbc (alih otomatis, tidak perlu konfigurasi)

    jdbc (dipaksakan)

    Tidak ada masalah izin. Untuk Hologres 2.1.27 dan seterusnya, mode beralih ke jdbc_fixed.

    ≥ 11.1

    Versi apa pun

    AUTO (default)

    Dipilih secara otomatis berdasarkan versi Hologres

    • Untuk Hologres 2.1.27 dan seterusnya, mode jdbc dipilih dengan koneksi ringan diaktifkan secara default (parameter connection.fixed.enabled disetel ke true).

    • Untuk Hologres versi 2.1.0 hingga 2.1.26, mode jdbc dipilih.

    • Untuk Hologres 2.0 dan sebelumnya, mode holohub dipilih.

    Penting

    Di VVR 11.1 dan seterusnya, konektor secara default mengonsumsi data binlog. Pastikan Anda telah mengaktifkan binlog. Jika tidak, kesalahan dapat terjadi.

    Masalah izin

    Jika Anda bukan superuser, Anda harus memberikan izin yang diperlukan untuk mengonsumsi binlog dalam mode JDBC.

    user_name adalah ID Akun Alibaba Cloud atau Pengguna RAM Anda. Untuk informasi lebih lanjut, lihat Ikhtisar akun.

    -- Dalam model otorisasi PostgreSQL standar, berikan izin CREATE kepada pengguna dan berikan role replikasi pada instans kepada pengguna.
    GRANT CREATE ON DATABASE <db_name> TO <user_name>;
    alter role <user_name> replication;
    
    -- Jika database menggunakan model izin sederhana (SLMP), Anda tidak dapat menjalankan pernyataan GRANT. Gunakan spm_grant untuk memberikan izin Admin pada database kepada pengguna. Anda juga dapat memberikan izin di konsol HoloWeb.
    call spm_grant('<db_name>_admin', '<user_name>');
    alter role <user_name> replication;

    Tabel sink

    • Untuk VVR 8 dan sebelumnya, pilih mode penulisan data menggunakan parameter sdkMode.

    • Untuk VVR 11 dan seterusnya, pilih mode penulisan data menggunakan parameter sink.write-mode.

    Versi VVR

    Versi Hologres

    Mode RPC terpengaruh

    Actual Write Mode

    Nilai default/direkomendasikan

    Catatan

    6.0.4 hingga 8.0.2

    < 2.0

    Tidak

    rpc

    Custom

    N/A

    6.0.4 hingga 8.0.2

    ≥ 2.0

    Ya

    jdbc_fixed (alih otomatis)

    Custom

    Untuk mencegah deduplikasi, setel 'jdbcWriteBatchSize'='1'.

    ≥ 8.0.3

    Versi apa pun

    Ya

    jdbc_fixed (alih otomatis)

    Custom

    Jika Anda mengonfigurasi mode sebagai rpc, sistem secara otomatis beralih ke mode jdbc_fixed dan menyetel 'jdbcWriteBatchSize'='1' untuk mencegah deduplikasi.

    ≥ 8.0.5

    Versi apa pun

    Ya

    jdbc_fixed (alih otomatis)

    Custom

    Jika Anda mengonfigurasi mode sebagai rpc, sistem secara otomatis beralih ke mode jdbc_fixed dan menyetel 'deduplication.enabled'='false' untuk mencegah deduplikasi.

    Penting
    • Layanan rpc telah ditinggalkan di Hologres 2.0 dan seterusnya. Jika Anda menyetel parameter ini ke rpc, Flink secara otomatis beralih ke nilai jdbc_fixed. Jika Anda menyetel parameter ke nilai lain, Flink menggunakan nilai yang ditentukan.

    • Mode rpc dihapus dari VVR 11.1 dan seterusnya. Kami merekomendasikan penggunaan mode jdbc untuk koneksi.

    • Untuk operasi tulis dalam skenario konkurensi tinggi, kami merekomendasikan penggunaan mode jdbc_copy atau COPY_STREAM.

    Tabel dimensi

    Versi VVR

    Versi Hologres

    Mode RPC terpengaruh

    Mode konsumsi aktual

    Nilai default/direkomendasikan

    Catatan

    6.0.4 hingga 8.0.2

    < 2.0

    Tidak

    rpc

    Custom

    N/A

    6.0.4 hingga 8.0.2

    ≥ 2.0

    Ya

    jdbc_fixed (alih otomatis)

    Custom

    Layanan rpc telah ditinggalkan untuk instans Hologres versi 2.0 atau lebih baru. Jika Anda menyetel parameter ini ke rpc, Flink secara otomatis beralih ke nilai jdbc_fixed. Namun, jika Anda menyetel parameter ke nilai lain, Flink menggunakan nilai yang Anda tentukan.

    ≥ 8.0.3

    Versi apa pun

    Ya

    jdbc_fixed (alih otomatis)

    Custom

    ≥ 8.0.5

    Versi apa pun

    Ya

    jdbc_fixed (alih otomatis)

    Custom

    Penting

    Mode rpc dihapus dari VVR 11.1 dan seterusnya. Secara default, mode jdbc digunakan untuk koneksi. Anda juga dapat mengaktifkan mode koneksi ringan dengan menyetel parameter connection.fixed.enabled.

  • Untuk membaca data JSONB dari tabel sumber binlog dalam mode JDBC, Anda harus mengaktifkan parameter GUC di tingkat database.

    -- Aktifkan parameter GUC di tingkat database. Hanya superuser yang dapat menjalankan perintah ini. Anda hanya perlu menjalankan perintah ini sekali untuk setiap database.
    alter database <db_name> set hg_experimental_enable_binlog_jsonb = on;
  • Operasi UPDATE menghasilkan dua catatan binlog berturut-turut: catatan update_before untuk data lama, diikuti oleh catatan update_after untuk data baru.

  • Hindari menjalankan operasi TRUNCATE atau operasi pembuatan ulang tabel lainnya pada tabel sumber binlog. Untuk informasi lebih lanjut, lihat FAQ.

  • Pastikan presisi tipe DECIMAL konsisten antara Flink dan Hologres untuk menghindari kesalahan. Untuk informasi lebih lanjut, lihat FAQ.

  • Saat Anda menggunakan mode initial untuk konsumsi data penuh dan inkremental terpadu dari tabel sumber, pengurutan global tidak dijamin. Jika sistem downstream bergantung pada perhitungan berbasis waktu, gunakan mode konsumsi lain yang hanya berbasis binlog.

Aktifkan binlog

Untuk tabel baru

Fitur pembacaan data waktu nyata dinonaktifkan secara default. Oleh karena itu, saat Anda membuat tabel di HoloWeb menggunakan pernyataan DDL, Anda harus menyetel parameter binlog.level dan binlog.ttl. Berikut adalah contohnya.

begin;
create table test_table(
  id int primary key, 
  title text not null, 
  body text);
call set_table_property('test_table', 'orientation', 'row');--Buat tabel berorientasi baris bernama test_table.
call set_table_property('test_table', 'clustering_key', 'id');--Buat kunci pengelompokan pada kolom id.
call set_table_property('test_table', 'binlog.level', 'replica');--Aktifkan fitur binlog.
call set_table_property('test_table', 'binlog.ttl', '86400');--Setel waktu hidup (TTL) untuk binlog dalam detik.
commit;

Untuk tabel yang sudah ada

Di HoloWeb, Anda dapat menggunakan pernyataan berikut untuk mengaktifkan Binlog untuk tabel yang sudah ada dan menyetel TTL Binlog. table_name adalah nama tabel yang ingin Anda aktifkan Binlog-nya.

-- Aktifkan fitur binlog.
begin;
call set_table_property('<table_name>', 'binlog.level', 'replica');
commit;

-- Setel waktu hidup (TTL) binlog dalam detik.
begin;
call set_table_property('<table_name>', 'binlog.ttl', '2592000');
commit;

Dengan parameter

Mulai VVR 11, opsi konektor untuk Hologres telah diperbarui. Beberapa parameter mungkin telah diganti namanya atau dihapus. VVR 11 kompatibel mundur dengan VVR 8. Lihat dokumentasi parameter untuk versi VVR Anda.

Pemetaan tipe

Untuk pemetaan tipe data antara Flink dan Hologres, lihat Pemetaan tipe data antara Flink dan Hologres.

Catatan

Hologres mendukung sintaks GENERATED ALWAYS AS untuk mendefinisikan kolom yang dihasilkan. Contohnya:

ds TIMESTAMP NOT NULL GENERATED ALWAYS AS (date_trunc('month', create_time)) STORED

Kendala NOT NULL pada kolom yang dihasilkan dipetakan ke field nullable, sebagaimana diharapkan. Karena nilai kolom yang dihasilkan dihitung oleh Hologres, Flink tidak menulis nilai untuk field ini. Jika kendala NOT NULL dipertahankan, operasi tulis akan gagal validasi di klien Hologres. Perilaku ini tidak memengaruhi kendala NOT NULL pada kolom biasa (misalnya, ds TIMESTAMP NOT NULL).

Contoh

Contoh tabel source

Tabel source binlog

CDC mode

Dalam mode ini, source mengonsumsi data binlog dan secara otomatis menetapkan tipe Flink RowKind yang sesuai untuk setiap baris berdasarkan hg_binlog_event_type, sehingga tidak perlu deklarasi eksplisit. Jenis ini mencakup INSERT, DELETE, UPDATE_BEFORE, dan UPDATE_AFTER. Hal ini memungkinkan pencermatan data tabel, mirip dengan fungsionalitas Change Data Capture (CDC) pada database seperti MySQL atau PostgreSQL. Berikut adalah contoh DDL untuk tabel source.

VVR 11+

CREATE TEMPORARY TABLE test_message_src_binlog_table(
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username'='${secret_values.ak_id}',            --Gunakan variabel untuk Pasangan Kunci Akses Anda untuk mencegah kebocoran kunci. 
  'password'='${secret_values.ak_secret}',        
  'endpoint'='<yourEndpoint>',
  'source.binlog.change-log-mode'='ALL',  --Membaca semua jenis changelog, termasuk INSERT, DELETE, UPDATE_BEFORE, dan UPDATE_AFTER.
  'retry-count'='10',                     --Jumlah percobaan ulang saat terjadi kesalahan pembacaan binlog.
  'retry-sleep-step-ms'='5000',           --Waktu backoff inkremental antar percobaan ulang. Percobaan ulang pertama menunggu 5 detik, yang kedua menunggu 10 detik, dan seterusnya.
  'source.binlog.batch-size'='512'        --Ukuran batch untuk membaca data binlog.
);

VVR 8+

CREATE TEMPORARY TABLE test_message_src_binlog_table(
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username' = '${secret_values.ak_id}',       --Gunakan variabel untuk Pasangan Kunci Akses Anda untuk mencegah kebocoran kunci.   
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>',
  'binlog' = 'true',
  'cdcMode' = 'true',
  'sdkMode'='jdbc',
  'binlogMaxRetryTimes' = '10',     --Jumlah percobaan ulang saat terjadi kesalahan pembacaan binlog.
  'binlogRetryIntervalMs' = '500',  --Interval percobaan ulang dalam milidetik setelah kesalahan pembacaan binlog.
  'binlogBatchReadSize' = '100'     --Ukuran batch untuk membaca data binlog.
);

Mode non-CDC

Dalam mode ini, source meneruskan data binlog yang dikonsumsi ke node downstream sebagai data Flink biasa, memperlakukan semua catatan sebagai tipe INSERT. Anda dapat menangani catatan dengan hg_binlog_event_type tertentu berdasarkan kebutuhan bisnis Anda. Berikut adalah contoh DDL untuk tabel source.

VVR 11+

CREATE TEMPORARY TABLE test_message_src_binlog_table(
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username' = '${secret_values.ak_id}',       --Gunakan variabel untuk Pasangan Kunci Akses Anda untuk mencegah kebocoran kunci.   
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>',
  'source.binlog.change-log-mode'='ALL_AS_APPEND_ONLY',  --Semua jenis changelog diperlakukan sebagai operasi INSERT.
  'retry-count'='10',                     --Jumlah percobaan ulang saat terjadi kesalahan pembacaan binlog.
  'retry-sleep-step-ms'='5000',           --Waktu backoff inkremental antar percobaan ulang. Percobaan ulang pertama menunggu 5 detik, yang kedua menunggu 10 detik, dan seterusnya.
  'source.binlog.batch-size'='512'        --Ukuran batch untuk membaca data binlog.
);

VVR 8+

CREATE TEMPORARY TABLE test_message_src_binlog_table(
  hg_binlog_lsn BIGINT,
  hg_binlog_event_type BIGINT,
  hg_binlog_timestamp_us BIGINT,
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username' = '${secret_values.ak_id}',       --Gunakan variabel untuk Pasangan Kunci Akses Anda untuk mencegah kebocoran kunci.   
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>',
  'binlog' = 'true',
  'binlogMaxRetryTimes' = '10',     --Jumlah percobaan ulang saat terjadi kesalahan pembacaan binlog.
  'binlogRetryIntervalMs' = '500',  --Interval percobaan ulang dalam milidetik setelah kesalahan pembacaan binlog.
  'binlogBatchReadSize' = '100'     --Ukuran batch untuk membaca data binlog.
);

Tabel source non-binlog

VVR 11+

Penting

Mulai VVR 11.1, konektor secara default mengonsumsi data binlog. Untuk membaca dari tabel source non-binlog, Anda harus secara eksplisit menyetel 'source.binlog' ke 'false'. Untuk informasi lebih lanjut, lihat Tabel source binlog.

CREATE TEMPORARY TABLE hologres_source (
  name varchar, 
  age BIGINT,
  birthday BIGINT
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username' = '${secret_values.ak_id}',       --Gunakan manajemen variabel untuk mencegah kebocoran kunci AK/SK.   
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>',
  'source.binlog'='false'                      --Menentukan apakah akan mengonsumsi data binlog.
);

VVR 8+

CREATE TEMPORARY TABLE hologres_source (
  name varchar, 
  age BIGINT,
  birthday BIGINT
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username' = '${secret_values.ak_id}',       --Gunakan variabel untuk Pasangan Kunci Akses Anda untuk mencegah kebocoran kunci.   
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>',
  'sdkMode' = 'jdbc'
);

Sink table

CREATE TEMPORARY TABLE datagen_source(
  name varchar, 
  age BIGINT,
  birthday BIGINT
) WITH (
  'connector'='datagen'
);
CREATE TEMPORARY TABLE hologres_sink (
  name varchar, 
  age BIGINT,
  birthday BIGINT
) WITH (
  'connector'='hologres',           
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username' = '${secret_values.ak_id}',       -- Gunakan manajemen variabel untuk mencegah kebocoran kunci AK/SK.
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>'
);
INSERT INTO hologres_sink SELECT * from datagen_source;

Contoh tabel dimensi

CREATE TEMPORARY TABLE datagen_source (
  a INT,
  b BIGINT,
  c STRING,
  proctime AS PROCTIME()
) WITH (
  'connector' = 'datagen'
);
CREATE TEMPORARY TABLE hologres_dim (
  a INT, 
  b VARCHAR, 
  c VARCHAR
) WITH (
  'connector'='hologres',           
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username' = '${secret_values.ak_id}',       -- Gunakan variabel untuk Pasangan Kunci Akses Anda untuk mencegah kebocoran kunci.
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>'
);
CREATE TEMPORARY TABLE blackhole_sink (
  a INT,
  b STRING
) WITH (
  'connector' = 'blackhole'
);
INSERT INTO blackhole_sink SELECT T.a,H.b
FROM datagen_source AS T JOIN hologres_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.a = H.a;

Fitur lanjutan

Ingesti penuh dan inkremental terpadu

Skenario

  • Fitur ini hanya berlaku untuk tabel source yang memiliki kunci primer. Fitur ini direkomendasikan untuk tabel source Hologres yang menggunakan mode CDC.

  • Hologres memungkinkan Anda mengaktifkan Binlog sesuai kebutuhan. Anda dapat mengaktifkan Binlog untuk tabel yang sudah ada dan berisi data.

Contoh kode

VVR 11+

CREATE TABLE test_message_src_binlog_table(
  hg_binlog_lsn BIGINT,
  hg_binlog_event_type BIGINT,
  hg_binlog_timestamp_us BIGINT,
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username'='<yourAccessID>',
  'password'='<yourAccessSecret>',
  'endpoint'='<yourEndpoint>',
  'source.binlog.startup-mode' = 'INITIAL',   --Membaca semua data historis, lalu mengonsumsi Binlog secara inkremental.
  'retry-count'='10',                         --Jumlah percobaan ulang jika terjadi kesalahan saat membaca data Binlog.
  'retry-sleep-step-ms'='5000',               --Waktu tunggu inkremental antar percobaan ulang. Percobaan ulang pertama menunggu 5 detik, yang kedua menunggu 10 detik, dan seterusnya.
  'source.binlog.batch-size'='512'            --Jumlah baris yang dibaca dari Binlog dalam satu batch.
  );
Catatan
  • Setel source.binlog.startup-mode ke INITIAL untuk melakukan pembacaan penuh awal pada tabel sebelum beralih ke konsumsi inkremental Binlog.

  • Jika parameter startTime disetel, baik secara langsung maupun dengan memilih waktu mulai di UI startup, parameter tersebut memiliki prioritas lebih tinggi dan menyetel binlogStartUpMode ke mode timestamp, menggantikan pengaturan mode lainnya, karena parameter startTime memiliki prioritas lebih tinggi.

VVR 8+

CREATE TABLE test_message_src_binlog_table(
  hg_binlog_lsn BIGINT,
  hg_binlog_event_type BIGINT,
  hg_binlog_timestamp_us BIGINT,
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username'='<yourAccessID>',
  'password'='<yourAccessSecret>',
  'endpoint'='<yourEndpoint>',
  'binlog' = 'true',
  'cdcMode' = 'true',
  'binlogStartUpMode' = 'initial', --Membaca semua data historis, lalu mengonsumsi Binlog secara inkremental.
  'binlogMaxRetryTimes' = '10',     --Jumlah percobaan ulang jika terjadi kesalahan saat membaca data Binlog.
  'binlogRetryIntervalMs' = '500',  --Interval percobaan ulang dalam milidetik setelah kesalahan pembacaan Binlog.
  'binlogBatchReadSize' = '100'     --Jumlah baris yang dibaca dari Binlog dalam satu batch.
  );
Catatan
  • Setel binlogStartUpMode ke initial untuk melakukan pembacaan penuh awal pada tabel sebelum beralih ke konsumsi inkremental Binlog.

  • Jika parameter startTime disetel, baik secara langsung maupun dengan memilih waktu mulai di UI startup, parameter tersebut memiliki prioritas lebih tinggi dan menyetel binlogStartUpMode ke mode timestamp, menggantikan pengaturan mode lainnya, karena parameter startTime memiliki prioritas lebih tinggi.

Resolusi konflik kunci primer

Saat menulis data ke Hologres, konektor menawarkan tiga strategi untuk menangani catatan dengan kunci primer duplikat.

VVR 11+

Anda dapat menentukan strategi dengan menyetel parameter sink.on-conflict-action.

Nilai

Deskripsi

INSERT_OR_IGNORE

Menyimpan catatan yang tiba pertama dan mengabaikan duplikat berikutnya.

INSERT_OR_REPLACE

Menimpa catatan yang ada dengan yang baru.

INSERT_OR_UPDATE (Default)

Hanya memperbarui kolom yang disediakan dalam tabel sink, meninggalkan kolom lain dalam catatan yang ada tidak berubah.

VVR 8+

Anda dapat menentukan strategi dengan menyetel parameter mutatetype.

Nilai

Deskripsi

insertorignore (Default)

Menyimpan catatan yang tiba pertama dan mengabaikan duplikat berikutnya.

insertorreplace

Menimpa catatan yang ada dengan yang baru.

insertorupdate

Hanya memperbarui kolom yang disediakan dalam tabel sink, meninggalkan kolom lain dalam catatan yang ada tidak berubah.

Misalnya, asumsikan sebuah tabel memiliki kolom a, b, c, dan d, dan a adalah kunci primer. Jika tabel sink hanya menyediakan kolom a dan b, menyetel strategi ke INSERT_OR_UPDATE hanya memperbarui kolom b. Kolom c dan d tetap tidak berubah.
Catatan

Namun, kolom apa pun dalam tabel fisik yang dihilangkan dari tabel sink harus nullable. Jika tidak, operasi tulis akan gagal.

Menulis ke tabel partisi

Secara default, sink Hologres menulis data ke satu tabel non-partisi. Untuk menulis ke tabel partisi dengan menargetkan tabel induknya, Anda harus mengaktifkan opsi berikut.

VVR 11+

Untuk memungkinkan konektor membuat partisi anak secara otomatis jika belum ada, setel sink.create-missing-partition ke true.

Catatan
  • VVR 11.1 dan versi yang lebih baru mendukung penulisan ke tabel partisi secara default dan secara otomatis mengarahkan data ke partisi anak yang sesuai.

  • Setel parameter tablename ke nama tabel induk.

  • Jika partisi anak yang diperlukan tidak ada dan sink.create-missing-partition=true tidak disetel, operasi tulis akan gagal.

VVR 8+

  • Untuk mengarahkan data secara otomatis ke partisi anak yang sesuai, setel partitionRouter ke true.

  • Untuk memungkinkan konektor membuat partisi anak secara otomatis jika belum ada, setel createparttable ke true.

Catatan
  • Setel parameter tablename ke nama tabel induk.

  • Jika partisi anak yang diperlukan tidak ada dan createparttable=true tidak disetel, operasi tulis akan gagal.

Gabungkan stream dan pembaruan parsial

Saat Anda menulis beberapa stream data ke satu tabel lebar Hologres, konektor dapat secara otomatis menggabungkan catatan yang memiliki kunci primer yang sama. Anda juga dapat melakukan pembaruan parsial, yang hanya memperbarui kolom yang dimodifikasi alih-alih mengganti seluruh baris. Hal ini meningkatkan kinerja tulis dan konsistensi data.

Batasan

  • Tabel lebar harus memiliki kunci primer.

  • Setiap stream data harus menyertakan semua kolom yang membentuk kunci primer.

  • Untuk tabel lebar yang menggunakan penyimpanan berorientasi kolom, penggabungan stream pada laju permintaan per detik (RPS) yang tinggi dapat menyebabkan penggunaan CPU tinggi. Untuk menguranginya, pertimbangkan untuk menonaktifkan encoding kamus untuk kolom tabel.

Contoh

Asumsikan Anda memiliki dua stream data Flink. Stream pertama berisi kolom a, b, dan c. Stream kedua berisi kolom a, d, dan e. Tabel lebar Hologres, WIDE_TABLE, berisi kolom a, b, c, d, dan e, dengan kolom a sebagai kunci primer.

VVR 11+

// source1 dan source2 sudah didefinisikan.
CREATE TEMPORARY TABLE hologres_sink ( -- Deklarasikan kolom a, b, c, d, dan e.
  a BIGINT, 
  b STRING,
  c STRING,
  d STRING,
  e STRING,
  primary key(a) not enforced
) WITH (
  'connector'='hologres',           
  'dbname'='<yourDbname>',
  'tablename'='<yourWideTablename>',  -- Tabel lebar Hologres, yang berisi kolom a, b, c, d, dan e.
  'username' = '${secret_values.ak_id}',
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>',
  'sink.on-conflict-action'='INSERT_OR_UPDATE',   -- Perbarui kolom tertentu berdasarkan kunci primer.
  'sink.delete-strategy'='IGNORE_DELETE',         -- Strategi untuk menangani pesan penarikan. IGNORE_DELETE cocok untuk stream append-only atau upsert yang tidak memerlukan operasi hapus.
  'sink.partial-insert.enabled'='true'            -- Mengaktifkan pembaruan parsial. Hanya kolom yang ditentukan dalam pernyataan INSERT yang dikirim ke konektor.
);

BEGIN STATEMENT SET;
INSERT INTO hologres_sink(a,b,c) select * from source1;  -- Deklarasikan bahwa hanya kolom a, b, dan c yang dimasukkan.
INSERT INTO hologres_sink(a,d,e) select * from source2;  -- Deklarasikan bahwa hanya kolom a, d, dan e yang dimasukkan.
END;

VVR 8+

// source1 dan source2 sudah didefinisikan.
CREATE TEMPORARY TABLE hologres_sink ( -- Deklarasikan kolom a, b, c, d, dan e.
  a BIGINT, 
  b STRING,
  c STRING,
  d STRING,
  e STRING,
  primary key(a) not enforced
) WITH (
  'connector'='hologres',           
  'dbname'='<yourDbname>',
  'tablename'='<yourWideTablename>',  -- Tabel lebar Hologres, yang berisi kolom a, b, c, d, dan e.
  'username' = '${secret_values.ak_id}',
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>',
  'mutatetype'='insertorupdate',    -- Perbarui kolom tertentu berdasarkan kunci primer.
  'ignoredelete'='true',            -- Abaikan permintaan HAPUS yang dihasilkan oleh pesan penarikan.
  'partial-insert.enabled'='true'   -- Aktifkan pembaruan parsial untuk hanya memperbarui kolom yang dideklarasikan dalam pernyataan INSERT.
);

BEGIN STATEMENT SET;
INSERT INTO hologres_sink(a,b,c) select * from source1;  -- Deklarasikan bahwa hanya kolom a, b, dan c yang dimasukkan.
INSERT INTO hologres_sink(a,d,e) select * from source2;  -- Deklarasikan bahwa hanya kolom a, d, dan e yang dimasukkan.
END;
Catatan

Setel ignoredelete ke true untuk mengabaikan permintaan Hapus yang dihasilkan oleh pesan penarikan. Untuk VVR 8.0.8 dan seterusnya, kami merekomendasikan penggunaan sink.delete-strategy untuk mengonfigurasi berbagai strategi untuk pembaruan parsial.

Konsumsi binlog dari tabel partisi (Beta)

Tabel partisi membantu meningkatkan pengarsipan data dan kinerja kueri. Konektor Hologres mendukung konsumsi binlog dari tabel partisi fisik maupun logis. Untuk perbedaan antara tabel partisi fisik dan logis, lihat BUAT TABEL PARTISI LOGIS.

Konsumsi binlog dari tabel partisi fisik

Konektor Hologres dapat mengonsumsi binlog dari tabel partisi dan secara dinamis memantau partisi baru dalam satu job. Hal ini secara signifikan meningkatkan efisiensi dan kegunaan pemrosesan data waktu nyata.

Catatan penggunaan

  • Fitur ini hanya tersedia untuk tabel sumber binlog dalam mode JDBC. Fitur ini memerlukan VVR 8.0.11 atau lebih baru dan instans Hologres versi 2.1.27 atau lebih baru.

  • Nama partisi harus mengikuti format {parent_table}_{partition_value} secara ketat. Konektor mungkin tidak mengonsumsi partisi yang tidak mengikuti format ini. Untuk informasi lebih lanjut, lihat partisi dinamis.

    Penting
    • Untuk mode DYNAMIC, VVR versi 8.0.11 tidak mendukung kolom partisi dengan pembatas - (seperti YYYY-MM-DD).

    • Mulai VVR 11.1, Anda dapat mengonsumsi data dari kolom partisi yang menggunakan format kustom.

    • Batasan ini tidak berlaku saat Anda menulis ke tabel partisi.

  • Saat mendeklarasikan tabel sumber Hologres di Flink, Anda harus menyertakan kolom partisinya.

  • Dalam mode DYNAMIC, tabel partisi harus memiliki partisi dinamis diaktifkan. Selain itu, parameter pra-pembuatan partisi auto_partitioning.num_precreate harus lebih besar dari 1. Jika tidak, job akan melemparkan pengecualian saat mencoba mengonsumsi partisi terbaru.

  • Dalam mode DYNAMIC, setelah partisi baru ditambahkan, konektor tidak lagi mengonsumsi perubahan data berikutnya dari partisi lama.

Contoh

Mode

Fitur

Deskripsi

DYNAMIC

konsumsi partisi dinamis

Secara otomatis memantau dan mengonsumsi partisi baru secara kronologis. Mode ini cocok untuk skenario streaming data waktu nyata.

STATIC

konsumsi partisi statis

Hanya mengonsumsi partisi yang sudah ada (atau yang ditentukan secara manual) dan tidak secara otomatis menemukan partisi baru. Mode ini cocok untuk memproses data historis dalam rentang tetap.

Mode Dinamis

VVR 11+

Asumsikan tabel partisi Hologres dibuat dengan DDL berikut, dan binlog serta partisi dinamis diaktifkan.

CREATE TABLE "test_message_src1" (
    id int,
    title text,
    body text,
    dt text,
    PRIMARY KEY (id, dt)
)
PARTITION BY LIST (dt) WITH (
    binlog_level = 'replica', 
    auto_partitioning_enable =  'true',   -- Mengaktifkan partisi dinamis.
    auto_partitioning_time_unit = 'DAY',  -- Partisi dibuat setiap hari. Contoh nama partisi: test_message_src1_20250512, test_message_src1_20250513.
    auto_partitioning_num_precreate = '2' -- Membuat dua partisi sebelumnya.
);
-- Untuk tabel partisi yang sudah ada, Anda juga dapat mengaktifkan partisi dinamis menggunakan ALTER TABLE.

Di Flink, gunakan pernyataan SQL berikut untuk mengonsumsi tabel partisi test_message_src1 dalam mode DYNAMIC.

CREATE TEMPORARY TABLE hologres_source
(
  id INTEGER,
  title VARCHAR,
  body VARCHAR,
  dt VARCHAR  -- Kolom partisi dari tabel partisi Hologres.
)
with (
  'connector' = 'hologres',
  'dbname' = '<yourDatabase>',
  'tablename' = 'test_message_src1',  -- Tabel induk yang diaktifkan partisi dinamisnya.
  'username' = '<yourUserName>',
  'password' = '<yourPassword>',
  'endpoint' = '<yourEndpoint>',
  'source.binlog.partition-binlog-mode' = 'DYNAMIC', -- Memantau partisi terbaru secara dinamis.
  'source.binlog.startup-mode' = 'initial'           -- Mengonsumsi semua data yang ada, lalu mengonsumsi data inkremental dari binlog.
);

VVR 8.0.11

Asumsikan tabel partisi Hologres dibuat dengan DDL berikut, dan binlog serta partisi dinamis diaktifkan.

CREATE TABLE "test_message_src1" (
    id int,
    title text,
    body text,
    dt text,
    PRIMARY KEY (id, dt)
)
PARTITION BY LIST (dt) WITH (
    binlog_level = 'replica', 
    auto_partitioning_enable =  'true',   -- Mengaktifkan partisi dinamis.
    auto_partitioning_time_unit = 'DAY',  -- Partisi dibuat setiap hari. Contoh nama partisi: test_message_src1_20241027, test_message_src1_20241028.
    auto_partitioning_num_precreate = '2' -- Membuat dua partisi sebelumnya.
);

-- Untuk tabel partisi yang sudah ada, Anda juga dapat mengaktifkan partisi dinamis menggunakan ALTER TABLE.

Di Flink, gunakan pernyataan SQL berikut untuk mengonsumsi data dari tabel partisi test_message_src1 dalam mode DYNAMIC.

CREATE TEMPORARY TABLE hologres_source
(
  id INTEGER,
  title VARCHAR,
  body VARCHAR,
  dt VARCHAR  -- Kolom partisi dari tabel partisi Hologres.
)
with (
  'connector' = 'hologres',
  'dbname' = '<yourDatabase>',
  'tablename' = 'test_message_src1',  -- Tabel induk yang diaktifkan partisi dinamisnya.
  'username' = '<yourUserName>',
  'password' = '<yourPassword>',
  'endpoint' = '<yourEndpoint>',
  'binlog' = 'true',
  'partition-binlog.mode' = 'DYNAMIC',  -- Memantau partisi terbaru secara dinamis.
  'binlogstartUpMode' = 'initial',      -- Mengonsumsi semua data yang ada, lalu mengonsumsi data inkremental dari binlog.
  'sdkMode' = 'jdbc_fixed'              -- Gunakan mode ini untuk menghindari masalah batas koneksi.
);

Mode Statis

VVR 11+

Asumsikan tabel partisi Hologres dibuat dengan DDL berikut, dan binlog diaktifkan.

CREATE TABLE test_message_src2 (
    id int,
    title text,
    body text,
    color text,
    PRIMARY KEY (id, color)
)
PARTITION BY LIST (color) WITH (
    binlog_level = 'replica'
);
create table test_message_src2_red partition of test_message_src2 for values in ('red');
create table test_message_src2_blue partition of test_message_src2 for values in ('blue');
create table test_message_src2_green partition of test_message_src2 for values in ('green');
create table test_message_src2_black partition of test_message_src2 for values in ('black');

Di Flink, gunakan pernyataan SQL berikut untuk mengonsumsi tabel partisi test_message_src2 dalam mode STATIC.

CREATE TEMPORARY TABLE hologres_source
(
  id INTEGER,
  title VARCHAR,
  body VARCHAR,
  color VARCHAR  -- Kolom partisi dari tabel partisi Hologres.
)
with (
  'connector' = 'hologres',
  'dbname' = '<yourDatabase>',
  'tablename' = 'test_message_src2',  -- Tabel partisi.
  'username' = '<yourUserName>',
  'password' = '<yourPassword>',
  'endpoint' = '<yourEndpoint>',
  'source.binlog.partition-binlog-mode' = 'STATIC', -- Mengonsumsi set partisi tetap.
  'source.binlog.partition-values-to-read' = 'red,blue,green',  -- Hanya mengonsumsi tiga partisi yang ditentukan. Partisi 'black' tidak dikonsumsi. Partisi baru juga tidak dikonsumsi. Jika opsi ini tidak disetel, job mengonsumsi semua partisi dari tabel induk.
  'source.binlog.startup-mode' = 'initial'  -- Mengonsumsi semua data yang ada, lalu mengonsumsi data inkremental dari binlog.
);

VVR 8.0.11

Asumsikan tabel partisi Hologres dibuat dengan DDL berikut, dan binlog diaktifkan.

CREATE TABLE test_message_src2 (
    id int,
    title text,
    body text,
    color text,
    PRIMARY KEY (id, color)
)
PARTITION BY LIST (color) WITH (
    binlog_level = 'replica'
);
create table test_message_src2_red partition of test_message_src2 for values in ('red');
create table test_message_src2_blue partition of test_message_src2 for values in ('blue');
create table test_message_src2_green partition of test_message_src2 for values in ('green');
create table test_message_src2_black partition of test_message_src2 for values in ('black');

Di Flink, gunakan pernyataan SQL berikut untuk mengonsumsi data dari tabel partisi test_message_src2 dalam mode STATIC.

CREATE TEMPORARY TABLE hologres_source
(
  id INTEGER,
  title VARCHAR,
  body VARCHAR,
  color VARCHAR  -- Kolom partisi dari tabel partisi Hologres.
)
with (
  'connector' = 'hologres',
  'dbname' = '<yourDatabase>',
  'tablename' = 'test_message_src2',  -- Tabel partisi.
  'username' = '<yourUserName>',
  'password' = '<yourPassword>',
  'endpoint' = '<yourEndpoint>',
  'binlog' = 'true',
  'partition-binlog.mode' = 'STATIC', -- Mengonsumsi set partisi tetap.
  'partition-values-to-read' = 'red,blue,green',  -- Hanya mengonsumsi tiga partisi yang ditentukan. Partisi 'black' tidak dikonsumsi. Partisi baru juga tidak dikonsumsi. Jika opsi ini tidak disetel, job mengonsumsi semua partisi dari tabel induk.
  'binlogstartUpMode' = 'initial',  -- Mengonsumsi semua data yang ada, lalu mengonsumsi data inkremental dari binlog.
  'sdkMode' = 'jdbc_fixed' -- Gunakan mode ini untuk menghindari masalah batas koneksi.
);

Konsumsi binlog dari tabel partisi logis

Konektor Hologres mendukung konsumsi binlog dari tabel partisi logis dan memungkinkan Anda menentukan partisi mana yang akan dikonsumsi dengan opsi.

Catatan penggunaan

  • Mengonsumsi binlog dari partisi tertentu tabel partisi logis memerlukan VVR 11.0.0 atau lebih baru dan instans Hologres versi V3.1 atau lebih baru.

  • Mengonsumsi binlog dari semua partisi tabel partisi logis sama dengan mengonsumsi binlog dari tabel Hologres non-partisi. Untuk informasi lebih lanjut, lihat Tabel source.

Contoh

Parameter

Deskripsi

Contoh

source.binlog.logical-partition-filter-column-names

Nama kolom partisi yang menentukan partisi mana yang akan dikonsumsi. Tutup nama kolom dengan tanda kutip ganda ("). Pisahkan beberapa nama kolom dengan koma (,). Jika nama kolom berisi tanda kutip ganda, escape dengan tanda kutip ganda tambahan.

'source.binlog.logical-partition-filter-column-names'='"Pt","id"'

Dua kolom partisi digunakan: Pt dan id.

source.binlog.logical-partition-filter-column-values

Nilai partisi yang menentukan partisi mana yang akan dikonsumsi. Partisi ditentukan oleh satu set nilai, satu untuk setiap kolom partisi. Tutup setiap nilai dengan tanda kutip ganda ("). Pisahkan nilai untuk partisi yang sama dengan koma (,). Pisahkan partisi dengan titik koma (;). Jika nilai berisi tanda kutip ganda, escape dengan tanda kutip ganda tambahan.

'source.binlog.logical-partition-filter-column-values'='"20240910","0";"special""value","9"'

Ini menentukan dua partisi untuk dikonsumsi. Nilai partisi pertama adalah (20240910, 0), dan yang kedua adalah (special"value, 9).

Asumsikan Anda telah membuat tabel berikut di Hologres.

CREATE TABLE holo_table (
    id int not null,
    name text,
    age numeric(18,4),
    "Pt" text,
    primary key(id, "Pt")
)
LOGICAL PARTITION BY LIST ("Pt", id)
WITH (
    binlog_level ='replica'
);

Untuk mengonsumsi binlog tabel ini di Flink:

CREATE TEMPORARY TABLE test_src_binlog_table(
  id INTEGER,
  name VARCHAR,
  age decimal(18,4),
  `Pt` VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='holo_table',
  'username'='<yourAccessID>',
  'password'='<yourAccessSecret>',
  'endpoint'='<yourEndpoint>',
  'source.binlog'='true',
  'source.binlog.logical-partition-filter-column-names'='"Pt","id"',
  'source.binlog.logical-partition-filter-column-values'='<yourPartitionColumnValues>',
  'source.binlog.change-log-mode'='ALL',  --Membaca semua jenis changelog, termasuk INSERT, DELETE, UPDATE_BEFORE, dan UPDATE_AFTER.
  'retry-count'='10',                     -- Jumlah percobaan ulang untuk kesalahan pembacaan binlog.
  'retry-sleep-step-ms'='5000',           --Waktu backoff inkremental antar percobaan ulang. Percobaan ulang pertama menunggu 5 detik, yang kedua menunggu 10 detik, dan seterusnya.
  'source.binlog.batch-size'='512'        --Jumlah baris yang dibaca dari binlog dalam satu batch.
);

API DataStream

Penting

Untuk membaca dari atau menulis ke Hologres menggunakan API DataStream, gunakan konektor DataStream yang sesuai untuk terhubung ke Realtime Compute for Apache Flink. Untuk informasi tentang cara menyiapkan konektor DataStream, lihat Cara menggunakan konektor DataStream. Konektor DataStream Hologres tersedia di Maven Central. Untuk debugging lokal, gunakan Uber JAR yang sesuai. Untuk informasi lebih lanjut, lihat Jalankan dan debug job yang berisi konektor secara lokal.

Tabel source Hologres

Tabel source binlog

VVR menyediakan kelas HologresBinlogSource untuk membaca data binlog Hologres. Contoh berikut menunjukkan cara membangun source binlog Hologres.

VVR 11.3+

Penting

Mulai VVR 11.1.2, parameter JDBCOptions dan startTimeMs telah dihapus dari konstruktor HologresBinlogSource. Mulai VVR 11.3, parameter List<Subscribe.BinlogFilter> telah ditambahkan. Jika Anda menggunakan VVR 11 atau lebih baru, kami merekomendasikan penggunaan VVR 11.3 atau lebih baru.

public class Sample {                                                                                                                                                                          
        public static void main(String[] args) throws Exception {
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // Inisialisasi skema untuk tabel yang akan dibaca. Skema harus sesuai dengan field skema tabel Hologres. Anda dapat mendefinisikan subset field.
            TableSchema schema = TableSchema.builder()
                    .field("a", DataTypes.INT())
                    .field("b", DataTypes.STRING())
                    .field("c", DataTypes.TIMESTAMP())
                    .build();

            // Nama tabel yang akan dibaca.
            String sourceTableName = "sourceTableName";

            // Parameter untuk koneksi Hologres.
            Configuration config = new Configuration();
            config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
            config.setString(HologresConfigs.USERNAME, "yourUserName");
            config.setString(HologresConfigs.PASSWORD, "yourPassword");
            config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
            config.setString(HologresConfigs.TABLE, sourceTableName);
            config.set(HologresConfigs.BINLOG, true);
            config.set(HologresConfigs.BINLOG_CHANGE_LOG_MODE, BinlogChangeLogMode.ALL);
            // Bangun source binlog Hologres.
            HologresBinlogSource source = new HologresBinlogSource(
                    new HologresConnectionParam(config),
                    schema,
                    config,
                    StartupMode.INITIAL,
                    sourceTableName,
                    "",
                    Collections.emptyList(),
                    -1,
                    Collections.emptySet(),
                    Collections.emptyList()
            );
            env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
            env.execute();
        }
  }

VVR 8.0.11+

public class Sample {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // Inisialisasi skema untuk tabel yang akan dibaca. Skema harus sesuai dengan field skema tabel Hologres. Anda dapat mendefinisikan subset field.
        TableSchema schema = TableSchema.builder()
                .field("a", DataTypes.INT())
                .field("b", DataTypes.STRING())
                .field("c", DataTypes.TIMESTAMP())
                .build();
        // Parameter untuk koneksi Hologres.
        Configuration config = new Configuration();
        config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
        config.setString(HologresConfigs.USERNAME, "yourUserName");
        config.setString(HologresConfigs.PASSWORD, "yourPassword");
        config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
        config.setString(HologresConfigs.TABLE, "yourTableName");
        config.setString(HologresConfigs.SDK_MODE, "jdbc");
        config.setBoolean(HologresBinlogConfigs.OPTIONAL_BINLOG, true);
        config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
        // Bangun JDBCOptions.
        JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
        // Bangun source binlog Hologres.
        long startTimeMs = 0;
        HologresBinlogSource source = new HologresBinlogSource(
                new HologresConnectionParam(config),
                schema,
                config,
                jdbcOptions,
                startTimeMs,
                StartupMode.INITIAL,
                "",
                "",
                -1,
                Collections.emptySet(),
                new ArrayList<>()
        );
        env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
        env.execute();
    }
}

VVR 8.0.7+

public class Sample {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // Inisialisasi skema untuk tabel yang akan dibaca. Skema harus sesuai dengan field skema tabel Hologres. Anda dapat mendefinisikan subset field.
        TableSchema schema = TableSchema.builder()
                .field("a", DataTypes.INT())
                .field("b", DataTypes.STRING())
                .field("c", DataTypes.TIMESTAMP())
                .build();
        // Parameter untuk koneksi Hologres.
        Configuration config = new Configuration();
        config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
        config.setString(HologresConfigs.USERNAME, "yourUserName");
        config.setString(HologresConfigs.PASSWORD, "yourPassword");
        config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
        config.setString(HologresConfigs.TABLE, "yourTableName");
        config.setString(HologresConfigs.SDK_MODE, "jdbc");
        config.setBoolean(HologresBinlogConfigs.OPTIONAL_BINLOG, true);
        config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
        // Bangun JDBCOptions.
        JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
        // Bangun source binlog Hologres.
        long startTimeMs = 0;
        HologresBinlogSource source = new HologresBinlogSource(
                new HologresConnectionParam(config),
                schema,
                config,
                jdbcOptions,
                startTimeMs,
                StartupMode.INITIAL,
                "",
                "",
                -1,
                Collections.emptySet()
        );
        env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
        env.execute();
    }
}

VVR 6.0.7+

public class Sample {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // Inisialisasi skema untuk tabel yang akan dibaca. Skema harus sesuai dengan field skema tabel Hologres. Anda dapat mendefinisikan subset field.
        TableSchema schema = TableSchema.builder()
                .field("a", DataTypes.INT())
                .build();
         // Parameter untuk koneksi Hologres.
        Configuration config = new Configuration();
        config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
        config.setString(HologresConfigs.USERNAME, "yourUserName");
        config.setString(HologresConfigs.PASSWORD, "yourPassword");
        config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
        config.setString(HologresConfigs.TABLE, "yourTableName");
        config.setString(HologresConfigs.SDK_MODE, "jdbc");
        config.setBoolean(HologresBinlogConfigs.OPTIONAL_BINLOG, true);
        config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
        // Bangun JDBCOptions.
        JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
        // Setel atau buat nama slot default.
        config.setString(HologresBinlogConfigs.JDBC_BINLOG_SLOT_NAME, HoloBinlogUtil.getOrCreateDefaultSlotForJDBCBinlog(jdbcOptions));

        boolean cdcMode = config.get(HologresBinlogConfigs.BINLOG_CDC_MODE) && config.get(HologresBinlogConfigs.OPTIONAL_BINLOG);
        // Bangun JDBCBinlogRecordConverter.
        JDBCBinlogRecordConverter recordConverter = new JDBCBinlogRecordConverter(
                jdbcOptions.getTable(),
                schema,
                new HologresConnectionParam(config),
                cdcMode,
                Collections.emptySet());
        
        // Bangun source binlog Hologres JDBC.
        long startTimeMs = 0;
        HologresJDBCBinlogSource source = new HologresJDBCBinlogSource(
                new HologresConnectionParam(config),
                schema,
                config,
                jdbcOptions,
                startTimeMs,
                StartupMode.TIMESTAMP,
                recordConverter,
                "",
                -1);
        env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
        env.execute();
    }
}
Penting

Jika Anda menggunakan mesin Realtime Compute for Apache Flink versi lebih lama dari 8.0.5 atau versi Hologres lebih lama dari 2.1, pastikan pengguna adalah superuser atau memiliki izin Replication Role. Untuk informasi lebih lanjut, lihat Masalah izin Hologres.

Tabel source non-binlog

VVR menyediakan kelas HologresBulkreadInputFormat, implementasi RichInputFormat, untuk membaca data dari tabel Hologres. Contoh berikut menunjukkan cara membangun source Hologres.

public class Sample {
    public static void main(String[] args) throws Exception {
        // atur API Java DataStream
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // Inisialisasi skema untuk tabel yang akan dibaca. Skema harus sesuai dengan field skema tabel Hologres. Anda dapat mendefinisikan subset field.
        TableSchema schema = TableSchema.builder()
                .field("a", DataTypes.INT())
                .field("b", DataTypes.STRING())
                .field("c", DataTypes.TIMESTAMP())
                .build();
        // Parameter untuk koneksi Hologres.
        Configuration config = new Configuration();
        config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
        config.setString(HologresConfigs.USERNAME, "yourUserName");
        config.setString(HologresConfigs.PASSWORD, "yourPassword");
        config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
        config.setString(HologresConfigs.TABLE, "yourTableName");
        // Bangun JDBCOptions.
        JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
        HologresBulkreadInputFormat inputFormat = new HologresBulkreadInputFormat(
                new HologresConnectionParam(config),
                jdbcOptions,
                schema,
                "",
                -1);
        TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
        env.addSource(new InputFormatSourceFunction<>(inputFormat, typeInfo)).returns(typeInfo).print();
        env.execute();
    }
}

Dependensi Maven

Konektor DataStream Hologres tersedia di Maven Central.

<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-hologres</artifactId>
    <version>${vvr-version}</version>
</dependency>

Tabel sink Hologres

VVR 11+

public class Sample {
      public static void main(String[] args) throws Exception {
          final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
          // Inisialisasi skema untuk tabel yang akan ditulis. Skema harus sesuai dengan field skema tabel Hologres. Anda dapat mendefinisikan subset field.
          TableSchema tableSchema = TableSchema.builder()
                  .field("a", DataTypes.INT().notNull())
                  .field("b", DataTypes.STRING())
                  .primaryKey("a")
                  .build();
          // Parameter untuk koneksi Hologres.
          Configuration config = new Configuration();
          config.set(HologresConfigs.ENDPOINT, "yourEndpoint");
          config.set(HologresConfigs.USERNAME, "yourUserName");
          config.set(HologresConfigs.PASSWORD, "yourPassword");
          config.set(HologresConfigs.DATABASE, "yourDatabaseName");
          config.set(HologresConfigs.TABLE, "yourTableName");
          HologresConnectionParam connectionParam = new HologresConnectionParam(config);
          HologresTableSchema hologresTableSchema =
                  HologresTableSchema.get(connectionParam.getJDBCOptions());
          // Indeks kolom yang akan ditulis ke sink.
          Integer[] targetColumnIndexes = {0, 1};
          // Bangun sink Hologres.
          HologresSinkFunction sinkFunction =
                  new HologresSinkFunction(
                          connectionParam, tableSchema, targetColumnIndexes, hologresTableSchema);
          TypeInformation<RowData> typeInfo = InternalTypeInfo.of(tableSchema.toRowDataType().getLogicalType());
          env.fromElements((RowData) GenericRowData.of(101, StringData.fromString("name"))).returns(typeInfo).addSink(sinkFunction);
          env.execute();
      }
  }

VVR 8+

public class Sample {
    public static void main(String[] args) throws Exception {
        // atur API Java DataStream
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // Inisialisasi skema untuk tabel yang akan ditulis. Skema harus sesuai dengan field skema tabel Hologres. Anda dapat mendefinisikan subset field.
        TableSchema schema = TableSchema.builder()
                .field("a", DataTypes.INT())
                .field("b", DataTypes.STRING())
                .build();
        // Parameter untuk koneksi Hologres.
        Configuration config = new Configuration();
        config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
        config.setString(HologresConfigs.USERNAME, "yourUserName");
        config.setString(HologresConfigs.PASSWORD, "yourPassword");
        config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
        config.setString(HologresConfigs.TABLE, "yourTableName");
        config.setString(HologresConfigs.SDK_MODE, "jdbc");
        HologresConnectionParam hologresConnectionParam = new HologresConnectionParam(config);
        
         // Bangun writer Hologres untuk menulis data sebagai RowData.
        AbstractHologresWriter<RowData> hologresWriter = HologresJDBCWriter.createRowDataWriter(
                hologresConnectionParam, 
                schema, 
                HologresTableSchema.get(hologresConnectionParam), 
                new Integer[0]);
        // Bangun sink Hologres.
        HologresSinkFunction sinkFunction = new HologresSinkFunction(hologresConnectionParam, hologresWriter);
        TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
        env.fromElements((RowData) GenericRowData.of(101, StringData.fromString("name"))).returns(typeInfo).addSink(sinkFunction);
        env.execute();
    }
}

Kolom metadata

Di Realtime Compute for Apache Flink VVR 8.0.11 dan seterusnya, tabel source binlog mendukung kolom metadata. Mulai versi ini, kami merekomendasikan Anda mendeklarasikan field binlog seperti hg_binlog_event_type sebagai kolom metadata. Kolom metadata adalah ekstensi terhadap standar SQL. Kolom ini memungkinkan Anda mengakses informasi spesifik, seperti nama database sumber, nama tabel, jenis perubahan, dan timestamp event. Anda dapat menggunakan metadata ini untuk mengimplementasikan logika kustom, misalnya, untuk memfilter event DELETE.

Parameter

Tipe

Deskripsi

db_name

STRING NOT NULL

Nama database yang berisi baris tersebut.

table_name

STRING NOT NULL

Nama tabel yang berisi baris tersebut.

hg_binlog_lsn

BIGINT NOT NULL

Field sistem untuk nomor urutan binlog. Nilainya meningkat secara monoton tetapi tidak kontinu dalam satu shard. Keunikan dan urutan tidak dijamin di seluruh shard.

hg_binlog_timestamp_us

BIGINT NOT NULL

Timestamp event perubahan di database, dalam mikrodetik (us).

hg_binlog_event_type

BIGINT NOT NULL

Jenis perubahan baris. Nilai yang valid adalah:

  • 5: Pesan INSERT.

  • 2: Pesan DELETE.

  • 3: Gambar baris sebelum operasi UPDATE.

  • 7: Gambar baris setelah operasi UPDATE.

hg_shard_id

INT NOT NULL

ID shard data yang berisi baris tersebut. Untuk informasi lebih lanjut tentang kelompok tabel dan shard, lihat kelompok tabel dan shard.

Dalam pernyataan DDL, Anda dapat mendeklarasikan kolom metadata dengan menggunakan <meta_column_name> <datatype> METADATA VIRTUAL. Berikut adalah contohnya:

CREATE TABLE test_message_src_binlog_table(
  hg_binlog_lsn bigint METADATA VIRTUAL
  hg_binlog_event_type bigint METADATA VIRTUAL
  hg_binlog_timestamp_us bigint METADATA VIRTUAL
  hg_shard_id int METADATA VIRTUAL
  db_name string METADATA VIRTUAL
  table_name string METADATA VIRTUAL
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  ...
  );

FAQ

Referensi