Gunakan konektor ApsaraDB for HBase untuk membaca dari ApsaraDB for HBase sebagai tabel dimensi atau menulis ke dalamnya sebagai tabel sink pada pekerjaan streaming.
Jenis tabel yang didukung: Tabel dimensi · Tabel sink Mode eksekusi: Streaming API: SQL Pembaruan atau penghapusan data pada tabel sink: Didukung Metrik sink: numBytesOut, numBytesOutPerSecond, numRecordsOut, numRecordsOutPerSecond, currentSendTime
Untuk informasi selengkapnya mengenai metrik sink, lihat Metrics.
Catatan penggunaan
Sebelum menggunakan konektor ini, pastikan tipe instans database Anda:
-
Konektor ini hanya untuk instans ApsaraDB for HBase. Menggunakannya dengan tipe instans lain dapat menyebabkan masalah tak terduga.
-
Instans Lindorm kompatibel dengan Apache HBase. Gunakan konektor Lindorm untuk instans Lindorm.
-
Menggunakan konektor ini untuk menghubungkan Realtime Compute for Apache Flink ke database HBase open source tidak menjamin validitas data.
Prasyarat
Sebelum memulai, pastikan Anda telah:
-
Membeli kluster ApsaraDB for HBase dan membuat tabel ApsaraDB for HBase. Lihat Purchase a cluster
-
Mengonfigurasi daftar putih untuk kluster ApsaraDB for HBase. Lihat Configure a whitelist
Sintaksis DDL
CREATE TABLE hbase_table (
rowkey INT,
family1 ROW<q1 INT>,
family2 ROW<q2 STRING, q3 BIGINT>,
family3 ROW<q4 DOUBLE, q5 BOOLEAN, q6 STRING>
) WITH (
'connector' = 'cloudhbase',
'table-name' = '<yourTableName>',
'zookeeper.quorum' = '<yourZookeeperQuorum>'
);
Aturan skema:
-
Deklarasikan setiap keluarga kolom sebagai
ROW<column_name type, ...>. Nama bidang dipetakan ke nama keluarga kolom. -
Setiap bidang di dalam
ROWdipetakan ke kolom dalam keluarga kolom tersebut. Misalnya,q2danq3adalah kolom dalamfamily2. -
Sertakan tepat satu bidang bertipe atomik (seperti
INTatauSTRING). Bidang ini merupakan kunci baris (row key). -
Kunci baris harus menjadi primary key tabel sink. Jika tidak ada primary key yang didefinisikan, kunci baris berfungsi sebagai primary key.
-
Deklarasikan hanya keluarga kolom dan kolom yang benar-benar digunakan oleh pekerjaan Anda.
Contoh penggunaan
Contoh tabel dimensi
CREATE TEMPORARY TABLE datagen_source (
a INT,
b BIGINT,
c STRING,
`proc_time` AS PROCTIME()
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE hbase_dim (
rowkey INT,
family1 ROW<col1 INT>,
family2 ROW<col1 STRING, col2 BIGINT>,
family3 ROW<col1 DOUBLE, col2 BOOLEAN, col3 STRING>
) WITH (
'connector' = 'cloudhbase',
'table-name' = '<yourTableName>',
'zookeeper.quorum' = '<yourZookeeperQuorum>'
);
CREATE TEMPORARY TABLE blackhole_sink (
a INT,
f1c1 INT,
f3c3 STRING
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink
SELECT a, family1.col1 AS f1c1, family3.col3 AS f3c3
FROM datagen_source
JOIN hbase_dim FOR SYSTEM_TIME AS OF datagen_source.`proc_time` AS h
ON datagen_source.a = h.rowkey;
Contoh tabel sink
CREATE TEMPORARY TABLE datagen_source (
rowkey INT,
f1q1 INT,
f2q1 STRING,
f2q2 BIGINT,
f3q1 DOUBLE,
f3q2 BOOLEAN,
f3q3 STRING
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE hbase_sink (
rowkey INT,
family1 ROW<q1 INT>,
family2 ROW<q1 STRING, q2 BIGINT>,
family3 ROW<q1 DOUBLE, q2 BOOLEAN, q3 STRING>,
PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
'connector' = 'cloudhbase',
'table-name' = '<yourTableName>',
'zookeeper.quorum' = '<yourZookeeperQuorum>'
);
INSERT INTO hbase_sink
SELECT rowkey, ROW(f1q1), ROW(f2q1, f2q2), ROW(f3q1, f3q2, f3q3)
FROM datagen_source;
Sink kolom dinamis
Ketika dynamic.table diatur ke true, setiap keluarga kolom harus mendeklarasikan tepat dua bidang: yang pertama merepresentasikan nama kolom dinamis, dan yang kedua merepresentasikan nilainya.
CREATE TEMPORARY TABLE datagen_source (
id INT,
f1hour STRING,
f1deal BIGINT,
f2day STRING,
f2deal BIGINT
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE hbase_sink (
rowkey INT,
f1 ROW<`hour` STRING, deal BIGINT>,
f2 ROW<`day` STRING, deal BIGINT>
) WITH (
'connector' = 'cloudhbase',
'table-name' = '<yourTableName>',
'zookeeper.quorum' = '<yourZookeeperQuorum>',
'dynamic.table' = 'true'
);
INSERT INTO hbase_sink
SELECT id, ROW(f1hour, f1deal), ROW(f2day, f2deal)
FROM datagen_source;
Sebagai contoh, jika baris sumber memiliki id=1, f1hour='10', f1deal=100, f2day='2020-7-26', dan f2deal=10000, maka baris HBase hasilnya memiliki rowkey=1, f1:10=100, dan f2:2020-7-26=10000.
Opsi konektor
Opsi umum
| Opsi | Tipe | Wajib | Bawaan | Deskripsi |
|---|---|---|---|---|
connector |
String | Ya | — | Diatur ke cloudhbase. |
table-name |
String | Ya | — | Nama tabel ApsaraDB for HBase. |
zookeeper.znode.quorum |
String | Ya | — | String koneksi ZooKeeper untuk kluster ApsaraDB for HBase. |
zookeeper.znode.parent |
String | Tidak | /hbase |
Direktori root ApsaraDB for HBase di ZooKeeper. Hanya berlaku untuk Edisi Standar. |
userName |
String | Tidak | — | Username untuk akses database. Hanya berlaku untuk Edisi Performance-enhanced. |
password |
String | Tidak | — | Password untuk akses database. Hanya berlaku untuk Edisi Performance-enhanced. |
haclient.cluster.id |
String | Tidak | — | ID kluster untuk mode high availability (HA). Wajib untuk kluster zone-disaster recovery. Hanya berlaku untuk Edisi Performance-enhanced. |
retires.number |
Integer | Tidak | 31 |
Jumlah upaya percobaan koneksi ulang untuk client HBase. |
null-string-literal |
String | Tidak | null |
Nilai yang ditulis ke HBase ketika bidang STRING Flink bernilai null. |
Opsi sink
| Opsi | Tipe | Wajib | Bawaan | Deskripsi |
|---|---|---|---|---|
sink.buffer-flush.max-size |
String | Tidak | 2MB |
Ukuran maksimum data yang dibuffer sebelum flush. Nilai yang lebih besar meningkatkan throughput tulis tetapi menambah latensi dan penggunaan memori. Satuan: B, KB, MB, atau GB (tidak case-sensitive). Atur ke 0 untuk menonaktifkan buffering. |
sink.buffer-flush.max-rows |
Integer | Tidak | 1000 |
Jumlah maksimum catatan yang dibuffer sebelum flush. Nilai yang lebih besar meningkatkan throughput tulis tetapi menambah latensi dan penggunaan memori. Atur ke 0 untuk menonaktifkan buffering. |
sink.buffer-flush.interval |
Duration | Tidak | 1s |
Interval flush untuk data yang dibuffer. Mengontrol latensi tulis. Satuan: ms, s, min, h, atau d. Atur ke 0 untuk menonaktifkan flushing periodik. |
sink.sync-write |
Boolean | Tidak | true |
Mode penulisan. true: sinkron — data ditulis secara berurutan tetapi dengan throughput lebih rendah. false: asinkron — throughput lebih tinggi, tetapi urutan penulisan tidak dijamin. |
sink.buffer-flush.batch-rows |
Integer | Tidak | 100 |
Jumlah catatan yang dibatch dalam mode tulis sinkron. Nilai yang lebih besar meningkatkan throughput tetapi menambah latensi dan penggunaan memori. Hanya berlaku ketika sink.sync-write bernilai true. |
dynamic.table |
Boolean | Tidak | false |
Mengaktifkan mode kolom dinamis. Ketika diatur ke true, setiap baris keluarga kolom harus mendeklarasikan tepat dua bidang (nama kolom dan nilai). |
sink.ignore-delete |
Boolean | Tidak | false |
Menentukan apakah akan mengabaikan event DELETE dan UPDATE_BEFORE dari upstream. Atur ke true ketika beberapa tugas sink memperbarui bidang berbeda dari baris yang sama secara konkuren, untuk mencegah inkonsistensi data akibat penghapusan konkuren dan pembaruan parsial. Ketika true, hanya catatan INSERT dan UPDATE_AFTER yang diproses. |
sink.ignore-null |
Boolean | Tidak | false |
Menentukan apakah akan melewatkan penulisan nilai bidang null. Ketika diatur ke true, null-string-literal tidak berpengaruh. Memerlukan Ververica Runtime (VVR) 8.0.9 atau lebih baru. |
Opsi tabel dimensi
| Opsi | Tipe | Wajib | Bawaan | Deskripsi |
|---|---|---|---|---|
cache |
String | Tidak | ALL |
Kebijakan cache. Nilai yang valid: None, LRU, ALL. Lihat deskripsi kebijakan cache di bawah ini. |
cacheSize |
Long | Tidak | 10000 |
Jumlah maksimum baris yang dicache. Hanya berlaku ketika cache diatur ke LRU. |
cacheTTLMs |
Long | Tidak | — | Perilaku kedaluwarsa cache bergantung pada pengaturan cache: LRU — timeout entri cache dalam milidetik (tidak kedaluwarsa secara default); ALL — interval muat ulang cache dalam milidetik (tidak dimuat ulang secara default); None — tidak berlaku. |
cacheEmpty |
Boolean | Tidak | true |
Menentukan apakah akan mencache hasil lookup kosong. |
cacheReloadTimeBlackList |
String | Tidak | — | Periode waktu saat muat ulang cache ditangguhkan. Hanya berlaku ketika cache diatur ke ALL. Format: 2017-10-24 14:00 -> 2017-10-24 15:00, 2017-11-10 23:30 -> 2017-11-11 08:00. Pisahkan beberapa periode dengan koma; gunakan -> untuk memisahkan awal dan akhir setiap periode. |
cacheScanLimit |
Integer | Tidak | 100 |
Jumlah baris yang dikembalikan per panggilan prosedur remote (RPC) saat memuat seluruh tabel dimensi. Hanya berlaku ketika cache diatur ke ALL. |
Kebijakan cache:
-
None — Tanpa caching. Setiap lookup langsung mengakses ApsaraDB for HBase.
-
LRU — Mencache subset baris. Saat terjadi cache miss, sistem langsung mengakses ApsaraDB for HBase. Konfigurasikan
cacheSizedancacheTTLMssaat menggunakan kebijakan ini. -
ALL (default) — Memuat seluruh tabel dimensi ke dalam cache sebelum pekerjaan dimulai. Semua lookup selanjutnya mengenai cache. Jika kunci tidak ditemukan di cache, artinya kunci tersebut tidak ada di tabel sumber. Cache dimuat ulang setelah kedaluwarsa. Konfigurasikan
cacheTTLMsdan opsionalcacheReloadTimeBlackListsaat menggunakan kebijakan ini.Ketika
cachediatur keALL, node yang melakukan join memerlukan memori tambahan — kira-kira dua kali ukuran tabel dimensi — karena sistem memuat semua data secara asinkron sebelum pekerjaan dimulai. Hal ini dapat memperlambat startup pekerjaan.
Pemetaan tipe data
Semua nilai Flink diserialisasi ke dan dideserialisasi dari array byte HBase menggunakan org.apache.hadoop.hbase.util.Bytes. Dua kasus khusus berlaku:
-
Bidang non-STRING yang membaca array byte kosong didekode sebagai
null. -
Bidang
STRINGyang membaca array byte yang sesuai dengannull-string-literaldidekode sebagainull.
| Tipe Flink SQL | Konversi HBase |
|---|---|
| CHAR / VARCHAR / STRING | Disimpan sebagai array byte string UTF-8 menggunakan toBytes(String s) / toString(byte[] b). |
| BOOLEAN | toBytes(boolean b) / toBoolean(byte[] b) |
| BINARY / VARBINARY | Disimpan sebagai byte[] mentah. |
| DECIMAL | toBytes(BigDecimal v) / toBigDecimal(byte[] b) |
| TINYINT | new byte[] { val } / bytes[0] |
| SMALLINT | toBytes(short val) / toShort(byte[] bytes) |
| INT | toBytes(int val) / toInt(byte[] bytes) |
| BIGINT | toBytes(long val) / toLong(byte[] bytes) |
| FLOAT | toBytes(float val) / toFloat(byte[] bytes) |
| DOUBLE | toBytes(double val) / toDouble(byte[] bytes) |
| DATE | Disimpan sebagai jumlah hari sejak 1970-01-01, diserialisasi sebagai INT. |
| TIME | Disimpan sebagai jumlah milidetik sejak 00:00:00, diserialisasi sebagai INT. |
| TIMESTAMP | Disimpan sebagai jumlah milidetik sejak 00:00:00 pada 1970-01-01, diserialisasi sebagai LONG. |