全部产品
Search
文档中心

Realtime Compute for Apache Flink:Hologres

更新时间:Nov 10, 2025

Topik ini menjelaskan cara menggunakan konektor Hologres.

Latar Belakang

Hologres adalah mesin gudang data waktu nyata terpadu yang mendukung penulisan, pembaruan, dan analisis data dalam jumlah besar secara real-time. Hologres mendukung SQL standar dan kompatibel dengan protokol PostgreSQL. Hologres juga mendukung pemrosesan analitik online multidimensi (OLAP), analisis ad hoc data petabyte, serta penyajian data online dengan konkurensi tinggi dan latensi rendah. Hologres terintegrasi erat dengan MaxCompute, Flink, dan DataWorks untuk menyediakan solusi terpadu bagi gudang data offline maupun online. Konektor Hologres mendukung fitur-fitur berikut.

Kategori

Rincian

Jenis yang didukung

Tabel sumber, dimensi, dan sink

Mode eksekusi

Mode stream dan batch

Format data

Tidak didukung

Metrik pemantauan khusus

Metrik pemantauan

  • Tabel sumber:

    • numRecordsIn

    • numRecordsInPerSecond

  • Tabel sink:

    • numRecordsOut

    • numRecordsOutPerSecond

    • currentSendTime

    Catatan

    Untuk informasi lebih lanjut mengenai metrik tersebut, lihat Metrik pemantauan.

Jenis API

DataStream dan SQL

Mendukung pembaruan atau penghapusan data pada tabel sink

Ya

Fitur

Fitur

Rincian

Konsumsi data Hologres secara real-time

Anda dapat membaca data Hologres dengan atau tanpa binary logging. Fitur ini kompatibel dengan mode change data capture (CDC) maupun non-CDC.

Konsumsi penuh dan inkremental terpadu

Anda dapat melakukan konsumsi penuh, inkremental, atau konsumsi penuh dan inkremental terpadu.

Kebijakan penanganan konflik kunci primer

Anda dapat mengabaikan data baru, mengganti seluruh baris, atau hanya memperbarui field tertentu.

Penggabungan tabel lebar dan pembaruan parsial untuk penulisan multi-stream

Anda dapat memperbarui hanya kolom yang dimodifikasi alih-alih seluruh baris.

Mengonsumsi log biner tabel partisi (pratinjau publik)

Anda dapat mengonsumsi log biner dari tabel partisi fisik. Satu pekerjaan dapat memantau semua partisi, termasuk partisi yang baru ditambahkan. Anda juga dapat mengonsumsi log biner dari tabel partisi logis.

Menulis ke tabel partisi

Anda dapat menulis data ke tabel induk dari tabel partisi dan membuat otomatis partisi anak yang sesuai.

Sinkronisasi real-time untuk satu tabel atau seluruh database

Anda dapat melakukan sinkronisasi data real-time pada tingkat satu tabel atau seluruh database. Fitur ini menyediakan kemampuan berikut:

  • Deteksi otomatis evolusi skema tabel leluhur: Jika skema tabel database sumber berubah, Hologres dapat menyinkronkan perubahan tersebut ke tabel sink secara real-time.

  • Penanganan otomatis evolusi skema: Jika data baru mengalir ke Hologres, Flink pertama-tama memicu operasi modifikasi skema sebelum menulis data. Proses ini tidak memerlukan intervensi manual.

Untuk informasi lebih lanjut, lihat Pernyataan CREATE TABLE AS (CTAS) dan Mulai Cepat untuk sinkronisasi database real-time.

Batasan dan saran

Batasan

  • Tabel eksternal tidak didukung: Konektor Hologres tidak mendukung akses ke tabel eksternal Hologres, seperti tabel eksternal MaxCompute.

  • Batasan tipe waktu: Konsumsi real-time data TIMESTAMP tidak didukung. Anda harus menggunakan tipe TIMESTAMPTZ saat membuat tabel.

  • Mode pemindaian tabel sumber (Ververica Runtime (VVR) 8 dan sebelumnya): Secara default, data dibaca dalam mode batch. Seluruh tabel dipindai hanya sekali, dan data baru tidak dikonsumsi.

  • Batasan Watermark (VVR 8 dan sebelumnya): Mode CDC tidak mendukung definisi watermark. Untuk agregasi berbasis jendela, Anda harus menggunakan solusi agregasi non-jendela.

Saran

  • Pemilihan mode penyimpanan:

    • Kueri titik pada tabel dimensi: Gunakan penyimpanan berorientasi baris. Anda harus menetapkan kunci primer dan kunci pengelompokan.

    • Kueri satu-ke-banyak pada tabel dimensi: Gunakan penyimpanan berorientasi kolom. Anda dapat mengonfigurasi kunci distribusi dan kunci segmen (kolom waktu event) untuk mengoptimalkan kinerja.

    • Tabel untuk pembaruan frekuensi tinggi dan kueri analitik: Jika sebuah tabel harus mendukung konsumsi log biner dan analisis OLAP secara real-time, gunakan mode penyimpanan hibrida baris-kolom.

    Penting

    Saat membuat tabel di Hologres, mode penyimpanan default adalah berorientasi kolom jika Anda tidak menentukan mode tertentu. Mode penyimpanan tidak dapat diubah setelah tabel dibuat. Untuk informasi lebih lanjut, lihat Buat tabel di Hologres dan Format penyimpanan tabel: Berorientasi kolom, berorientasi baris, dan hibrida baris-kolom.

  • Pengaturan konkurensi pekerjaan: Anda dapat mengatur konkurensi pekerjaan Flink agar sama dengan jumlah shard pada tabel Hologres.

    # Di Konsol Hologres, jalankan pernyataan berikut untuk melihat jumlah shard pada suatu tabel. Ganti tablename dengan nama tabel Anda.
    select tg.property_value from hologres.hg_table_properties tb join hologres.hg_table_group_properties tg on tb.property_value = tg.tablegroup_name where tb.property_key = 'table_group' and tg.property_key = 'shard_count' and table_name = '<tablename>';
  • Versi dan fitur: Kami menyarankan Anda secara rutin memeriksa Catatan Rilis Konektor Hologres untuk informasi bug yang diketahui, pembaruan fitur, dan kompatibilitas versi.

Catatan

  • Kompatibilitas dan batasan versi Hologres dan VVR

    Tabel sumber

    • Untuk VVR 8 dan sebelumnya, Anda dapat memilih mode konsumsi dengan menentukan parameter sdkMode.

    • Untuk VVR 11 dan seterusnya, Anda dapat memilih mode konsumsi dengan menentukan parameter source.binlog.read-mode.

    Versi VVR

    Versi Hologres

    Nilai parameter default/direkomendasikan

    Mode konsumsi aktual

    Catatan

    ≥ 6.0.7

    < 2.0

    Kustom

    holohub (default)

    Konfigurasikan jdbc.

    6.0.7 hingga 8.0.4

    ≥ 2.0

    jdbc (alih otomatis, tidak perlu konfigurasi)

    jdbc (dipaksakan)

    Karena layanan holohub tidak dipublikasikan di Hologres 2.0 dan seterusnya, mode secara otomatis dialihkan ke jdbc. Hal ini dapat menyebabkan masalah izin. Untuk informasi lebih lanjut tentang cara mengonfigurasi izin, lihat Masalah izin.

    8.0.5 atau seterusnya

    Lebih besar dari atau sama dengan 2.1

    jdbc (alih otomatis, tidak perlu konfigurasi)

    jdbc (dipaksakan)

    Tidak ada masalah izin. Untuk Hologres 2.1.27 dan seterusnya, mode dialihkan ke jdbc_fixed.

    Lebih besar dari atau sama dengan 11.1

    Versi apa pun

    AUTO (default)

    Dipilih secara otomatis berdasarkan versi Hologres

    • Untuk versi 2.1.27 dan seterusnya, mode jdbc dipilih, dan koneksi ringan diaktifkan secara default. Artinya parameter connection.fixed.enabled diatur ke true secara default.

    • Untuk versi 2.1.0 hingga 2.1.26, mode jdbc dipilih.

    • Untuk versi 2.0 dan sebelumnya, mode holohub dipilih.

    Penting

    Untuk VVR 11.1 dan seterusnya, data log biner dikonsumsi secara default. Pastikan Anda telah mengaktifkan binary logging. Jika tidak, kesalahan mungkin terjadi.

    Masalah izin

    Jika pengguna bukan superuser, Anda harus mengonfigurasi izin untuk mengonsumsi log biner dalam mode JDBC.

    user_name menentukan ID akun Alibaba Cloud atau Pengguna RAM. Untuk informasi lebih lanjut, lihat Ikhtisar akun.

    -- Dalam model otorisasi PostgreSQL standar, berikan izin CREATE kepada pengguna dan berikan izin Replication Role pada instans kepada pengguna.
    GRANT CREATE ON DATABASE <db_name> TO <user_name>;
    alter role <user_name> replication;
    
    -- Jika database menggunakan model izin sederhana (SLMP), Anda tidak dapat menjalankan pernyataan GRANT. Gunakan spm_grant untuk memberikan izin Admin pada database kepada pengguna. Anda juga dapat memberikan izin di konsol HoloWeb.
    call spm_grant('<db_name>_admin', '<user_name>');
    alter role <user_name> replication;

    Tabel sink

    • Untuk VVR 8 dan sebelumnya, Anda dapat memilih mode konsumsi dengan menentukan parameter sdkMode.

    • Untuk VVR 11 dan seterusnya, Anda dapat memilih mode konsumsi dengan menentukan parameter sink.write-mode.

    Versi VVR

    Versi Hologres

    Apakah mode rpc terpengaruh?

    Mode konsumsi rpc aktual

    Nilai parameter direkomendasikan/Default

    Catatan

    6.0.4 hingga 8.0.2

    < 2.0

    Tidak

    rpc

    Kustom

    /

    6.0.4 hingga 8.0.2

    ≥ 2.0

    Ya

    jdbc_fixed (alih otomatis)

    Kustom

    Anda dapat mengatur 'jdbcWriteBatchSize' ke '1' untuk mencegah deduplikasi.

    8.0.3 atau seterusnya

    Versi apa pun

    Ya

    jdbc_fixed (alih otomatis)

    Kustom

    Jika Anda mengonfigurasi mode rpc, nilai parameter secara otomatis dialihkan ke jdbc_fixed dan 'jdbcWriteBatchSize' diatur ke '1' untuk mencegah deduplikasi.

    8.0.5 atau seterusnya

    Versi apa pun

    Ya

    jdbc_fixed (alih otomatis)

    Kustom

    Jika Anda mengonfigurasi mode rpc, nilai parameter secara otomatis dialihkan ke jdbc_fixed dan 'deduplication.enabled' diatur ke 'false' untuk mencegah deduplikasi.

    Penting
    • Layanan rpc tidak dipublikasikan di Hologres 2.0 dan seterusnya. Jika Anda mengatur parameter ini ke rpc, sistem Flink secara otomatis mengalihkan nilai parameter ke jdbc_fixed. Namun, jika Anda mengaturnya ke nilai lain, sistem Flink menggunakan nilai yang Anda konfigurasi.

    • Mode rpc dihapus di VVR 11.1 dan seterusnya. Kami menyarankan Anda menggunakan mode jdbc untuk koneksi.

    • Untuk penulisan dalam skenario konkurensi tinggi, Anda dapat menggunakan mode jdbc_copy/COPY_STREAM.

    Tabel dimensi

    Versi VVR

    Versi Hologres

    Apakah mode rpc terpengaruh?

    Mode konsumsi rpc aktual

    Nilai parameter direkomendasikan/Default

    Catatan

    6.0.4 hingga 8.0.2

    < 2.0

    Tidak

    rpc

    Kustom

    /

    6.0.4 hingga 8.0.2

    ≥ 2.0

    Ya

    jdbc_fixed (alih otomatis)

    Kustom

    Jika instans Hologres Anda versi 2.0 atau seterusnya, layanan rpc tidak dipublikasikan. Jika Anda mengatur parameter ini ke rpc, sistem Flink secara otomatis mengalihkan nilai parameter ke jdbc_fixed. Namun, jika Anda mengaturnya ke nilai lain, sistem Flink menggunakan nilai yang Anda konfigurasi.

    8.0.3 atau seterusnya

    Versi apa pun

    Ya

    jdbc_fixed (alih otomatis)

    Kustom

    8.0.5 atau seterusnya

    Versi apa pun

    Ya

    jdbc_fixed (alih otomatis)

    Kustom

    Penting

    Mode rpc dihapus di VVR 11.1 dan seterusnya. Mode jdbc digunakan untuk koneksi secara default. Anda dapat mengaktifkan parameter connection.fixed.enabled untuk menggunakan mode koneksi ringan jika diperlukan.

  • Tabel sumber log biner dalam mode JDBC mendukung pembacaan tipe JSONB. Anda harus mengaktifkan parameter Grand Unified Configuration (GUC) di tingkat database.

    -- Aktifkan parameter GUC di tingkat database. Hanya superuser yang dapat menjalankan pernyataan ini. Anda hanya perlu mengatur parameter ini sekali untuk setiap database.
    alter database <db_name> set hg_experimental_enable_binlog_jsonb = on;
  • Operasi UPDATE menghasilkan dua catatan log biner berturut-turut. Catatan untuk data lama (update_before) diikuti oleh catatan untuk data baru (update_after).

  • Jangan menjalankan operasi TRUNCATE atau operasi pembuatan ulang tabel lainnya pada tabel sumber log biner. Untuk informasi lebih lanjut, lihat FAQ.

  • Untuk mencegah kesalahan, pastikan presisi tipe DECIMAL sama di Flink dan Hologres. Untuk informasi lebih lanjut, lihat FAQ.

  • Jika Anda menggunakan mode INITIAL untuk konsumsi penuh dan inkremental terpadu data tabel sumber, pengurutan global tidak dijamin. Jika aplikasi downstream Anda bergantung pada field waktu untuk perhitungan, Anda harus menggunakan mode konsumsi log biner murni yang berbeda.

Aktifkan binary logging

Tidak ada tabel yang dibuat

Fitur konsumsi real-time dinonaktifkan secara default. Saat membuat tabel di Konsol Hologres, Anda harus mengatur parameter binlog.level dan binlog.ttl dalam pernyataan DDL. Kode berikut memberikan contoh.

begin;
create table test_table(
  id int primary key, 
  title text not null, 
  body text);
call set_table_property('test_table', 'orientation', 'row');--Buat tabel berorientasi baris bernama test_table.
call set_table_property('test_table', 'clustering_key', 'id');--Buat kunci pengelompokan pada kolom id.
call set_table_property('test_table', 'binlog.level', 'replica');--Atur properti tabel untuk mengaktifkan binary logging.
call set_table_property('test_table', 'binlog.ttl', '86400');--binlog.ttl menentukan TTL log biner dalam detik.
commit;

Untuk tabel yang sudah ada

Di Konsol Hologres, Anda dapat menjalankan pernyataan berikut untuk mengaktifkan binary logging untuk tabel yang sudah ada dan mengatur TTL log biner. table_name menentukan nama tabel tempat Anda ingin mengaktifkan binary logging.

-- Atur properti tabel untuk mengaktifkan binary logging.
begin;
call set_table_property('<table_name>', 'binlog.level', 'replica');
commit;

-- Atur properti tabel untuk mengonfigurasi TTL log biner dalam detik.
begin;
call set_table_property('<table_name>', 'binlog.ttl', '2592000');
commit;

Opsi konektor

Mulai dari VVR 11, parameter WITH disesuaikan untuk mendukung Hologres dengan lebih baik. Beberapa parameter mungkin diganti namanya atau dihapus. VVR 11 kompatibel mundur dengan VVR 8. Temukan deskripsi parameter yang sesuai dengan versi VVR Anda.

Pemetaan tipe

Untuk informasi lebih lanjut tentang pemetaan tipe data antara Flink dan Hologres, lihat Pemetaan tipe data antara Flink dan Hologres.

Contoh

Contoh tabel sumber

Tabel sumber Binlog

Mode CDC

Dalam mode ini, untuk data log biner yang dikonsumsi oleh sumber, tipe Flink RowKind secara otomatis dan akurat diatur untuk setiap baris berdasarkan hg_binlog_event_type. Anda tidak perlu mendeklarasikan tipe secara eksplisit. Contohnya termasuk INSERT, DELETE, UPDATE_BEFORE, dan UPDATE_AFTER. Hal ini memungkinkan sinkronisasi data cermin dari tabel, yang mirip dengan fitur CDC MySQL atau Postgres. Kode berikut memberikan contoh pernyataan DDL tabel sumber.

VVR 11+

CREATE TEMPORARY TABLE test_message_src_binlog_table(
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username'='${secret_values.ak_id}',            --Gunakan manajemen variabel untuk Pasangan Kunci Akses Anda guna mencegah kebocoran kunci. 
  'password'='${secret_values.ak_secret}',        
  'endpoint'='<yourEndpoint>',
  'source.binlog.change-log-mode'='ALL',  --Baca semua jenis ChangeLog, termasuk INSERT, DELETE, UPDATE_BEFORE, dan UPDATE_AFTER.
  'retry-count'='10',                     --Jumlah percobaan ulang setelah terjadi kesalahan pembacaan log biner.
  'retry-sleep-step-ms'='5000',           --Waktu tunggu bertahap untuk percobaan ulang. Percobaan ulang pertama menunggu 5 detik, yang kedua 10 detik, dan seterusnya.
  'source.binlog.batch-size'='512'        --Jumlah baris data yang dibaca dari log biner dalam setiap batch.
);

VVR 8+

CREATE TEMPORARY TABLE test_message_src_binlog_table(
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username' = '${secret_values.ak_id}',       --Gunakan manajemen variabel untuk Pasangan Kunci Akses Anda guna mencegah kebocoran kunci.   
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>',
  'binlog' = 'true',
  'cdcMode' = 'true',
  'sdkMode'='jdbc',
  'binlogMaxRetryTimes' = '10',     --Jumlah percobaan ulang setelah terjadi kesalahan pembacaan log biner.
  'binlogRetryIntervalMs' = '500',  --Interval percobaan ulang setelah terjadi kesalahan pembacaan log biner.
  'binlogBatchReadSize' = '100'     --Jumlah baris data yang dibaca dari log biner dalam setiap batch.
);

Mode non-CDC

Dalam mode ini, data log biner yang dikonsumsi oleh sumber diteruskan ke node turunan sebagai data Flink biasa. Artinya semua data bertipe INSERT. Anda dapat memilih cara memproses data dengan tipe hg_binlog_event_type tertentu sesuai kebutuhan. Kode berikut memberikan contoh pernyataan DDL tabel sumber.

VVR 11+

CREATE TEMPORARY TABLE test_message_src_binlog_table(
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username' = '${secret_values.ak_id}',       --Gunakan manajemen variabel untuk Pasangan Kunci Akses Anda guna mencegah kebocoran kunci.   
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>',
  'source.binlog.change-log-mode'='ALL_AS_APPEND_ONLY',  --Semua jenis ChangeLog diproses sebagai INSERT.
  'retry-count'='10',                     --Jumlah percobaan ulang setelah terjadi kesalahan pembacaan log biner.
  'retry-sleep-step-ms'='5000',           --Waktu tunggu bertahap untuk percobaan ulang. Percobaan ulang pertama menunggu 5 detik, yang kedua 10 detik, dan seterusnya.
  'source.binlog.batch-size'='512'        --Jumlah baris data yang dibaca dari log biner dalam setiap batch.
);

VVR 8+

CREATE TEMPORARY TABLE test_message_src_binlog_table(
  hg_binlog_lsn BIGINT,
  hg_binlog_event_type BIGINT,
  hg_binlog_timestamp_us BIGINT,
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username' = '${secret_values.ak_id}',       --Gunakan manajemen variabel untuk Pasangan Kunci Akses Anda guna mencegah kebocoran kunci.   
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>',
  'binlog' = 'true',
  'binlogMaxRetryTimes' = '10',     --Jumlah percobaan ulang setelah terjadi kesalahan pembacaan log biner.
  'binlogRetryIntervalMs' = '500',  --Interval percobaan ulang setelah terjadi kesalahan pembacaan log biner.
  'binlogBatchReadSize' = '100'     --Jumlah baris data yang dibaca dari log biner dalam setiap batch.
);

Tabel sumber Non-Binlog

VVR 11+

Penting

Untuk VVR 11.1 dan seterusnya, data log biner dikonsumsi secara default. Untuk informasi lebih lanjut, lihat Tabel sumber Binlog.

CREATE TEMPORARY TABLE hologres_source (
  name varchar, 
  age BIGINT,
  birthday BIGINT
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username' = '${secret_values.ak_id}',       --Gunakan manajemen variabel untuk Pasangan Kunci Akses Anda guna mencegah kebocoran kunci.   
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>',
  'source.binlog'='false'                      --Menentukan apakah akan mengonsumsi data log biner.
);

VVR 8+

CREATE TEMPORARY TABLE hologres_source (
  name varchar, 
  age BIGINT,
  birthday BIGINT
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username' = '${secret_values.ak_id}',       --Gunakan manajemen variabel untuk Pasangan Kunci Akses Anda guna mencegah kebocoran kunci.   
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>',
  'sdkMode' = 'jdbc'
);

Contoh tabel sink

CREATE TEMPORARY TABLE datagen_source(
  name varchar, 
  age BIGINT,
  birthday BIGINT
) WITH (
  'connector'='datagen'
);
CREATE TEMPORARY TABLE hologres_sink (
  name varchar, 
  age BIGINT,
  birthday BIGINT
) WITH (
  'connector'='hologres',           
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username' = '${secret_values.ak_id}',       --Gunakan manajemen variabel untuk Pasangan Kunci Akses Anda guna mencegah kebocoran kunci.   
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>'
);
INSERT INTO hologres_sink SELECT * from datagen_source;

Contoh tabel dimensi

CREATE TEMPORARY TABLE datagen_source (
  a INT,
  b BIGINT,
  c STRING,
  proctime AS PROCTIME()
) WITH (
  'connector' = 'datagen'
);
CREATE TEMPORARY TABLE hologres_dim (
  a INT, 
  b VARCHAR, 
  c VARCHAR
) WITH (
  'connector'='hologres',           
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username' = '${secret_values.ak_id}',       --Gunakan manajemen variabel untuk Pasangan Kunci Akses Anda guna mencegah kebocoran kunci.   
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>'
);
CREATE TEMPORARY TABLE blackhole_sink (
  a INT,
  b STRING
) WITH (
  'connector' = 'blackhole'
);
INSERT INTO blackhole_sink SELECT T.a,H.b
FROM datagen_source AS T JOIN hologres_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.a = H.a;

Deskripsi fitur terperinci

Konsumsi penuh dan inkremental terpadu

Skenario

  • Fitur ini hanya berlaku untuk skenario di mana tabel sink memiliki kunci primer. Kami menyarankan Anda menggunakan tabel sumber Hologres penuh dan inkremental dalam mode CDC.

  • Hologres mendukung pengaktifan binary logging sesuai kebutuhan. Anda dapat mengaktifkan binary logging untuk tabel yang berisi data historis.

Contoh kode

VVR 11+

CREATE TABLE test_message_src_binlog_table(
  hg_binlog_lsn BIGINT,
  hg_binlog_event_type BIGINT,
  hg_binlog_timestamp_us BIGINT,
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username'='<yourAccessID>',
  'password'='<yourAccessSecret>',
  'endpoint'='<yourEndpoint>',
  'source.binlog.startup-mode' = 'INITIAL',   --Pertama membaca semua data historis, lalu mengonsumsi log biner secara inkremental.
  'retry-count'='10',                         --Jumlah percobaan ulang setelah terjadi kesalahan pembacaan log biner.
  'retry-sleep-step-ms'='5000',               --Waktu tunggu bertahap untuk percobaan ulang. Percobaan ulang pertama menunggu 5 detik, yang kedua 10 detik, dan seterusnya.
  'source.binlog.batch-size'='512'            --Jumlah baris data yang dibaca dari log biner dalam setiap batch.
  );
Catatan
  • Anda dapat mengatur source.binlog.startup-mode ke INITIAL untuk pertama kali mengonsumsi semua data lalu membaca log biner untuk konsumsi inkremental.

  • Jika Anda mengatur parameter startTime atau memilih waktu mulai di halaman startup, binlogStartUpMode dipaksakan menjadi timestamp. Mode konsumsi lain tidak berlaku karena parameter startTime memiliki prioritas lebih tinggi.

VVR 8+

CREATE TABLE test_message_src_binlog_table(
  hg_binlog_lsn BIGINT,
  hg_binlog_event_type BIGINT,
  hg_binlog_timestamp_us BIGINT,
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username'='<yourAccessID>',
  'password'='<yourAccessSecret>',
  'endpoint'='<yourEndpoint>',
  'binlog' = 'true',
  'cdcMode' = 'true',
  'binlogStartUpMode' = 'initial', --Pertama membaca semua data historis, lalu mengonsumsi log biner secara inkremental.
  'binlogMaxRetryTimes' = '10',     --Jumlah percobaan ulang setelah terjadi kesalahan pembacaan log biner.
  'binlogRetryIntervalMs' = '500',  --Interval percobaan ulang setelah terjadi kesalahan pembacaan log biner.
  'binlogBatchReadSize' = '100'     --Jumlah baris data yang dibaca dari log biner dalam setiap batch.
  );
Catatan
  • Anda dapat mengatur binlogStartUpMode ke initial untuk pertama kali mengonsumsi semua data lalu membaca log biner untuk konsumsi inkremental.

  • Jika Anda mengatur parameter startTime atau memilih waktu mulai di halaman startup, binlogStartUpMode dipaksakan menjadi timestamp. Mode konsumsi lain tidak berlaku karena parameter startTime memiliki prioritas lebih tinggi.

Kebijakan penanganan konflik kunci primer

Konektor menyediakan tiga kebijakan untuk menangani data dengan kunci primer duplikat saat Anda menulis data ke Hologres.

VVR 11+

Anda dapat menentukan parameter sink.on-conflict-action untuk menerapkan kebijakan penanganan berbeda.

Nilai sink.on-conflict-action

Makna

INSERT_OR_IGNORE

Menyimpan kemunculan pertama data dan mengabaikan duplikat berikutnya.

INSERT_OR_REPLACE

Mengganti data yang ada dengan data berikutnya.

INSERT_OR_UPDATE (default)

Hanya memperbarui field yang disediakan di sink. Field lain tetap tidak berubah.

VVR 8+

Anda dapat menentukan parameter mutatetype untuk menerapkan kebijakan penanganan berbeda.

Nilai mutatetype

Makna

insertorignore (default)

Menyimpan kemunculan pertama data dan mengabaikan duplikat berikutnya.

insertorreplace

Mengganti data yang ada dengan data berikutnya.

insertorupdate

Hanya memperbarui field yang disediakan di sink. Field lain tetap tidak berubah.

Asumsikan sebuah tabel memiliki field a, b, c, dan d, dengan a sebagai kunci primer. Jika tabel sink hanya menyediakan field a dan b, dan Anda mengatur kebijakan ke INSERT_OR_UPDATE, hanya field b yang diperbarui, sedangkan c dan d tetap tidak berubah.
Catatan

Jumlah field di tabel sink dapat kurang dari jumlah field di tabel fisik Hologres. Namun, field yang hilang harus mengizinkan nilai null. Jika tidak, operasi penulisan gagal.

Menulis ke tabel partisi

Secara default, sink Hologres hanya mendukung impor data ke satu tabel tunggal. Untuk mengimpor data ke tabel induk dari tabel partisi, Anda harus mengaktifkan konfigurasi berikut:

VVR 11+

Anda dapat mengatur sink.create-missing-partition ke true. Jika partisi anak belum dibuat, partisi tersebut akan dibuat secara otomatis.

Catatan
  • VVR 11.1 dan seterusnya mendukung penulisan ke tabel partisi secara default dan secara otomatis mengarahkan data ke partisi anak yang sesuai.

  • Anda harus mengatur parameter tablename ke nama tabel induk.

  • Jika tabel anak belum dibuat sebelumnya dan sink.create-missing-partition=true tidak diatur, operasi penulisan gagal.

VVR 8+

  • Anda dapat mengatur partitionRouter ke true untuk secara otomatis mengarahkan data ke partisi anak yang sesuai.

  • Anda dapat mengatur createparttable ke true. Jika partisi anak belum dibuat, partisi tersebut akan dibuat secara otomatis.

Catatan
  • Anda harus mengatur parameter tablename ke nama tabel induk.

  • Jika tabel anak belum dibuat sebelumnya dan createparttable=true tidak diatur, operasi penulisan gagal.

Penggabungan tabel lebar dan pembaruan parsial untuk penulisan multi-stream

Saat Anda menulis beberapa aliran data ke tabel lebar Hologres yang sama, sistem mendukung penggabungan otomatis data yang memiliki kunci primer yang sama. Anda dapat memilih untuk hanya memperbarui kolom yang berubah alih-alih mengganti seluruh baris. Hal ini meningkatkan efisiensi penulisan dan konsistensi data.

Batasan

  • Tabel lebar harus memiliki kunci primer.

  • Data di setiap aliran data harus berisi field kunci primer lengkap.

  • Dalam skenario di mana tabel lebar berorientasi kolom digabungkan dengan jumlah permintaan per detik (RPS) tinggi, pemanfaatan CPU mungkin tinggi. Kami menyarankan Anda menonaktifkan fitur Dictionary Encoding untuk field-field di tabel.

Contoh

Asumsikan ada dua aliran data Flink. Salah satunya berisi field a, b, dan c, sedangkan yang lain berisi field a, d, dan e. Tabel lebar Hologres WIDE_TABLE berisi field a, b, c, d, dan e, dengan a sebagai kunci primer.

VVR 11+

// source1 dan source2 sudah didefinisikan.
CREATE TEMPORARY TABLE hologres_sink ( -- Deklarasikan lima field: a, b, c, d, dan e.
  a BIGINT, 
  b STRING,
  c STRING,
  d STRING,
  e STRING,
  primary key(a) not enforced
) WITH (
  'connector'='hologres',           
  'dbname'='<yourDbname>',
  'tablename'='<yourWideTablename>',  -- Tabel lebar Hologres yang berisi lima field: a, b, c, d, dan e.
  'username' = '${secret_values.ak_id}',
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>',
  'sink.on-conflict-action'='INSERT_OR_UPDATE',   -- Perbarui beberapa kolom data berdasarkan kunci primer.
  'sink.delete-strategy'='IGNORE_DELETE',         -- Kebijakan untuk menangani pesan retraction. IGNORE_DELETE cocok untuk skenario di mana Anda hanya perlu memasukkan atau memperbarui data, bukan menghapusnya.
  'sink.partial-insert.enabled'='true'            -- Aktifkan parameter pembaruan kolom parsial. Field yang didefinisikan dalam pernyataan INSERT didorong ke konektor, sehingga hanya field yang dideklarasikan yang dapat diperbarui atau dimasukkan.
);

BEGIN STATEMENT SET;
INSERT INTO hologres_sink(a,b,c) select * from source1;  -- Deklarasikan bahwa hanya tiga field yang dimasukkan: a, b, dan c.
INSERT INTO hologres_sink(a,d,e) select * from source2;  -- Deklarasikan bahwa hanya tiga field yang dimasukkan: a, d, dan e.
END;

VVR 8+

// source1 dan source2 sudah didefinisikan.
CREATE TEMPORARY TABLE hologres_sink ( -- Deklarasikan lima field: a, b, c, d, dan e.
  a BIGINT, 
  b STRING,
  c STRING,
  d STRING,
  e STRING,
  primary key(a) not enforced
) WITH (
  'connector'='hologres',           
  'dbname'='<yourDbname>',
  'tablename'='<yourWideTablename>',  -- Tabel lebar Hologres yang berisi lima field: a, b, c, d, dan e.
  'username' = '${secret_values.ak_id}',
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>',
  'mutatetype'='insertorupdate',    -- Perbarui beberapa kolom data berdasarkan kunci primer.
  'ignoredelete'='true',            -- Abaikan permintaan Delete yang dihasilkan oleh pesan retraction.
  'partial-insert.enabled'='true'   -- Aktifkan parameter pembaruan kolom parsial untuk mendukung pembaruan hanya field yang dideklarasikan dalam pernyataan INSERT.
);

BEGIN STATEMENT SET;
INSERT INTO hologres_sink(a,b,c) select * from source1;  -- Deklarasikan bahwa hanya tiga field yang dimasukkan: a, b, dan c.
INSERT INTO hologres_sink(a,d,e) select * from source2;  -- Deklarasikan bahwa hanya tiga field yang dimasukkan: a, d, dan e.
END;
Catatan

Anda dapat mengatur ignoredelete ke true untuk mengabaikan permintaan Delete yang dihasilkan oleh pesan retraction. Untuk VVR 8.0.8 dan seterusnya, kami menyarankan Anda menggunakan parameter sink.delete-strategy untuk mengonfigurasi berbagai kebijakan untuk pembaruan parsial.

Mengonsumsi log biner tabel partisi (pratinjau publik)

Tabel partisi membantu dalam pengarsipan data dan optimasi kueri. Konektor Hologres mendukung konsumsi log biner dari tabel partisi fisik dan logis. Untuk informasi lebih lanjut tentang perbedaan antara tabel partisi fisik dan logis, lihat BUAT TABEL PARTISI LOGIS.

Mengonsumsi log biner tabel partisi fisik

Konektor Hologres mendukung konsumsi log biner tabel partisi dengan satu pekerjaan dan dapat secara dinamis mendengarkan partisi baru. Hal ini secara signifikan meningkatkan efisiensi dan kegunaan pemrosesan data real-time.

Catatan

  • Hanya VVR 8.0.11 dan seterusnya, instans Hologres versi V2.1.27 atau seterusnya, dan tabel sumber log biner dalam mode JDBC yang mendukung konsumsi log biner tabel partisi.

  • Nama partisi harus secara ketat terdiri dari nama tabel induk, garis bawah, dan nilai partisi, dalam format {parent_table}_{partition_value}. Partisi yang tidak mengikuti format ini mungkin tidak dikonsumsi. Untuk informasi lebih lanjut, lihat Manajemen partisi dinamis.

    Penting
    • Untuk mode DYNAMIC, VVR 8.0.11 tidak mendukung field partisi dengan pemisah -, seperti YYYY-MM-DD.

    • Mulai dari VVR 11.1, Anda dapat mengonsumsi field partisi dalam format kustom.

    • Batasan format ini tidak berlaku saat Anda menulis data ke tabel partisi.

  • Saat mendeklarasikan tabel sumber Hologres di Flink, Anda harus menyertakan field partisi dari tabel partisi Hologres.

  • Untuk mode DYNAMIC, tabel partisi harus memiliki manajemen partisi dinamis diaktifkan. Parameter pra-pembuatan partisi auto_partitioning.num_precreate harus lebih besar dari 1. Jika tidak, pekerjaan melempar pengecualian saat mencoba mengonsumsi partisi terbaru.

  • Dalam mode DYNAMIC, setelah partisi baru ditambahkan, pembaruan data berikutnya di partisi lama tidak lagi dibaca.

Contoh

Jenis mode

Fitur

Deskripsi skenario

DYNAMIC

Konsumsi partisi dinamis

Secara otomatis mendengarkan partisi baru dan secara dinamis memajukan progres konsumsi secara kronologis. Cocok untuk skenario aliran data real-time.

STATIC

Konsumsi partisi statis

Hanya mengonsumsi partisi yang sudah ada atau partisi yang ditentukan secara manual. Tidak secara otomatis menemukan partisi baru. Cocok untuk memproses data historis dalam rentang tetap.

Mode DYNAMIC

VVR 11+

Asumsikan tabel partisi DDL berikut ada di Hologres, dan binary logging serta partisi dinamis diaktifkan.

CREATE TABLE "test_message_src1" (
    id int,
    title text,
    body text,
    dt text,
    PRIMARY KEY (id, dt)
)
PARTITION BY LIST (dt) WITH (
    binlog_level = 'replica', 
    auto_partitioning_enable =  'true',   -- Aktifkan partisi dinamis.
    auto_partitioning_time_unit = 'DAY',  -- Gunakan DAY sebagai satuan waktu. Contoh nama partisi: test_message_src1_20250512, test_message_src1_20250513.
    auto_partitioning_num_precreate = '2' -- Pra-buat dua partisi.
);
-- Untuk tabel partisi yang sudah ada, Anda juga dapat mengaktifkan partisi dinamis menggunakan ALTER TABLE.

Di Flink, Anda dapat menggunakan pernyataan SQL berikut untuk mendeklarasikan konsumsi mode DYNAMIC untuk tabel partisi test_message_src1.

CREATE TEMPORARY TABLE hologres_source
(
  id INTEGER,
  title VARCHAR,
  body VARCHAR,
  dt VARCHAR  -- Field partisi dari tabel partisi Hologres.
)
with (
  'connector' = 'hologres',
  'dbname' = '<yourDatabase>',
  'tablename' = 'test_message_src1',  -- Tabel induk dengan partisi dinamis diaktifkan.
  'username' = '<yourUserName>',
  'password' = '<yourPassword>',
  'endpoint' = '<yourEndpoint>',
  'source.binlog.partition-binlog-mode' = 'DYNAMIC', -- Secara dinamis mendengarkan partisi terbaru.
  'source.binlog.startup-mode' = 'initial'           -- Pertama membaca semua data historis, lalu mengonsumsi log biner secara inkremental.
);

VVR 8.0.11

Asumsikan tabel partisi DDL berikut ada di Hologres, dan binary logging serta partisi dinamis diaktifkan.

CREATE TABLE "test_message_src1" (
    id int,
    title text,
    body text,
    dt text,
    PRIMARY KEY (id, dt)
)
PARTITION BY LIST (dt) WITH (
    binlog_level = 'replica', 
    auto_partitioning_enable =  'true',   -- Aktifkan partisi dinamis.
    auto_partitioning_time_unit = 'DAY',  -- Gunakan DAY sebagai satuan waktu. Contoh nama partisi: test_message_src1_20241027, test_message_src1_20241028.
    auto_partitioning_num_precreate = '2' -- Pra-buat dua partisi.
);

-- Untuk tabel partisi yang sudah ada, Anda juga dapat mengaktifkan partisi dinamis menggunakan ALTER TABLE.

Di Flink, Anda dapat menggunakan pernyataan SQL berikut untuk mendeklarasikan konsumsi mode DYNAMIC untuk tabel partisi test_message_src1.

CREATE TEMPORARY TABLE hologres_source
(
  id INTEGER,
  title VARCHAR,
  body VARCHAR,
  dt VARCHAR  -- Field partisi dari tabel partisi Hologres.
)
with (
  'connector' = 'hologres',
  'dbname' = '<yourDatabase>',
  'tablename' = 'test_message_src1',  -- Tabel induk dengan partisi dinamis diaktifkan.
  'username' = '<yourUserName>',
  'password' = '<yourPassword>',
  'endpoint' = '<yourEndpoint>',
  'binlog' = 'true',
  'partition-binlog.mode' = 'DYNAMIC',  -- Secara dinamis mendengarkan partisi terbaru.
  'binlogstartUpMode' = 'initial',      -- Pertama membaca semua data historis, lalu mengonsumsi log biner secara inkremental.
  'sdkMode' = 'jdbc_fixed'              -- Gunakan mode ini untuk menghindari batas koneksi.
);

Mode STATIC

VVR 11+

Asumsikan tabel partisi DDL berikut ada di Hologres, dan binary logging diaktifkan.

CREATE TABLE test_message_src2 (
    id int,
    title text,
    body text,
    color text,
    PRIMARY KEY (id, color)
)
PARTITION BY LIST (color) WITH (
    binlog_level = 'replica'
);
create table test_message_src2_red partition of test_message_src2 for values in ('red');
create table test_message_src2_blue partition of test_message_src2 for values in ('blue');
create table test_message_src2_green partition of test_message_src2 for values in ('green');
create table test_message_src2_black partition of test_message_src2 for values in ('black');

Di Flink, Anda dapat menggunakan pernyataan SQL berikut untuk mendeklarasikan konsumsi mode STATIC untuk tabel partisi test_message_src2.

CREATE TEMPORARY TABLE hologres_source
(
  id INTEGER,
  title VARCHAR,
  body VARCHAR,
  color VARCHAR  -- Field partisi dari tabel partisi Hologres.
)
with (
  'connector' = 'hologres',
  'dbname' = '<yourDatabase>',
  'tablename' = 'test_message_src2',  -- Tabel partisi.
  'username' = '<yourUserName>',
  'password' = '<yourPassword>',
  'endpoint' = '<yourEndpoint>',
  'source.binlog.partition-binlog-mode' = 'STATIC', -- Mengonsumsi partisi tetap.
  'source.binlog.partition-values-to-read' = 'red,blue,green',  -- Hanya mengonsumsi tiga partisi yang dikonfigurasi. Partisi 'black' tidak dikonsumsi. Partisi baru tidak akan dikonsumsi. Jika tidak diatur, semua partisi dari tabel induk dikonsumsi.
  'source.binlog.startup-mode' = 'initial'  -- Pertama membaca semua data historis, lalu mengonsumsi log biner secara inkremental.
);

VVR 8.0.11

Asumsikan tabel partisi DDL berikut ada di Hologres, dan binary logging diaktifkan.

CREATE TABLE test_message_src2 (
    id int,
    title text,
    body text,
    color text,
    PRIMARY KEY (id, color)
)
PARTITION BY LIST (color) WITH (
    binlog_level = 'replica'
);
create table test_message_src2_red partition of test_message_src2 for values in ('red');
create table test_message_src2_blue partition of test_message_src2 for values in ('blue');
create table test_message_src2_green partition of test_message_src2 for values in ('green');
create table test_message_src2_black partition of test_message_src2 for values in ('black');

Di Flink, Anda dapat menggunakan pernyataan SQL berikut untuk mendeklarasikan konsumsi mode STATIC untuk tabel partisi test_message_src2.

CREATE TEMPORARY TABLE hologres_source
(
  id INTEGER,
  title VARCHAR,
  body VARCHAR,
  color VARCHAR  -- Field partisi dari tabel partisi Hologres.
)
with (
  'connector' = 'hologres',
  'dbname' = '<yourDatabase>',
  'tablename' = 'test_message_src2',  -- Tabel partisi.
  'username' = '<yourUserName>',
  'password' = '<yourPassword>',
  'endpoint' = '<yourEndpoint>',
  'binlog' = 'true',
  'partition-binlog.mode' = 'STATIC', -- Mengonsumsi partisi tetap.
  'partition-values-to-read' = 'red,blue,green',  -- Hanya mengonsumsi tiga partisi yang dikonfigurasi. Partisi 'black' tidak dikonsumsi. Partisi baru tidak akan dikonsumsi. Jika tidak diatur, semua partisi dari tabel induk dikonsumsi.
  'binlogstartUpMode' = 'initial',  -- Pertama membaca semua data historis, lalu mengonsumsi log biner secara inkremental.
  'sdkMode' = 'jdbc_fixed' -- Gunakan mode ini untuk menghindari batas koneksi.
);

Mengonsumsi log biner tabel partisi logis

Konektor Hologres mendukung konsumsi log biner tabel partisi logis dan dapat mengonsumsi partisi tertentu melalui parameter.

Catatan

  • Hanya VVR 11.0.0 dan seterusnya serta instans Hologres versi V3.1 atau seterusnya yang mendukung konsumsi log biner partisi tertentu dalam tabel partisi logis.

  • Mengonsumsi log biner semua partisi dalam tabel partisi logis sama dengan mengonsumsi log biner tabel Hologres biasa. Untuk informasi lebih lanjut, lihat Contoh tabel sumber.

Contoh

Nama parameter

Deskripsi

Contoh

source.binlog.logical-partition-filter-column-names

Nama kolom kunci partisi untuk partisi tertentu yang log binernya akan dikonsumsi dari tabel partisi logis. Nama kolom kunci partisi harus diapit tanda kutip ganda ("). Pisahkan beberapa kolom kunci partisi dengan koma (,). Jika nama kolom berisi tanda kutip ganda, escape dengan tanda kutip ganda lainnya.

'source.binlog.logical-partition-filter-column-names'='"Pt","id"'

Ada dua kolom kunci partisi: Pt dan id.

source.binlog.logical-partition-filter-column-values

Nilai kolom kunci partisi untuk partisi tertentu yang log binernya akan dikonsumsi dari tabel partisi logis. Setiap partisi dapat ditentukan oleh nilai beberapa kolom kunci partisi. Pisahkan nilai kolom kunci partisi dengan koma (,). Apit nilai dengan tanda kutip ganda ("). Jika nilai berisi tanda kutip ganda, escape dengan tanda kutip ganda lainnya. Pisahkan beberapa partisi dengan titik koma (;).

'source.binlog.logical-partition-filter-column-values'='"20240910","0";"special""value","9"'

Menentukan bahwa dua partisi dikonsumsi. Ada dua kolom kunci partisi. Nilai untuk partisi pertama adalah (20240910, 0), dan nilai untuk partisi kedua adalah (special"value, 9).

Asumsikan tabel berikut telah dibuat di Hologres.

CREATE TABLE holo_table (
    id int not null,
    name text,
    age numeric(18,4),
    "Pt" text,
    primary key(id, "Pt")
)
LOGICAL PARTITION BY LIST ("Pt", id)
WITH (
    binlog_level ='replica'
);

Anda dapat mengonsumsi log biner tabel ini di Flink.

CREATE TEMPORARY TABLE test_src_binlog_table(
  id INTEGER,
  name VARCHAR,
  age decimal(18,4),
  `Pt` VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='holo_table',
  'username'='<yourAccessID>',
  'password'='<yourAccessSecret>',
  'endpoint'='<yourEndpoint>',
  'source.binlog'='true',
  'source.binlog.logical-partition-filter-column-names'='"Pt","id"',
  'source.binlog.logical-partition-filter-column-values'='<yourPartitionColumnValues>',
  'source.binlog.change-log-mode'='ALL',  --Baca semua jenis ChangeLog, termasuk INSERT, DELETE, UPDATE_BEFORE, dan UPDATE_AFTER.
  'retry-count'='10',                     --Jumlah percobaan ulang setelah terjadi kesalahan pembacaan log biner.
  'retry-sleep-step-ms'='5000',           --Waktu tunggu bertahap untuk percobaan ulang. Percobaan ulang pertama menunggu 5 detik, yang kedua 10 detik, dan seterusnya.
  'source.binlog.batch-size'='512'        --Jumlah baris data yang dibaca dari log biner dalam setiap batch.
);

API DataStream

Penting

Untuk membaca dan menulis data menggunakan API DataStream, Anda harus menggunakan konektor DataStream yang sesuai untuk terhubung ke Realtime Compute for Apache Flink. Untuk informasi lebih lanjut tentang cara mengatur konektor DataStream, lihat Cara menggunakan konektor DataStream. Konektor DataStream Hologres tersedia di repositori pusat Maven. Untuk debugging lokal, Anda harus menggunakan Uber JAR yang sesuai. Untuk informasi lebih lanjut, lihat Jalankan dan debug pekerjaan yang berisi konektor secara lokal.

Tabel sumber Hologres

Tabel sumber Binlog

VVR menyediakan kelas implementasi HologresBinlogSource dari Source untuk membaca data log biner Hologres. Kode berikut memberikan contoh cara membangun sumber log biner Hologres.

VVR 8.0.11+

public class Sample {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // Inisialisasi skema tabel yang akan dibaca. Field harus sesuai dengan skema tabel Hologres. Anda hanya dapat mendefinisikan subset field.
        TableSchema schema = TableSchema.builder()
                .field("a", DataTypes.INT())
                .field("b", DataTypes.STRING())
                .field("c", DataTypes.TIMESTAMP())
                .build();
        // Parameter terkait Hologres.
        Configuration config = new Configuration();
        config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
        config.setString(HologresConfigs.USERNAME, "yourUserName");
        config.setString(HologresConfigs.PASSWORD, "yourPassword");
        config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
        config.setString(HologresConfigs.TABLE, "yourTableName");
        config.setString(HologresConfigs.SDK_MODE, "jdbc");
        config.setBoolean(HologresBinlogConfigs.OPTIONAL_BINLOG, true);
        config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
        // Bangun JDBCOptions.
        JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
        // Bangun HologresBinlogSource.
        long startTimeMs = 0;
        HologresBinlogSource source = new HologresBinlogSource(
                new HologresConnectionParam(config),
                schema,
                config,
                jdbcOptions,
                startTimeMs,
                StartupMode.INITIAL,
                "",
                "",
                -1,
                Collections.emptySet(),
                new ArrayList<>()
        );
        env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
        env.execute();
    }
}

VVR 8.0.7+

public class Sample {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // Inisialisasi skema tabel yang akan dibaca. Field harus sesuai dengan skema tabel Hologres. Anda hanya dapat mendefinisikan subset field.
        TableSchema schema = TableSchema.builder()
                .field("a", DataTypes.INT())
                .field("b", DataTypes.STRING())
                .field("c", DataTypes.TIMESTAMP())
                .build();
        // Parameter terkait Hologres.
        Configuration config = new Configuration();
        config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
        config.setString(HologresConfigs.USERNAME, "yourUserName");
        config.setString(HologresConfigs.PASSWORD, "yourPassword");
        config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
        config.setString(HologresConfigs.TABLE, "yourTableName");
        config.setString(HologresConfigs.SDK_MODE, "jdbc");
        config.setBoolean(HologresBinlogConfigs.OPTIONAL_BINLOG, true);
        config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
        // Bangun JDBCOptions.
        JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
        // Bangun HologresBinlogSource.
        long startTimeMs = 0;
        HologresBinlogSource source = new HologresBinlogSource(
                new HologresConnectionParam(config),
                schema,
                config,
                jdbcOptions,
                startTimeMs,
                StartupMode.INITIAL,
                "",
                "",
                -1,
                Collections.emptySet()
        );
        env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
        env.execute();
    }
}

VVR 6.0.7+

public class Sample {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // Inisialisasi skema tabel yang akan dibaca. Field harus sesuai dengan skema tabel Hologres. Anda hanya dapat mendefinisikan subset field.
        TableSchema schema = TableSchema.builder()
                .field("a", DataTypes.INT())
                .build();
         // Parameter terkait Hologres.
        Configuration config = new Configuration();
        config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
        config.setString(HologresConfigs.USERNAME, "yourUserName");
        config.setString(HologresConfigs.PASSWORD, "yourPassword");
        config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
        config.setString(HologresConfigs.TABLE, "yourTableName");
        config.setString(HologresConfigs.SDK_MODE, "jdbc");
        config.setBoolean(HologresBinlogConfigs.OPTIONAL_BINLOG, true);
        config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
        // Bangun JDBCOptions.
        JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
        // Atur atau buat nama slot default.
        config.setString(HologresBinlogConfigs.JDBC_BINLOG_SLOT_NAME, HoloBinlogUtil.getOrCreateDefaultSlotForJDBCBinlog(jdbcOptions));

        boolean cdcMode = config.get(HologresBinlogConfigs.BINLOG_CDC_MODE) && config.get(HologresBinlogConfigs.OPTIONAL_BINLOG);
        // Bangun Konverter Catatan Binlog.
        JDBCBinlogRecordConverter recordConverter = new JDBCBinlogRecordConverter(
                jdbcOptions.getTable(),
                schema,
                new HologresConnectionParam(config),
                cdcMode,
                Collections.emptySet());
        
        // Bangun HologresBinlogSource.
        long startTimeMs = 0;
        HologresJDBCBinlogSource source = new HologresJDBCBinlogSource(
                new HologresConnectionParam(config),
                schema,
                config,
                jdbcOptions,
                startTimeMs,
                StartupMode.TIMESTAMP,
                recordConverter,
                "",
                -1);
        env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
        env.execute();
    }
}
Penting

Untuk versi VVR sebelum 8.0.5 atau versi Hologres sebelum 2.1, Anda harus memeriksa apakah Anda adalah superuser atau memiliki izin Replication Role. Untuk informasi lebih lanjut, lihat Masalah izin Hologres.

Tabel sumber Non-Binlog

VVR menyediakan kelas implementasi HologresBulkreadInputFormat dari RichInputFormat untuk membaca data tabel Hologres. Kode berikut memberikan contoh cara membangun sumber Hologres untuk membaca data tabel.

public class Sample {
    public static void main(String[] args) throws Exception {
        // siapkan API DataStream Java
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // Inisialisasi skema tabel yang akan dibaca. Field harus sesuai dengan skema tabel Hologres. Anda hanya dapat mendefinisikan subset field.
        TableSchema schema = TableSchema.builder()
                .field("a", DataTypes.INT())
                .field("b", DataTypes.STRING())
                .field("c", DataTypes.TIMESTAMP())
                .build();
        // Parameter terkait Hologres.
        Configuration config = new Configuration();
        config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
        config.setString(HologresConfigs.USERNAME, "yourUserName");
        config.setString(HologresConfigs.PASSWORD, "yourPassword");
        config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
        config.setString(HologresConfigs.TABLE, "yourTableName");
        // Bangun JDBCOptions.
        JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
        HologresBulkreadInputFormat inputFormat = new HologresBulkreadInputFormat(
                new HologresConnectionParam(config),
                jdbcOptions,
                schema,
                "",
                -1);
        TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
        env.addSource(new InputFormatSourceFunction<>(inputFormat, typeInfo)).returns(typeInfo).print();
        env.execute();
    }
}

Dependensi Maven

Konektor DataStream Hologres tersedia di repositori pusat Maven.

<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-hologres</artifactId>
    <version>${vvr-version}</version>
</dependency>

Tabel sink Hologres

VVR menyediakan kelas implementasi HologresSinkFunction dari OutputFormatSinkFunction untuk menulis data. Kode berikut memberikan contoh cara membangun sink Hologres.

public class Sample {
    public static void main(String[] args) throws Exception {
        // siapkan API DataStream Java
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // Inisialisasi skema tabel yang akan dibaca. Field harus sesuai dengan skema tabel Hologres. Anda hanya dapat mendefinisikan subset field.
        TableSchema schema = TableSchema.builder()
                .field("a", DataTypes.INT())
                .field("b", DataTypes.STRING())
                .build();
        // Parameter terkait Hologres.
        Configuration config = new Configuration();
        config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
        config.setString(HologresConfigs.USERNAME, "yourUserName");
        config.setString(HologresConfigs.PASSWORD, "yourPassword");
        config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
        config.setString(HologresConfigs.TABLE, "yourTableName");
        config.setString(HologresConfigs.SDK_MODE, "jdbc");
        HologresConnectionParam hologresConnectionParam = new HologresConnectionParam(config);
        
         // Bangun penulis Hologres untuk menulis data dalam format RowData.
        AbstractHologresWriter<RowData> hologresWriter = HologresJDBCWriter.createRowDataWriter(
                hologresConnectionParam, 
                schema, 
                HologresTableSchema.get(hologresConnectionParam), 
                new Integer[0]);
        // Bangun sink Hologres.
        HologresSinkFunction sinkFunction = new HologresSinkFunction(hologresConnectionParam, hologresWriter);
        TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
        env.fromElements((RowData) GenericRowData.of(101, StringData.fromString("name"))).returns(typeInfo).addSink(sinkFunction);
        env.execute();
    }
}

Kolom metadata

Tabel sumber log biner di VVR 8.0.11 dan seterusnya mendukung kolom metadata. Mulai dari versi ini, kami menyarankan Anda mendeklarasikan field log biner seperti hg_binlog_event_type sebagai kolom metadata. Kolom metadata adalah ekstensi dari standar SQL. Anda dapat menggunakan kolom metadata untuk mengakses informasi tertentu, seperti nama database, nama tabel, jenis perubahan data, dan waktu pembuatan tabel sumber. Anda dapat menyesuaikan logika pemrosesan berdasarkan informasi ini, seperti menyaring data jenis perubahan DELETE.

Nama bidang

Tipe field

Deskripsi

db_name

STRING NOT NULL

Nama database yang berisi baris tersebut.

table_name

STRING NOT NULL

Nama tabel yang berisi baris tersebut.

hg_binlog_lsn

BIGINT NOT NULL

Field sistem log biner yang menunjukkan nomor urutan log biner. Nomor ini meningkat secara monoton tetapi tidak kontinu dalam satu shard. Keunikan dan urutan tidak dijamin di berbagai shard berbeda.

hg_binlog_timestamp_us

BIGINT NOT NULL

Timestamp perubahan untuk catatan ini di database, dalam mikrodetik (us).

hg_binlog_event_type

BIGINT NOT NULL

Jenis perubahan catatan ini. Nilai yang valid:

  • 5: menunjukkan pesan INSERT.

  • 2: menunjukkan pesan DELETE.

  • 3: menunjukkan pesan UPDATE_BEFORE.

  • 7: menunjukkan pesan UPDATE_AFTER.

hg_shard_id

INT NOT NULL

Shard data tempat data berada. Untuk informasi lebih lanjut tentang konsep dasar shard, lihat Grup Tabel dan Shard.

Dalam pernyataan DDL, Anda dapat mendeklarasikan kolom metadata menggunakan <meta_column_name> <datatype> METADATA VIRTUAL. Kode berikut memberikan contoh:

CREATE TABLE test_message_src_binlog_table(
  hg_binlog_lsn bigint METADATA VIRTUAL
  hg_binlog_event_type bigint METADATA VIRTUAL
  hg_binlog_timestamp_us bigint METADATA VIRTUAL
  hg_shard_id int METADATA VIRTUAL
  db_name string METADATA VIRTUAL
  table_name string METADATA VIRTUAL
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  ...
  );

FAQ

Referensi