All Products
Search
Document Center

Realtime Compute for Apache Flink:ApsaraDB for HBase

Last Updated:Mar 26, 2026

Use the ApsaraDB for HBase connector to read from ApsaraDB for HBase as a dimension table or write to it as a sink table in streaming jobs.

Supported table types: Dimension table · Sink table Running mode: Streaming API: SQL Data update or deletion in a sink table: Supported Sink metrics: numBytesOut, numBytesOutPerSecond, numRecordsOut, numRecordsOutPerSecond, currentSendTime

For more information about sink metrics, see Metrics.

Usage notes

Before you use this connector, confirm the type of your database instance:

  • This connector is for ApsaraDB for HBase instances only. Using it with other instance types can cause unexpected issues.

  • Lindorm instances are compatible with Apache HBase. Use the Lindorm connector for Lindorm instances.

  • Using this connector to connect Realtime Compute for Apache Flink to an open source HBase database does not guarantee data validity.

Prerequisites

Before you begin, ensure that you have:

DDL syntax

CREATE TABLE hbase_table (
  rowkey INT,
  family1 ROW<q1 INT>,
  family2 ROW<q2 STRING, q3 BIGINT>,
  family3 ROW<q4 DOUBLE, q5 BOOLEAN, q6 STRING>
) WITH (
  'connector' = 'cloudhbase',
  'table-name' = '<yourTableName>',
  'zookeeper.quorum' = '<yourZookeeperQuorum>'
);

Schema rules:

  • Declare each column family as ROW<column_name type, ...>. The field name maps to the column family name.

  • Each field inside a ROW maps to a column within that column family. For example, q2 and q3 are columns in family2.

  • Include exactly one atomic-type field (such as INT or STRING). This field is the row key.

  • The row key must be the primary key of a sink table. If no primary key is defined, the row key serves as the primary key.

  • Declare only the column families and columns your job actually uses.

Usage examples

Dimension table example

CREATE TEMPORARY TABLE datagen_source (
  a INT,
  b BIGINT,
  c STRING,
  `proc_time` AS PROCTIME()
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE hbase_dim (
  rowkey INT,
  family1 ROW<col1 INT>,
  family2 ROW<col1 STRING, col2 BIGINT>,
  family3 ROW<col1 DOUBLE, col2 BOOLEAN, col3 STRING>
) WITH (
  'connector' = 'cloudhbase',
  'table-name' = '<yourTableName>',
  'zookeeper.quorum' = '<yourZookeeperQuorum>'
);

CREATE TEMPORARY TABLE blackhole_sink (
  a INT,
  f1c1 INT,
  f3c3 STRING
) WITH (
  'connector' = 'blackhole'
);

INSERT INTO blackhole_sink
SELECT a, family1.col1 AS f1c1, family3.col3 AS f3c3
FROM datagen_source
JOIN hbase_dim FOR SYSTEM_TIME AS OF datagen_source.`proc_time` AS h
ON datagen_source.a = h.rowkey;

Sink table example

CREATE TEMPORARY TABLE datagen_source (
  rowkey INT,
  f1q1 INT,
  f2q1 STRING,
  f2q2 BIGINT,
  f3q1 DOUBLE,
  f3q2 BOOLEAN,
  f3q3 STRING
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE hbase_sink (
  rowkey INT,
  family1 ROW<q1 INT>,
  family2 ROW<q1 STRING, q2 BIGINT>,
  family3 ROW<q1 DOUBLE, q2 BOOLEAN, q3 STRING>,
  PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
  'connector' = 'cloudhbase',
  'table-name' = '<yourTableName>',
  'zookeeper.quorum' = '<yourZookeeperQuorum>'
);

INSERT INTO hbase_sink
SELECT rowkey, ROW(f1q1), ROW(f2q1, f2q2), ROW(f3q1, f3q2, f3q3)
FROM datagen_source;

Dynamic column sink

When dynamic.table is set to true, each column family must declare exactly two fields: the first represents the dynamic column name, and the second represents its value.

CREATE TEMPORARY TABLE datagen_source (
  id INT,
  f1hour STRING,
  f1deal BIGINT,
  f2day STRING,
  f2deal BIGINT
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE hbase_sink (
  rowkey INT,
  f1 ROW<`hour` STRING, deal BIGINT>,
  f2 ROW<`day` STRING, deal BIGINT>
) WITH (
  'connector' = 'cloudhbase',
  'table-name' = '<yourTableName>',
  'zookeeper.quorum' = '<yourZookeeperQuorum>',
  'dynamic.table' = 'true'
);

INSERT INTO hbase_sink
SELECT id, ROW(f1hour, f1deal), ROW(f2day, f2deal)
FROM datagen_source;

For example, if the source row has id=1, f1hour='10', f1deal=100, f2day='2020-7-26', and f2deal=10000, the resulting HBase row has rowkey=1, f1:10=100, and f2:2020-7-26=10000.

Connector options

General options

Option Type Required Default Description
connector String Yes Set to cloudhbase.
table-name String Yes The name of the ApsaraDB for HBase table.
zookeeper.znode.quorum String Yes The ZooKeeper connection string for the ApsaraDB for HBase cluster.
zookeeper.znode.parent String No /hbase The root directory of ApsaraDB for HBase in ZooKeeper. Applies to Standard Edition only.
userName String No The username for database access. Applies to Performance-enhanced Edition only.
password String No The password for database access. Applies to Performance-enhanced Edition only.
haclient.cluster.id String No The cluster ID for high availability (HA) mode. Required for zone-disaster recovery clusters. Applies to Performance-enhanced Edition only.
retires.number Integer No 31 The number of connection retry attempts for the HBase client.
null-string-literal String No null The value written to HBase when a Flink STRING field is null.

Sink options

Option Type Required Default Description
sink.buffer-flush.max-size String No 2MB The maximum data size to buffer before flushing. Larger values improve write throughput but increase latency and memory usage. Units: B, KB, MB, or GB (case-insensitive). Set to 0 to disable buffering.
sink.buffer-flush.max-rows Integer No 1000 The maximum number of records to buffer before flushing. Larger values improve write throughput but increase latency and memory usage. Set to 0 to disable buffering.
sink.buffer-flush.interval Duration No 1s The flush interval for buffered data. Controls write latency. Units: ms, s, min, h, or d. Set to 0 to disable periodic flushing.
sink.sync-write Boolean No true The write mode. true: synchronous — data is written in order but at lower throughput. false: asynchronous — higher throughput, but write order is not guaranteed.
sink.buffer-flush.batch-rows Integer No 100 The number of records to batch in synchronous write mode. Larger values improve throughput but increase latency and memory usage. Takes effect only when sink.sync-write is true.
dynamic.table Boolean No false Enables dynamic column mode. When set to true, each column family row must declare exactly two fields (column name and value).
sink.ignore-delete Boolean No false Specifies whether to ignore DELETE and UPDATE_BEFORE events from upstream. Set to true when multiple sink tasks concurrently update different fields of the same row, to prevent data inconsistency caused by concurrent deletions and partial updates. When true, only INSERT and UPDATE_AFTER records are processed.
sink.ignore-null Boolean No false Specifies whether to skip writing null field values. When set to true, null-string-literal has no effect. Requires Ververica Runtime (VVR) 8.0.9 or later.

Dimension table options

Option Type Required Default Description
cache String No ALL The cache policy. Valid values: None, LRU, ALL. See the cache policy descriptions below.
cacheSize Long No 10000 The maximum number of rows to cache. Takes effect only when cache is set to LRU.
cacheTTLMs Long No The cache expiry behavior depends on the cache setting: LRU — cache entry timeout in milliseconds (no expiry by default); ALL — cache reload interval in milliseconds (no reload by default); None — not applicable.
cacheEmpty Boolean No true Specifies whether to cache empty lookup results.
cacheReloadTimeBlackList String No Time periods during which cache reloads are suspended. Takes effect only when cache is set to ALL. Format: 2017-10-24 14:00 -> 2017-10-24 15:00, 2017-11-10 23:30 -> 2017-11-11 08:00. Separate multiple periods with commas; use -> to separate the start and end of each period.
cacheScanLimit Integer No 100 The number of rows returned per remote procedure call (RPC) when loading the full dimension table. Takes effect only when cache is set to ALL.

Cache policies:

  • None — No caching. Every lookup queries ApsaraDB for HBase directly.

  • LRU — Caches a subset of rows. On a cache miss, the system queries ApsaraDB for HBase directly. Configure cacheSize and cacheTTLMs when using this policy.

  • ALL (default) — Loads the entire dimension table into cache before the job starts. All subsequent lookups hit the cache. If a key is not found in the cache, it does not exist in the source table. The cache is reloaded after it expires. Configure cacheTTLMs and optionally cacheReloadTimeBlackList when using this policy.

    When cache is set to ALL, the node that performs the join requires additional memory — approximately twice the size of the dimension table — because the system loads all data asynchronously before the job starts. This can slow down job startup.

Data type mappings

All Flink values are serialized to and deserialized from HBase byte arrays using org.apache.hadoop.hbase.util.Bytes. Two special cases apply:

  • A non-STRING field that reads an empty byte array is decoded as null.

  • A STRING field that reads the byte array matching null-string-literal is decoded as null.

Flink SQL type HBase conversion
CHAR / VARCHAR / STRING Stored as a UTF-8 string byte array using toBytes(String s) / toString(byte[] b).
BOOLEAN toBytes(boolean b) / toBoolean(byte[] b)
BINARY / VARBINARY Stored as a raw byte[].
DECIMAL toBytes(BigDecimal v) / toBigDecimal(byte[] b)
TINYINT new byte[] { val } / bytes[0]
SMALLINT toBytes(short val) / toShort(byte[] bytes)
INT toBytes(int val) / toInt(byte[] bytes)
BIGINT toBytes(long val) / toLong(byte[] bytes)
FLOAT toBytes(float val) / toFloat(byte[] bytes)
DOUBLE toBytes(double val) / toDouble(byte[] bytes)
DATE Stored as the number of days since 1970-01-01, serialized as an INT.
TIME Stored as the number of milliseconds since 00:00:00, serialized as an INT.
TIMESTAMP Stored as the number of milliseconds since 00:00:00 on 1970-01-01, serialized as a LONG.