全部产品
Search
文档中心

Realtime Compute for Apache Flink:Konektor ApsaraDB untuk HBase

更新时间:Nov 10, 2025

Topik ini menjelaskan cara menggunakan Konektor ApsaraDB for HBase.

Informasi latar belakang

ApsaraDB for HBase adalah layanan NoSQL berbasis cloud yang hemat biaya dan cerdas. Layanan ini menyediakan skalabilitas tinggi serta kompatibel dengan HBase open source. ApsaraDB for HBase menawarkan manfaat seperti penyimpanan rendah biaya, throughput tinggi, penskalaan otomatis, dan pemrosesan data cerdas. ApsaraDB for HBase mendukung layanan inti Alibaba seperti rekomendasi Taobao, pengendalian risiko Ant Credit Pay, periklanan, dasbor data, pelacakan logistik Cainiao, catatan transaksi Alipay, dan pesan Mobile Taobao. Sebagai layanan terkelola sepenuhnya, ApsaraDB for HBase menyediakan kemampuan tingkat perusahaan seperti pemrosesan petabyte data, konkurensi tinggi, penskalaan cepat dalam hitungan detik, latensi respons rendah dalam milidetik, ketersediaan tinggi lintas pusat data, dan distribusi global.

Tabel berikut menggambarkan kemampuan yang didukung oleh Konektor ApsaraDB for HBase.

Item

Deskripsi

Jenis tabel

Tabel dimensi dan tabel sink

Mode operasi

Mode streaming

Format data

Tidak tersedia

Metrik

监控指标

  • Source

    Tidak ada

  • Tabel dimensi

    Tidak ada

  • Sink

    numBytesOut, numBytesOutPerSecond, numRecordsOut, numRecordsOutPerSecond, dan currentSendTime

    Catatan

    Untuk informasi lebih lanjut mengenai metrik, lihat Metrik.

Jenis API

API SQL

Pembaruan atau penghapusan data di tabel sink

Didukung

Prasyarat

  • Sebuah kluster ApsaraDB for HBase telah dibeli dan tabel ApsaraDB for HBase telah dibuat. Untuk informasi lebih lanjut tentang cara membeli kluster ApsaraDB for HBase, lihat Beli Kluster.

  • Daftar putih telah dikonfigurasi untuk kluster ApsaraDB for HBase. Untuk informasi lebih lanjut, lihat Konfigurasikan Daftar Putih.

Catatan penggunaan

Sebelum menggunakan Konektor ApsaraDB for HBase, konfirmasikan jenis instance database Anda dan pastikan bahwa jenis konektor yang dipilih benar. Penggunaan konektor yang tidak tepat dapat menyebabkan masalah tak terduga.

  • Konektor ApsaraDB for HBase yang dijelaskan dalam topik ini digunakan untuk instance ApsaraDB for HBase.

  • Instance Lindorm kompatibel dengan Apache HBase. Gunakan Konektor Lindorm untuk instance Lindorm. Untuk informasi lebih lanjut, lihat Konektor Lindorm.

  • Jika Anda menggunakan Konektor ApsaraDB for HBase untuk menghubungkan Realtime Compute for Apache Flink ke database HBase open source, validitas data tidak dapat dijamin.

Sintaksis

CREATE TABLE hbase_table(
  rowkey INT,
  family1 ROW<q1 INT>,
  family2 ROW<q2 STRING, q3 BIGINT>,
  family3 ROW<q4 DOUBLE, q5 BOOLEAN, q6 STRING>
) WITH (
  'connector'='cloudhbase',
  'table-name'='<yourTableName>',
  'zookeeper.quorum'='<yourZookeeperQuorum>'
);
  • Keluarga kolom tabel ApsaraDB for HBase harus dinyatakan sebagai tipe ROW. Setiap nama keluarga kolom adalah nama bidang dari baris. Dalam sintaksis DDL, keluarga kolom berikut dinyatakan: family1, family2, dan family3.

  • Sebuah kolom dalam keluarga kolom sesuai dengan sebuah bidang dalam baris. Nama kolom adalah nama bidang. Dalam sintaksis DDL, kolom q2 dan q3 dinyatakan dalam keluarga kolom family2.

  • Selain bidang tipe ROW, hanya satu bidang tipe atomik seperti STRING dan BIGINT yang dapat ada dalam tabel ApsaraDB for HBase. Bidang tipe atomik dianggap sebagai kunci baris tabel, seperti rowkey dalam pernyataan DDL.

  • Kunci baris tabel ApsaraDB for HBase harus didefinisikan sebagai kunci utama tabel sink. Jika tidak ada kunci utama yang didefinisikan, kunci baris digunakan sebagai kunci utama.

  • Anda hanya perlu mendeklarasikan keluarga kolom dan kolom yang diperlukan dari tabel ApsaraDB for HBase di tabel sink.

Opsi konektor dalam klausa WITH

  • Umum

    Opsi

    Deskripsi

    Tipe data

    Diperlukan

    Nilai default

    Catatan

    connector

    Jenis tabel.

    String

    Ya

    Tidak ada nilai default

    Atur nilainya menjadi cloudhbase.

    table-name

    Nama tabel ApsaraDB for HBase.

    String

    Ya

    Tidak ada nilai default

    Tidak tersedia.

    zookeeper.znode.quorum

    URL yang digunakan untuk mengakses layanan ZooKeeper ApsaraDB for HBase.

    String

    Ya

    Tidak ada nilai default

    Tidak tersedia.

    zookeeper.znode.parent

    Direktori root ApsaraDB for HBase dalam layanan ZooKeeper.

    String

    Tidak

    /hbase

    Parameter ini hanya berlaku di Edisi Standar ApsaraDB for HBase.

    userName

    Nama pengguna yang digunakan untuk mengakses database.

    String

    Tidak

    Tidak ada nilai default

    Parameter ini hanya berlaku di Edisi Performa-ditingkatkan ApsaraDB for HBase.

    password

    Kata sandi yang digunakan untuk mengakses database.

    String

    Tidak

    Tidak ada nilai default

    Parameter ini hanya berlaku di Edisi Performa-ditingkatkan ApsaraDB for HBase.

    haclient.cluster.id

    ID kluster ApsaraDB for HBase dalam mode high availability (HA).

    String

    Tidak

    Tidak ada nilai default

    Parameter ini hanya diperlukan saat Anda mengakses kluster pemulihan bencana zona. Parameter ini hanya berlaku di Edisi Performa-ditingkatkan ApsaraDB for HBase.

    retires.number

    Jumlah percobaan ulang yang diizinkan untuk klien ApsaraDB for HBase untuk terhubung ke database ApsaraDB for HBase.

    Integer

    Tidak

    31

    Tidak tersedia.

    null-string-literal

    Jika tipe data bidang ApsaraDB for HBase adalah STRING dan data bidang Realtime Compute for Apache Flink adalah null, null-string-literal ditetapkan ke bidang ApsaraDB for HBase dan ditulis ke database ApsaraDB for HBase.

    String

    Tidak

    null

    Tidak tersedia.

  • Spesifik Sink

    Opsi

    Deskripsi

    Tipe data

    Diperlukan

    Nilai default

    Catatan

    sink.buffer-flush.max-size

    Ukuran data dalam byte yang disimpan di memori sebelum data ditulis ke database ApsaraDB for HBase. Nilai parameter ini yang lebih besar meningkatkan kinerja penulisan ApsaraDB for HBase tetapi memperpanjang latensi penulisan dan mengonsumsi lebih banyak memori.

    String

    Tidak

    2MB

    Unit: B, KB, MB, atau GB. Unit tidak peka huruf besar/kecil. Jika parameter ini diatur ke 0, tidak ada data yang disimpan.

    sink.buffer-flush.max-rows

    Jumlah rekaman data yang disimpan di memori sebelum data ditulis ke database ApsaraDB for HBase. Nilai parameter ini yang lebih besar meningkatkan kinerja penulisan ApsaraDB for HBase tetapi memperpanjang latensi penulisan dan mengonsumsi lebih banyak memori.

    Integer

    Tidak

    1000

    Jika parameter ini diatur ke 0, tidak ada data yang disimpan.

    sink.buffer-flush.interval

    Interval di mana data yang disimpan ditulis ke database ApsaraDB for HBase. Parameter ini mengontrol latensi penulisan data ke database ApsaraDB for HBase.

    Duration

    Tidak

    1s

    Unit: ms, s, min, h, atau d. Jika parameter ini diatur ke 0, penulisan data periodik dinonaktifkan.

    dynamic.table

    Menentukan apakah akan menggunakan tabel ApsaraDB for HBase yang mendukung kolom dinamis.

    Boolean

    Tidak

    false

    Nilai valid:

    • true

    • false

    sink.ignore-delete

    Menentukan apakah akan mengabaikan pesan retraksi.

    Boolean

    Tidak

    false

    Jika aliran berisi event DELETE atau UPDATE_BEFORE, dan beberapa tugas sink memperbarui bidang berbeda dari suatu tabel secara konkuren, ketidakkonsistenan data dapat terjadi.

    Misalnya, setelah suatu catatan dihapus, tugas lain memperbarui beberapa bidang. Bidang yang tidak diperbarui kemudian akan menjadi null atau diisi dengan nilai default, menyebabkan kesalahan data.

    Untuk mencegah masalah ini, atur sink.ignore-delete menjadi true untuk mengabaikan event DELETE dan UPDATE_BEFORE dari hulu.

    Catatan
    • UPDATE_BEFORE merupakan bagian dari mekanisme penarikan kembali Flink dan digunakan untuk menarik kembali nilai lama selama operasi pembaruan.

    • Jika ignoreDelete diatur ke true, semua event DELETE dan UPDATE_BEFORE dilewati. Hanya catatan INSERT dan UPDATE_AFTER yang diproses.

    sink.sync-write

    Menentukan apakah akan menulis data ke ApsaraDB for HBase dalam mode sinkron.

    Boolean

    Tidak

    true

    Nilai valid:

    • true: Data ditulis dalam mode sinkron. Dalam mode ini, data ditulis secara berurutan tetapi kinerja penulisan dikompromikan.

    • false: Data ditulis dalam mode asinkron. Dalam mode ini, data mungkin tidak ditulis secara berurutan tetapi kinerja penulisan ditingkatkan.

    sink.buffer-flush.batch-rows

    Jumlah rekaman data yang disimpan di memori saat data ditulis ke ApsaraDB for HBase dalam mode sinkron. Nilai yang lebih besar meningkatkan kinerja penulisan ApsaraDB for HBase, tetapi meningkatkan latensi penulisan dan penggunaan memori.

    Integer

    Tidak

    100

    Parameter ini hanya berlaku saat parameter sink.sync-write diatur ke true.

    sink.ignore-null

    Menentukan apakah akan mengabaikan nilai null.

    Boolean

    Tidak

    false

    Catatan
    • Jika parameter ini diatur ke true, parameter null-string-literal tidak berlaku.

    • Hanya Realtime Compute for Apache Flink yang menggunakan VVR 8.0.9 atau lebih baru yang mendukung parameter ini.

  • Opsi Spesifik Dimensi (Terkait Cache)

    Opsi

    Deskripsi

    Tipe data

    Diperlukan

    Nilai default

    Catatan

    cache

    Kebijakan cache.

    String

    Tidak

    ALL

    Nilai valid:

    • None: Tidak ada data yang disimpan.

    • LRU: Hanya datatertentu dalam tabel dimensi yang disimpan. Setiap kali sistem menerima rekaman data, sistem mencari cache. Jika sistem tidak menemukan rekaman di cache, sistem mencari rekaman data di tabel dimensi fisik.

      Catatan

      Jika kebijakan cache ini digunakan, Anda harus mengonfigurasi parameter cacheSize dan cacheTTLMs.

    • ALL: Semua data dalam tabel dimensi disimpan. Ini adalah nilai default. Sebelum pekerjaan berjalan, sistem memuat semua data dalam tabel dimensi ke cache. Dengan cara ini, cache dicari untuk semua kueri selanjutnya dalam tabel dimensi. Jika sistem tidak menemukan rekaman data di cache, kunci penggabungan tidak ada. Sistem memuat ulang semua data dalam cache setelah entri cache kedaluwarsa.

      Catatan
      • Jika jumlah data dalam tabel jarak jauh kecil dan sejumlah besar kunci yang hilang ada, kami sarankan Anda mengatur parameter ini ke ALL. Tabel sumber dan tabel dimensi tidak dapat dikaitkan berdasarkan klausa ON. Jika Anda menggunakan kebijakan cache ini, Anda harus mengonfigurasi parameter cacheTTLMs dan cacheReloadTimeBlackList.

      • Jika semua data dalam tabel dimensi dimuat ke cache, kecepatan startup penerapan mungkin melambat. Anda dapat mengonfigurasi kebijakan cache secara fleksibel berdasarkan kebutuhan bisnis Anda.

    Jika Anda mengatur parameter cache ke ALL, Anda harus meningkatkan memori node untuk bergabung dengan tabel karena sistem memuat data dari tabel dimensi secara asinkron. Ukuran memori yang ditingkatkan adalah dua kali lipat dari tabel jarak jauh.

    cacheSize

    Jumlah maksimum baris data yang dapat disimpan dalam cache.

    Long

    Tidak

    10000

    Anda dapat mengonfigurasi parameter ini saat Anda mengatur parameter cache ke LRU.

    cacheTTLMs

    Periode waktu habis cache. Unit: milidetik.

    Long

    Tidak

    Tidak ada nilai default

    Konfigurasi parameter cacheTTLMs bervariasi berdasarkan parameter cache.

    • Jika Anda mengatur parameter cache ke None, parameter cacheTTLMs dapat dibiarkan kosong. Ini menunjukkan bahwa entri cache tidak kedaluwarsa.

    • Jika Anda mengatur parameter cache ke LRU, parameter cacheTTLMs menentukan periode waktu habis cache. Secara default, entri cache tidak kedaluwarsa.

    • Jika Anda mengatur parameter cache ke ALL, parameter cacheTTLMs menentukan interval di mana sistem memuat ulang cache. Secara default, cache tidak dimuat ulang.

    cacheEmpty

    Menentukan apakah akan menyimpan hasil kosong.

    Boolean

    Tidak

    true

    Tidak tersedia.

    cacheReloadTimeBlackList

    Periode waktu di mana cache tidak diperbarui. Parameter ini berlaku saat parameter cache diatur ke ALL. Cache tidak diperbarui selama periode waktu yang Anda tentukan untuk parameter ini. Parameter ini cocok untuk acara promosi online berskala besar seperti Double 11.

    String

    Tidak

    Tidak ada nilai default

    Contoh berikut menunjukkan format nilai: 2017-10-24 14:00 -> 2017-10-24 15:00, 2017-11-10 23:30 -> 2017-11-11 08:00. Gunakan pemisah berdasarkan aturan berikut:

    • Pisahkan beberapa periode waktu dengan koma (,).

    • Pisahkan waktu mulai dan waktu akhir setiap periode waktu dengan panah (->) yang merupakan kombinasi tanda hubung (-) dan tanda kurung penutup (>).

    cacheScanLimit

    Jumlah baris yang dikembalikan oleh server prosedur panggilan jarak jauh (RPC) ke klien ketika server membaca data penuh dari tabel dimensi ApsaraDB for HBase.

    Integer

    Tidak

    100

    Parameter ini hanya tersedia saat Anda mengatur parameter cache ke ALL.

Pemetaan tipe data

Nilai dari tipe data Realtime Compute for Apache Flink dikonversi menjadi larik byte menggunakan org.apache.hadoop.hbase.util.Bytes dalam tabel ApsaraDB for HBase. Proses dekode bervariasi berdasarkan skenario berikut:

  • Jika tipe data Realtime Compute for Apache Flink adalah tipe non-STRING dan nilai dalam tabel ApsaraDB for HBase adalah larik byte kosong, nilai tersebut didekode sebagai null.

  • Jika tipe data Realtime Compute for Apache Flink adalah tipe STRING dan nilai dalam tabel dimensi ApsaraDB for HBase adalah larik byte yang ditentukan oleh null-string-literal, nilai tersebut didekode sebagai null.

Tipe SQL Flink

Fungsi yang digunakan untuk mengonversi nilai menjadi byte untuk ApsaraDB for HBase

Fungsi yang digunakan untuk membaca byte dari ApsaraDB for HBase

CHAR

byte[] toBytes(String s)

String toString(byte[] b)

VARCHAR

STRING

BOOLEAN

byte[] toBytes(boolean b)

boolean toBoolean(byte[] b)

BINARY

byte[]

byte[]

VARBINARY

DECIMAL

byte[] toBytes(BigDecimal v)

BigDecimal toBigDecimal(byte[] b)

TINYINT

new byte[] { val }

bytes[0]

SMALLINT

byte[] toBytes(short val)

short toShort(byte[] bytes)

INT

byte[] toBytes(int val)

int toInt(byte[] bytes)

BIGINT

byte[] toBytes(long val)

long toLong(byte[] bytes)

FLOAT

byte[] toBytes(float val)

float toFloat(byte[] bytes)

DOUBLE

byte[] toBytes(double val)

double toDouble(byte[] bytes)

DATE

Mengonversi tanggal menjadi nilai INT yang mewakili jumlah hari sejak 1 Januari 1970 dan kemudian menjadi larik byte menggunakan byte[] toBytes(int val).

Mengonversi larik byte database ApsaraDB for HBase menjadi tipe data INT menggunakan int toInt(byte[] bytes). Nilai tipe data INT mewakili jumlah hari sejak 1 Januari 1970.

TIME

Mengonversi waktu menjadi nilai INT yang mewakili jumlah milidetik sejak 00:00:00 dan kemudian menjadi larik byte menggunakan byte[] toBytes(int val).

Mengonversi larik byte database ApsaraDB for HBase menjadi tipe data INT menggunakan int toInt(byte[] bytes). Nilai tipe data INT mewakili jumlah milidetik sejak 00:00:00.

TIMESTAMP

Mengonversi timestamp menjadi nilai LONG yang mewakili jumlah milidetik sejak 00:00:00 pada 1 Januari 1970 dan kemudian menjadi larik byte menggunakan byte[] toBytes(long val).

Mengonversi larik byte database ApsaraDB for HBase menjadi tipe data LONG menggunakan long toLong(byte[] bytes). Nilai tipe data LONG mewakili jumlah milidetik sejak 00:00:00 pada 1 Januari 1970.

Contoh kode

  • Contoh kode untuk tabel dimensi

    CREATE TEMPORARY TABLE datagen_source (
      a INT,
      b BIGINT,
      c STRING,
      `proc_time` AS PROCTIME()
    ) WITH (
      'connector'='datagen'
    );
    
    CREATE TEMPORARY TABLE hbase_dim (
      rowkey INT,
      family1 ROW<col1 INT>,
      family2 ROW<col1 STRING, col2 BIGINT>,
      family3 ROW<col1 DOUBLE, col2 BOOLEAN, col3 STRING>
    ) WITH (
      'connector' = 'cloudhbase',
      'table-name' = '<yourTableName>',
      'zookeeper.quorum' = '<yourZookeeperQuorum>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
      a INT,
      f1c1 INT,
      f3c3 STRING
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink
         SELECT a, family1.col1 as f1c1,  family3.col3 as f3c3 FROM datagen_source
    JOIN hbase_dim FOR SYSTEM_TIME AS OF datagen_source.`proc_time` as h ON datagen_source.a = h.rowkey;
  • Contoh kode untuk tabel sink

    CREATE TEMPORARY TABLE datagen_source (
      rowkey INT,
      f1q1 INT,
      f2q1 STRING,
      f2q2 BIGINT,
      f3q1 DOUBLE,
      f3q2 BOOLEAN,
      f3q3 STRING
    ) WITH (
      'connector'='datagen'
    );
    
    CREATE TEMPORARY TABLE hbase_sink (
      rowkey INT,
      family1 ROW<q1 INT>,
      family2 ROW<q1 STRING, q2 BIGINT>,
      family3 ROW<q1 DOUBLE, q2 BOOLEAN, q3 STRING>,
      PRIMARY KEY (rowkey) NOT ENFORCED
    ) WITH (
      'connector'='cloudhbase',
      'table-name'='<yourTableName>',
      'zookeeper.quorum'='<yourZookeeperQuorum>'
    );
     
    INSERT INTO hbase_sink
    SELECT rowkey, ROW(f1q1), ROW(f2q1, f2q2), ROW(f3q1, f3q2, f3q3) FROM datagen_source;
  • Contoh kode untuk tabel sink yang mendukung kolom dinamis

    CREATE TEMPORARY TABLE datagen_source (
      id INT,
      f1hour STRING,
      f1deal BIGINT,
      f2day STRING,
      f2deal BIGINT
    ) WITH (
      'connector'='datagen'
    );
    
    CREATE TEMPORARY TABLE hbase_sink (
      rowkey INT,
      f1 ROW<`hour` STRING, deal BIGINT>,
      f2 ROW<`day` STRING, deal BIGINT>
    ) WITH (
      'connector'='cloudhbase',
      'table-name'='<yourTableName>',
      'zookeeper.quorum'='<yourZookeeperQuorum>',
      'dynamic.table'='true'
    );
    
    INSERT INTO hbase_sink
    SELECT id, ROW(f1hour, f1deal), ROW(f2day, f2deal) FROM datagen_source;
    • Jika dynamic.table diatur ke true, tabel ApsaraDB for HBase yang mendukung kolom dinamis digunakan.

    • Dua bidang harus dinyatakan dalam baris yang sesuai dengan setiap keluarga kolom. Nilai bidang pertama menunjukkan kolom dinamis, dan nilai bidang kedua menunjukkan nilai kolom dinamis.

    • Sebagai contoh, tabel datagen_source berisi satu baris data. Baris data tersebut menunjukkan bahwa ID komoditas adalah 1, jumlah transaksi komoditas antara pukul 10:00 dan 11:00 adalah 100, dan jumlah transaksi komoditas pada 26 Juli 2020 adalah 10000. Dalam hal ini, sebuah baris dengan rowkey 1 dimasukkan ke dalam tabel ApsaraDB for HBase. f1:10 adalah 100, dan f2:2020-7-26 adalah 10000.