The HBase data source provides a bidirectional channel to read from and write to HBase. This topic describes its data synchronization capabilities within DataWorks.
Supported versions
HBase plugins are categorized as HBase and HBase{xx}xsql. The HBase{xx}xsql plugin must be used with both HBase and Phoenix.
HBase plugin
The HBase plugin supports
HBase 0.94.x,HBase 1.1.x, andHBase 2.x. You can configure the plugin using the codeless UI or the code editor. You can use thehbaseVersionparameter to specify the HBase version.If you use
HBase 0.94.x, set the `hbaseVersion` parameter to 094x for both the reader and writer plugins."reader": { "hbaseVersion": "094x" }"writer": { "hbaseVersion": "094x" }If you use HBase 1.1.x or HBase 2.x, set the `hbaseVersion` parameter to 11x for both the reader and writer plugins.
"reader": { "hbaseVersion": "11x" }"writer": { "hbaseVersion": "11x" }The HBase 1.1.x plugin is compatible with HBase 2.0.
HBase{xx}xsql plugin
HBase20xsql plugin: Supports
HBase 2.xandPhoenix 5.x. Only the code editor is supported.HBase11xsql plugin: Supports
HBase 1.1.xandPhoenix 5.x. Only the code editor is supported.The HBase{xx}xsql writer plugin lets you import data in batches to SQL tables (Phoenix) in HBase. Phoenix encodes rowkeys. If you write data using the HBase API directly, you must manually convert the data. This process is complex and error-prone. The HBase{xx}xsql writer plugin provides a simple way to import data into SQL tables.
NoteThe plugin uses the Phoenix Java Database Connectivity (JDBC) driver to execute UPSERT statements and write data to tables in batches. Because the plugin uses a high-level interface, it can also update index tables.
Limits
HBase Reader | HBase20xsql Reader | HBase11xsql Writer |
|
|
|
Supported features
HBase Reader
HBase Reader supports normal and multiVersionFixedColumn modes.
normalmode: Reads an HBase table as a standard two-dimensional table and retrieves the latest version of the data.hbase(main):017:0> scan 'users' ROW COLUMN+CELL lisi column=address:city, timestamp=1457101972764, value=beijing lisi column=address:contry, timestamp=1457102773908, value=china lisi column=address:province, timestamp=1457101972736, value=beijing lisi column=info:age, timestamp=1457101972548, value=27 lisi column=info:birthday, timestamp=1457101972604, value=1987-06-17 lisi column=info:company, timestamp=1457101972653, value=baidu xiaoming column=address:city, timestamp=1457082196082, value=hangzhou xiaoming column=address:contry, timestamp=1457082195729, value=china xiaoming column=address:province, timestamp=1457082195773, value=zhejiang xiaoming column=info:age, timestamp=1457082218735, value=29 xiaoming column=info:birthday, timestamp=1457082186830, value=1987-06-17 xiaoming column=info:company, timestamp=1457082189826, value=alibaba 2 row(s) in 0.0580 seconds }The following table shows the data after it is read.
rowKey
address:city
address:contry
address:province
info:age
info:birthday
info:company
lisi
beijing
china
beijing
27
1987-06-17
baidu
xiaoming
hangzhou
china
zhejiang
29
1987-06-17
alibaba
multiVersionFixedColumnmode: Reads an HBase table as a vertical table. Each record that is read has a four-column format:rowKey,family:qualifier,timestamp, andvalue. You must specify the columns to read. The value in each cell is treated as a record. If multiple versions exist, multiple records are generated.hbase(main):018:0> scan 'users',{VERSIONS=>5} ROW COLUMN+CELL lisi column=address:city, timestamp=1457101972764, value=beijing lisi column=address:contry, timestamp=1457102773908, value=china lisi column=address:province, timestamp=1457101972736, value=beijing lisi column=info:age, timestamp=1457101972548, value=27 lisi column=info:birthday, timestamp=1457101972604, value=1987-06-17 lisi column=info:company, timestamp=1457101972653, value=baidu xiaoming column=address:city, timestamp=1457082196082, value=hangzhou xiaoming column=address:contry, timestamp=1457082195729, value=china xiaoming column=address:province, timestamp=1457082195773, value=zhejiang xiaoming column=info:age, timestamp=1457082218735, value=29 xiaoming column=info:age, timestamp=1457082178630, value=24 xiaoming column=info:birthday, timestamp=1457082186830, value=1987-06-17 xiaoming column=info:company, timestamp=1457082189826, value=alibaba 2 row(s) in 0.0260 seconds }The following table shows the data (four columns) after it is read.
rowKey
column:qualifier
timestamp
value
lisi
address:city
1457101972764
beijing
lisi
address:contry
1457102773908
china
lisi
address:province
1457101972736
beijing
lisi
info:age
1457101972548
27
lisi
info:birthday
1457101972604
1987-06-17
lisi
info:company
1457101972653
beijing
xiaoming
address:city
1457082196082
hangzhou
xiaoming
address:contry
1457082195729
china
xiaoming
address:province
1457082195773
zhejiang
xiaoming
info:age
1457082218735
29
xiaoming
info:age
1457082178630
24
xiaoming
info:birthday
1457082186830
1987-06-17
xiaoming
info:company
1457082189826
alibaba
HBase Writer
rowkeygeneration rule: The HBase Writer supports concatenating multiple source fields to generate therowkeyfor the HBase table.Version (timestamp) support for writing data to HBase:
Use the current time as the version.
Use a source column as the version.
Use a specific time as the version.
Supported field types
Offline read
The following table lists the readable HBase data types and the mappings between HBase types and HBase Reader types.
Category
Data Integration column type
Database data type
Integer
long
short, int, and long
Floating-point
double
float and double
String
string
binary_string and string
Date and time
date
date
Byte
bytes
bytes
Boolean
boolean
boolean
HBase20xsql Reader supports most Phoenix types. However, some types are not supported. You must check your data types.
The following table lists the mappings between Phoenix types and HBase20xsql Reader types.
DataX internal type
Phoenix data type
long
INTEGER, TINYINT, SMALLINT, BIGINT
double
FLOAT, DECIMAL, DOUBLE
string
CHAR, VARCHAR
date
DATE, TIME, TIMESTAMP
bytes
BINARY, VARBINARY
boolean
BOOLEAN
Offline write
The following table lists the writable HBase data types and the mappings between HBase types and HBase Writer types.
The column configuration must be consistent with the column types in the HBase table.
Only the field types listed in the following table are supported.
Category | Database data type |
Integer | INT, LONG, and SHORT |
Floating-point | FLOAT and DOUBLE |
Boolean | BOOLEAN |
String | STRING |
Notes
If you receive the error message "tried to access method com.google.common.base.Stopwatch" when you test the connection, you must add the hbaseVersion property to the data source configuration to specify the HBase version.
Add a data source
Before you develop a synchronization task in DataWorks, you must add the required data source to DataWorks by following the instructions in Data Source Management. You can view parameter descriptions in the DataWorks console to understand the meanings of the parameters when you add a data source.
Develop a data synchronization task
For information about the entry point for and the procedure of configuring a synchronization task, see the following configuration guides.
Configuration guide for single-table offline synchronization tasks
For more information, see Configure a sync task in the codeless UI and Configure a sync task in the code editor.
When you use the codeless UI, field mappings are not displayed by default because HBase is a data source with no fixed structure. You must manually configure the field mappings:
If HBase is the source, configure the Source Field. The format is:
Field Type|Column Family:Column Name.If HBase is the destination, configure the Destination Field and rowkey. The format for Destination Field is
Source Field Index|Field Type|Column Family:Column Name. The format for rowkey isSource Primary Key Index|Field Type.
NoteEnter each field on a new line.
For a complete list of parameters and a script demo for the code editor, see Appendix: Script demos and parameter descriptions.
FAQ
Q: What is a suitable concurrency? Does increasing the concurrency improve a slow speed?
A: The default Java Virtual Machine (JVM) heap size for a data import process is 2 GB. Concurrency, which is the number of channels, is implemented using multiple threads. Increasing the number of threads does not always improve the import speed. It can even degrade performance due to frequent garbage collection (GC). We recommend that you set the concurrency (channel) to a value from 5 to 10.
Q: What is a suitable batchSize?
A: The default value is 256. However, you should calculate the optimal batchSize based on the size of each row. The data volume of a single operation is typically 2 MB to 4 MB. To determine the batchSize, divide this value by the row size.
Appendix: Script demos and parameter descriptions
Configure a batch synchronization task by using the code editor
If you want to configure a batch synchronization task by using the code editor, you must configure the related parameters in the script based on the unified script format requirements. For more information, see Configure a task in the code editor. The following information describes the parameters that you must configure for data sources when you configure a batch synchronization task by using the code editor.
HBase Reader script demo
{
"type":"job",
"version":"2.0",// The version number.
"steps":[
{
"stepType":"hbase",// The plugin name.
"parameter":{
"mode":"normal",// The mode to read data from HBase. Valid values: normal and multiVersionFixedColumn.
"scanCacheSize":"256",// The number of rows that the HBase client reads from the server in each remote procedure call (RPC).
"scanBatchSize":"100",// The number of columns that the HBase client reads from the server in each RPC.
"hbaseVersion":"094x/11x",// The HBase version.
"column":[// The fields.
{
"name":"rowkey",// The field name.
"type":"string"// The data type.
},
{
"name":"columnFamilyName1:columnName1",
"type":"string"
},
{
"name":"columnFamilyName2:columnName2",
"format":"yyyy-MM-dd",
"type":"date"
},
{
"name":"columnFamilyName3:columnName3",
"type":"long"
}
],
"range":{// Specifies the range of rowkeys for HBase Reader to read.
"endRowkey":"",// The end rowkey.
"isBinaryRowkey":true,// Specifies how to convert the configured startRowkey and endRowkey to byte arrays. The default value is false.
"startRowkey":""// The start rowkey.
},
"maxVersion":"",// The number of versions that HBase Reader reads in multi-version mode.
"encoding":"UTF-8",// The encoding format.
"table":"",// The table name.
"hbaseConfig":{// The configuration information required to connect to the HBase cluster, in JSON format.
"hbase.zookeeper.quorum":"hostname",
"hbase.rootdir":"hdfs://ip:port/database",
"hbase.cluster.distributed":"true"
}
},
"name":"Reader",
"category":"reader"
},
{
"stepType":"stream",
"parameter":{},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"// The number of error records.
},
"speed":{
"throttle":true,// If throttle is set to false, the mbps parameter does not take effect and no rate limiting is applied. If throttle is set to true, rate limiting is applied.
"concurrent":1,// The number of concurrent jobs.
"mbps":"12"// The rate limit. 1 mbps = 1 MB/s.
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}HBase Reader script parameters
Parameter | Description | Required | Default value |
haveKerberos | If haveKerberos is set to true, Kerberos authentication is required for the HBase cluster. Note
| No | false |
hbaseConfig | The configuration information required to connect to the HBase cluster, in JSON format. The hbase.zookeeper.quorum parameter is required. It specifies the ZooKeeper endpoint of the HBase cluster. You can also add more HBase client configurations, such as scan cache and batch, to optimize the interaction with the server. Note If you use an ApsaraDB for HBase database, connect to it using its private network endpoint. | Yes | None |
mode | The mode to read data from HBase. Valid values: normal and multiVersionFixedColumn. | Yes | None |
table | The name of the HBase table to read. This parameter is case-sensitive. | Yes | None |
encoding | The encoding format, such as UTF-8 or GBK. This is used to convert the binary-stored HBase byte[] to a string. | No | utf-8 |
column | The HBase fields to read. This parameter is required in normal mode and multiVersionFixedColumn mode.
| Yes | None |
maxVersion | The number of versions that HBase Reader reads in multi-version mode. The value can only be -1 or a number greater than 1. A value of -1 indicates that all versions are read. | Required in multiVersionFixedColumn mode. | None |
range | Specifies the range of rowkeys for HBase Reader to read.
| No | None |
scanCacheSize | The number of rows that HBase Reader reads from HBase at a time. | No | 256 |
scanBatchSize | The number of columns that HBase Reader reads from HBase at a time. If you set this parameter to -1, all columns are returned. Note The value of scanBatchSize should be greater than the actual number of columns to avoid data quality issues. | No | 100 |
HBase Writer script demo
{
"type":"job",
"version":"2.0",// The version number.
"steps":[
{
"stepType":"stream",
"parameter":{},
"name":"Reader",
"category":"reader"
},
{
"stepType":"hbase",// The plugin name.
"parameter":{
"mode":"normal",// The mode to write data to HBase.
"walFlag":"false",// Disables (false) writing WAL logs.
"hbaseVersion":"094x",// The HBase version.
"rowkeyColumn":[// The rowkey column to write to HBase.
{
"index":"0",// The serial number.
"type":"string"// The data type.
},
{
"index":"-1",
"type":"string",
"value":"_"
}
],
"nullMode":"skip",// How to handle null values.
"column":[// The HBase fields to write to.
{
"name":"columnFamilyName1:columnName1",// The field name.
"index":"0",// The index number.
"type":"string"// The data type.
},
{
"name":"columnFamilyName2:columnName2",
"index":"1",
"type":"string"
},
{
"name":"columnFamilyName3:columnName3",
"index":"2",
"type":"string"
}
],
"encoding":"utf-8",// The encoding format.
"table":"",// The table name.
"hbaseConfig":{// The configuration information required to connect to the HBase cluster, in JSON format.
"hbase.zookeeper.quorum":"hostname",
"hbase.rootdir":"hdfs: //ip:port/database",
"hbase.cluster.distributed":"true"
}
},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"// The number of error records.
},
"speed":{
"throttle":true,// If throttle is set to false, the mbps parameter does not take effect and no rate limiting is applied. If throttle is set to true, rate limiting is applied.
"concurrent":1, // The number of concurrent jobs.
"mbps":"12"// The rate limit.
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}HBase Writer script parameters
Parameter | Description | Required | Default value |
haveKerberos | If haveKerberos is set to true, Kerberos authentication is required for the HBase cluster. Note
| No | false |
hbaseConfig | The configuration information required to connect to the HBase cluster, in JSON format. The hbase.zookeeper.quorum parameter is required. It specifies the ZooKeeper endpoint of the HBase cluster. You can also add more HBase client configurations, such as scan cache and batch, to optimize the interaction with the server. Note If you use an ApsaraDB for HBase database, connect to it using its private network endpoint. | Yes | None |
mode | The mode to write data to HBase. Currently, only normal mode is supported. Dynamic column mode may be supported in the future. | Yes | None |
table | The name of the HBase table to write to. This parameter is case-sensitive. | Yes | None |
encoding | The encoding format, such as UTF-8 or GBK. This is used to convert a STRING to an HBase byte[]. | No | utf-8 |
column | The HBase fields to write to:
| Yes | None |
rowkeyColumn | The rowkey column to write to HBase:
The following code shows the configuration format. | Yes | None |
versionColumn | Specifies the timestamp to write to HBase. You can use the current time, a specified time column, or a specified time. If you do not configure this parameter, the current time is used.
The following code shows the configuration format.
| No | None |
nullMode | When the read data is null, you can handle it in one of the following two ways:
| No | skip |
walFlag | When an HBase client submits data (Put/Delete operations) to a RegionServer in the cluster, it first writes to the Write-Ahead Log (WAL). The WAL is also known as HLog. All regions on a RegionServer share one HLog. The data is written to the MemStore only after it is successfully written to the WAL. Then, the client is notified that the data is submitted. If the write to the WAL fails, the client is notified that the submission failed. You can set this parameter to false to disable writing to the WAL. This improves data write performance. | No | false |
writeBufferSize | Sets the size of the HBase client's write buffer in bytes. This is used with autoflush. autoflush (disabled by default):
| No | 8 MB |
HBase20xsql Reader script demo
{
"type":"job",
"version":"2.0",// The version number.
"steps":[
{
"stepType":"hbase20xsql",// The plugin name.
"parameter":{
"queryServerAddress": "http://127.0.0.1:8765", // The Phoenix QueryServer endpoint.
"serialization": "PROTOBUF", // The QueryServer serialization format.
"table": "TEST", // The table to read.
"column": ["ID", "NAME"], // The columns to read.
"splitKey": "ID" // The sharding column, which must be the primary key of the table.
},
"name":"Reader",
"category":"reader"
},
{
"stepType":"stream",
"parameter":{},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"// The number of error records.
},
"speed":{
"throttle":true,// If throttle is set to false, the mbps parameter does not take effect and no rate limiting is applied. If throttle is set to true, rate limiting is applied.
"concurrent":1,// The number of concurrent jobs.
"mbps":"12"// The rate limit. 1 mbps = 1 MB/s.
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}HBase20xsql Reader script parameters
Parameter | Description | Required | Default value |
queryServerAddress | The HBase20xsql Reader plugin needs to connect to Phoenix QueryServer through a lightweight Phoenix client. Therefore, you must specify the QueryServer endpoint here. If you are an ApsaraDB for HBase Performance-enhanced Edition (Lindorm) user and need to pass through the user and password parameters, you can add optional properties after queryServerAddress. Format: | Yes | None |
serialization | The serialization protocol used by QueryServer. | No | PROTOBUF |
table | The name of the table to read. This parameter is case-sensitive. | Yes | None |
schema | The schema where the table resides. | No | None |
column | The set of column names to synchronize from the configured table. Use a JSON array to describe the field information. An empty value indicates that all columns are read. The default value is empty. | No | All columns |
splitKey | Shards the table when reading data. If you specify splitKey, the system uses the field represented by splitKey to shard the data. The data synchronization task then starts concurrent subtasks to sync the data, which improves efficiency. You can choose one of two sharding methods. If splitPoint is empty, the system automatically shards the data using Method 1 by default:
| Yes | None |
splitPoints | Sharding based on the maximum and minimum values of the sharding column cannot guarantee the avoidance of data hot spots. Therefore, we recommend that you set the sharding points based on the startkey and endkey of the Region. This ensures that each query corresponds to a single Region. | No | None |
where | The filter condition. You can add a filter condition to the table query. HBase20xsql Reader concatenates an SQL statement based on the specified column, table, and where conditions, and then extracts data based on that SQL statement. | No | None |
querySql | In some business scenarios, the where parameter is not sufficient to describe the filter condition. You can use this parameter to customize the filter SQL. If you configure this parameter, the queryserverAddress parameter is still required, but HBase20xsql Reader ignores the column, table, where, and splitKey conditions. It uses the content of this parameter to filter the data. | No | None |
HBase11xsql Writer script demo
{
"type": "job",
"version": "1.0",
"configuration": {
"setting": {
"errorLimit": {
"record": "0"
},
"speed": {
"throttle":true,// If throttle is set to false, the mbps parameter does not take effect and no rate limiting is applied. If throttle is set to true, rate limiting is applied.
"concurrent":1, // The number of concurrent jobs.
"mbps":"1"// The rate limit. 1 mbps = 1 MB/s.
}
},
"reader": {
"plugin": "odps",
"parameter": {
"datasource": "",
"table": "",
"column": [],
"partition": ""
}
},
"plugin": "hbase11xsql",
"parameter": {
"table": "The destination HBase table name, case-sensitive",
"hbaseConfig": {
"hbase.zookeeper.quorum": "The ZooKeeper server endpoint of the destination HBase cluster",
"zookeeper.znode.parent": "The znode of the destination HBase cluster"
},
"column": [
"columnName"
],
"batchSize": 256,
"nullMode": "skip"
}
}
}HBase11xsql Writer script parameters
Parameter | Description | Required | Default value |
plugin | The plugin name. Must be hbase11xsql. | Yes | None |
table | The name of the table to import data into. This parameter is case-sensitive. Phoenix table names are typically in uppercase. | Yes | None |
column | The column name. This parameter is case-sensitive. Phoenix column names are typically in uppercase. Note
| Yes | None |
hbaseConfig | The HBase cluster endpoint. The ZooKeeper endpoint is required. The format is ip1,ip2,ip3. Note
| Yes | None |
batchSize | The maximum number of rows for a batch write. | No | 256 |
nullMode | When the read column value is null, you can handle it in one of the following two ways:
| No | skip |