このトピックでは、ApsaraDB for HBase コネクタの使用方法について説明します。
背景情報
ApsaraDB for HBase は、コスト効率が高く、スケーラブルでインテリジェントなビッグデータ NoSQL サービスです。標準の HBase アクセスプロトコルと互換性があり、低コストのストレージ、スケーラブルなスループット、インテリジェントなデータ処理などの主要な利点を提供します。ApsaraDB for HBase は、Taobao のレコメンデーション、Ant Group のリスク管理、広告配信、モニタリングダッシュボード、Cainiao の物流追跡、Alipay の請求書、Taobao Mobile のメッセージなど、多くの Alibaba のコアサービスをサポートしています。ペタバイト規模のストレージ、高い同時実行性、数秒でのスケーリング、ミリ秒レベルの応答、データセンター間の高可用性、グローバル配信など、エンタープライズレベルの機能を提供するフルマネージドデータベースです。
次の表に、ApsaraDB for HBase コネクタがサポートする機能を示します。
カテゴリ | 詳細 |
サポートされるテーブルタイプ | ディメンションテーブルと結果テーブル |
実行モード | ストリーミングモード |
データフォーマット | サポートされていません |
特定のモニタリングメトリック | |
API タイプ | SQL |
結果テーブル内のデータの更新または削除 | はい。 |
前提条件
ApsaraDB for HBase クラスターを購入し、テーブルを作成済みであること。 詳細については、「クラスターの購入」をご参照ください。
ホワイトリストを設定済みであること。 詳細については、「ホワイトリストの設定」をご参照ください。
使用上の注意
コネクタを使用する前に、データベースインスタンスのタイプを確認し、正しいコネクタを選択してください。間違ったコネクタを使用すると、予期しない問題が発生する可能性があります。
ApsaraDB for HBase インスタンスの場合は、このトピックで説明する ApsaraDB for HBase コネクタを使用してください。
Lindorm インスタンスは HBase モードと互換性があります。Lindorm インスタンスには Lindorm コネクタを使用してください。詳細については、「Lindorm」をご参照ください。
オープンソースの HBase データベースに接続する場合、データの正確性は保証されません。
構文
CREATE TABLE hbase_table(
rowkey INT,
family1 ROW<q1 INT>,
family2 ROW<q2 STRING, q3 BIGINT>,
family3 ROW<q4 DOUBLE, q5 BOOLEAN, q6 STRING>
) WITH (
'connector'='cloudhbase',
'table-name'='<yourTableName>',
'zookeeper.quorum'='<yourZookeeperQuorum>'
);HBase テーブルのカラムファミリーは ROW 型として宣言する必要があります。カラムファミリーの名前は ROW のフィールド名です。たとえば、DDL 定義では、family1、family2、family3 の 3 つのカラムファミリーが宣言されています。
HBase カラムファミリーの各カラムは、対応する ROW 型のネストされたフィールドに対応します。カラム名はフィールド名として使用されます。たとえば、DDL 定義では、family2 カラムファミリーは q2 と q3 の 2 つのカラムを宣言しています。
ROW 型のフィールドに加えて、STRING や BIGINT などのアトミック型のフィールドは 1 つだけ許可されます。このフィールドは、HBase テーブルの行キーとして扱われます。DDL 定義の Rowkey フィールドがその例です。
HBase の行キーは、結果テーブルのプライマリキーとして定義する必要があります。プライマリキーを明示的に定義しない場合、行キーがデフォルトでプライマリキーとして使用されます。
結果テーブルには、HBase テーブルの必要なカラムファミリーとカラムのみを宣言する必要があります。
WITH パラメーター
一般パラメーター
パラメーター
説明
データ型
必須
デフォルト値
備考
connector
テーブルタイプ。
String
はい
なし
静的な値
cloudhbaseに設定します。table-name
HBase テーブルの名前。
String
はい
なし
なし。
zookeeper.znode.quorum
HBase の ZooKeeper アドレス。
String
はい
なし
なし。
zookeeper.znode.parent
ZooKeeper 内の HBase のルートディレクトリ。
String
いいえ
/hbaseこのパラメーターは ApsaraDB for HBase Standard Edition でのみ有効です。
userName
ユーザー名。
String
いいえ
なし
このパラメーターは ApsaraDB for HBase Performance-enhanced Edition でのみ有効です。
password
パスワード。
String
いいえ
なし
このパラメーターは ApsaraDB for HBase Performance-enhanced Edition でのみ有効です。
haclient.cluster.id
高可用性 (HA) HBase インスタンスの ID。
String
いいえ
なし
このパラメーターは、同じ都市のプライマリ/セカンダリインスタンスにアクセスする場合にのみ必要です。このパラメーターは ApsaraDB for HBase Performance-enhanced Edition でのみ有効です。
retires.number
HBase クライアントのリトライ回数。
Integer
いいえ
31
なし。
null-string-literal
HBase のフィールドタイプが文字列の場合、Flink のフィールドデータが null であれば、フィールドには
null-string-literalの値が割り当てられ、HBase に書き込まれます。String
いいえ
null
なし。
シンク固有のパラメーター
パラメーター
説明
データ型
必須
デフォルト値
備考
sink.buffer-flush.max-size
HBase に書き込む前にメモリにキャッシュするデータのサイズ (バイト単位)。値を大きくすると HBase の書き込みパフォーマンスが向上しますが、書き込みレイテンシーとメモリ使用量が増加します。
String
いいえ
2 MB
バイト単位 B、KB、MB、GB をサポートします。単位は大文字と小文字を区別しません。0 に設定するとキャッシュが無効になります。
sink.buffer-flush.max-rows
HBase に書き込む前にメモリにキャッシュするデータレコードの数。値を大きくすると HBase の書き込みパフォーマンスが向上しますが、書き込みレイテンシーとメモリ使用量が増加します。
Integer
いいえ
1000
0 に設定するとキャッシュが無効になります。
sink.buffer-flush.interval
キャッシュされたデータが定期的に HBase に書き込まれる間隔。このパラメーターは、HBase への書き込みレイテンシーを制御します。
Duration
いいえ
1s
時間単位 ms、s、min、h、d をサポートします。0 に設定すると定期的な書き込みが無効になります。
dynamic.table
動的カラムをサポートする HBase テーブルを使用するかどうかを指定します。
Boolean
いいえ
false
有効な値:
true: 動的カラムをサポートする HBase テーブルを使用します。
false: 動的カラムをサポートする HBase テーブルを使用しません。
sink.ignore-delete
リトラクションメッセージを無視するかどうかを指定します。
Boolean
いいえ
false
Flink SQL によって生成されたストリームに delete または pre-update レコードが含まれ、複数のシンクタスクが同じテーブルの異なるフィールドを同時に更新すると、データ不整合が発生する可能性があります。
たとえば、レコードが削除された後、別のタスクが一部のフィールドのみを更新する場合があります。更新されなかったフィールドは null またはデフォルト値になり、データエラーが発生します。
この問題を回避するには、`sink.ignore-delete` を `true` に設定して、アップストリームの DELETE および UPDATE_BEFORE 操作を無視します。
説明UPDATE_BEFORE は Flink のリトラクションメカニズムの一部であり、更新操作中に古い値をリトラクトするために使用されます。
`ignoreDelete` が `true` に設定されている場合、DELETE および UPDATE_BEFORE タイプのすべてのレコードはスキップされます。INSERT および UPDATE_AFTER レコードのみが処理されます。
sink.sync-write
HBase にデータを同期的に書き込むかどうかを指定します。
Boolean
いいえ
true
有効な値:
true: 同期書き込み。順序は保証されますが、パフォーマンスがいくらか犠牲になります。
false: 非同期書き込み。順序は保証されませんが、パフォーマンスは向上します。
sink.buffer-flush.batch-rows
HBase への同期書き込み中にメモリにキャッシュするデータレコードの数。値を大きくすると HBase の書き込みパフォーマンスが向上しますが、書き込みレイテンシーとメモリ使用量が増加します。
Integer
いいえ
100
このパラメーターは、sink.sync-write が true に設定されている場合にのみ有効です。
sink.ignore-null
null 値の書き込みを無視するかどうかを指定します。
Boolean
いいえ
false
説明このパラメーターが true に設定されている場合、
null-string-literalパラメーターは有効ではなくなります。このパラメーターは、リアルタイムコンピューティングエンジン VVR 8.0.9 以降のバージョンでのみサポートされます。
ディメンション固有のパラメーター (キャッシュパラメーターなど)
パラメーター
説明
データ型
必須
デフォルト値
備考
cache
キャッシュポリシー。
String
いいえ
ALL
ApsaraDB for HBase ディメンションテーブルは現在、次の 3 つのキャッシュポリシーをサポートしています。
None: キャッシュなし。
LRU: ディメンションテーブルのデータの一部をキャッシュします。ソーステーブルの各レコードについて、システムはまずキャッシュ内のデータを検索します。見つからない場合は、物理ディメンションテーブルを検索します。
説明関連するパラメーター、キャッシュサイズ (cacheSize) とキャッシュ更新間隔 (cacheTTLMs) を設定する必要があります。
ALL (デフォルト): ディメンションテーブルのすべてのデータをキャッシュします。ジョブが実行される前に、システムはディメンションテーブルのすべてのデータをキャッシュにロードします。ディメンションテーブルでの後続のすべてのルックアップは、キャッシュを介して実行されます。キャッシュにデータが見つからない場合、そのキーは存在しません。フルキャッシュは、キャッシュの有効期限が切れた後に再読み込みされます。
説明このポリシーは、リモートテーブルのデータ量が少なく、欠落しているキーが多い (JOIN 中に ON 条件がソーステーブルデータとディメンションテーブルを関連付けられない) シナリオに適しています。関連するパラメーター、キャッシュ更新間隔 cacheTTLMs とキャッシュ更新禁止時間 cacheReloadTimeBlackList を設定する必要があります。
ディメンションテーブルのすべてのデータをキャッシュにロードすると、ジョブの起動が遅くなる可能性があります。必要に応じてキャッシュポリシーを設定できます。
システムはディメンションテーブルのデータを非同期にロードするため、CACHE ALL を使用する場合は、ディメンションテーブルの JOIN ノードのメモリを増やす必要があります。増加させるメモリサイズは、リモートテーブルのデータサイズの 2 倍にする必要があります。
cacheSize
キャッシュサイズ。
Long
いいえ
10000
キャッシュポリシーが LRU に設定されている場合にキャッシュサイズを設定できます。
cacheTTLMs
キャッシュの有効期限 (ミリ秒単位)。
Long
いいえ
なし
cacheTTLMs の設定は、[cache] パラメーターに依存します。
`cache` が None に設定されている場合、cacheTTLMs を設定する必要はありません。これは、キャッシュがタイムアウトしないことを意味します。
`cache` が LRU に設定されている場合、cacheTTLMs はキャッシュのタイムアウト期間です。デフォルトでは、キャッシュは有効期限切れになりません。
`cache` が ALL に設定されている場合、cacheTTLMs はキャッシュの再読み込み時間です。デフォルトでは、キャッシュは再読み込みされません。
cacheEmpty
空の結果をキャッシュするかどうかを指定します。
Boolean
いいえ
true
なし。
cacheReloadTimeBlackList
キャッシュ更新禁止時間。キャッシュポリシーが ALL に設定されている場合、キャッシュ更新禁止時間を有効にして、この期間 (たとえば、独身の日のショッピングフェスティバル中) のキャッシュ更新を防ぐことができます。
String
いいえ
なし
フォーマットは 2017-10-24 14:00 -> 2017-10-24 15:00,2017-11-10 23:30 -> 2017-11-11 08:00 です。次のように区切り文字を使用します。
複数のブラックリスト期間を区切るには、カンマ (,) を使用します。
ブラックリスト期間の開始時刻と終了時刻を区切るには、矢印 (->) を使用します。
cacheScanLimit
HBase の全データを読み取る際に、リモートプロシージャコール (RPC) サーバーが一度にクライアントに返す行数。
Integer
いいえ
100
このパラメーターは、キャッシュポリシーが ALL に設定されている場合にのみ有効です。
型マッピング
Flink のデータ型は、org.apache.hadoop.hbase.util.Bytes を使用して HBase のバイト配列に変換されます。デコードプロセスは次のように機能します。
Flink の非文字列型の場合、HBase の値が空のバイト配列であれば、null としてデコードされます。
Flink の文字列型の場合、HBase の値が
null-string-literalバイト配列であれば、null としてデコードされます。
Flink SQL 型 | ApsaraDB for HBase にバイトを書き込む際の変換関数 | ApsaraDB for HBase からバイトを読み取る際の変換関数 |
CHAR | byte[] toBytes(String s) | String toString(byte[] b) |
VARCHAR | ||
STRING | ||
BOOLEAN | byte[] toBytes(boolean b) | boolean toBoolean(byte[] b) |
BINARY | byte[] | byte[] |
VARBINARY | ||
DECIMAL | byte[] toBytes(BigDecimal v) | BigDecimal toBigDecimal(byte[] b) |
TINYINT | new byte[] { val } | bytes[0] |
SMALLINT | byte[] toBytes(short val) | short toShort(byte[] bytes) |
INT | byte[] toBytes(int val) | int toInt(byte[] bytes) |
BIGINT | byte[] toBytes(long val) | long toLong(byte[] bytes) |
FLOAT | byte[] toBytes(float val) | float toFloat(byte[] bytes) |
DOUBLE | byte[] toBytes(double val) | double toDouble(byte[] bytes) |
DATE | 日付を 1970-01-01 からの日数 (int で表現) に変換し、 | HBase バイト配列は |
TIME | 時刻を 00:00:00 からのミリ秒数 (int で表現) に変換し、 | HBase バイト配列は |
TIMESTAMP | タイムスタンプを 1970-01-01 00:00:00 からのミリ秒数 (long で表現) に変換し、 | HBase バイト配列は |
コード例
ディメンションテーブルの例
CREATE TEMPORARY TABLE datagen_source ( a INT, b BIGINT, c STRING, `proc_time` AS PROCTIME() ) WITH ( 'connector'='datagen' ); CREATE TEMPORARY TABLE hbase_dim ( rowkey INT, family1 ROW<col1 INT>, family2 ROW<col1 STRING, col2 BIGINT>, family3 ROW<col1 DOUBLE, col2 BOOLEAN, col3 STRING> ) WITH ( 'connector' = 'cloudhbase', 'table-name' = '<yourTableName>', 'zookeeper.quorum' = '<yourZookeeperQuorum>' ); CREATE TEMPORARY TABLE blackhole_sink( a INT, f1c1 INT, f3c3 STRING ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT a, family1.col1 as f1c1, family3.col3 as f3c3 FROM datagen_source JOIN hbase_dim FOR SYSTEM_TIME AS OF datagen_source.`proc_time` as h ON datagen_source.a = h.rowkey;結果テーブルの例
CREATE TEMPORARY TABLE datagen_source ( rowkey INT, f1q1 INT, f2q1 STRING, f2q2 BIGINT, f3q1 DOUBLE, f3q2 BOOLEAN, f3q3 STRING ) WITH ( 'connector'='datagen' ); CREATE TEMPORARY TABLE hbase_sink ( rowkey INT, family1 ROW<q1 INT>, family2 ROW<q1 STRING, q2 BIGINT>, family3 ROW<q1 DOUBLE, q2 BOOLEAN, q3 STRING>, PRIMARY KEY (rowkey) NOT ENFORCED ) WITH ( 'connector'='cloudhbase', 'table-name'='<yourTableName>', 'zookeeper.quorum'='<yourZookeeperQuorum>' ); INSERT INTO hbase_sink SELECT rowkey, ROW(f1q1), ROW(f2q1, f2q2), ROW(f3q1, f3q2, f3q3) FROM datagen_source;動的な結果テーブルの例
CREATE TEMPORARY TABLE datagen_source ( id INT, f1hour STRING, f1deal BIGINT, f2day STRING, f2deal BIGINT ) WITH ( 'connector'='datagen' ); CREATE TEMPORARY TABLE hbase_sink ( rowkey INT, f1 ROW<`hour` STRING, deal BIGINT>, f2 ROW<`day` STRING, deal BIGINT> ) WITH ( 'connector'='cloudhbase', 'table-name'='<yourTableName>', 'zookeeper.quorum'='<yourZookeeperQuorum>', 'dynamic.table'='true' ); INSERT INTO hbase_sink SELECT id, ROW(f1hour, f1deal), ROW(f2day, f2deal) FROM datagen_source;dynamic.table パラメーターが true に設定されている場合、コネクタは動的カラムをサポートする HBase テーブルを使用します。
各カラムファミリーに対応する ROW 型には 2 つのフィールドを宣言する必要があります。最初のフィールドは動的カラム名を表し、2 番目のフィールドはその動的カラムの値を表します。
たとえば、datagen_source table に ID が 1 のプロダクトのレコードが含まれているとします。10:00 から 11:00 までの取引額は 100 で、2020 年 7 月 26 日の合計取引額は 10,000 です。その後、行キーが 1 の行が HBase テーブルに挿入され、f1:10 の値は 100、f2:2020-7-26 の値は 10,000 になります。