すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:Tair (Enterprise Edition) コネクタ

最終更新日:Apr 24, 2025

このトピックでは、Tair (Enterprise Edition) コネクタの使用方法について説明します。

背景情報

Tair (Redis OSS 互換) は、オープンソース Redis システムのプロトコルと互換性のあるデータベースサービスです。 Tair (Redis OSS 互換) は、ストレージにメモリとハードディスクのハイブリッドをサポートしています。 Tair (Redis OSS 互換) は、高可用性を確保するためにホットスタンバイアーキテクチャを提供し、スケーラブルなクラスタアーキテクチャを使用して、高スループット、低レイテンシ操作、および柔軟な構成変更に対するビジネス要件を満たします。

次の表に、Tair コネクタでサポートされている機能を示します。

項目

説明

テーブルタイプ

シンクテーブル

実行モード

ストリーミングモード

データ形式

STRING

メトリック

  • numBytesSend

  • numBytesSendPerSecond

  • numRecordsSend

  • numRecordsSendPerSecond

  • numRecordSendErrors

  • currentSendTime

説明

メトリックの詳細については、「メトリック」をご参照ください。

API タイプ

SQL API

シンクテーブルでのデータの更新または削除

サポートされています

前提条件

  • Tair (Enterprise Edition) インスタンスが作成されていること。 詳細については、「手順 1: インスタンスの作成」をご参照ください。

  • Tair (Enterprise Edition) インスタンスに IP アドレスホワイトリストが構成されていること。 詳細については、「手順 2: ホワイトリストの構成」をご参照ください。

制限事項

  • Ververica Runtime (VVR) 6.0.6 以降を使用する Apache Flink 用 Realtime Compute のみ、Tair (Enterprise Edition) コネクタをサポートしています。

  • Tair (Enterprise Edition) コネクタでは、複数のホストを構成することはできません。

構文

Tair (Enterprise Edition) は、次の Redis データ構造との互換性に基づいて、すべての独自開発の Tair データ構造をサポートしています: STRING、LIST、SET、HASHMAP、SORTEDSET。

Tair シンクテーブルを作成するには、次の DDL ステートメントを実行します。

CREATE TABLE tair_table (
  a STRING,
  b STRING,
  PRIMARY KEY (a) NOT ENFORCED -- 必須です。
) WITH (
  'connector'= 'tair',
  'host' = '<yourHost>' // ホストを指定します
);
説明

Tair は Redis データ構造と互換性があります。Redis データ構造の構文例の詳細については、「Tair (Redis OSS 互換) コネクタ」をご参照ください。

WITH 句のパラメータ

パラメータ

説明

データ型

必須

デフォルト値

備考

connector

テーブルのタイプ。

STRING

はい

デフォルト値なし

値を tair に設定します。

host

Tair サーバーのエンドポイント。

STRING

はい

デフォルト値なし

内部エンドポイントを使用することをお勧めします。

説明

インターネット経由で Tair データベースにアクセスする場合、ネットワークレイテンシや帯域幅の制限などの問題により、ネットワークが不安定になる可能性があります。

mode

Tair のデータ構造。

STRING

はい

デフォルト値なし

有効な値:

  • string

  • list

  • set

  • hashmap

  • sortedset

  • tairstring

  • tairhash

  • tairzset

  • tairbloom

  • tairdoc

  • tairsearch

  • tairts

  • taircpc

  • tairroaring

  • tairgis

  • tairvector

Tair は、Redis データ構造と独自開発の Tair データ構造をサポートしています。有効な値の詳細については、「ApsaraDB for Redis シンクテーブルでサポートされているデータ構造」および「Tair データ構造の形式」をご参照ください。

説明
  • VVR 8.0.1 以降を使用する Apache Flink 用 Realtime Compute のみ、TairTs、TairCpc、TairRoaring、TairVector、および TairGis をサポートしています。

  • Tair シンクテーブルは、独自開発の Tair データ構造をサポートしています。 Tair シンクテーブルを作成するために使用される DDL ステートメントは、指定された形式に基づいてテーブルを定義する必要があり、テーブルにプライマリキーを指定する必要があります。

port

Tair サーバーのポート番号。

INT

いいえ

6379

該当なし。

password

Tair データベースへのアクセスに使用するパスワード。

STRING

いいえ

空の文字列

空の文字列は、検証が実行されないことを示します。

dbNum

宛先データベースの ID。

INT

いいえ

0

該当なし。

clusterMode

クラスタアーキテクチャを使用するかどうかを指定します。

BOOLEAN

いいえ

false

有効な値:

  • true: クラスタアーキテクチャを使用します。

  • false: スタンドアロンアーキテクチャを使用します。

ignoreDelete

取り消しメッセージを無視するかどうかを指定します。

BOOLEAN

いいえ

false

有効な値:

  • false: 取り消しメッセージを受信すると、挿入されたデータとそのデータのキーが削除されます。これはデフォルト値です。

  • true: 取り消しメッセージを受信しても、挿入されたデータとそのデータのキーは保持されます。

expiration

挿入されたデータのキーに指定された有効期限 (TTL)。

LONG

いいえ

0

値 0 は、TTL が構成されていないことを示します。このパラメータの値が 0 より大きい場合、挿入されたデータのキーに TTL が構成されます。単位: ミリ秒。

expirationAt

挿入されたデータのキーに指定された絶対有効期限。

LONG

いいえ

0

単位: ミリ秒。デフォルト値: 0。デフォルト値は、有効期限が指定されていないことを示します。

このパラメータの値が 0 より大きく、expiration パラメータが 0 に設定されている場合、挿入されたデータのキーに絶対有効期限が指定されます。

incrMode

Tair データベースのシンクモード。

STRING

いいえ

None

有効な値:

  • None: 挿入操作を示します。これはデフォルト値です。

  • int: INCRBY 操作を示します。 INCR の値は、incrValue パラメータの値に固定されます。

  • float: INCRBYFLOAT 操作を示します。 INCR の値は、incrValue パラメータの値に固定されます。

  • dynamic_int: INCRBY 操作を示します。 INCR の値は、DDL ステートメントで INCR の値が属する列の名前です。

  • dynamic_float: INCRBYFLOAT 操作を示します。 INCR の値は、DDL ステートメントで INCR の値が属する列の名前です。

incrValue

incrMode パラメータの値に基づいて決定される INCR の値。

STRING

いいえ

デフォルト値なし

incrValue パラメータの値は、incrMode パラメータの値によって異なります。

  • incrMode パラメータが None に設定されている場合、incrValue パラメータを構成する必要はありません。

  • incrMode パラメータが int または float に設定されている場合、incrValue パラメータの値は INCR の値です。

  • incrMode パラメータが dynamic_int または dynamic_float に設定されている場合、incrValue パラメータの値は、DDL ステートメントで INCR の値が属する列の名前です。

fieldExpireMode

TairHash のフィールドまたは TairTS の skeys の有効期限モード。

STRING

いいえ

None

有効な値:

  • None: 有効期限は指定されていません。

  • millisecond: 相対有効期限が指定されています。有効期限は、fieldExpireValue パラメータの値に固定されます。

    説明

    TairTS の skeys の有効期限モードは、millisecond である必要があります。

  • unixtime: 絶対有効期限が指定されています。有効期限は、fieldExpireValue パラメータの値に固定されます。

  • dynamic_millisecond: 相対有効期限が指定されています。有効期限は、DDL ステートメントで fieldExpireValue の値が属する列の名前です。

  • dynamic_unixtime: 絶対有効期限が指定されています。有効期限は、DDL ステートメントで fieldExpireValue の値が属する列の名前です。

fieldExpireValue

TairHash のフィールドまたは TairTS の skeys の有効期限。

STRING

いいえ

デフォルト値なし

有効な値:

  • fieldExpireMode パラメータが None に設定されている場合、有効期限は指定されていません。

  • fieldExpireMode パラメータが millisecond または unixtime に設定されている場合、fieldExpireValue パラメータの値は有効期限です。

  • fieldExpireMode パラメータが dynamic_millisecond または dynamic_unixtime に設定されている場合、fieldExpireValue パラメータの値は、DDL ステートメントで fieldExpireValue パラメータの値が属する列の名前です。

データ型マッピング

Apache Flink 用 Realtime Compute のデータ型

Tair のデータ型

VARCHAR

STRING

DOUBLE

DOUBLE

Tair データ構造の形式

データ構造

形式

Tair シンクテーブルにデータを挿入するためのコマンド

TairString

incrMode パラメータが None に設定されている場合、DDL ステートメントには 2 つの列があります。

  • 最初の列には、STRING 型のキーが一覧表示されます。

  • 2 番目の列には、STRING 型の値が一覧表示されます。

exset key value // key と value を設定します

incrMode パラメータが int または float に設定されている場合、DDL ステートメントには 1 つの列のみがあり、その列には STRING 型のキーが一覧表示されます。

exincrby/exincrbyfloat key incrValue // key と incrValue を設定します

incrMode パラメータが dynamic_int または dynamic_float に設定されている場合、DDL ステートメントには 2 つの列があります。

  • 最初の列には、STRING 型のキーが一覧表示されます。

  • 2 番目の列には、STRING 型の incrValue 値が一覧表示されます。

exincrby/exincrbyfloat key incrValue // key と incrValue を設定します

TairHash

incrMode パラメータが None に設定されている場合、DDL ステートメントには 3 つの列があります。

  • 最初の列には、STRING 型のキーが一覧表示されます。

  • 2 番目の列には、STRING 型のフィールドが一覧表示されます。

  • 3 番目の列には、STRING 型のフィールド値が一覧表示されます。

exhset key field value // key、field、value を設定します

incrMode パラメータが int または float に設定されている場合、DDL ステートメントには 2 つの列があります。

  • 最初の列には、STRING 型のキーが一覧表示されます。

  • 2 番目の列には、STRING 型のフィールドが一覧表示されます。

exhincrby/exincrbyfloat key field incrValue // key、field、incrValue を設定します

incrMode パラメータが dynamic_int または dynamic_float に設定されている場合、DDL ステートメントには 3 つの列があります。

  • 最初の列には、STRING 型のキーが一覧表示されます。

  • 2 番目の列には、STRING 型のフィールドが一覧表示されます。

  • 3 番目の列には、フィールドに対応する STRING 型の incrValue 値が一覧表示されます。

exhincrby/exincrbyfloat key field incrValue // key、field、incrValue を設定します

TairZset

incrMode パラメータが None に設定されている場合、TairZset は多次元データソートをサポートします。 TairZset では、最大 256 次元の DOUBLE 型のデータをソートできます。したがって、DDL ステートメントには 3 ~ 258 列があります。

  • 最初の列には、STRING 型のキーが一覧表示されます。

  • 2 番目の列には、STRING 型のメンバーが一覧表示されます。

  • 残りの列には、DOUBLE 型のスコアが一覧表示されます。

exzadd key score member // key、score、member を設定します
説明

複数次元からデータをソートする場合は、すべての次元のスコア形式が同じであることを確認してください。

incrMode パラメータが int または float に設定されている場合、DDL ステートメントには 2 つの列があります。

  • 最初の列には、STRING 型のキーが一覧表示されます。

  • 2 番目の列には、STRING 型のメンバーが一覧表示されます。

exzincyby key member incrValue // key、member、incrValue を設定します

incrMode パラメータが dynamic_int または dynamic_float に設定されている場合、DDL ステートメントには 3 つの列があります。

  • 最初の列には、STRING 型のキーが一覧表示されます。

  • 2 番目の列には、STRING 型のメンバーが一覧表示されます。

  • 3 番目の列には、STRING 型の incrValue 値が一覧表示されます。

exzincyby key member incrValue // key、member、incrValue を設定します

TairBloom

incrMode パラメータは None に設定する必要があります。

Tair シンクテーブルに初めてデータを挿入すると、デフォルトの容量が 100 要素、エラー率が 0.01 の TairBloom キーが作成されます。 DDL ステートメントには 2 つの列があります。

  • 最初の列には、STRING 型のキーが一覧表示されます。

  • 2 番目の列には、STRING 型の項目が一覧表示されます。

BF.ADD key item // key と item を設定します

TairDoc

incrMode パラメータは None に設定する必要があります。 DDL ステートメントには 3 つの列があります。

  • 最初の列には、STRING 型のキーが一覧表示されます。

  • 2 番目の列には、STRING 型のパスが一覧表示されます。

  • 3 番目の列には、STRING 型の JSON 要素が一覧表示されます。

JSON.SET key path json // key、path、json を設定します

TairSearch

incrMode パラメータが None に設定されている場合、DDL ステートメントには 4 つの列があります。

  • 最初の列には、STRING 型のインデックスが一覧表示されます。

  • 2 番目の列には、STRING 型のドキュメント ID が一覧表示されます。

  • 3 番目の列には、STRING 型のドキュメントが一覧表示されます。ドキュメントは JSON 形式である必要があります。

  • 4 番目の列には、STRING 型のマッピングが一覧表示されます。

TFT.ADDDOC index document docid // index、document、docid を設定します
説明

Tair シンクテーブルにデータを挿入する前に、インデックスを作成し、マッピングを追加する必要があります。コマンド例:

TFT.CREATEINDEX index mappings // index と mappings を設定します

incrMode パラメータが int または float に設定されている場合、DDL ステートメントには 4 つの列があります。

  • 最初の列には、STRING 型のインデックスが一覧表示されます。

  • 2 番目の列には、STRING 型のドキュメント ID が一覧表示されます。

  • 3 番目の列には、STRING 型のフィールドが一覧表示されます。

  • 4 番目の列には、STRING 型のマッピングが一覧表示されます。

ドキュメント操作のコマンド例:

TFT.INCRLONGDOCFIELD/TFT.INCRFLOATDOCFIELD index doc_id field increment // index、doc_id、field、increment を設定します
説明

Tair シンクテーブルにデータを挿入する前に、インデックスを作成し、マッピングを追加する必要があります。コマンド例:

TFT.CREATEINDEX index mappings // index と mappings を設定します

incrMode パラメータが dynamic_int または dynamic_float に設定されている場合、DDL ステートメントには 5 つの列があります。

  • 最初の列には、STRING 型のインデックスが一覧表示されます。

  • 2 番目の列には、STRING 型のドキュメント ID が一覧表示されます。

  • 3 番目の列には、STRING 型のフィールドが一覧表示されます。

  • 4 番目の列には、STRING 型のマッピングが一覧表示されます。

  • 5 番目の列には、STRING 型の incrValue 値が一覧表示されます。

ドキュメント操作のコマンド例:

TFT.INCRLONGDOCFIELD/TFT.INCRFLOATDOCFIELD index doc_id field increment // index、doc_id、field、increment を設定します
説明

Tair シンクテーブルにデータを挿入する前に、インデックスを作成し、マッピングを追加する必要があります。コマンド例:

TFT.CREATEINDEX index mappings // index と mappings を設定します

TairCpc

incrMode パラメータは None に設定する必要があります。 DDL ステートメントには 2 つの列があります。

  • 最初の列には、STRING 型のキーが一覧表示されます。

  • 2 番目の列には、STRING 型の項目が一覧表示されます。

CPC.UPDATE key item // key と item を設定します

TairGis

incrMode パラメータは None に設定する必要があります。 DDL ステートメントには 3 つの列があります。

  • 最初の列には、STRING 型のキーが一覧表示されます。

  • 2 番目の列には、STRING 型のポリゴン名が一覧表示されます。

  • 3 番目の列には、ポリゴンの Well-known Text (WKT) 値が一覧表示されます。値は STRING 型です。

GIS.ADD area polygonName polygonWkt // area、polygonName、polygonWkt を設定します

TairRoaring

incrMode パラメータは None に設定する必要があります。 DDL ステートメントには 3 つの列があります。

  • 最初の列には、STRING 型のキーが一覧表示されます。

  • 2 番目の列には、BIGINT 型の指定されたオフセットが一覧表示されます。

  • 3 番目の列には、BIGINT 型の値が一覧表示されます。有効な値: 0 と 1。

TR.SETBIT key offset value // key offset value // key、offset、value を設定します

TairVector

incrMode パラメータは None に設定する必要があります。 DDL ステートメントには 6 つの列があります。

  • 最初の列には、STRING 型のインデックス名が一覧表示されます。

  • 2 番目の列には、レコードのプライマリキーが一覧表示されます。値は STRING 型です。

  • 3 番目の列には、STRING 型のベクトルデータが一覧表示されます。

  • 4 番目の列には、INT 型のベクトル次元が一覧表示されます。

  • 5 番目の列には、インデックスの構築とクエリに使用されるアルゴリズムが一覧表示されます。値は STRING 型です。

  • 6 番目の列には、ベクトル距離の計算に使用される距離メソッドが一覧表示されます。値は STRING 型です。

TVS.HSET index_name key VECTOR vector_data // index_name、key、vector_data を設定します
説明

Tair シンクテーブルにデータを挿入する前に、インデックスを作成し、マッピングを追加する必要があります。コマンド例:

TVS.CREATEINDEX index_name dims algorithm distance_method // index_name、dims、algorithm、distance_method を設定します

TairTs

incrMode パラメータが None に設定されている場合、DDL ステートメントには 4 つの列があります。

  • 最初の列には、STRING 型の pkey が一覧表示されます。 pkey はタイムラインのグループを示します。

  • 2 番目の列には、STRING 型の skey が一覧表示されます。 skey はタイムラインを示します。

  • 3 番目の列には、STRING 型のタイムスタンプが一覧表示されます。

  • 4 番目の列には、STRING 型の値が一覧表示されます。

EXTS.S.RAW_MODIFY Pkey Skey timestamp value // Pkey、Skey、timestamp、value を設定します

incrMode パラメータが float に設定されている場合、DDL ステートメントには 3 つの列があります。

  • 最初の列には、STRING 型の pkey が一覧表示されます。

  • 2 番目の列には、STRING 型の skey が一覧表示されます。

  • 3 番目の列には、STRING 型のタイムスタンプが一覧表示されます。

EXTS.S.RAW_INCRBY Pkey Skey timestamp incrValue // Pkey、Skey、timestamp、incrValue を設定します

incrMode パラメータが dynamic_float に設定されている場合、DDL ステートメントには 4 つの列があります。

  • 最初の列には、STRING 型の pkey が一覧表示されます。

  • 2 番目の列には、STRING 型の skey が一覧表示されます。

  • 3 番目の列には、STRING 型のタイムスタンプが一覧表示されます。

  • 4 番目の列には、STRING 型の incrValue 値が一覧表示されます。 incrValue 値は、3 番目の列に一覧表示されているタイムスタンプに対応します。

EXTS.S.RAW_INCRBY Pkey Skey timestamp incrValue // Pkey、Skey、timestamp、incrValue を設定します

サンプルコード

  • 共通モードで Tair シンクテーブルにデータを挿入するサンプルコード

    CREATE TEMPORARY TABLE datagen_stream ( // データ生成ストリームを作成します
      v STRING, // v を STRING 型として定義します
      p STRING // p を STRING 型として定義します
    ) WITH (
      'connector' = 'datagen' // コネクタを datagen に設定します
    );
    
    CREATE TEMPORARY TABLE tair_output ( // 一時的な Tair 出力テーブルを作成します
      index_name STRING, // index_name を STRING 型として定義します
      doc_id STRING, // doc_id を STRING 型として定義します
      doc STRING, // doc を STRING 型として定義します
      mapping STRING, // mapping を STRING 型として定義します
      PRIMARY KEY(index_name) NOT ENFORCED // index_name をプライマリキーとして設定しますが、強制しません
    ) WITH (
      'connector' = 'tair', // コネクタを tair に設定します
      'mode' = 'tairsearch', // モードを tairsearch に設定します
      'host' = '${tairHost}', // ホストを ${tairHost} に設定します
      'port' = '${tairPort}', // ポートを ${tairPort} に設定します
      'password' = '${password}' // パスワードを ${password} に設定します
    );
    
    INSERT INTO tair_output // tair_output テーブルにデータを挿入します
    SELECT 'index' as index,v,p,'{"mappings":{"_source":{"enabled":true},"properties":{"product_id":{"type":"keyword","ignore_above":128},"product_name":{"type":"text"}}}}' as mapping // index、v、p、およびマッピングを選択します
    FROM datagen_stream; // datagen_stream からデータを選択します
    
  • incrMode パラメータが構成されている場合に Tair シンクテーブルにデータを挿入するサンプルコード

    CREATE TEMPORARY TABLE datagen_stream ( // データ生成ストリームを作成します
      v STRING, // v を STRING 型として定義します
      p STRING // p を STRING 型として定義します
    ) WITH (
      'connector' = 'datagen' // コネクタを datagen に設定します
    );
    
    CREATE TEMPORARY TABLE tair_output ( // 一時的な Tair 出力テーブルを作成します
      key STRING, // key を STRING 型として定義します
      step STRING, // step を STRING 型として定義します
      PRIMARY KEY (key) NOT ENFORCED // key をプライマリキーとして設定しますが、強制しません
    ) WITH (
      'connector' = 'tair', // コネクタを tair に設定します
      'mode' = 'tairstring', // モードを tairstring に設定します
      'host' = '${tairHost}', // ホストを ${tairHost} に設定します
      'port' = '${tairPort}', // ポートを ${tairPort} に設定します
      'password' = '${password}', // パスワードを ${password} に設定します
      'incrMode' = 'dynamic_float', // incrMode を dynamic_float に設定します
      'incrValue' = 'step' // incrValue を step に設定します
    );
    
    INSERT INTO tair_output // tair_output テーブルにデータを挿入します
    SELECT * // すべての列を選択します
    FROM datagen_stream; // datagen_stream からデータを選択します           
    CREATE TEMPORARY TABLE datagen_stream ( // データ生成ストリームを作成します
      v STRING, // v を STRING 型として定義します
      p STRING // p を STRING 型として定義します
    ) WITH (
      'connector' = 'datagen' // コネクタを datagen に設定します
    );
    
    CREATE TEMPORARY TABLE tair_output ( // 一時的な Tair 出力テーブルを作成します
      key STRING, // key を STRING 型として定義します
      PRIMARY KEY (key) NOT ENFORCED // key をプライマリキーとして設定しますが、強制しません
    ) WITH (
      'connector' = 'tair', // コネクタを tair に設定します
      'mode' = 'tairstring', // モードを tairstring に設定します
      'host' = '${tairHost}', // ホストを ${tairHost} に設定します
      'port' = '${tairPort}', // ポートを ${tairPort} に設定します
      'password' = '${password}', // パスワードを ${password} に設定します
      'incrMode' = 'float', // incrMode を float に設定します
      'incrValue' = '11.11' // incrValue を 11.11 に設定します
    );
    
    INSERT INTO tair_output // tair_output テーブルにデータを挿入します
    SELECT v // v を選択します
    FROM datagen_stream; // datagen_stream からデータを選択します
    
  • fieldExpireMode パラメータが構成されている場合に Tair シンクテーブルにデータを挿入するサンプルコード

    CREATE TEMPORARY TABLE datagen_stream ( // データ生成ストリームを作成します
      v STRING, // v を STRING 型として定義します
      p STRING, // p を STRING 型として定義します
      s STRING // s を STRING 型として定義します
    ) WITH (
      'connector' = 'datagen' // コネクタを datagen に設定します
    );
    
    CREATE TEMPORARY TABLE tair_ouput ( // 一時的な Tair 出力テーブルを作成します
    	key STRING, // key を STRING 型として定義します
    	field STRING, // field を STRING 型として定義します
    	value STRING, // value を STRING 型として定義します
    	PRIMARY KEY (key) NOT ENFORCED // key をプライマリキーとして設定しますが、強制しません
    ) WITH (
      'connector' = 'tair', // コネクタを tair に設定します
      'mode' = 'tairhash', // モードを tairhash に設定します
      'host' = '${tairHost}', // ホストを ${tairHost} に設定します
      'port' = '${tairPort}', // ポートを ${tairPort} に設定します
      'password' = '${password}', // パスワードを ${password} に設定します
      'fieldExpireMode' = 'millisecond', // fieldExpireMode を millisecond に設定します
      'fieldExpireValue' = '1000' // fieldExpireValue を 1000 に設定します
    );
    
    INSERT INTO tair_output // tair_output テーブルにデータを挿入します
    SELECT v, p, s // v、p、s を選択します
    FROM datagen_stream; // datagen_stream からデータを選択します