すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:ApsaraDB for HBase コネクタ

最終更新日:Nov 09, 2025

このトピックでは、ApsaraDB for HBase コネクタの使用方法について説明します。

背景情報

ApsaraDB for HBase は、コスト効率が高く、スケーラブルでインテリジェントなビッグデータ NoSQL サービスです。標準の HBase アクセスプロトコルと互換性があり、低コストのストレージ、スケーラブルなスループット、インテリジェントなデータ処理などの主要な利点を提供します。ApsaraDB for HBase は、Taobao のレコメンデーション、Ant Group のリスク管理、広告配信、モニタリングダッシュボード、Cainiao の物流追跡、Alipay の請求書、Taobao Mobile のメッセージなど、多くの Alibaba のコアサービスをサポートしています。ペタバイト規模のストレージ、高い同時実行性、数秒でのスケーリング、ミリ秒レベルの応答、データセンター間の高可用性、グローバル配信など、エンタープライズレベルの機能を提供するフルマネージドデータベースです。

次の表に、ApsaraDB for HBase コネクタがサポートする機能を示します。

カテゴリ

詳細

サポートされるテーブルタイプ

ディメンションテーブルと結果テーブル

実行モード

ストリーミングモード

データフォーマット

サポートされていません

特定のモニタリングメトリック

モニタリングメトリック

  • ソーステーブル

    サポートされるメトリック: なし。

  • ディメンションテーブル

    サポートされるメトリック: なし。

  • シンクテーブル

    サポートされるメトリック: numBytesOut、numBytesOutPerSecond、numRecordsOut、numRecordsOutPerSecond、currentSendTime。

    説明

    メトリックの詳細については、「メトリック」をご参照ください。

API タイプ

SQL

結果テーブル内のデータの更新または削除

はい。

前提条件

  • ApsaraDB for HBase クラスターを購入し、テーブルを作成済みであること。 詳細については、「クラスターの購入」をご参照ください。

  • ホワイトリストを設定済みであること。 詳細については、「ホワイトリストの設定」をご参照ください。

使用上の注意

コネクタを使用する前に、データベースインスタンスのタイプを確認し、正しいコネクタを選択してください。間違ったコネクタを使用すると、予期しない問題が発生する可能性があります。

  • ApsaraDB for HBase インスタンスの場合は、このトピックで説明する ApsaraDB for HBase コネクタを使用してください。

  • Lindorm インスタンスは HBase モードと互換性があります。Lindorm インスタンスには Lindorm コネクタを使用してください。詳細については、「Lindorm」をご参照ください。

  • オープンソースの HBase データベースに接続する場合、データの正確性は保証されません。

構文

CREATE TABLE hbase_table(
  rowkey INT,
  family1 ROW<q1 INT>,
  family2 ROW<q2 STRING, q3 BIGINT>,
  family3 ROW<q4 DOUBLE, q5 BOOLEAN, q6 STRING>
) WITH (
  'connector'='cloudhbase',
  'table-name'='<yourTableName>',
  'zookeeper.quorum'='<yourZookeeperQuorum>'
);
  • HBase テーブルのカラムファミリーは ROW 型として宣言する必要があります。カラムファミリーの名前は ROW のフィールド名です。たとえば、DDL 定義では、family1、family2、family3 の 3 つのカラムファミリーが宣言されています。

  • HBase カラムファミリーの各カラムは、対応する ROW 型のネストされたフィールドに対応します。カラム名はフィールド名として使用されます。たとえば、DDL 定義では、family2 カラムファミリーは q2 と q3 の 2 つのカラムを宣言しています。

  • ROW 型のフィールドに加えて、STRING や BIGINT などのアトミック型のフィールドは 1 つだけ許可されます。このフィールドは、HBase テーブルの行キーとして扱われます。DDL 定義の Rowkey フィールドがその例です。

  • HBase の行キーは、結果テーブルのプライマリキーとして定義する必要があります。プライマリキーを明示的に定義しない場合、行キーがデフォルトでプライマリキーとして使用されます。

  • 結果テーブルには、HBase テーブルの必要なカラムファミリーとカラムのみを宣言する必要があります。

WITH パラメーター

  • 一般パラメーター

    パラメーター

    説明

    データ型

    必須

    デフォルト値

    備考

    connector

    テーブルタイプ。

    String

    はい

    なし

    静的な値 cloudhbase に設定します。

    table-name

    HBase テーブルの名前。

    String

    はい

    なし

    なし。

    zookeeper.znode.quorum

    HBase の ZooKeeper アドレス。

    String

    はい

    なし

    なし。

    zookeeper.znode.parent

    ZooKeeper 内の HBase のルートディレクトリ。

    String

    いいえ

    /hbase

    このパラメーターは ApsaraDB for HBase Standard Edition でのみ有効です。

    userName

    ユーザー名。

    String

    いいえ

    なし

    このパラメーターは ApsaraDB for HBase Performance-enhanced Edition でのみ有効です。

    password

    パスワード。

    String

    いいえ

    なし

    このパラメーターは ApsaraDB for HBase Performance-enhanced Edition でのみ有効です。

    haclient.cluster.id

    高可用性 (HA) HBase インスタンスの ID。

    String

    いいえ

    なし

    このパラメーターは、同じ都市のプライマリ/セカンダリインスタンスにアクセスする場合にのみ必要です。このパラメーターは ApsaraDB for HBase Performance-enhanced Edition でのみ有効です。

    retires.number

    HBase クライアントのリトライ回数。

    Integer

    いいえ

    31

    なし。

    null-string-literal

    HBase のフィールドタイプが文字列の場合、Flink のフィールドデータが null であれば、フィールドには null-string-literal の値が割り当てられ、HBase に書き込まれます。

    String

    いいえ

    null

    なし。

  • シンク固有のパラメーター

    パラメーター

    説明

    データ型

    必須

    デフォルト値

    備考

    sink.buffer-flush.max-size

    HBase に書き込む前にメモリにキャッシュするデータのサイズ (バイト単位)。値を大きくすると HBase の書き込みパフォーマンスが向上しますが、書き込みレイテンシーとメモリ使用量が増加します。

    String

    いいえ

    2 MB

    バイト単位 B、KB、MB、GB をサポートします。単位は大文字と小文字を区別しません。0 に設定するとキャッシュが無効になります。

    sink.buffer-flush.max-rows

    HBase に書き込む前にメモリにキャッシュするデータレコードの数。値を大きくすると HBase の書き込みパフォーマンスが向上しますが、書き込みレイテンシーとメモリ使用量が増加します。

    Integer

    いいえ

    1000

    0 に設定するとキャッシュが無効になります。

    sink.buffer-flush.interval

    キャッシュされたデータが定期的に HBase に書き込まれる間隔。このパラメーターは、HBase への書き込みレイテンシーを制御します。

    Duration

    いいえ

    1s

    時間単位 ms、s、min、h、d をサポートします。0 に設定すると定期的な書き込みが無効になります。

    dynamic.table

    動的カラムをサポートする HBase テーブルを使用するかどうかを指定します。

    Boolean

    いいえ

    false

    有効な値:

    • true: 動的カラムをサポートする HBase テーブルを使用します。

    • false: 動的カラムをサポートする HBase テーブルを使用しません。

    sink.ignore-delete

    リトラクションメッセージを無視するかどうかを指定します。

    Boolean

    いいえ

    false

    Flink SQL によって生成されたストリームに delete または pre-update レコードが含まれ、複数のシンクタスクが同じテーブルの異なるフィールドを同時に更新すると、データ不整合が発生する可能性があります。

    たとえば、レコードが削除された後、別のタスクが一部のフィールドのみを更新する場合があります。更新されなかったフィールドは null またはデフォルト値になり、データエラーが発生します。

    この問題を回避するには、`sink.ignore-delete` を `true` に設定して、アップストリームの DELETE および UPDATE_BEFORE 操作を無視します。

    説明
    • UPDATE_BEFORE は Flink のリトラクションメカニズムの一部であり、更新操作中に古い値をリトラクトするために使用されます。

    • `ignoreDelete` が `true` に設定されている場合、DELETE および UPDATE_BEFORE タイプのすべてのレコードはスキップされます。INSERT および UPDATE_AFTER レコードのみが処理されます。

    sink.sync-write

    HBase にデータを同期的に書き込むかどうかを指定します。

    Boolean

    いいえ

    true

    有効な値:

    • true: 同期書き込み。順序は保証されますが、パフォーマンスがいくらか犠牲になります。

    • false: 非同期書き込み。順序は保証されませんが、パフォーマンスは向上します。

    sink.buffer-flush.batch-rows

    HBase への同期書き込み中にメモリにキャッシュするデータレコードの数。値を大きくすると HBase の書き込みパフォーマンスが向上しますが、書き込みレイテンシーとメモリ使用量が増加します。

    Integer

    いいえ

    100

    このパラメーターは、sink.sync-write が true に設定されている場合にのみ有効です。

    sink.ignore-null

    null 値の書き込みを無視するかどうかを指定します。

    Boolean

    いいえ

    false

    説明
    • このパラメーターが true に設定されている場合、null-string-literal パラメーターは有効ではなくなります。

    • このパラメーターは、リアルタイムコンピューティングエンジン VVR 8.0.9 以降のバージョンでのみサポートされます。

  • ディメンション固有のパラメーター (キャッシュパラメーターなど)

    パラメーター

    説明

    データ型

    必須

    デフォルト値

    備考

    cache

    キャッシュポリシー。

    String

    いいえ

    ALL

    ApsaraDB for HBase ディメンションテーブルは現在、次の 3 つのキャッシュポリシーをサポートしています。

    • None: キャッシュなし。

    • LRU: ディメンションテーブルのデータの一部をキャッシュします。ソーステーブルの各レコードについて、システムはまずキャッシュ内のデータを検索します。見つからない場合は、物理ディメンションテーブルを検索します。

      説明

      関連するパラメーター、キャッシュサイズ (cacheSize) とキャッシュ更新間隔 (cacheTTLMs) を設定する必要があります。

    • ALL (デフォルト): ディメンションテーブルのすべてのデータをキャッシュします。ジョブが実行される前に、システムはディメンションテーブルのすべてのデータをキャッシュにロードします。ディメンションテーブルでの後続のすべてのルックアップは、キャッシュを介して実行されます。キャッシュにデータが見つからない場合、そのキーは存在しません。フルキャッシュは、キャッシュの有効期限が切れた後に再読み込みされます。

      説明
      • このポリシーは、リモートテーブルのデータ量が少なく、欠落しているキーが多い (JOIN 中に ON 条件がソーステーブルデータとディメンションテーブルを関連付けられない) シナリオに適しています。関連するパラメーター、キャッシュ更新間隔 cacheTTLMs とキャッシュ更新禁止時間 cacheReloadTimeBlackList を設定する必要があります。

      • ディメンションテーブルのすべてのデータをキャッシュにロードすると、ジョブの起動が遅くなる可能性があります。必要に応じてキャッシュポリシーを設定できます。

    システムはディメンションテーブルのデータを非同期にロードするため、CACHE ALL を使用する場合は、ディメンションテーブルの JOIN ノードのメモリを増やす必要があります。増加させるメモリサイズは、リモートテーブルのデータサイズの 2 倍にする必要があります。

    cacheSize

    キャッシュサイズ。

    Long

    いいえ

    10000

    キャッシュポリシーが LRU に設定されている場合にキャッシュサイズを設定できます。

    cacheTTLMs

    キャッシュの有効期限 (ミリ秒単位)。

    Long

    いいえ

    なし

    cacheTTLMs の設定は、[cache] パラメーターに依存します。

    • `cache` が None に設定されている場合、cacheTTLMs を設定する必要はありません。これは、キャッシュがタイムアウトしないことを意味します。

    • `cache` が LRU に設定されている場合、cacheTTLMs はキャッシュのタイムアウト期間です。デフォルトでは、キャッシュは有効期限切れになりません。

    • `cache` が ALL に設定されている場合、cacheTTLMs はキャッシュの再読み込み時間です。デフォルトでは、キャッシュは再読み込みされません。

    cacheEmpty

    空の結果をキャッシュするかどうかを指定します。

    Boolean

    いいえ

    true

    なし。

    cacheReloadTimeBlackList

    キャッシュ更新禁止時間。キャッシュポリシーが ALL に設定されている場合、キャッシュ更新禁止時間を有効にして、この期間 (たとえば、独身の日のショッピングフェスティバル中) のキャッシュ更新を防ぐことができます。

    String

    いいえ

    なし

    フォーマットは 2017-10-24 14:00 -> 2017-10-24 15:00,2017-11-10 23:30 -> 2017-11-11 08:00 です。次のように区切り文字を使用します。

    • 複数のブラックリスト期間を区切るには、カンマ (,) を使用します。

    • ブラックリスト期間の開始時刻と終了時刻を区切るには、矢印 (->) を使用します。

    cacheScanLimit

    HBase の全データを読み取る際に、リモートプロシージャコール (RPC) サーバーが一度にクライアントに返す行数。

    Integer

    いいえ

    100

    このパラメーターは、キャッシュポリシーが ALL に設定されている場合にのみ有効です。

型マッピング

Flink のデータ型は、org.apache.hadoop.hbase.util.Bytes を使用して HBase のバイト配列に変換されます。デコードプロセスは次のように機能します。

  • Flink の非文字列型の場合、HBase の値が空のバイト配列であれば、null としてデコードされます。

  • Flink の文字列型の場合、HBase の値が null-string-literal バイト配列であれば、null としてデコードされます。

Flink SQL 型

ApsaraDB for HBase にバイトを書き込む際の変換関数

ApsaraDB for HBase からバイトを読み取る際の変換関数

CHAR

byte[] toBytes(String s)

String toString(byte[] b)

VARCHAR

STRING

BOOLEAN

byte[] toBytes(boolean b)

boolean toBoolean(byte[] b)

BINARY

byte[]

byte[]

VARBINARY

DECIMAL

byte[] toBytes(BigDecimal v)

BigDecimal toBigDecimal(byte[] b)

TINYINT

new byte[] { val }

bytes[0]

SMALLINT

byte[] toBytes(short val)

short toShort(byte[] bytes)

INT

byte[] toBytes(int val)

int toInt(byte[] bytes)

BIGINT

byte[] toBytes(long val)

long toLong(byte[] bytes)

FLOAT

byte[] toBytes(float val)

float toFloat(byte[] bytes)

DOUBLE

byte[] toBytes(double val)

double toDouble(byte[] bytes)

DATE

日付を 1970-01-01 からの日数 (int で表現) に変換し、byte[] toBytes(int val) を使用してバイト配列に変換します。

HBase バイト配列は int toInt(byte[] bytes) を使用して int に変換されます。これは 1970-01-01 からの日数を表します。

TIME

時刻を 00:00:00 からのミリ秒数 (int で表現) に変換し、byte[] toBytes(int val) を使用してバイト配列に変換します。

HBase バイト配列は int toInt(byte[] bytes) を使用して int に変換されます。これは 00:00:00 からのミリ秒数を表します。

TIMESTAMP

タイムスタンプを 1970-01-01 00:00:00 からのミリ秒数 (long で表現) に変換し、byte[] toBytes(long val) を使用してバイト配列に変換します。

HBase バイト配列は long toLong(byte[] bytes) を使用して変換されます。これは 1970-01-01 00:00:00 からのミリ秒数を表します。

コード例

  • ディメンションテーブルの例

    CREATE TEMPORARY TABLE datagen_source (
      a INT,
      b BIGINT,
      c STRING,
      `proc_time` AS PROCTIME()
    ) WITH (
      'connector'='datagen'
    );
    
    CREATE TEMPORARY TABLE hbase_dim (
      rowkey INT,
      family1 ROW<col1 INT>,
      family2 ROW<col1 STRING, col2 BIGINT>,
      family3 ROW<col1 DOUBLE, col2 BOOLEAN, col3 STRING>
    ) WITH (
      'connector' = 'cloudhbase',
      'table-name' = '<yourTableName>',
      'zookeeper.quorum' = '<yourZookeeperQuorum>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
      a INT,
      f1c1 INT,
      f3c3 STRING
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink
         SELECT a, family1.col1 as f1c1,  family3.col3 as f3c3 FROM datagen_source
    JOIN hbase_dim FOR SYSTEM_TIME AS OF datagen_source.`proc_time` as h ON datagen_source.a = h.rowkey;
  • 結果テーブルの例

    CREATE TEMPORARY TABLE datagen_source (
      rowkey INT,
      f1q1 INT,
      f2q1 STRING,
      f2q2 BIGINT,
      f3q1 DOUBLE,
      f3q2 BOOLEAN,
      f3q3 STRING
    ) WITH (
      'connector'='datagen'
    );
    
    CREATE TEMPORARY TABLE hbase_sink (
      rowkey INT,
      family1 ROW<q1 INT>,
      family2 ROW<q1 STRING, q2 BIGINT>,
      family3 ROW<q1 DOUBLE, q2 BOOLEAN, q3 STRING>,
      PRIMARY KEY (rowkey) NOT ENFORCED
    ) WITH (
      'connector'='cloudhbase',
      'table-name'='<yourTableName>',
      'zookeeper.quorum'='<yourZookeeperQuorum>'
    );
     
    INSERT INTO hbase_sink
    SELECT rowkey, ROW(f1q1), ROW(f2q1, f2q2), ROW(f3q1, f3q2, f3q3) FROM datagen_source;
  • 動的な結果テーブルの例

    CREATE TEMPORARY TABLE datagen_source (
      id INT,
      f1hour STRING,
      f1deal BIGINT,
      f2day STRING,
      f2deal BIGINT
    ) WITH (
      'connector'='datagen'
    );
    
    CREATE TEMPORARY TABLE hbase_sink (
      rowkey INT,
      f1 ROW<`hour` STRING, deal BIGINT>,
      f2 ROW<`day` STRING, deal BIGINT>
    ) WITH (
      'connector'='cloudhbase',
      'table-name'='<yourTableName>',
      'zookeeper.quorum'='<yourZookeeperQuorum>',
      'dynamic.table'='true'
    );
    
    INSERT INTO hbase_sink
    SELECT id, ROW(f1hour, f1deal), ROW(f2day, f2deal) FROM datagen_source;
    • dynamic.table パラメーターが true に設定されている場合、コネクタは動的カラムをサポートする HBase テーブルを使用します。

    • 各カラムファミリーに対応する ROW 型には 2 つのフィールドを宣言する必要があります。最初のフィールドは動的カラム名を表し、2 番目のフィールドはその動的カラムの値を表します。

    • たとえば、datagen_source table に ID が 1 のプロダクトのレコードが含まれているとします。10:00 から 11:00 までの取引額は 100 で、2020 年 7 月 26 日の合計取引額は 10,000 です。その後、行キーが 1 の行が HBase テーブルに挿入され、f1:10 の値は 100、f2:2020-7-26 の値は 10,000 になります。