Use the ApsaraDB for HBase connector to read from ApsaraDB for HBase as a dimension table or write to it as a sink table in streaming jobs.
Supported table types: Dimension table · Sink table Running mode: Streaming API: SQL Data update or deletion in a sink table: Supported Sink metrics: numBytesOut, numBytesOutPerSecond, numRecordsOut, numRecordsOutPerSecond, currentSendTime
For more information about sink metrics, see Metrics.
Usage notes
Before you use this connector, confirm the type of your database instance:
-
This connector is for ApsaraDB for HBase instances only. Using it with other instance types can cause unexpected issues.
-
Lindorm instances are compatible with Apache HBase. Use the Lindorm connector for Lindorm instances.
-
Using this connector to connect Realtime Compute for Apache Flink to an open source HBase database does not guarantee data validity.
Prerequisites
Before you begin, ensure that you have:
-
An ApsaraDB for HBase cluster purchased and an ApsaraDB for HBase table created. See Purchase a cluster
-
A whitelist configured for the ApsaraDB for HBase cluster. See Configure a whitelist
DDL syntax
CREATE TABLE hbase_table (
rowkey INT,
family1 ROW<q1 INT>,
family2 ROW<q2 STRING, q3 BIGINT>,
family3 ROW<q4 DOUBLE, q5 BOOLEAN, q6 STRING>
) WITH (
'connector' = 'cloudhbase',
'table-name' = '<yourTableName>',
'zookeeper.quorum' = '<yourZookeeperQuorum>'
);
Schema rules:
-
Declare each column family as
ROW<column_name type, ...>. The field name maps to the column family name. -
Each field inside a
ROWmaps to a column within that column family. For example,q2andq3are columns infamily2. -
Include exactly one atomic-type field (such as
INTorSTRING). This field is the row key. -
The row key must be the primary key of a sink table. If no primary key is defined, the row key serves as the primary key.
-
Declare only the column families and columns your job actually uses.
Usage examples
Dimension table example
CREATE TEMPORARY TABLE datagen_source (
a INT,
b BIGINT,
c STRING,
`proc_time` AS PROCTIME()
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE hbase_dim (
rowkey INT,
family1 ROW<col1 INT>,
family2 ROW<col1 STRING, col2 BIGINT>,
family3 ROW<col1 DOUBLE, col2 BOOLEAN, col3 STRING>
) WITH (
'connector' = 'cloudhbase',
'table-name' = '<yourTableName>',
'zookeeper.quorum' = '<yourZookeeperQuorum>'
);
CREATE TEMPORARY TABLE blackhole_sink (
a INT,
f1c1 INT,
f3c3 STRING
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink
SELECT a, family1.col1 AS f1c1, family3.col3 AS f3c3
FROM datagen_source
JOIN hbase_dim FOR SYSTEM_TIME AS OF datagen_source.`proc_time` AS h
ON datagen_source.a = h.rowkey;
Sink table example
CREATE TEMPORARY TABLE datagen_source (
rowkey INT,
f1q1 INT,
f2q1 STRING,
f2q2 BIGINT,
f3q1 DOUBLE,
f3q2 BOOLEAN,
f3q3 STRING
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE hbase_sink (
rowkey INT,
family1 ROW<q1 INT>,
family2 ROW<q1 STRING, q2 BIGINT>,
family3 ROW<q1 DOUBLE, q2 BOOLEAN, q3 STRING>,
PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
'connector' = 'cloudhbase',
'table-name' = '<yourTableName>',
'zookeeper.quorum' = '<yourZookeeperQuorum>'
);
INSERT INTO hbase_sink
SELECT rowkey, ROW(f1q1), ROW(f2q1, f2q2), ROW(f3q1, f3q2, f3q3)
FROM datagen_source;
Dynamic column sink
When dynamic.table is set to true, each column family must declare exactly two fields: the first represents the dynamic column name, and the second represents its value.
CREATE TEMPORARY TABLE datagen_source (
id INT,
f1hour STRING,
f1deal BIGINT,
f2day STRING,
f2deal BIGINT
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE hbase_sink (
rowkey INT,
f1 ROW<`hour` STRING, deal BIGINT>,
f2 ROW<`day` STRING, deal BIGINT>
) WITH (
'connector' = 'cloudhbase',
'table-name' = '<yourTableName>',
'zookeeper.quorum' = '<yourZookeeperQuorum>',
'dynamic.table' = 'true'
);
INSERT INTO hbase_sink
SELECT id, ROW(f1hour, f1deal), ROW(f2day, f2deal)
FROM datagen_source;
For example, if the source row has id=1, f1hour='10', f1deal=100, f2day='2020-7-26', and f2deal=10000, the resulting HBase row has rowkey=1, f1:10=100, and f2:2020-7-26=10000.
Connector options
General options
| Option | Type | Required | Default | Description |
|---|---|---|---|---|
connector |
String | Yes | — | Set to cloudhbase. |
table-name |
String | Yes | — | The name of the ApsaraDB for HBase table. |
zookeeper.znode.quorum |
String | Yes | — | The ZooKeeper connection string for the ApsaraDB for HBase cluster. |
zookeeper.znode.parent |
String | No | /hbase |
The root directory of ApsaraDB for HBase in ZooKeeper. Applies to Standard Edition only. |
userName |
String | No | — | The username for database access. Applies to Performance-enhanced Edition only. |
password |
String | No | — | The password for database access. Applies to Performance-enhanced Edition only. |
haclient.cluster.id |
String | No | — | The cluster ID for high availability (HA) mode. Required for zone-disaster recovery clusters. Applies to Performance-enhanced Edition only. |
retires.number |
Integer | No | 31 |
The number of connection retry attempts for the HBase client. |
null-string-literal |
String | No | null |
The value written to HBase when a Flink STRING field is null. |
Sink options
| Option | Type | Required | Default | Description |
|---|---|---|---|---|
sink.buffer-flush.max-size |
String | No | 2MB |
The maximum data size to buffer before flushing. Larger values improve write throughput but increase latency and memory usage. Units: B, KB, MB, or GB (case-insensitive). Set to 0 to disable buffering. |
sink.buffer-flush.max-rows |
Integer | No | 1000 |
The maximum number of records to buffer before flushing. Larger values improve write throughput but increase latency and memory usage. Set to 0 to disable buffering. |
sink.buffer-flush.interval |
Duration | No | 1s |
The flush interval for buffered data. Controls write latency. Units: ms, s, min, h, or d. Set to 0 to disable periodic flushing. |
sink.sync-write |
Boolean | No | true |
The write mode. true: synchronous — data is written in order but at lower throughput. false: asynchronous — higher throughput, but write order is not guaranteed. |
sink.buffer-flush.batch-rows |
Integer | No | 100 |
The number of records to batch in synchronous write mode. Larger values improve throughput but increase latency and memory usage. Takes effect only when sink.sync-write is true. |
dynamic.table |
Boolean | No | false |
Enables dynamic column mode. When set to true, each column family row must declare exactly two fields (column name and value). |
sink.ignore-delete |
Boolean | No | false |
Specifies whether to ignore DELETE and UPDATE_BEFORE events from upstream. Set to true when multiple sink tasks concurrently update different fields of the same row, to prevent data inconsistency caused by concurrent deletions and partial updates. When true, only INSERT and UPDATE_AFTER records are processed. |
sink.ignore-null |
Boolean | No | false |
Specifies whether to skip writing null field values. When set to true, null-string-literal has no effect. Requires Ververica Runtime (VVR) 8.0.9 or later. |
Dimension table options
| Option | Type | Required | Default | Description |
|---|---|---|---|---|
cache |
String | No | ALL |
The cache policy. Valid values: None, LRU, ALL. See the cache policy descriptions below. |
cacheSize |
Long | No | 10000 |
The maximum number of rows to cache. Takes effect only when cache is set to LRU. |
cacheTTLMs |
Long | No | — | The cache expiry behavior depends on the cache setting: LRU — cache entry timeout in milliseconds (no expiry by default); ALL — cache reload interval in milliseconds (no reload by default); None — not applicable. |
cacheEmpty |
Boolean | No | true |
Specifies whether to cache empty lookup results. |
cacheReloadTimeBlackList |
String | No | — | Time periods during which cache reloads are suspended. Takes effect only when cache is set to ALL. Format: 2017-10-24 14:00 -> 2017-10-24 15:00, 2017-11-10 23:30 -> 2017-11-11 08:00. Separate multiple periods with commas; use -> to separate the start and end of each period. |
cacheScanLimit |
Integer | No | 100 |
The number of rows returned per remote procedure call (RPC) when loading the full dimension table. Takes effect only when cache is set to ALL. |
Cache policies:
-
None — No caching. Every lookup queries ApsaraDB for HBase directly.
-
LRU — Caches a subset of rows. On a cache miss, the system queries ApsaraDB for HBase directly. Configure
cacheSizeandcacheTTLMswhen using this policy. -
ALL (default) — Loads the entire dimension table into cache before the job starts. All subsequent lookups hit the cache. If a key is not found in the cache, it does not exist in the source table. The cache is reloaded after it expires. Configure
cacheTTLMsand optionallycacheReloadTimeBlackListwhen using this policy.When
cacheis set toALL, the node that performs the join requires additional memory — approximately twice the size of the dimension table — because the system loads all data asynchronously before the job starts. This can slow down job startup.
Data type mappings
All Flink values are serialized to and deserialized from HBase byte arrays using org.apache.hadoop.hbase.util.Bytes. Two special cases apply:
-
A non-STRING field that reads an empty byte array is decoded as
null. -
A
STRINGfield that reads the byte array matchingnull-string-literalis decoded asnull.
| Flink SQL type | HBase conversion |
|---|---|
| CHAR / VARCHAR / STRING | Stored as a UTF-8 string byte array using toBytes(String s) / toString(byte[] b). |
| BOOLEAN | toBytes(boolean b) / toBoolean(byte[] b) |
| BINARY / VARBINARY | Stored as a raw byte[]. |
| DECIMAL | toBytes(BigDecimal v) / toBigDecimal(byte[] b) |
| TINYINT | new byte[] { val } / bytes[0] |
| SMALLINT | toBytes(short val) / toShort(byte[] bytes) |
| INT | toBytes(int val) / toInt(byte[] bytes) |
| BIGINT | toBytes(long val) / toLong(byte[] bytes) |
| FLOAT | toBytes(float val) / toFloat(byte[] bytes) |
| DOUBLE | toBytes(double val) / toDouble(byte[] bytes) |
| DATE | Stored as the number of days since 1970-01-01, serialized as an INT. |
| TIME | Stored as the number of milliseconds since 00:00:00, serialized as an INT. |
| TIMESTAMP | Stored as the number of milliseconds since 00:00:00 on 1970-01-01, serialized as a LONG. |