This topic provides the DDL syntax that is used to create an ApsaraDB for HBase result table, describes the parameters in the WITH clause, and provides data type conversion, dynamic tables, and sample code.

What is ApsaraDB for HBase?

ApsaraDB for HBase is a cost-effective cloud-based intelligent NoSQL service that provides high scalability and is compatible with open source HBase. ApsaraDB for HBase provides benefits such as low storage costs, high throughput and scalability, and intelligent data processing. ApsaraDB for HBase supports core services of Alibaba such as Taobao recommendations, risk control for Ant Credit Pay, advertising, data dashboards, Cainiao logistics track, Alipay transaction records, and Taobao Mobile messages. ApsaraDB for HBase is a fully managed service that provides enterprise-level capabilities such as the processing of petabytes of data, high concurrency, quick scaling within seconds, low response latency within milliseconds, high availability across data centers, and global distribution.

Prerequisites

  • An ApsaraDB for HBase cluster is purchased and an ApsaraDB for HBase table is created. For more information about how to purchase an ApsaraDB for HBase cluster, see Purchase a cluster.
  • A whitelist is configured for the ApsaraDB for HBase cluster. For more information, see Configure a whitelist.

Limits

Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 2.0.0 or later supports the ApsaraDB for HBase connector.

DDL syntax

CREATE TABLE hbase_sink(
  rowkey INT,
  family1 ROW<q1 INT>,
  family2 ROW<q2 STRING, q3 BIGINT>,
  family3 ROW<q4 DOUBLE, q5 BOOLEAN, q6 STRING>
) with (
  'connector'='cloudhbase',
  'table-name'='<yourTableName>',
  'zookeeper.quorum'='<yourZookeeperQuorum>'
);
  • Column families of an ApsaraDB for HBase table must be declared as the ROW type. Each column family name is the field name of a row. In the DDL syntax, the following column families are declared: family1, family2, and family3.
  • A column in a column family corresponds to a field in a row. The column name is the field name. In the DDL syntax, the q2 and q3 columns are declared in the family2 column family.
  • You need to only declare the required column families and columns of an ApsaraDB for HBase table in the result table.
  • In addition to the fields of the ROW type, only one field of the atomic type such as STRING and BIGINT can exist in an ApsaraDB for HBase table. The fields of the atomic type are considered as row keys of the table, such as rowkey in the DDL statement.
  • The row key of an ApsaraDB for HBase table must be defined as the primary key of the result table. If no primary key is defined, the row key is used as the primary key.

Parameters in the WITH clause

Parameter Description Required Remarks
connector The type of the result table. Yes Set the value to cloudhbase.
table-name The name of the ApsaraDB for HBase table. Yes N/A.
zookeeper.quorum The URL that is used to access the ZooKeeper service of ApsaraDB for HBase. Yes N/A.
zookeeper.znode.parent The root directory of ApsaraDB for HBase in the ZooKeeper service. No Default value: /hbase.
Note This parameter takes effect only in the ApsaraDB for HBase Standard Edition.
userName The username that is used to access the ApsaraDB for HBase database. No This parameter takes effect only in the ApsaraDB for HBase Enhanced Edition.
password The password that is used to access the ApsaraDB for HBase database. No This parameter takes effect only in the ApsaraDB for HBase Enhanced Edition.
haclient.cluster.id The ID of the ApsaraDB for HBase cluster in high availability (HA) mode. No This parameter is required only when you access zone-disaster recovery clusters.
Note This parameter takes effect only in the ApsaraDB for HBase Enhanced Edition.
retries.number The maximum number of attempts that the ApsaraDB for HBase client makes to connect to the ApsaraDB for HBase database. No Default value: 31.
null-string-literal If the field data type of ApsaraDB for HBase is STRING and the Flink field data is null, null-string-literal is assigned to the field and is written to the ApsaraDB for HBase database. No Default value: null.
sink.buffer-flush.max-size The size of data in bytes cached in the memory before data is written to the ApsaraDB for HBase database. A larger value of this parameter improves the write performance of ApsaraDB for HBase but prolongs the write latency and consumes more memory. No Default value: 2 MB. Unit: B, KB, MB, and GB. If this parameter is set to 0, no data is cached.
sink.buffer-flush.max-rows The number of data records cached in the memory before data is written to the ApsaraDB for HBase database. A larger value of this parameter improves the write performance of ApsaraDB for HBase but prolongs the write latency and consumes more memory. No Default value: 1000. If this parameter is set to 0, no data is cached.
sink.buffer-flush.interval The interval at which cached data is written to the ApsaraDB for HBase database. This parameter controls the latency of data writing to the ApsaraDB for HBase database. No Default value: 1s. Unit: milliseconds, seconds, minutes, hours, and days. If this parameter is set to 0, periodic data writing is disabled.
dynamic.table Specifies whether to use an ApsaraDB for HBase table that supports dynamic columns. No Valid values:
  • true: An ApsaraDB for HBase table that supports dynamic columns is used.
  • false: An ApsaraDB for HBase table that supports dynamic columns is not used. This is the default value.
sink.ignore-delete Specifies whether to ignore retraction messages. No Valid values:
  • true: Retraction messages are ignored.
  • false: Retraction messages are not ignored. This is the default value.
Note Only Realtime Compute for Apache Flink that uses VVR 4.0.10 or later supports this parameter.

Data type conversion

ApsaraDB for HBase data is converted into data types of Flink by using org.apache.hadoop.hbase.util.Bytes. The decoding process varies based on the following scenarios:
  • Data of a non-string type, such as null, is encoded as an empty array.
  • Data of the STRING type is converted based on the value of null-string-literal.
The following table describes the relationships between the field data types of Flink and the data type conversion functions of ApsaraDB for HBase.
Field data type of Flink Data type conversion function of ApsaraDB for HBase
CHAR byte[] toBytes(String s)
VARCHAR
STRING
BOOLEAN byte[] toBytes(boolean b)
BINARY byte[]
VARBINARY
DECIMAL byte[] toBytes(BigDecimal v)
TINYINT new byte[] { val }
SMALLINT byte[] toBytes(short val)
INT byte[] toBytes(int val)
BIGINT byte[] toBytes(long val)
FLOAT byte[] toBytes(float val)
DOUBLE byte[] toBytes(double val)
DATE Converts a date into the number of days represented by INT since January 1, 1970 and then into a byte array by using byte[] toBytes(int val).
TIME Converts a time into the number of milliseconds represented by INT since 00:00:00 and then into a byte array by using byte[] toBytes(int val).
TIMESTAMP Converts a timestamp into the number of milliseconds represented by LONG since 00:00:00 on January 1, 1970 and then into a byte array by using byte[] toBytes(long val).
ARRAY Not supported.
MAP Not supported.
MULTISET
ROW Not supported.

Dynamic table

Some result data of Flink is used as a dynamic column based on the value of a column and written to ApsaraDB for HBase. The following sample code shows that the hourly transaction amount of a product is used as a dynamic column.
CREATE TEMPORARY TABLE datagen_source (
  id INT,
  f1hour STRING,
  f1deal BIGINT,
  f2day STRING,
  f2deal BIGINT
) with (
  'connector'='datagen'
);

CREATE TEMPORARY TABLE hbase_sink (
  rowkey INT,
  f1 ROW<`hour` STRING, deal BIGINT>,
  f2 ROW<`day` STRING, deal BIGINT>
) with (
  'connector'='cloudhbase',
  'table-name'='<yourTableName>',
  'zookeeper.quorum'='<yourZookeeperQuorum>',
  'dynamic.table'='true'
);

INSERT INTO hbase_sink
SELECT id, ROW(f1hour, f1deal), ROW(f2day, f2deal) FROM datagen_source;
Note
  • If dynamic.table is set to true, an ApsaraDB for HBase table that supports dynamic columns is used.
  • Two fields must be declared in the rows that correspond to each column family. The value of the first field indicates the dynamic column, and the value of the second field indicates the value of the dynamic column.
  • If the src table contains the data record (1, "10", 100, "2020-7-26", 10000), the row whose row key is 1 is inserted into the ApsaraDB for HBase table. The data record indicates that the product ID is 1, the transaction amount from 10:00 to 11:00 is 100, and the transaction amount on July 26, 2020 is 10,000. Based on the preceding statement, f1:10 is 100 and f2:2020-7-26 is 10000.

Sample code

CREATE TEMPORARY TABLE datagen_source (
  rowkey INT,
  f1q1 INT,
  f2q1 STRING,
  f2q2 BIGINT,
  f3q1 DOUBLE,
  f3q2 BOOLEAN,
  f3q3 STRING
) with (
  'connector'='datagen'
);

CREATE TEMPORARY TABLE hbase_sink (
  rowkey INT,
  family1 ROW<q1 INT>,
  family2 ROW<q1 STRING, q2 BIGINT>,
  family3 ROW<q1 DOUBLE, q2 BOOLEAN, q3 STRING>,
  PRIMARY KEY (rowkey) NOT ENFORCED
) with (
  'connector'='cloudhbase',
  'table-name'='<yourTableName>',
  'zookeeper.quorum'='<yourZookeeperQuorum>'
);
 
INSERT INTO hbase_sink
SELECT rowkey, ROW(f1q1), ROW(f2q1, f2q2), ROW(f3q1, f3q2, f3q3) FROM datagen_source;