This topic provides the DDL syntax that is used to create an ApsaraDB for HBase dimension table, describes the parameters in the WITH and CACHE clauses, and describes the data type conversion functions of ApsaraDB for HBase.

What is ApsaraDB for HBase?

ApsaraDB for HBase is a cost-effective cloud-based intelligent NoSQL service that provides high scalability and is compatible with open source HBase. ApsaraDB for HBase provides benefits such as low storage costs, high throughput and scalability, and intelligent data processing. ApsaraDB for HBase supports many core services of Alibaba such as Taobao recommendations, risk control for Ant Credit Pay, advertising, data dashboards, Cainiao logistics track, Alipay transaction records, and Taobao Mobile messages. ApsaraDB for HBase is a fully managed service that provides enterprise-level capabilities such as the processing of petabytes of data and tens of millions of concurrent requests, quick scaling within seconds, low response latency within milliseconds, high availability across data centers, and global distribution.

Prerequisites

  • An ApsaraDB for HBase cluster is purchased and an ApsaraDB for HBase table is created. For more information, see Purchase an instance.
  • A whitelist is configured for the ApsaraDB for HBase cluster. For more information, see Configure a whitelist.

Limits

Only Flink that uses Ververica Runtime (VVR) 2.0.0 or later versions supports ApsaraDB for HBase connectors.

DDL syntax

CREATE TABLE hbase_dim(
  rowkey INT,
  family1 ROW<q1 INT>,
  family2 ROW<q2 STRING, q3 BIGINT>,
  family3 ROW<q4 DOUBLE, q5 BOOLEAN, q6 STRING>
) with (
  'connector'='cloudhbase',
  'table-name'='<yourTableName>',
  'zookeeper.quorum'='<yourZookeeperQuorum>'
);
  • Column families of an ApsaraDB for HBase table must be declared as the ROW type. Each column family name is the field name of a row. In the DDL syntax, the following column families are declared: family1, family2, and family3.
  • A column in a column family corresponds to a field in a row. The column name is the field name. In the DDL syntax, the q2 and q3 columns are declared in the family2 column family.
  • In addition to the fields of the ROW type, only fields of the atomic type such as STRING and BIGINT can exist in an ApsaraDB for HBase table. The fields of the atomic type are considered as row keys of the table, such as rowkey in the DDL statement.
  • The row key of an ApsaraDB for HBase table must be defined as the primary key of the result table. If no primary key is defined, the row key is used as the primary key.

Parameters in the WITH clause

Parameter Description Required Remarks
connector The type of the dimension table. Yes Set the value to cloudhbase.
table-name The name of the ApsaraDB for HBase dimension table. Yes N/A.
zookeeper.quorum The URL that is used to access the ZooKeeper service of ApsaraDB for HBase. Yes N/A.
zookeeper.znode.parent The root directory of ApsaraDB for HBase in the ZooKeeper service. No Default value: /hbase.
Note This parameter takes effect only in the ApsaraDB for HBase Standard Edition.
userName The username that is used to log on to the ApsaraDB for HBase cluster. No
Note This parameter takes effect only in the ApsaraDB for HBase Enhanced Edition.
password The password that is used to log on to the ApsaraDB for HBase cluster. No
Note This parameter takes effect only in the ApsaraDB for HBase Enhanced Edition.
haclient.cluster.id The ID of the ApsaraDB for HBase cluster in high availability (HA) mode. No This parameter is required only when you access zone-disaster recovery instances.
Note This parameter takes effect only in the ApsaraDB for HBase Enhanced Edition.
retries.number The maximum number of attempts with which the ApsaraDB for HBase client tries to connect to the ApsaraDB for HBase database. No Default value: 31.
null-string-literal If the field data type of ApsaraDB for HBase is STRING and the data of ApsaraDB for HBase is a byte array of the value of this parameter, the value of this parameter is null. No Default value: Null.

Parameters in the CACHE clause

Parameter Description Required Remarks
cache The cache policy. No Valid values:
  • None: indicates that data is not cached.
  • LRU: indicates that only some data in the dimension table is cached. Each time the system receives a data record, the system searches the cache. If the system does not find the record in the cache, the system searches for the data record in the physical dimension table.

    If this cache policy is used, you must configure the cacheSize and cacheTTLMs parameters.

  • ALL: indicates that all data in the dimension table is cached. Default value: ALL. Before a job runs, the system loads all data in the dimension table to the cache. This way, the cache is searched for all subsequent queries in the dimension table. If the system does not find the data record in the cache, the join key does not exist. The system reloads all data in the cache after cache entries expire.

    If the amount of data in a remote table is small and a large number of missing keys exist, we recommend that you set this parameter to ALL. The source table and dimension table cannot be associated based on the ON clause.

    If you use this cache policy, you must configure the cacheSize and cacheTTLMs parameters.

Note If you set the cache parameter to ALL, you must increase the memory of the node for joining tables because the system asynchronously loads data from the dimension table. The increased memory size is twice the memory size of the remote table.
cacheSize The maximum number of data records that can be cached. No This parameter is available only if you set the cache parameter to LRU. Default value: 10000.
cacheTTLMs The timeout period of the cache. Unit: milliseconds. No The cacheTTLMs parameter applies configurations based on the value of the cache parameter.
  • If you set the cache parameter to None, the cacheTTLMs parameter can be left empty. This indicates that cache entries do not expire.
  • If you set the cache parameter to LRU, the cacheTTLMs parameter specifies the cache timeout period. By default, cache entries do not expire.
  • If you set the cache parameter to ALL, the cacheTTLMs parameter specifies the interval at which the system refreshes the cache. By default, the cache is not refreshed.
cacheEmpty Specifies whether to cache empty results. No Default value: true.
cacheReloadTimeBlackList The periods of time during which cache is not refreshed. This parameter takes effect when the cache parameter is set to ALL. The cache is not refreshed during the periods of time that you specify for this parameter. This parameter is suitable for large-scale online promotional events such as Double 11. No By default, this parameter is empty. The following example shows the format of the values: 2017-10-24 14:00 -> 2017-10-24 15:00, 2017-11-10 23:30 -> 2017-11-11 08:00. Use delimiters based on the following rules:
  • Separate time periods with commas, for example, ,.
  • Separate the start time and end time of each period of time with a hyphen and a closing angle bracket, for example, ->.
cacheScanLimit This parameter is available only if the cache parameter is set to ALL. The number of rows that the remote procedure call (RPC) server returns to a client when the server reads full data from an ApsaraDB for HBase dimension table. No Default value: 100.

Data type conversion

ApsaraDB for HBase data is converted into Flink data types by using org.apache.hadoop.hbase.util.Bytes. The decoding process varies based on the following scenarios:
  • If the Flink data type is a non-STRING type and a value in the ApsaraDB for HBase dimension table is an empty byte array, the value is decoded as null.
  • If the Flink data type is the STRING type and a value in the ApsaraDB for HBase dimension table is the byte array specified by null-string-literal, the value is decoded as null.
The following table describes the relationships between the field data types of Flink and the data type conversion functions of ApsaraDB for HBase.
Field data type of Flink Data type conversion function of ApsaraDB for HBase
CHAR String toString(byte[] b)
VARCHAR
STRING
BOOLEAN boolean toBoolean(byte[] b)
BINARY byte[]
VARBINARY
DECIMAL BigDecimal toBigDecimal(byte[] b)
TINYINT bytes[0]
SMALLINT short toShort(byte[] bytes)
INT int toInt(byte[] bytes)
BIGINT long toLong(byte[] bytes)
FLOAT float toFloat(byte[] bytes)
DOUBLE double toDouble(byte[] bytes)
DATE Converts a byte array of the ApsaraDB for HBase database into the INT data type by using int toInt(byte[] bytes). The value of the INT data type represents the number of days since January 1, 1970.
TIME Converts a byte array of the ApsaraDB for HBase database into the INT data type by using int toInt(byte[] bytes). The value of the INT data type represents the number of milliseconds since 00:00:00.
TIMESTAMP Converts a byte array of the ApsaraDB for HBase database into the LONG data type by using long toLong(byte[] bytes). The value of the LONG data type represents the number of milliseconds since 00:00:00 on January 1, 1970.
ARRAY Not supported
MAP / MULTISET Not supported
ROW Not supported

Sample code

The following sample code shows how to create an ApsaraDB for HBase dimension table in a Realtime Compute for Apache Flink job:
CREATE TABLE datagen_source (
  a INT,
  b BIGINT,
  c STRING,
  `proc_time` AS PROCTIME()
) with (
  'connector'='datagen'
);

CREATE TABLE hbase_dim (
  rowkey INT,
  family1 ROW<col1 INT>,
  family2 ROW<col1 STRING, col2 BIGINT>,
  family3 ROW<col1 DOUBLE, col2 BOOLEAN, col3 STRING>
) WITH (
  'connector' = 'cloudhbase',
  'table-name' = '<yourTableName>',
  'zookeeper.quorum' = '<yourZookeeperQuorum>'
);

CREATE TABLE blackhole_sink(
  a INT,
  f1c1 INT,
  f3c3 STRING
) with (
  'connector' = 'blackhole' 
);
  
INSERT INTO blackhole_sink
     SELECT a, family1.col1 as f1c1,  family3.col3 as f3c3 FROM datagen_source
JOIN hbase_dim FOR SYSTEM_TIME AS OF src.`proc_time` as h ON src.a = h.rowkey;