All Products
Search
Document Center

Realtime Compute for Apache Flink:ApsaraDB for HBase

Last Updated:Mar 27, 2026

Gunakan konektor ApsaraDB for HBase untuk membaca dari ApsaraDB for HBase sebagai tabel dimensi atau menulis ke dalamnya sebagai tabel sink pada pekerjaan streaming.

Jenis tabel yang didukung: Tabel dimensi · Tabel sink Mode eksekusi: Streaming API: SQL Pembaruan atau penghapusan data pada tabel sink: Didukung Metrik sink: numBytesOut, numBytesOutPerSecond, numRecordsOut, numRecordsOutPerSecond, currentSendTime

Untuk informasi selengkapnya mengenai metrik sink, lihat Metrics.

Catatan penggunaan

Sebelum menggunakan konektor ini, pastikan tipe instans database Anda:

  • Konektor ini hanya untuk instans ApsaraDB for HBase. Menggunakannya dengan tipe instans lain dapat menyebabkan masalah tak terduga.

  • Instans Lindorm kompatibel dengan Apache HBase. Gunakan konektor Lindorm untuk instans Lindorm.

  • Menggunakan konektor ini untuk menghubungkan Realtime Compute for Apache Flink ke database HBase open source tidak menjamin validitas data.

Prasyarat

Sebelum memulai, pastikan Anda telah:

Sintaksis DDL

CREATE TABLE hbase_table (
  rowkey INT,
  family1 ROW<q1 INT>,
  family2 ROW<q2 STRING, q3 BIGINT>,
  family3 ROW<q4 DOUBLE, q5 BOOLEAN, q6 STRING>
) WITH (
  'connector' = 'cloudhbase',
  'table-name' = '<yourTableName>',
  'zookeeper.quorum' = '<yourZookeeperQuorum>'
);

Aturan skema:

  • Deklarasikan setiap keluarga kolom sebagai ROW<column_name type, ...>. Nama bidang dipetakan ke nama keluarga kolom.

  • Setiap bidang di dalam ROW dipetakan ke kolom dalam keluarga kolom tersebut. Misalnya, q2 dan q3 adalah kolom dalam family2.

  • Sertakan tepat satu bidang bertipe atomik (seperti INT atau STRING). Bidang ini merupakan kunci baris (row key).

  • Kunci baris harus menjadi primary key tabel sink. Jika tidak ada primary key yang didefinisikan, kunci baris berfungsi sebagai primary key.

  • Deklarasikan hanya keluarga kolom dan kolom yang benar-benar digunakan oleh pekerjaan Anda.

Contoh penggunaan

Contoh tabel dimensi

CREATE TEMPORARY TABLE datagen_source (
  a INT,
  b BIGINT,
  c STRING,
  `proc_time` AS PROCTIME()
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE hbase_dim (
  rowkey INT,
  family1 ROW<col1 INT>,
  family2 ROW<col1 STRING, col2 BIGINT>,
  family3 ROW<col1 DOUBLE, col2 BOOLEAN, col3 STRING>
) WITH (
  'connector' = 'cloudhbase',
  'table-name' = '<yourTableName>',
  'zookeeper.quorum' = '<yourZookeeperQuorum>'
);

CREATE TEMPORARY TABLE blackhole_sink (
  a INT,
  f1c1 INT,
  f3c3 STRING
) WITH (
  'connector' = 'blackhole'
);

INSERT INTO blackhole_sink
SELECT a, family1.col1 AS f1c1, family3.col3 AS f3c3
FROM datagen_source
JOIN hbase_dim FOR SYSTEM_TIME AS OF datagen_source.`proc_time` AS h
ON datagen_source.a = h.rowkey;

Contoh tabel sink

CREATE TEMPORARY TABLE datagen_source (
  rowkey INT,
  f1q1 INT,
  f2q1 STRING,
  f2q2 BIGINT,
  f3q1 DOUBLE,
  f3q2 BOOLEAN,
  f3q3 STRING
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE hbase_sink (
  rowkey INT,
  family1 ROW<q1 INT>,
  family2 ROW<q1 STRING, q2 BIGINT>,
  family3 ROW<q1 DOUBLE, q2 BOOLEAN, q3 STRING>,
  PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
  'connector' = 'cloudhbase',
  'table-name' = '<yourTableName>',
  'zookeeper.quorum' = '<yourZookeeperQuorum>'
);

INSERT INTO hbase_sink
SELECT rowkey, ROW(f1q1), ROW(f2q1, f2q2), ROW(f3q1, f3q2, f3q3)
FROM datagen_source;

Sink kolom dinamis

Ketika dynamic.table diatur ke true, setiap keluarga kolom harus mendeklarasikan tepat dua bidang: yang pertama merepresentasikan nama kolom dinamis, dan yang kedua merepresentasikan nilainya.

CREATE TEMPORARY TABLE datagen_source (
  id INT,
  f1hour STRING,
  f1deal BIGINT,
  f2day STRING,
  f2deal BIGINT
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE hbase_sink (
  rowkey INT,
  f1 ROW<`hour` STRING, deal BIGINT>,
  f2 ROW<`day` STRING, deal BIGINT>
) WITH (
  'connector' = 'cloudhbase',
  'table-name' = '<yourTableName>',
  'zookeeper.quorum' = '<yourZookeeperQuorum>',
  'dynamic.table' = 'true'
);

INSERT INTO hbase_sink
SELECT id, ROW(f1hour, f1deal), ROW(f2day, f2deal)
FROM datagen_source;

Sebagai contoh, jika baris sumber memiliki id=1, f1hour='10', f1deal=100, f2day='2020-7-26', dan f2deal=10000, maka baris HBase hasilnya memiliki rowkey=1, f1:10=100, dan f2:2020-7-26=10000.

Opsi konektor

Opsi umum

Opsi Tipe Wajib Bawaan Deskripsi
connector String Ya Diatur ke cloudhbase.
table-name String Ya Nama tabel ApsaraDB for HBase.
zookeeper.znode.quorum String Ya String koneksi ZooKeeper untuk kluster ApsaraDB for HBase.
zookeeper.znode.parent String Tidak /hbase Direktori root ApsaraDB for HBase di ZooKeeper. Hanya berlaku untuk Edisi Standar.
userName String Tidak Username untuk akses database. Hanya berlaku untuk Edisi Performance-enhanced.
password String Tidak Password untuk akses database. Hanya berlaku untuk Edisi Performance-enhanced.
haclient.cluster.id String Tidak ID kluster untuk mode high availability (HA). Wajib untuk kluster zone-disaster recovery. Hanya berlaku untuk Edisi Performance-enhanced.
retires.number Integer Tidak 31 Jumlah upaya percobaan koneksi ulang untuk client HBase.
null-string-literal String Tidak null Nilai yang ditulis ke HBase ketika bidang STRING Flink bernilai null.

Opsi sink

Opsi Tipe Wajib Bawaan Deskripsi
sink.buffer-flush.max-size String Tidak 2MB Ukuran maksimum data yang dibuffer sebelum flush. Nilai yang lebih besar meningkatkan throughput tulis tetapi menambah latensi dan penggunaan memori. Satuan: B, KB, MB, atau GB (tidak case-sensitive). Atur ke 0 untuk menonaktifkan buffering.
sink.buffer-flush.max-rows Integer Tidak 1000 Jumlah maksimum catatan yang dibuffer sebelum flush. Nilai yang lebih besar meningkatkan throughput tulis tetapi menambah latensi dan penggunaan memori. Atur ke 0 untuk menonaktifkan buffering.
sink.buffer-flush.interval Duration Tidak 1s Interval flush untuk data yang dibuffer. Mengontrol latensi tulis. Satuan: ms, s, min, h, atau d. Atur ke 0 untuk menonaktifkan flushing periodik.
sink.sync-write Boolean Tidak true Mode penulisan. true: sinkron — data ditulis secara berurutan tetapi dengan throughput lebih rendah. false: asinkron — throughput lebih tinggi, tetapi urutan penulisan tidak dijamin.
sink.buffer-flush.batch-rows Integer Tidak 100 Jumlah catatan yang dibatch dalam mode tulis sinkron. Nilai yang lebih besar meningkatkan throughput tetapi menambah latensi dan penggunaan memori. Hanya berlaku ketika sink.sync-write bernilai true.
dynamic.table Boolean Tidak false Mengaktifkan mode kolom dinamis. Ketika diatur ke true, setiap baris keluarga kolom harus mendeklarasikan tepat dua bidang (nama kolom dan nilai).
sink.ignore-delete Boolean Tidak false Menentukan apakah akan mengabaikan event DELETE dan UPDATE_BEFORE dari upstream. Atur ke true ketika beberapa tugas sink memperbarui bidang berbeda dari baris yang sama secara konkuren, untuk mencegah inkonsistensi data akibat penghapusan konkuren dan pembaruan parsial. Ketika true, hanya catatan INSERT dan UPDATE_AFTER yang diproses.
sink.ignore-null Boolean Tidak false Menentukan apakah akan melewatkan penulisan nilai bidang null. Ketika diatur ke true, null-string-literal tidak berpengaruh. Memerlukan Ververica Runtime (VVR) 8.0.9 atau lebih baru.

Opsi tabel dimensi

Opsi Tipe Wajib Bawaan Deskripsi
cache String Tidak ALL Kebijakan cache. Nilai yang valid: None, LRU, ALL. Lihat deskripsi kebijakan cache di bawah ini.
cacheSize Long Tidak 10000 Jumlah maksimum baris yang dicache. Hanya berlaku ketika cache diatur ke LRU.
cacheTTLMs Long Tidak Perilaku kedaluwarsa cache bergantung pada pengaturan cache: LRU — timeout entri cache dalam milidetik (tidak kedaluwarsa secara default); ALL — interval muat ulang cache dalam milidetik (tidak dimuat ulang secara default); None — tidak berlaku.
cacheEmpty Boolean Tidak true Menentukan apakah akan mencache hasil lookup kosong.
cacheReloadTimeBlackList String Tidak Periode waktu saat muat ulang cache ditangguhkan. Hanya berlaku ketika cache diatur ke ALL. Format: 2017-10-24 14:00 -> 2017-10-24 15:00, 2017-11-10 23:30 -> 2017-11-11 08:00. Pisahkan beberapa periode dengan koma; gunakan -> untuk memisahkan awal dan akhir setiap periode.
cacheScanLimit Integer Tidak 100 Jumlah baris yang dikembalikan per panggilan prosedur remote (RPC) saat memuat seluruh tabel dimensi. Hanya berlaku ketika cache diatur ke ALL.

Kebijakan cache:

  • None — Tanpa caching. Setiap lookup langsung mengakses ApsaraDB for HBase.

  • LRU — Mencache subset baris. Saat terjadi cache miss, sistem langsung mengakses ApsaraDB for HBase. Konfigurasikan cacheSize dan cacheTTLMs saat menggunakan kebijakan ini.

  • ALL (default) — Memuat seluruh tabel dimensi ke dalam cache sebelum pekerjaan dimulai. Semua lookup selanjutnya mengenai cache. Jika kunci tidak ditemukan di cache, artinya kunci tersebut tidak ada di tabel sumber. Cache dimuat ulang setelah kedaluwarsa. Konfigurasikan cacheTTLMs dan opsional cacheReloadTimeBlackList saat menggunakan kebijakan ini.

    Ketika cache diatur ke ALL, node yang melakukan join memerlukan memori tambahan — kira-kira dua kali ukuran tabel dimensi — karena sistem memuat semua data secara asinkron sebelum pekerjaan dimulai. Hal ini dapat memperlambat startup pekerjaan.

Pemetaan tipe data

Semua nilai Flink diserialisasi ke dan dideserialisasi dari array byte HBase menggunakan org.apache.hadoop.hbase.util.Bytes. Dua kasus khusus berlaku:

  • Bidang non-STRING yang membaca array byte kosong didekode sebagai null.

  • Bidang STRING yang membaca array byte yang sesuai dengan null-string-literal didekode sebagai null.

Tipe Flink SQL Konversi HBase
CHAR / VARCHAR / STRING Disimpan sebagai array byte string UTF-8 menggunakan toBytes(String s) / toString(byte[] b).
BOOLEAN toBytes(boolean b) / toBoolean(byte[] b)
BINARY / VARBINARY Disimpan sebagai byte[] mentah.
DECIMAL toBytes(BigDecimal v) / toBigDecimal(byte[] b)
TINYINT new byte[] { val } / bytes[0]
SMALLINT toBytes(short val) / toShort(byte[] bytes)
INT toBytes(int val) / toInt(byte[] bytes)
BIGINT toBytes(long val) / toLong(byte[] bytes)
FLOAT toBytes(float val) / toFloat(byte[] bytes)
DOUBLE toBytes(double val) / toDouble(byte[] bytes)
DATE Disimpan sebagai jumlah hari sejak 1970-01-01, diserialisasi sebagai INT.
TIME Disimpan sebagai jumlah milidetik sejak 00:00:00, diserialisasi sebagai INT.
TIMESTAMP Disimpan sebagai jumlah milidetik sejak 00:00:00 pada 1970-01-01, diserialisasi sebagai LONG.