Topik ini menjelaskan cara menggunakan Konektor ApsaraDB for HBase.
Informasi latar belakang
ApsaraDB for HBase adalah layanan NoSQL berbasis cloud yang hemat biaya dan cerdas. Layanan ini menyediakan skalabilitas tinggi serta kompatibel dengan HBase open source. ApsaraDB for HBase menawarkan manfaat seperti penyimpanan rendah biaya, throughput tinggi, penskalaan otomatis, dan pemrosesan data cerdas. ApsaraDB for HBase mendukung layanan inti Alibaba seperti rekomendasi Taobao, pengendalian risiko Ant Credit Pay, periklanan, dasbor data, pelacakan logistik Cainiao, catatan transaksi Alipay, dan pesan Mobile Taobao. Sebagai layanan terkelola sepenuhnya, ApsaraDB for HBase menyediakan kemampuan tingkat perusahaan seperti pemrosesan petabyte data, konkurensi tinggi, penskalaan cepat dalam hitungan detik, latensi respons rendah dalam milidetik, ketersediaan tinggi lintas pusat data, dan distribusi global.
Tabel berikut menggambarkan kemampuan yang didukung oleh Konektor ApsaraDB for HBase.
Item | Deskripsi |
Jenis tabel | Tabel dimensi dan tabel sink |
Mode operasi | Mode streaming |
Format data | Tidak tersedia |
Metrik | |
Jenis API | API SQL |
Pembaruan atau penghapusan data di tabel sink | Didukung |
Prasyarat
Sebuah kluster ApsaraDB for HBase telah dibeli dan tabel ApsaraDB for HBase telah dibuat. Untuk informasi lebih lanjut tentang cara membeli kluster ApsaraDB for HBase, lihat Beli Kluster.
Daftar putih telah dikonfigurasi untuk kluster ApsaraDB for HBase. Untuk informasi lebih lanjut, lihat Konfigurasikan Daftar Putih.
Catatan penggunaan
Sebelum menggunakan Konektor ApsaraDB for HBase, konfirmasikan jenis instance database Anda dan pastikan bahwa jenis konektor yang dipilih benar. Penggunaan konektor yang tidak tepat dapat menyebabkan masalah tak terduga.
Konektor ApsaraDB for HBase yang dijelaskan dalam topik ini digunakan untuk instance ApsaraDB for HBase.
Instance Lindorm kompatibel dengan Apache HBase. Gunakan Konektor Lindorm untuk instance Lindorm. Untuk informasi lebih lanjut, lihat Konektor Lindorm.
Jika Anda menggunakan Konektor ApsaraDB for HBase untuk menghubungkan Realtime Compute for Apache Flink ke database HBase open source, validitas data tidak dapat dijamin.
Sintaksis
CREATE TABLE hbase_table(
rowkey INT,
family1 ROW<q1 INT>,
family2 ROW<q2 STRING, q3 BIGINT>,
family3 ROW<q4 DOUBLE, q5 BOOLEAN, q6 STRING>
) WITH (
'connector'='cloudhbase',
'table-name'='<yourTableName>',
'zookeeper.quorum'='<yourZookeeperQuorum>'
);Keluarga kolom tabel ApsaraDB for HBase harus dinyatakan sebagai tipe ROW. Setiap nama keluarga kolom adalah nama bidang dari baris. Dalam sintaksis DDL, keluarga kolom berikut dinyatakan: family1, family2, dan family3.
Sebuah kolom dalam keluarga kolom sesuai dengan sebuah bidang dalam baris. Nama kolom adalah nama bidang. Dalam sintaksis DDL, kolom q2 dan q3 dinyatakan dalam keluarga kolom family2.
Selain bidang tipe ROW, hanya satu bidang tipe atomik seperti STRING dan BIGINT yang dapat ada dalam tabel ApsaraDB for HBase. Bidang tipe atomik dianggap sebagai kunci baris tabel, seperti rowkey dalam pernyataan DDL.
Kunci baris tabel ApsaraDB for HBase harus didefinisikan sebagai kunci utama tabel sink. Jika tidak ada kunci utama yang didefinisikan, kunci baris digunakan sebagai kunci utama.
Anda hanya perlu mendeklarasikan keluarga kolom dan kolom yang diperlukan dari tabel ApsaraDB for HBase di tabel sink.
Opsi konektor dalam klausa WITH
Umum
Opsi
Deskripsi
Tipe data
Diperlukan
Nilai default
Catatan
connector
Jenis tabel.
String
Ya
Tidak ada nilai default
Atur nilainya menjadi
cloudhbase.table-name
Nama tabel ApsaraDB for HBase.
String
Ya
Tidak ada nilai default
Tidak tersedia.
zookeeper.znode.quorum
URL yang digunakan untuk mengakses layanan ZooKeeper ApsaraDB for HBase.
String
Ya
Tidak ada nilai default
Tidak tersedia.
zookeeper.znode.parent
Direktori root ApsaraDB for HBase dalam layanan ZooKeeper.
String
Tidak
/hbaseParameter ini hanya berlaku di Edisi Standar ApsaraDB for HBase.
userName
Nama pengguna yang digunakan untuk mengakses database.
String
Tidak
Tidak ada nilai default
Parameter ini hanya berlaku di Edisi Performa-ditingkatkan ApsaraDB for HBase.
password
Kata sandi yang digunakan untuk mengakses database.
String
Tidak
Tidak ada nilai default
Parameter ini hanya berlaku di Edisi Performa-ditingkatkan ApsaraDB for HBase.
haclient.cluster.id
ID kluster ApsaraDB for HBase dalam mode high availability (HA).
String
Tidak
Tidak ada nilai default
Parameter ini hanya diperlukan saat Anda mengakses kluster pemulihan bencana zona. Parameter ini hanya berlaku di Edisi Performa-ditingkatkan ApsaraDB for HBase.
retires.number
Jumlah percobaan ulang yang diizinkan untuk klien ApsaraDB for HBase untuk terhubung ke database ApsaraDB for HBase.
Integer
Tidak
31
Tidak tersedia.
null-string-literal
Jika tipe data bidang ApsaraDB for HBase adalah STRING dan data bidang Realtime Compute for Apache Flink adalah null,
null-string-literalditetapkan ke bidang ApsaraDB for HBase dan ditulis ke database ApsaraDB for HBase.String
Tidak
null
Tidak tersedia.
Spesifik Sink
Opsi
Deskripsi
Tipe data
Diperlukan
Nilai default
Catatan
sink.buffer-flush.max-size
Ukuran data dalam byte yang disimpan di memori sebelum data ditulis ke database ApsaraDB for HBase. Nilai parameter ini yang lebih besar meningkatkan kinerja penulisan ApsaraDB for HBase tetapi memperpanjang latensi penulisan dan mengonsumsi lebih banyak memori.
String
Tidak
2MB
Unit: B, KB, MB, atau GB. Unit tidak peka huruf besar/kecil. Jika parameter ini diatur ke 0, tidak ada data yang disimpan.
sink.buffer-flush.max-rows
Jumlah rekaman data yang disimpan di memori sebelum data ditulis ke database ApsaraDB for HBase. Nilai parameter ini yang lebih besar meningkatkan kinerja penulisan ApsaraDB for HBase tetapi memperpanjang latensi penulisan dan mengonsumsi lebih banyak memori.
Integer
Tidak
1000
Jika parameter ini diatur ke 0, tidak ada data yang disimpan.
sink.buffer-flush.interval
Interval di mana data yang disimpan ditulis ke database ApsaraDB for HBase. Parameter ini mengontrol latensi penulisan data ke database ApsaraDB for HBase.
Duration
Tidak
1s
Unit: ms, s, min, h, atau d. Jika parameter ini diatur ke 0, penulisan data periodik dinonaktifkan.
dynamic.table
Menentukan apakah akan menggunakan tabel ApsaraDB for HBase yang mendukung kolom dinamis.
Boolean
Tidak
false
Nilai valid:
true
false
sink.ignore-delete
Menentukan apakah akan mengabaikan pesan retraksi.
Boolean
Tidak
false
Jika aliran berisi event
DELETEatauUPDATE_BEFORE, dan beberapa tugas sink memperbarui bidang berbeda dari suatu tabel secara konkuren, ketidakkonsistenan data dapat terjadi.Misalnya, setelah suatu catatan dihapus, tugas lain memperbarui beberapa bidang. Bidang yang tidak diperbarui kemudian akan menjadi null atau diisi dengan nilai default, menyebabkan kesalahan data.
Untuk mencegah masalah ini, atur
sink.ignore-deletemenjaditrueuntuk mengabaikan eventDELETEdanUPDATE_BEFOREdari hulu.CatatanUPDATE_BEFOREmerupakan bagian dari mekanisme penarikan kembali Flink dan digunakan untuk menarik kembali nilai lama selama operasi pembaruan.Jika
ignoreDeletediatur ketrue, semua eventDELETEdanUPDATE_BEFOREdilewati. Hanya catatanINSERTdanUPDATE_AFTERyang diproses.
sink.sync-write
Menentukan apakah akan menulis data ke ApsaraDB for HBase dalam mode sinkron.
Boolean
Tidak
true
Nilai valid:
true: Data ditulis dalam mode sinkron. Dalam mode ini, data ditulis secara berurutan tetapi kinerja penulisan dikompromikan.
false: Data ditulis dalam mode asinkron. Dalam mode ini, data mungkin tidak ditulis secara berurutan tetapi kinerja penulisan ditingkatkan.
sink.buffer-flush.batch-rows
Jumlah rekaman data yang disimpan di memori saat data ditulis ke ApsaraDB for HBase dalam mode sinkron. Nilai yang lebih besar meningkatkan kinerja penulisan ApsaraDB for HBase, tetapi meningkatkan latensi penulisan dan penggunaan memori.
Integer
Tidak
100
Parameter ini hanya berlaku saat parameter sink.sync-write diatur ke true.
sink.ignore-null
Menentukan apakah akan mengabaikan nilai null.
Boolean
Tidak
false
CatatanJika parameter ini diatur ke true, parameter
null-string-literaltidak berlaku.Hanya Realtime Compute for Apache Flink yang menggunakan VVR 8.0.9 atau lebih baru yang mendukung parameter ini.
Opsi Spesifik Dimensi (Terkait Cache)
Opsi
Deskripsi
Tipe data
Diperlukan
Nilai default
Catatan
cache
Kebijakan cache.
String
Tidak
ALL
Nilai valid:
None: Tidak ada data yang disimpan.
LRU: Hanya datatertentu dalam tabel dimensi yang disimpan. Setiap kali sistem menerima rekaman data, sistem mencari cache. Jika sistem tidak menemukan rekaman di cache, sistem mencari rekaman data di tabel dimensi fisik.
CatatanJika kebijakan cache ini digunakan, Anda harus mengonfigurasi parameter cacheSize dan cacheTTLMs.
ALL: Semua data dalam tabel dimensi disimpan. Ini adalah nilai default. Sebelum pekerjaan berjalan, sistem memuat semua data dalam tabel dimensi ke cache. Dengan cara ini, cache dicari untuk semua kueri selanjutnya dalam tabel dimensi. Jika sistem tidak menemukan rekaman data di cache, kunci penggabungan tidak ada. Sistem memuat ulang semua data dalam cache setelah entri cache kedaluwarsa.
CatatanJika jumlah data dalam tabel jarak jauh kecil dan sejumlah besar kunci yang hilang ada, kami sarankan Anda mengatur parameter ini ke ALL. Tabel sumber dan tabel dimensi tidak dapat dikaitkan berdasarkan klausa ON. Jika Anda menggunakan kebijakan cache ini, Anda harus mengonfigurasi parameter cacheTTLMs dan cacheReloadTimeBlackList.
Jika semua data dalam tabel dimensi dimuat ke cache, kecepatan startup penerapan mungkin melambat. Anda dapat mengonfigurasi kebijakan cache secara fleksibel berdasarkan kebutuhan bisnis Anda.
Jika Anda mengatur parameter cache ke ALL, Anda harus meningkatkan memori node untuk bergabung dengan tabel karena sistem memuat data dari tabel dimensi secara asinkron. Ukuran memori yang ditingkatkan adalah dua kali lipat dari tabel jarak jauh.
cacheSize
Jumlah maksimum baris data yang dapat disimpan dalam cache.
Long
Tidak
10000
Anda dapat mengonfigurasi parameter ini saat Anda mengatur parameter cache ke LRU.
cacheTTLMs
Periode waktu habis cache. Unit: milidetik.
Long
Tidak
Tidak ada nilai default
Konfigurasi parameter cacheTTLMs bervariasi berdasarkan parameter cache.
Jika Anda mengatur parameter cache ke None, parameter cacheTTLMs dapat dibiarkan kosong. Ini menunjukkan bahwa entri cache tidak kedaluwarsa.
Jika Anda mengatur parameter cache ke LRU, parameter cacheTTLMs menentukan periode waktu habis cache. Secara default, entri cache tidak kedaluwarsa.
Jika Anda mengatur parameter cache ke ALL, parameter cacheTTLMs menentukan interval di mana sistem memuat ulang cache. Secara default, cache tidak dimuat ulang.
cacheEmpty
Menentukan apakah akan menyimpan hasil kosong.
Boolean
Tidak
true
Tidak tersedia.
cacheReloadTimeBlackList
Periode waktu di mana cache tidak diperbarui. Parameter ini berlaku saat parameter cache diatur ke ALL. Cache tidak diperbarui selama periode waktu yang Anda tentukan untuk parameter ini. Parameter ini cocok untuk acara promosi online berskala besar seperti Double 11.
String
Tidak
Tidak ada nilai default
Contoh berikut menunjukkan format nilai: 2017-10-24 14:00 -> 2017-10-24 15:00, 2017-11-10 23:30 -> 2017-11-11 08:00. Gunakan pemisah berdasarkan aturan berikut:
Pisahkan beberapa periode waktu dengan koma (,).
Pisahkan waktu mulai dan waktu akhir setiap periode waktu dengan panah (->) yang merupakan kombinasi tanda hubung (-) dan tanda kurung penutup (>).
cacheScanLimit
Jumlah baris yang dikembalikan oleh server prosedur panggilan jarak jauh (RPC) ke klien ketika server membaca data penuh dari tabel dimensi ApsaraDB for HBase.
Integer
Tidak
100
Parameter ini hanya tersedia saat Anda mengatur parameter cache ke ALL.
Pemetaan tipe data
Nilai dari tipe data Realtime Compute for Apache Flink dikonversi menjadi larik byte menggunakan org.apache.hadoop.hbase.util.Bytes dalam tabel ApsaraDB for HBase. Proses dekode bervariasi berdasarkan skenario berikut:
Jika tipe data Realtime Compute for Apache Flink adalah tipe non-STRING dan nilai dalam tabel ApsaraDB for HBase adalah larik byte kosong, nilai tersebut didekode sebagai null.
Jika tipe data Realtime Compute for Apache Flink adalah tipe STRING dan nilai dalam tabel dimensi ApsaraDB for HBase adalah larik byte yang ditentukan oleh
null-string-literal, nilai tersebut didekode sebagai null.
Tipe SQL Flink | Fungsi yang digunakan untuk mengonversi nilai menjadi byte untuk ApsaraDB for HBase | Fungsi yang digunakan untuk membaca byte dari ApsaraDB for HBase |
CHAR | byte[] toBytes(String s) | String toString(byte[] b) |
VARCHAR | ||
STRING | ||
BOOLEAN | byte[] toBytes(boolean b) | boolean toBoolean(byte[] b) |
BINARY | byte[] | byte[] |
VARBINARY | ||
DECIMAL | byte[] toBytes(BigDecimal v) | BigDecimal toBigDecimal(byte[] b) |
TINYINT | new byte[] { val } | bytes[0] |
SMALLINT | byte[] toBytes(short val) | short toShort(byte[] bytes) |
INT | byte[] toBytes(int val) | int toInt(byte[] bytes) |
BIGINT | byte[] toBytes(long val) | long toLong(byte[] bytes) |
FLOAT | byte[] toBytes(float val) | float toFloat(byte[] bytes) |
DOUBLE | byte[] toBytes(double val) | double toDouble(byte[] bytes) |
DATE | Mengonversi tanggal menjadi nilai INT yang mewakili jumlah hari sejak 1 Januari 1970 dan kemudian menjadi larik byte menggunakan | Mengonversi larik byte database ApsaraDB for HBase menjadi tipe data INT menggunakan |
TIME | Mengonversi waktu menjadi nilai INT yang mewakili jumlah milidetik sejak 00:00:00 dan kemudian menjadi larik byte menggunakan | Mengonversi larik byte database ApsaraDB for HBase menjadi tipe data INT menggunakan |
TIMESTAMP | Mengonversi timestamp menjadi nilai LONG yang mewakili jumlah milidetik sejak 00:00:00 pada 1 Januari 1970 dan kemudian menjadi larik byte menggunakan | Mengonversi larik byte database ApsaraDB for HBase menjadi tipe data LONG menggunakan |
Contoh kode
Contoh kode untuk tabel dimensi
CREATE TEMPORARY TABLE datagen_source ( a INT, b BIGINT, c STRING, `proc_time` AS PROCTIME() ) WITH ( 'connector'='datagen' ); CREATE TEMPORARY TABLE hbase_dim ( rowkey INT, family1 ROW<col1 INT>, family2 ROW<col1 STRING, col2 BIGINT>, family3 ROW<col1 DOUBLE, col2 BOOLEAN, col3 STRING> ) WITH ( 'connector' = 'cloudhbase', 'table-name' = '<yourTableName>', 'zookeeper.quorum' = '<yourZookeeperQuorum>' ); CREATE TEMPORARY TABLE blackhole_sink( a INT, f1c1 INT, f3c3 STRING ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT a, family1.col1 as f1c1, family3.col3 as f3c3 FROM datagen_source JOIN hbase_dim FOR SYSTEM_TIME AS OF datagen_source.`proc_time` as h ON datagen_source.a = h.rowkey;Contoh kode untuk tabel sink
CREATE TEMPORARY TABLE datagen_source ( rowkey INT, f1q1 INT, f2q1 STRING, f2q2 BIGINT, f3q1 DOUBLE, f3q2 BOOLEAN, f3q3 STRING ) WITH ( 'connector'='datagen' ); CREATE TEMPORARY TABLE hbase_sink ( rowkey INT, family1 ROW<q1 INT>, family2 ROW<q1 STRING, q2 BIGINT>, family3 ROW<q1 DOUBLE, q2 BOOLEAN, q3 STRING>, PRIMARY KEY (rowkey) NOT ENFORCED ) WITH ( 'connector'='cloudhbase', 'table-name'='<yourTableName>', 'zookeeper.quorum'='<yourZookeeperQuorum>' ); INSERT INTO hbase_sink SELECT rowkey, ROW(f1q1), ROW(f2q1, f2q2), ROW(f3q1, f3q2, f3q3) FROM datagen_source;Contoh kode untuk tabel sink yang mendukung kolom dinamis
CREATE TEMPORARY TABLE datagen_source ( id INT, f1hour STRING, f1deal BIGINT, f2day STRING, f2deal BIGINT ) WITH ( 'connector'='datagen' ); CREATE TEMPORARY TABLE hbase_sink ( rowkey INT, f1 ROW<`hour` STRING, deal BIGINT>, f2 ROW<`day` STRING, deal BIGINT> ) WITH ( 'connector'='cloudhbase', 'table-name'='<yourTableName>', 'zookeeper.quorum'='<yourZookeeperQuorum>', 'dynamic.table'='true' ); INSERT INTO hbase_sink SELECT id, ROW(f1hour, f1deal), ROW(f2day, f2deal) FROM datagen_source;Jika dynamic.table diatur ke true, tabel ApsaraDB for HBase yang mendukung kolom dinamis digunakan.
Dua bidang harus dinyatakan dalam baris yang sesuai dengan setiap keluarga kolom. Nilai bidang pertama menunjukkan kolom dinamis, dan nilai bidang kedua menunjukkan nilai kolom dinamis.
Sebagai contoh, tabel datagen_source berisi satu baris data. Baris data tersebut menunjukkan bahwa ID komoditas adalah 1, jumlah transaksi komoditas antara pukul 10:00 dan 11:00 adalah 100, dan jumlah transaksi komoditas pada 26 Juli 2020 adalah 10000. Dalam hal ini, sebuah baris dengan rowkey 1 dimasukkan ke dalam tabel ApsaraDB for HBase. f1:10 adalah 100, dan f2:2020-7-26 adalah 10000.