すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:Tair (Enterprise)

最終更新日:Mar 10, 2026

このトピックでは、Tair (Enterprise Edition) コネクタの使用方法について説明します。

背景情報

Tair (Redis OSS-compatible) は、オープンソース Redis システムのプロトコルと互換性のあるデータベースサービスです。Tair (Redis OSS-compatible) は、メモリとハードディスクを組み合わせたハイブリッドストレージをサポートしています。また、高可用性を確保するためのホットスタンバイアーキテクチャを提供し、スケーラブルなクラスタアーキテクチャにより、高スループット、低遅延の操作、柔軟な構成変更といったビジネス要件に対応します。

Tair コネクタは以下の機能をサポートしています。

カテゴリ

説明

サポートされるタイプ

結果テーブル

実行モード

ストリームモードのみサポートされています。

データフォーマット

STRING

固有メトリック

  • numBytesSend

  • numBytesSendPerSecond

  • numRecordsSend

  • numRecordsSendPerSecond

  • numRecordSendErrors

  • currentSendTime

説明

メトリックの詳細については、「Metrics」をご参照ください。

API タイプ

SQL

結果テーブルでのデータ更新または削除

はい。

前提条件

  • Tair (Enterprise Edition) インスタンスが作成されました。詳細については、「手順 1: インスタンスの作成」をご参照ください。

  • Tair (Enterprise Edition) インスタンスに対して IP アドレスホワイトリストが設定済みである必要があります。詳細については、「Step 2: Configure whitelists」をご参照ください。

制限事項

  • Ververica Runtime (VVR) 6.0.6 以降を使用する Realtime Compute for Apache Flink のみが、Tair (Enterprise Edition) コネクタをサポートしています。

  • Tair (Enterprise Edition) コネクタでは、複数のホストを設定することはできません。

構文

Tair (Enterprise Edition) は、STRING、LIST、SET、HASHMAP、SORTEDSET といった Redis データ構造との互換性に基づき、独自開発の Tair データ構造すべてをサポートしています。

以下は DDL 文の例です。

CREATE TABLE tair_table (
  a STRING,
  b STRING,
  PRIMARY KEY (a) NOT ENFORCED -- 必須。
) WITH (
  'connector'= 'tair',
  'host' = '<yourHost>'
);
説明

Tair は Redis データ構造と互換性があります。Redis データ構造の構文例の詳細については、「Tair (Redis OSS-compatible) connector」をご参照ください。

WITH 句のパラメーター

パラメーター

説明

データの型

必須

デフォルト値

備考

connector

テーブルのタイプ。

String

はい

なし

静的フィールドの値は tair です。

host

Tair サーバーのエンドポイント。

String

はい

なし

内部エンドポイントの使用を推奨します。

説明

ネットワーク遅延や帯域幅制限などの要因により、パブリックネットワーク接続は不安定になる可能性があります。

mode

Tair のデータ構造。

STRING

はい

なし

有効な値:

  • string

  • list

  • set

  • hashmap

  • sortedset

  • tairstring

  • tairhash

  • tairzset

  • tairbloom

  • tairdoc

  • tairsearch

  • tairts

  • taircpc

  • tairroaring

  • tairgis

  • tairvector

Tair は Redis データ構造および独自開発の Tair データ構造をサポートしています。有効な値の詳細については、「ApsaraDB for Redis 結果テーブルでサポートされるデータ構造」および「Tair データ構造のフォーマット」をご参照ください。

説明
  • TairTs、TairCpc、TairRoaring、TairVector、TairGis は、VVR 8.0.1 以降を使用する Realtime Compute for Apache Flink のみがサポートしています。

  • Tair 結果テーブルは、独自開発の Tair データ構造をサポートしています。Tair 結果テーブルを作成する DDL 文では、指定されたフォーマットに基づいてテーブルを定義し、プライマリキーを指定する必要があります。

port

Tair サーバーのポート番号。

INT

いいえ

6379

該当なし。

password

Tair データベースへのアクセスに使用するパスワード。

String

いいえ

空文字列

空文字列は認証を行わないことを示します。

dbNum

ターゲットデータベースの ID。

INT

いいえ

0

該当なし。

clusterMode

クラスタアーキテクチャを使用するかどうかを指定します。

BOOLEAN

いいえ

false

有効な値:

  • true:クラスタアーキテクチャを使用します。

  • false:スタンドアロンモードで動作します。

ignoreDelete

取り消しメッセージを無視するかどうかを指定します。

BOOLEAN

いいえ

false

有効な値:

  • false:取り消しメッセージを受信した場合、挿入されたデータおよびそのキーを削除します(デフォルト値)。

  • true:取り消しメッセージを受信した場合でも、挿入されたデータおよびそのキーを保持します。

expiration

挿入されたデータのキーに設定する生存時間 (TTL)。

LONG

いいえ

0

値 0 は TTL が設定されていないことを示します。このパラメーターの値が 0 より大きい場合、挿入されたデータのキーに対して TTL が設定されます。単位:ミリ秒。

expirationAt

挿入されたデータのキーに設定する絶対有効期限。

LONG

いいえ

0

単位:ミリ秒。デフォルト値:0。デフォルト値は有効期限が設定されていないことを示します。

このパラメーターの値が 0 より大きく、かつ expiration パラメーターが 0 に設定されている場合、挿入されたデータのキーに対して絶対有効期限が設定されます。

incrMode

Tair データベースの sink モード。

STRING

いいえ

None

有効な値:

  • None:挿入操作を示します(デフォルト値)。

  • int:INCRBY 操作を示します。INCR の値は incrValue パラメーターの値に固定されます。

  • float:INCRBYFLOAT 操作を示します。INCR の値は incrValue パラメーターの値に固定されます。

  • dynamic_int:incrby 操作を表します。増分値は incrValue で指定されたカラムから取得されます。

  • dynamic_float:incrbyfloat 操作を表します。増分値は incrValue カラムで指定されます。

incrValue

incrMode パラメーターの値に基づいて決定される INCR の値。

STRING

いいえ

None

このパラメーターは以下の値を受け入れます:

  • incrMode が None の場合、このパラメーターは指定されません。

  • incrMode パラメーターが int または float に設定されている場合、incrValue パラメーターの値が INCR の値になります。

  • incrMode パラメーターが dynamic_int または dynamic_float に設定されている場合、incrValue パラメーターの値は DDL 文における INCR 値が属するカラム名になります。

fieldExpireMode

TairHash のフィールドまたは TairTS の skey の有効期限モード。

String

いいえ

None

有効な値:

  • None:有効期限を指定しません。

  • millisecond:相対的な有効期限を指定します。有効期限は fieldExpireValue パラメーターの値に固定されます。

    説明

    TairTS の skey の有効期限モードは millisecond である必要があります。

  • unixtime:絶対的な有効期限を指定します。有効期限は fieldExpireValue パラメーターの値に固定されます。

  • dynamic_millisecond:fieldExpireValue に対応するカラムで指定される相対的な有効期限。

  • dynamic_unixtime:fieldExpireValue で指定されたカラムによって提供される絶対的な有効期限。

fieldExpireValue

TairHash のフィールドまたは TairTS の skey の有効期限。

String

いいえ

None

有効な値:

  • fieldExpireMode パラメーターが None に設定されている場合、有効期限は指定されません。

  • fieldExpireMode パラメーターが millisecond または unixtime に設定されている場合、fieldExpireValue パラメーターの値が有効期限になります。

  • fieldExpireMode が dynamic_millisecond または dynamic_unixtime の場合、fieldExpireValue は生存時間 (TTL) を持つ DDL カラム名になります。

データ型のマッピング

Flink フィールドの型

Tair のデータ型

VARCHAR

STRING

DOUBLE

DOUBLE

Tair データ構造のフォーマット

タイプ

フォーマット

コマンド

TairString

incrMode パラメーターが None に設定されている場合、DDL 文には 2 つのカラムがあります。

  • 1 番目のカラムには STRING 型のキーが含まれます。

  • 2 番目のカラムには STRING 型の値が含まれます。

exset key value

incrMode パラメーターが int または float に設定されている場合、DDL 文には 1 つのカラムのみがあり、そのカラムには STRING 型のキーが含まれます。

exincrby/exincrbyfloat key incrValue

incrMode パラメーターが dynamic_int または dynamic_float に設定されている場合、DDL 文には 2 つのカラムがあります。

  • 1 番目のカラムには STRING 型のキーが含まれます。

  • 2 番目のカラムには STRING 型の incrValue 値が含まれます。

exincrby/exincrbyfloat key incrValue

TairHash

incrMode パラメーターが None に設定されている場合、DDL 文には 3 つのカラムがあります。

  • 1 番目のカラムには STRING 型のキーが含まれます。

  • 2 番目のカラムには STRING 型のフィールドが含まれます。

  • 3 番目のカラムには STRING 型のフィールド値が含まれます。

exhset key field value

incrMode パラメーターが int または float に設定されている場合、DDL 文には 2 つのカラムがあります。

  • 1 番目のカラムには STRING 型のキーが含まれます。

  • 2 番目のカラムには STRING 型のフィールドが含まれます。

 exhincrby/exincrbyfloat key field incrValue

incrMode パラメーターが dynamic_int または dynamic_float に設定されている場合、DDL 文には 3 つのカラムがあります。

  • 1 番目のカラムには STRING 型のキーが含まれます。

  • 2 番目のカラムには STRING 型のフィールドが含まれます。

  • 3 番目のカラムにはフィールドに対応する STRING 型の incrValue 値が含まれます。

exhincrby/exincrbyfloat key field incrValue

TairZset

incrMode パラメーターが None に設定されている場合、TairZset は多次元データのソートをサポートします。TairZset では、最大 256 次元の DOUBLE 型データをソートできます。そのため、DDL 文には 3 ~ 258 のカラムがあります。

  • 1 番目のカラムには STRING 型のキーが含まれます。

  • 2 番目のカラムは member であり、データ型は STRING です。

  • 残りのカラムには DOUBLE 型のスコアが含まれます。

exzadd key score member
説明

多次元でデータをソートする場合は、すべての次元のスコアフォーマットが同一であることを確認してください。

incrMode パラメーターが int または float に設定されている場合、DDL 文には 2 つのカラムがあります。

  • 1 番目のカラムには STRING 型のキーが含まれます。

  • 2 番目のカラムには STRING 型のメンバーが含まれます。

exzincyby key member incrValue

incrMode パラメーターが dynamic_int または dynamic_float に設定されている場合、DDL 文には 3 つのカラムがあります。

  • 1 番目のカラムには STRING 型のキーが含まれます。

  • 2 番目のカラムには STRING 型のメンバーが含まれます。

  • 3 番目のカラムには STRING 型の incrValue 値が含まれます。

exzincyby key member incrValue

TairBloom

incrMode パラメーターは None に設定する必要があります。

データが Tair 結果テーブルに初めて挿入される際、デフォルト容量 100 要素、エラー率 0.01 の TairBloom キーが作成されます。DDL 文には 2 つのカラムがあります。

  • 1 番目のカラムには STRING 型のキーが含まれます。

  • 2 番目のカラムには STRING 型のアイテムが含まれます。

BF.ADD key item

TairDoc

incrMode パラメーターは None に設定する必要があります。DDL 文には 3 つのカラムがあります。

  • 1 番目のカラムには STRING 型のキーが含まれます。

  • 2 番目のカラムには STRING 型のパスが含まれます。

  • 3 番目のカラムは STRING 型で、JSON データを格納します。

JSON.SET key path json

TairSearch

incrMode パラメーターが None に設定されている場合、DDL 文には 4 つのカラムがあります。

  • 1 番目のカラムはインデックスであり、データ型は STRING です。

  • 2 番目のカラムには STRING 型のドキュメント ID が含まれます。

  • 3 番目のカラムには STRING 型のドキュメントが含まれます。ドキュメントは JSON 形式である必要があります。

  • 4 番目のカラムには STRING 型のマッピングが含まれます。

TFT.ADDDOC index document docid
説明

Tair 結果テーブルにデータを挿入する前に、インデックスを作成し、マッピングを追加する必要があります。サンプルコマンド:

TFT.CREATEINDEX index mappings

incrMode パラメーターが int または float に設定されている場合、DDL 文には 4 つのカラムがあります。

  • 1 番目のカラムには STRING 型のインデックスが含まれます。

  • 2 番目のカラムには STRING 型のドキュメント ID が含まれます。

  • 3 番目のカラムには STRING 型のフィールドが含まれます。

  • 4 番目のカラムには STRING 型のマッピングが含まれます。

ドキュメント操作のサンプルコマンド:

TFT.INCRLONGDOCFIELD/TFT.INCRFLOATDOCFIELD index doc_id field increment
説明

Tair 結果テーブルにデータを挿入する前に、インデックスを作成し、マッピングを追加する必要があります。サンプルコマンド:

TFT.CREATEINDEX index mappings

incrMode パラメーターが dynamic_int または dynamic_float に設定されている場合、DDL 文には 5 つのカラムがあります。

  • 1 番目のカラムには STRING 型のインデックスが含まれます。

  • 2 番目のカラムには STRING 型のドキュメント ID が含まれます。

  • 3 番目のカラムには STRING 型のフィールドが含まれます。

  • 4 番目のカラムには STRING 型のマッピングが含まれます。

  • 5 番目のカラムには STRING 型の incrValue 値が含まれます。

ドキュメント操作に使用されるコマンドは以下のとおりです。

TFT.INCRLONGDOCFIELD/TFT.INCRFLOATDOCFIELD index doc_id field increment
説明

Tair 結果テーブルにデータを挿入する前に、インデックスを作成し、マッピングを追加する必要があります。サンプルコマンド:

TFT.CREATEINDEX index mappings

TairCpc

incrMode パラメーターは None に設定する必要があります。DDL 文には 2 つのカラムがあります。

  • 1 番目のカラムには STRING 型のキーが含まれます。

  • 2 番目のカラムには STRING 型のアイテムが含まれます。

CPC.UPDATE key item

TairGis

incrMode パラメーターは None に設定する必要があります。DDL 文には 3 つのカラムがあります。

  • 1 番目のカラムには STRING 型のキーが含まれます。

  • 2 番目のカラムには STRING 型のポリゴン名が含まれます。

  • 3 番目のカラムにはポリゴンの Well-known Text (WKT) 値が含まれます。値のデータ型は STRING です。

GIS.ADD area polygonName polygonWkt

TairRoaring

incrMode パラメーターは None に設定する必要があります。DDL 文には 3 つのカラムがあります。

  • 1 番目のカラムには STRING 型のキーが含まれます。

  • 2 番目のカラムには BIGINT 型の指定オフセットが含まれます。

  • 3 番目のカラムには BIGINT 型の値が含まれます。有効な値:0 および 1。

TR.SETBIT key offset value

Vector

incrMode パラメーターは None に設定する必要があります。DDL 文には 6 つのカラムがあります。

  • 1 番目のカラムには STRING 型のインデックス名が含まれます。

  • 2 番目のカラムにはレコードのプライマリキーが含まれます。値のデータ型は STRING です。

  • 3 番目のカラムには STRING 型のベクトルデータが含まれます。

  • 4 番目のカラムには INT 型のベクトル次元が含まれます。

  • 5 番目のカラムにはインデックスの構築およびクエリに使用されるアルゴリズムが含まれます。値のデータ型は STRING です。

  • 6 番目のカラムにはベクトル距離の計算に使用される距離メソッドが含まれます。値のデータ型は STRING です。

TVS.HSET index_name key VECTOR vector_data
説明

Tair 結果テーブルにデータを挿入する前に、インデックスを作成し、マッピングを追加する必要があります。サンプルコマンド:

TVS.CREATEINDEX index_name dims algorithm distance_method

TairTs

incrMode パラメーターが None に設定されている場合、DDL 文には 4 つのカラムがあります。

  • 1 番目のカラムには STRING 型の pkey が含まれます。pkey はタイムラインのグループを示します。

  • 2 番目のカラムには STRING 型の skey が含まれます。skey はタイムラインを示します。

  • 3 番目のカラムには STRING 型のタイムスタンプが含まれます。

  • 4 番目のカラムには STRING 型の値が含まれます。

EXTS.S.RAW_MODIFY Pkey Skey timestamp value

incrMode パラメーターが float に設定されている場合、DDL 文には 3 つのカラムがあります。

  • 1 番目のカラムには STRING 型の pkey が含まれます。

  • 2 番目のカラムには STRING 型の skey が含まれます。

  • 3 番目のカラムには STRING 型のタイムスタンプが含まれます。

EXTS.S.RAW_INCRBY Pkey Skey timestamp incrValue

incrMode パラメーターが dynamic_float に設定されている場合、DDL 文には 4 つのカラムがあります。

  • 1 番目のカラムには STRING 型の pkey が含まれます。

  • 2 番目のカラムには STRING 型の skey が含まれます。

  • 3 番目のカラムには STRING 型のタイムスタンプが含まれます。

  • 4 番目のカラムには STRING 型の incrValue 値が含まれます。incrValue 値は、3 番目のカラムに記載されているタイムスタンプに対応します。

EXTS.S.RAW_INCRBY Pkey Skey timestamp incrValue

サンプルコード

  • 通常モードで Tair 結果テーブルにデータを挿入するサンプルコード

    CREATE TEMPORARY TABLE datagen_stream (
      v STRING,
      p STRING
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE tair_output (
      index_name STRING,
      doc_id STRING,
      doc STRING,
      mapping STRING,
      PRIMARY KEY(index_name) NOT ENFORCED
    ) WITH (
      'connector' = 'tair',
      'mode' = 'tairsearch',
      'host' = '${tairHost}',
      'port' = '${tairPort}',
      'password' = '${password}'
    );
    
    INSERT INTO tair_output
    SELECT 'index' as index,v,p,'{"mappings":{"_source":{"enabled":true},"properties":{"product_id":{"type":"keyword","ignore_above":128},"product_name":{"type":"text"}}}}' as mapping
    FROM datagen_stream;
  • incr パターンでの挿入データの例

    CREATE TEMPORARY TABLE datagen_stream (
      v STRING,
      p STRING
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE tair_output (
      key STRING,
      step STRING,
      PRIMARY KEY (key) NOT ENFORCED
    ) WITH (
      'connector' = 'tair',
      'mode' = 'tairstring',
      'host' = '${tairHost}',
      'port' = '${tairPort}',
      'password' = '${password}',
      'incrMode' = 'dynamic_float',
      'incrValue' = 'step'
    );
    
    INSERT INTO tair_output
    SELECT *
    FROM datagen_stream;           
    CREATE TEMPORARY TABLE datagen_stream (
      v STRING,
      p STRING
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE tair_output (
      key STRING,
      PRIMARY KEY (key) NOT ENFORCED
    ) WITH (
      'connector' = 'tair',
      'mode' = 'tairstring',
      'host' = '${tairHost}',
      'port' = '${tairPort}',
      'password' = '${password}',
      'incrMode' = 'float',
      'incrValue' = '11.11'
    );
    
    INSERT INTO tair_output
    SELECT v
    FROM datagen_stream;
  • fieldExpire パターンを使用して結果テーブルに書き込まれるデータの例

    CREATE TEMPORARY TABLE datagen_stream (
      v STRING,
      p STRING,
      s STRING
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE tair_ouput (
    	key STRING,
    	field STRING,
    	value STRING,
    	PRIMARY KEY (key) NOT ENFORCED
    ) WITH (
      'connector' = 'tair',
      'mode' = 'tairhash',
      'host' = '${tairHost}',
      'port' = '${tairPort}',
      'password' = '${password}',
      'fieldExpireMode' = 'millisecond',
      'fieldExpireValue' = '1000'
    );
    
    INSERT INTO tair_output
    SELECT v, p, s
    FROM datagen_stream;