すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:ApsaraDB for HBase

最終更新日:Mar 27, 2026

ApsaraDB for HBase コネクタを使用して、ストリーミングジョブで ApsaraDB for HBase をディメンションテーブルとして読み取るか、結果テーブルとして書き込みます。

サポートされるテーブルタイプ: ディメンションテーブル・結果テーブル 実行モード: ストリーミング API: SQL 結果テーブルでのデータの更新または削除: サポート sink メトリック: numBytesOutnumBytesOutPerSecondnumRecordsOutnumRecordsOutPerSecondcurrentSendTime

sink メトリックの詳細については、「メトリック」をご参照ください。

注意事項

このコネクタを使用する前に、ご利用のデータベースインスタンスのタイプを確認してください:

  • このコネクタは ApsaraDB for HBase インスタンス専用です。他のインスタンスタイプで使用すると、予期しない問題が発生する可能性があります。

  • Lindorm インスタンスは Apache HBase と互換があります。Lindorm インスタンスには、Lindorm コネクタを使用してください。

  • このコネクタを使用して Realtime Compute for Apache Flink をオープンソースの HBase データベースに接続する場合、データの有効性は保証されません。

前提条件

開始する前に、以下が完了していることを確認してください:

  • ApsaraDB for HBase クラスターが購入され、ApsaraDB for HBase テーブルが作成されていること。詳細については、「クラスターの購入」をご参照ください。

  • ApsaraDB for HBase クラスターのホワイトリストが設定されていること。詳細については、「ホワイトリストの設定」をご参照ください。

DDL 構文

CREATE TABLE hbase_table (
  rowkey INT,
  family1 ROW<q1 INT>,
  family2 ROW<q2 STRING, q3 BIGINT>,
  family3 ROW<q4 DOUBLE, q5 BOOLEAN, q6 STRING>
) WITH (
  'connector' = 'cloudhbase',
  'table-name' = '<yourTableName>',
  'zookeeper.quorum' = '<yourZookeeperQuorum>'
);

スキーマのルール:

  • 各カラムファミリーを ROW<column_name type, ...> として宣言します。フィールド名はカラムファミリー名にマッピングされます。

  • ROW 内の各フィールドは、そのカラムファミリー内のカラムにマッピングされます。たとえば、q2q3family2 のカラムです。

  • アトミック型のフィールド (INTSTRING など) を 1 つだけ含めます。このフィールドが rowkey になります。

  • rowkey は結果テーブルのプライマリキーである必要があります。プライマリキーが定義されていない場合、rowkey がプライマリキーとして機能します。

  • ジョブで実際に使用するカラムファミリーとカラムのみを宣言してください。

使用例

ディメンションテーブルの例

CREATE TEMPORARY TABLE datagen_source (
  a INT,
  b BIGINT,
  c STRING,
  `proc_time` AS PROCTIME()
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE hbase_dim (
  rowkey INT,
  family1 ROW<col1 INT>,
  family2 ROW<col1 STRING, col2 BIGINT>,
  family3 ROW<col1 DOUBLE, col2 BOOLEAN, col3 STRING>
) WITH (
  'connector' = 'cloudhbase',
  'table-name' = '<yourTableName>',
  'zookeeper.quorum' = '<yourZookeeperQuorum>'
);

CREATE TEMPORARY TABLE blackhole_sink (
  a INT,
  f1c1 INT,
  f3c3 STRING
) WITH (
  'connector' = 'blackhole'
);

INSERT INTO blackhole_sink
SELECT a, family1.col1 AS f1c1, family3.col3 AS f3c3
FROM datagen_source
JOIN hbase_dim FOR SYSTEM_TIME AS OF datagen_source.`proc_time` AS h
ON datagen_source.a = h.rowkey;

シンクテーブルの例

CREATE TEMPORARY TABLE datagen_source (
  rowkey INT,
  f1q1 INT,
  f2q1 STRING,
  f2q2 BIGINT,
  f3q1 DOUBLE,
  f3q2 BOOLEAN,
  f3q3 STRING
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE hbase_sink (
  rowkey INT,
  family1 ROW<q1 INT>,
  family2 ROW<q1 STRING, q2 BIGINT>,
  family3 ROW<q1 DOUBLE, q2 BOOLEAN, q3 STRING>,
  PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
  'connector' = 'cloudhbase',
  'table-name' = '<yourTableName>',
  'zookeeper.quorum' = '<yourZookeeperQuorum>'
);

INSERT INTO hbase_sink
SELECT rowkey, ROW(f1q1), ROW(f2q1, f2q2), ROW(f3q1, f3q2, f3q3)
FROM datagen_source;

動的カラムの sink

dynamic.tabletrue に設定されている場合、各カラムファミリーは正確に 2 つのフィールドを宣言する必要があります。最初のフィールドは動的カラム名を表し、2 番目のフィールドはその値を表します。

CREATE TEMPORARY TABLE datagen_source (
  id INT,
  f1hour STRING,
  f1deal BIGINT,
  f2day STRING,
  f2deal BIGINT
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE hbase_sink (
  rowkey INT,
  f1 ROW<`hour` STRING, deal BIGINT>,
  f2 ROW<`day` STRING, deal BIGINT>
) WITH (
  'connector' = 'cloudhbase',
  'table-name' = '<yourTableName>',
  'zookeeper.quorum' = '<yourZookeeperQuorum>',
  'dynamic.table' = 'true'
);

INSERT INTO hbase_sink
SELECT id, ROW(f1hour, f1deal), ROW(f2day, f2deal)
FROM datagen_source;

たとえば、ソース行に id=1f1hour='10'f1deal=100f2day='2020-7-26'、および f2deal=10000 がある場合、結果の HBase 行は rowkey=1f1:10=100、および f2:2020-7-26=10000 となります。

コネクタオプション

一般オプション

オプション 必須 デフォルト 説明
connector String はい cloudhbase に設定します。
table-name String はい ApsaraDB for HBase テーブルの名前。
zookeeper.znode.quorum String はい ApsaraDB for HBase クラスターの ZooKeeper 接続文字列。
zookeeper.znode.parent String いいえ /hbase ZooKeeper 内の ApsaraDB for HBase のルートディレクトリ。Standard Edition にのみ適用されます。
userName String いいえ データベースアクセス用のユーザー名。Performance-enhanced Edition にのみ適用されます。
password String いいえ データベースアクセス用のパスワード。Performance-enhanced Edition にのみ適用されます。
haclient.cluster.id String いいえ 高可用性 (HA) モードのクラスター ID。ゾーンディザスタリカバリクラスターに必要です。Performance-enhanced Edition にのみ適用されます。
retires.number Integer いいえ 31 HBase クライアントの接続リトライ回数。
null-string-literal String いいえ null Flink の STRING フィールドが null の場合に HBase に書き込まれる値。

sink オプション

オプション 必須 デフォルト 説明
sink.buffer-flush.max-size String いいえ 2MB フラッシュ前にバッファーに格納するデータの最大サイズ。値を大きくすると書き込みスループットは向上しますが、レイテンシーとメモリ使用量が増加します。単位: B、KB、MB、または GB (大文字と小文字は区別されません)。バッファリングを無効にするには 0 に設定します。
sink.buffer-flush.max-rows Integer いいえ 1000 フラッシュ前にバッファーに格納するレコードの最大数。値を大きくすると書き込みスループットは向上しますが、レイテンシーとメモリ使用量が増加します。バッファリングを無効にするには 0 に設定します。
sink.buffer-flush.interval Duration いいえ 1s バッファーデータのフラッシュ間隔。書き込みレイテンシーを制御します。単位: ms、s、min、h、または d。定期的なフラッシュを無効にするには 0 に設定します。
sink.sync-write Boolean いいえ true 書き込みモード。true: 同期 — データは順序通りに書き込まれますが、スループットは低くなります。false: 非同期 — スループットは高くなりますが、書き込み順序は保証されません。
sink.buffer-flush.batch-rows Integer いいえ 100 同期書き込みモードでバッチ処理するレコード数。値を大きくするとスループットは向上しますが、レイテンシーとメモリ使用量が増加します。sink.sync-writetrue の場合にのみ有効です。
dynamic.table Boolean いいえ false 動的カラムモードを有効にします。true に設定すると、各カラムファミリー行は正確に 2 つのフィールド (カラム名と値) を宣言する必要があります。
sink.ignore-delete Boolean いいえ false アップストリームからの DELETE および UPDATE_BEFORE イベントを無視するかどうかを指定します。複数の sink タスクが同じ行の異なるフィールドを同時に更新する場合に true に設定すると、同時削除と部分更新によるデータの不整合を防ぐことができます。true の場合、INSERT および UPDATE_AFTER レコードのみが処理されます。
sink.ignore-null Boolean いいえ false null フィールド値の書き込みをスキップするかどうかを指定します。true に設定すると、null-string-literal は効果がありません。Ververica Runtime (VVR) 8.0.9 以降が必要です。

ディメンションテーブルオプション

オプション 必須 デフォルト 説明
cache String いいえ ALL キャッシュポリシー。有効な値: NoneLRUALL。以下のキャッシュポリシーの説明をご参照ください。
cacheSize Long いいえ 10000 キャッシュする行の最大数。cacheLRU に設定されている場合にのみ有効です。
cacheTTLMs Long いいえ キャッシュの有効期限の動作は cache の設定に依存します: LRU — キャッシュエントリのタイムアウト (ミリ秒単位、デフォルトでは有効期限なし)。ALL — キャッシュの再読み込み間隔 (ミリ秒単位、デフォルトでは再読み込みなし)。None — 該当なし。
cacheEmpty Boolean いいえ true 空のルックアップ結果をキャッシュするかどうかを指定します。
cacheReloadTimeBlackList String いいえ キャッシュの再読み込みが一時停止される期間。cacheALL に設定されている場合にのみ有効です。フォーマット: 2017-10-24 14:00 -> 2017-10-24 15:00, 2017-11-10 23:30 -> 2017-11-11 08:00。複数の期間はカンマで区切り、各期間の開始と終了は -> で区切ります。
cacheScanLimit Integer いいえ 100 完全なディメンションテーブルをロードする際に、リモートプロシージャコール (RPC) ごとに返される行数。cacheALL に設定されている場合にのみ有効です。

キャッシュポリシー:

  • None — キャッシュなし。すべてのルックアップは ApsaraDB for HBase に直接クエリを実行します。

  • LRU — 行のサブセットをキャッシュします。キャッシュミスが発生した場合、システムは ApsaraDB for HBase に直接クエリを実行します。このポリシーを使用する場合は、cacheSizecacheTTLMs を設定してください。

  • ALL (デフォルト) — ジョブが開始される前に、ディメンションテーブル全体をキャッシュにロードします。以降のすべてのルックアップはキャッシュにヒットします。キーがキャッシュに見つからない場合、そのキーはソーステーブルに存在しません。キャッシュは有効期限が切れると再読み込みされます。このポリシーを使用する場合は、cacheTTLMs と、オプションで cacheReloadTimeBlackList を設定してください。

    cacheALL に設定されている場合、ジョブが開始される前にシステムがすべてのデータを非同期でロードするため、結合を実行するノードには追加のメモリ (ディメンションテーブルのサイズの約 2 倍) が必要になります。これにより、ジョブの起動が遅くなる可能性があります。

データ型のマッピング

すべての Flink の値は、org.apache.hadoop.hbase.util.Bytes を使用して HBase のバイト配列との間でシリアル化および逆シリアル化されます。2 つの特殊なケースが適用されます:

  • 空のバイト配列を読み取る非 STRING フィールドは、null としてデコードされます。

  • null-string-literal に一致するバイト配列を読み取る STRING フィールドは、null としてデコードされます。

Flink SQL 型 HBase での変換
CHAR / VARCHAR / STRING toBytes(String s) / toString(byte[] b) を使用して、UTF-8 文字列のバイト配列として格納されます。
BOOLEAN toBytes(boolean b) / toBoolean(byte[] b)
BINARY / VARBINARY 生の byte[] として格納されます。
DECIMAL toBytes(BigDecimal v) / toBigDecimal(byte[] b)
TINYINT new byte[] { val } / bytes[0]
SMALLINT toBytes(short val) / toShort(byte[] bytes)
INT toBytes(int val) / toInt(byte[] bytes)
BIGINT toBytes(long val) / toLong(byte[] bytes)
FLOAT toBytes(float val) / toFloat(byte[] bytes)
DOUBLE toBytes(double val) / toDouble(byte[] bytes)
DATE 1970-01-01 からの日数を INT としてシリアル化して格納します。
TIME 00:00:00 からのミリ秒数を INT としてシリアル化して格納します。
TIMESTAMP 1970-01-01 00:00:00 からのミリ秒数を LONG としてシリアル化して格納します。