ApsaraDB for HBase コネクタを使用して、ストリーミングジョブで ApsaraDB for HBase をディメンションテーブルとして読み取るか、結果テーブルとして書き込みます。
サポートされるテーブルタイプ: ディメンションテーブル・結果テーブル 実行モード: ストリーミング API: SQL 結果テーブルでのデータの更新または削除: サポート sink メトリック: numBytesOut、numBytesOutPerSecond、numRecordsOut、numRecordsOutPerSecond、currentSendTime
sink メトリックの詳細については、「メトリック」をご参照ください。
注意事項
このコネクタを使用する前に、ご利用のデータベースインスタンスのタイプを確認してください:
-
このコネクタは ApsaraDB for HBase インスタンス専用です。他のインスタンスタイプで使用すると、予期しない問題が発生する可能性があります。
-
Lindorm インスタンスは Apache HBase と互換があります。Lindorm インスタンスには、Lindorm コネクタを使用してください。
-
このコネクタを使用して Realtime Compute for Apache Flink をオープンソースの HBase データベースに接続する場合、データの有効性は保証されません。
前提条件
開始する前に、以下が完了していることを確認してください:
-
ApsaraDB for HBase クラスターが購入され、ApsaraDB for HBase テーブルが作成されていること。詳細については、「クラスターの購入」をご参照ください。
-
ApsaraDB for HBase クラスターのホワイトリストが設定されていること。詳細については、「ホワイトリストの設定」をご参照ください。
DDL 構文
CREATE TABLE hbase_table (
rowkey INT,
family1 ROW<q1 INT>,
family2 ROW<q2 STRING, q3 BIGINT>,
family3 ROW<q4 DOUBLE, q5 BOOLEAN, q6 STRING>
) WITH (
'connector' = 'cloudhbase',
'table-name' = '<yourTableName>',
'zookeeper.quorum' = '<yourZookeeperQuorum>'
);
スキーマのルール:
-
各カラムファミリーを
ROW<column_name type, ...>として宣言します。フィールド名はカラムファミリー名にマッピングされます。 -
ROW内の各フィールドは、そのカラムファミリー内のカラムにマッピングされます。たとえば、q2とq3はfamily2のカラムです。 -
アトミック型のフィールド (
INTやSTRINGなど) を 1 つだけ含めます。このフィールドが rowkey になります。 -
rowkey は結果テーブルのプライマリキーである必要があります。プライマリキーが定義されていない場合、rowkey がプライマリキーとして機能します。
-
ジョブで実際に使用するカラムファミリーとカラムのみを宣言してください。
使用例
ディメンションテーブルの例
CREATE TEMPORARY TABLE datagen_source (
a INT,
b BIGINT,
c STRING,
`proc_time` AS PROCTIME()
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE hbase_dim (
rowkey INT,
family1 ROW<col1 INT>,
family2 ROW<col1 STRING, col2 BIGINT>,
family3 ROW<col1 DOUBLE, col2 BOOLEAN, col3 STRING>
) WITH (
'connector' = 'cloudhbase',
'table-name' = '<yourTableName>',
'zookeeper.quorum' = '<yourZookeeperQuorum>'
);
CREATE TEMPORARY TABLE blackhole_sink (
a INT,
f1c1 INT,
f3c3 STRING
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink
SELECT a, family1.col1 AS f1c1, family3.col3 AS f3c3
FROM datagen_source
JOIN hbase_dim FOR SYSTEM_TIME AS OF datagen_source.`proc_time` AS h
ON datagen_source.a = h.rowkey;
シンクテーブルの例
CREATE TEMPORARY TABLE datagen_source (
rowkey INT,
f1q1 INT,
f2q1 STRING,
f2q2 BIGINT,
f3q1 DOUBLE,
f3q2 BOOLEAN,
f3q3 STRING
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE hbase_sink (
rowkey INT,
family1 ROW<q1 INT>,
family2 ROW<q1 STRING, q2 BIGINT>,
family3 ROW<q1 DOUBLE, q2 BOOLEAN, q3 STRING>,
PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
'connector' = 'cloudhbase',
'table-name' = '<yourTableName>',
'zookeeper.quorum' = '<yourZookeeperQuorum>'
);
INSERT INTO hbase_sink
SELECT rowkey, ROW(f1q1), ROW(f2q1, f2q2), ROW(f3q1, f3q2, f3q3)
FROM datagen_source;
動的カラムの sink
dynamic.table が true に設定されている場合、各カラムファミリーは正確に 2 つのフィールドを宣言する必要があります。最初のフィールドは動的カラム名を表し、2 番目のフィールドはその値を表します。
CREATE TEMPORARY TABLE datagen_source (
id INT,
f1hour STRING,
f1deal BIGINT,
f2day STRING,
f2deal BIGINT
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE hbase_sink (
rowkey INT,
f1 ROW<`hour` STRING, deal BIGINT>,
f2 ROW<`day` STRING, deal BIGINT>
) WITH (
'connector' = 'cloudhbase',
'table-name' = '<yourTableName>',
'zookeeper.quorum' = '<yourZookeeperQuorum>',
'dynamic.table' = 'true'
);
INSERT INTO hbase_sink
SELECT id, ROW(f1hour, f1deal), ROW(f2day, f2deal)
FROM datagen_source;
たとえば、ソース行に id=1、f1hour='10'、f1deal=100、f2day='2020-7-26'、および f2deal=10000 がある場合、結果の HBase 行は rowkey=1、f1:10=100、および f2:2020-7-26=10000 となります。
コネクタオプション
一般オプション
| オプション | 型 | 必須 | デフォルト | 説明 |
|---|---|---|---|---|
connector |
String | はい | — | cloudhbase に設定します。 |
table-name |
String | はい | — | ApsaraDB for HBase テーブルの名前。 |
zookeeper.znode.quorum |
String | はい | — | ApsaraDB for HBase クラスターの ZooKeeper 接続文字列。 |
zookeeper.znode.parent |
String | いいえ | /hbase |
ZooKeeper 内の ApsaraDB for HBase のルートディレクトリ。Standard Edition にのみ適用されます。 |
userName |
String | いいえ | — | データベースアクセス用のユーザー名。Performance-enhanced Edition にのみ適用されます。 |
password |
String | いいえ | — | データベースアクセス用のパスワード。Performance-enhanced Edition にのみ適用されます。 |
haclient.cluster.id |
String | いいえ | — | 高可用性 (HA) モードのクラスター ID。ゾーンディザスタリカバリクラスターに必要です。Performance-enhanced Edition にのみ適用されます。 |
retires.number |
Integer | いいえ | 31 |
HBase クライアントの接続リトライ回数。 |
null-string-literal |
String | いいえ | null |
Flink の STRING フィールドが null の場合に HBase に書き込まれる値。 |
sink オプション
| オプション | 型 | 必須 | デフォルト | 説明 |
|---|---|---|---|---|
sink.buffer-flush.max-size |
String | いいえ | 2MB |
フラッシュ前にバッファーに格納するデータの最大サイズ。値を大きくすると書き込みスループットは向上しますが、レイテンシーとメモリ使用量が増加します。単位: B、KB、MB、または GB (大文字と小文字は区別されません)。バッファリングを無効にするには 0 に設定します。 |
sink.buffer-flush.max-rows |
Integer | いいえ | 1000 |
フラッシュ前にバッファーに格納するレコードの最大数。値を大きくすると書き込みスループットは向上しますが、レイテンシーとメモリ使用量が増加します。バッファリングを無効にするには 0 に設定します。 |
sink.buffer-flush.interval |
Duration | いいえ | 1s |
バッファーデータのフラッシュ間隔。書き込みレイテンシーを制御します。単位: ms、s、min、h、または d。定期的なフラッシュを無効にするには 0 に設定します。 |
sink.sync-write |
Boolean | いいえ | true |
書き込みモード。true: 同期 — データは順序通りに書き込まれますが、スループットは低くなります。false: 非同期 — スループットは高くなりますが、書き込み順序は保証されません。 |
sink.buffer-flush.batch-rows |
Integer | いいえ | 100 |
同期書き込みモードでバッチ処理するレコード数。値を大きくするとスループットは向上しますが、レイテンシーとメモリ使用量が増加します。sink.sync-write が true の場合にのみ有効です。 |
dynamic.table |
Boolean | いいえ | false |
動的カラムモードを有効にします。true に設定すると、各カラムファミリー行は正確に 2 つのフィールド (カラム名と値) を宣言する必要があります。 |
sink.ignore-delete |
Boolean | いいえ | false |
アップストリームからの DELETE および UPDATE_BEFORE イベントを無視するかどうかを指定します。複数の sink タスクが同じ行の異なるフィールドを同時に更新する場合に true に設定すると、同時削除と部分更新によるデータの不整合を防ぐことができます。true の場合、INSERT および UPDATE_AFTER レコードのみが処理されます。 |
sink.ignore-null |
Boolean | いいえ | false |
null フィールド値の書き込みをスキップするかどうかを指定します。true に設定すると、null-string-literal は効果がありません。Ververica Runtime (VVR) 8.0.9 以降が必要です。 |
ディメンションテーブルオプション
| オプション | 型 | 必須 | デフォルト | 説明 |
|---|---|---|---|---|
cache |
String | いいえ | ALL |
キャッシュポリシー。有効な値: None、LRU、ALL。以下のキャッシュポリシーの説明をご参照ください。 |
cacheSize |
Long | いいえ | 10000 |
キャッシュする行の最大数。cache が LRU に設定されている場合にのみ有効です。 |
cacheTTLMs |
Long | いいえ | — | キャッシュの有効期限の動作は cache の設定に依存します: LRU — キャッシュエントリのタイムアウト (ミリ秒単位、デフォルトでは有効期限なし)。ALL — キャッシュの再読み込み間隔 (ミリ秒単位、デフォルトでは再読み込みなし)。None — 該当なし。 |
cacheEmpty |
Boolean | いいえ | true |
空のルックアップ結果をキャッシュするかどうかを指定します。 |
cacheReloadTimeBlackList |
String | いいえ | — | キャッシュの再読み込みが一時停止される期間。cache が ALL に設定されている場合にのみ有効です。フォーマット: 2017-10-24 14:00 -> 2017-10-24 15:00, 2017-11-10 23:30 -> 2017-11-11 08:00。複数の期間はカンマで区切り、各期間の開始と終了は -> で区切ります。 |
cacheScanLimit |
Integer | いいえ | 100 |
完全なディメンションテーブルをロードする際に、リモートプロシージャコール (RPC) ごとに返される行数。cache が ALL に設定されている場合にのみ有効です。 |
キャッシュポリシー:
-
None — キャッシュなし。すべてのルックアップは ApsaraDB for HBase に直接クエリを実行します。
-
LRU — 行のサブセットをキャッシュします。キャッシュミスが発生した場合、システムは ApsaraDB for HBase に直接クエリを実行します。このポリシーを使用する場合は、
cacheSizeとcacheTTLMsを設定してください。 -
ALL (デフォルト) — ジョブが開始される前に、ディメンションテーブル全体をキャッシュにロードします。以降のすべてのルックアップはキャッシュにヒットします。キーがキャッシュに見つからない場合、そのキーはソーステーブルに存在しません。キャッシュは有効期限が切れると再読み込みされます。このポリシーを使用する場合は、
cacheTTLMsと、オプションでcacheReloadTimeBlackListを設定してください。cacheがALLに設定されている場合、ジョブが開始される前にシステムがすべてのデータを非同期でロードするため、結合を実行するノードには追加のメモリ (ディメンションテーブルのサイズの約 2 倍) が必要になります。これにより、ジョブの起動が遅くなる可能性があります。
データ型のマッピング
すべての Flink の値は、org.apache.hadoop.hbase.util.Bytes を使用して HBase のバイト配列との間でシリアル化および逆シリアル化されます。2 つの特殊なケースが適用されます:
-
空のバイト配列を読み取る非 STRING フィールドは、
nullとしてデコードされます。 -
null-string-literalに一致するバイト配列を読み取るSTRINGフィールドは、nullとしてデコードされます。
| Flink SQL 型 | HBase での変換 |
|---|---|
| CHAR / VARCHAR / STRING | toBytes(String s) / toString(byte[] b) を使用して、UTF-8 文字列のバイト配列として格納されます。 |
| BOOLEAN | toBytes(boolean b) / toBoolean(byte[] b) |
| BINARY / VARBINARY | 生の byte[] として格納されます。 |
| DECIMAL | toBytes(BigDecimal v) / toBigDecimal(byte[] b) |
| TINYINT | new byte[] { val } / bytes[0] |
| SMALLINT | toBytes(short val) / toShort(byte[] bytes) |
| INT | toBytes(int val) / toInt(byte[] bytes) |
| BIGINT | toBytes(long val) / toLong(byte[] bytes) |
| FLOAT | toBytes(float val) / toFloat(byte[] bytes) |
| DOUBLE | toBytes(double val) / toDouble(byte[] bytes) |
| DATE | 1970-01-01 からの日数を INT としてシリアル化して格納します。 |
| TIME | 00:00:00 からのミリ秒数を INT としてシリアル化して格納します。 |
| TIMESTAMP | 1970-01-01 00:00:00 からのミリ秒数を LONG としてシリアル化して格納します。 |