このトピックでは、Tair (Enterprise Edition) コネクタの使用方法について説明します。
背景情報
Tair (Redis OSS 互換) は、オープンソース Redis システムのプロトコルと互換性のあるデータベースサービスです。 Tair (Redis OSS 互換) は、ストレージにメモリとハードディスクのハイブリッドをサポートしています。 Tair (Redis OSS 互換) は、高可用性を確保するためにホットスタンバイアーキテクチャを提供し、スケーラブルなクラスタアーキテクチャを使用して、高スループット、低レイテンシ操作、および柔軟な構成変更に対するビジネス要件を満たします。
次の表に、Tair コネクタでサポートされている機能を示します。
項目 | 説明 |
テーブルタイプ | シンクテーブル |
実行モード | ストリーミングモード |
データ形式 | STRING |
メトリック |
説明 メトリックの詳細については、「メトリック」をご参照ください。 |
API タイプ | SQL API |
シンクテーブルでのデータの更新または削除 | サポートされています |
前提条件
Tair (Enterprise Edition) インスタンスが作成されていること。 詳細については、「手順 1: インスタンスの作成」をご参照ください。
Tair (Enterprise Edition) インスタンスに IP アドレスホワイトリストが構成されていること。 詳細については、「手順 2: ホワイトリストの構成」をご参照ください。
制限事項
Ververica Runtime (VVR) 6.0.6 以降を使用する Apache Flink 用 Realtime Compute のみ、Tair (Enterprise Edition) コネクタをサポートしています。
Tair (Enterprise Edition) コネクタでは、複数のホストを構成することはできません。
構文
Tair (Enterprise Edition) は、次の Redis データ構造との互換性に基づいて、すべての独自開発の Tair データ構造をサポートしています: STRING、LIST、SET、HASHMAP、SORTEDSET。
Tair シンクテーブルを作成するには、次の DDL ステートメントを実行します。
CREATE TABLE tair_table (
a STRING,
b STRING,
PRIMARY KEY (a) NOT ENFORCED -- 必須です。
) WITH (
'connector'= 'tair',
'host' = '<yourHost>' // ホストを指定します
);Tair は Redis データ構造と互換性があります。Redis データ構造の構文例の詳細については、「Tair (Redis OSS 互換) コネクタ」をご参照ください。
WITH 句のパラメータ
パラメータ | 説明 | データ型 | 必須 | デフォルト値 | 備考 |
connector | テーブルのタイプ。 | STRING | はい | デフォルト値なし | 値を tair に設定します。 |
host | Tair サーバーのエンドポイント。 | STRING | はい | デフォルト値なし | 内部エンドポイントを使用することをお勧めします。 説明 インターネット経由で Tair データベースにアクセスする場合、ネットワークレイテンシや帯域幅の制限などの問題により、ネットワークが不安定になる可能性があります。 |
mode | Tair のデータ構造。 | STRING | はい | デフォルト値なし | 有効な値:
Tair は、Redis データ構造と独自開発の Tair データ構造をサポートしています。有効な値の詳細については、「ApsaraDB for Redis シンクテーブルでサポートされているデータ構造」および「Tair データ構造の形式」をご参照ください。 説明
|
port | Tair サーバーのポート番号。 | INT | いいえ | 6379 | 該当なし。 |
password | Tair データベースへのアクセスに使用するパスワード。 | STRING | いいえ | 空の文字列 | 空の文字列は、検証が実行されないことを示します。 |
dbNum | 宛先データベースの ID。 | INT | いいえ | 0 | 該当なし。 |
clusterMode | クラスタアーキテクチャを使用するかどうかを指定します。 | BOOLEAN | いいえ | false | 有効な値:
|
ignoreDelete | 取り消しメッセージを無視するかどうかを指定します。 | BOOLEAN | いいえ | false | 有効な値:
|
expiration | 挿入されたデータのキーに指定された有効期限 (TTL)。 | LONG | いいえ | 0 | 値 0 は、TTL が構成されていないことを示します。このパラメータの値が 0 より大きい場合、挿入されたデータのキーに TTL が構成されます。単位: ミリ秒。 |
expirationAt | 挿入されたデータのキーに指定された絶対有効期限。 | LONG | いいえ | 0 | 単位: ミリ秒。デフォルト値: 0。デフォルト値は、有効期限が指定されていないことを示します。 このパラメータの値が 0 より大きく、expiration パラメータが 0 に設定されている場合、挿入されたデータのキーに絶対有効期限が指定されます。 |
incrMode | Tair データベースのシンクモード。 | STRING | いいえ | None | 有効な値:
|
incrValue | incrMode パラメータの値に基づいて決定される INCR の値。 | STRING | いいえ | デフォルト値なし | incrValue パラメータの値は、incrMode パラメータの値によって異なります。
|
fieldExpireMode | TairHash のフィールドまたは TairTS の skeys の有効期限モード。 | STRING | いいえ | None | 有効な値:
|
fieldExpireValue | TairHash のフィールドまたは TairTS の skeys の有効期限。 | STRING | いいえ | デフォルト値なし | 有効な値:
|
データ型マッピング
Apache Flink 用 Realtime Compute のデータ型 | Tair のデータ型 |
VARCHAR | STRING |
DOUBLE | DOUBLE |
Tair データ構造の形式
データ構造 | 形式 | Tair シンクテーブルにデータを挿入するためのコマンド |
incrMode パラメータが None に設定されている場合、DDL ステートメントには 2 つの列があります。
| | |
incrMode パラメータが int または float に設定されている場合、DDL ステートメントには 1 つの列のみがあり、その列には STRING 型のキーが一覧表示されます。 | | |
incrMode パラメータが dynamic_int または dynamic_float に設定されている場合、DDL ステートメントには 2 つの列があります。
| | |
incrMode パラメータが None に設定されている場合、DDL ステートメントには 3 つの列があります。
| | |
incrMode パラメータが int または float に設定されている場合、DDL ステートメントには 2 つの列があります。
| | |
incrMode パラメータが dynamic_int または dynamic_float に設定されている場合、DDL ステートメントには 3 つの列があります。
| | |
incrMode パラメータが None に設定されている場合、TairZset は多次元データソートをサポートします。 TairZset では、最大 256 次元の DOUBLE 型のデータをソートできます。したがって、DDL ステートメントには 3 ~ 258 列があります。
| 説明 複数次元からデータをソートする場合は、すべての次元のスコア形式が同じであることを確認してください。 | |
incrMode パラメータが int または float に設定されている場合、DDL ステートメントには 2 つの列があります。
| | |
incrMode パラメータが dynamic_int または dynamic_float に設定されている場合、DDL ステートメントには 3 つの列があります。
| | |
incrMode パラメータは None に設定する必要があります。 Tair シンクテーブルに初めてデータを挿入すると、デフォルトの容量が 100 要素、エラー率が 0.01 の TairBloom キーが作成されます。 DDL ステートメントには 2 つの列があります。
| | |
incrMode パラメータは None に設定する必要があります。 DDL ステートメントには 3 つの列があります。
| | |
incrMode パラメータが None に設定されている場合、DDL ステートメントには 4 つの列があります。
| 説明 Tair シンクテーブルにデータを挿入する前に、インデックスを作成し、マッピングを追加する必要があります。コマンド例: | |
incrMode パラメータが int または float に設定されている場合、DDL ステートメントには 4 つの列があります。
| ドキュメント操作のコマンド例: 説明 Tair シンクテーブルにデータを挿入する前に、インデックスを作成し、マッピングを追加する必要があります。コマンド例: | |
incrMode パラメータが dynamic_int または dynamic_float に設定されている場合、DDL ステートメントには 5 つの列があります。
| ドキュメント操作のコマンド例: 説明 Tair シンクテーブルにデータを挿入する前に、インデックスを作成し、マッピングを追加する必要があります。コマンド例: | |
incrMode パラメータは None に設定する必要があります。 DDL ステートメントには 2 つの列があります。
| | |
incrMode パラメータは None に設定する必要があります。 DDL ステートメントには 3 つの列があります。
| | |
incrMode パラメータは None に設定する必要があります。 DDL ステートメントには 3 つの列があります。
| | |
incrMode パラメータは None に設定する必要があります。 DDL ステートメントには 6 つの列があります。
| 説明 Tair シンクテーブルにデータを挿入する前に、インデックスを作成し、マッピングを追加する必要があります。コマンド例: | |
incrMode パラメータが None に設定されている場合、DDL ステートメントには 4 つの列があります。
| | |
incrMode パラメータが float に設定されている場合、DDL ステートメントには 3 つの列があります。
| | |
incrMode パラメータが dynamic_float に設定されている場合、DDL ステートメントには 4 つの列があります。
| |
サンプルコード
共通モードで Tair シンクテーブルにデータを挿入するサンプルコード
CREATE TEMPORARY TABLE datagen_stream ( // データ生成ストリームを作成します v STRING, // v を STRING 型として定義します p STRING // p を STRING 型として定義します ) WITH ( 'connector' = 'datagen' // コネクタを datagen に設定します ); CREATE TEMPORARY TABLE tair_output ( // 一時的な Tair 出力テーブルを作成します index_name STRING, // index_name を STRING 型として定義します doc_id STRING, // doc_id を STRING 型として定義します doc STRING, // doc を STRING 型として定義します mapping STRING, // mapping を STRING 型として定義します PRIMARY KEY(index_name) NOT ENFORCED // index_name をプライマリキーとして設定しますが、強制しません ) WITH ( 'connector' = 'tair', // コネクタを tair に設定します 'mode' = 'tairsearch', // モードを tairsearch に設定します 'host' = '${tairHost}', // ホストを ${tairHost} に設定します 'port' = '${tairPort}', // ポートを ${tairPort} に設定します 'password' = '${password}' // パスワードを ${password} に設定します ); INSERT INTO tair_output // tair_output テーブルにデータを挿入します SELECT 'index' as index,v,p,'{"mappings":{"_source":{"enabled":true},"properties":{"product_id":{"type":"keyword","ignore_above":128},"product_name":{"type":"text"}}}}' as mapping // index、v、p、およびマッピングを選択します FROM datagen_stream; // datagen_stream からデータを選択しますincrMode パラメータが構成されている場合に Tair シンクテーブルにデータを挿入するサンプルコード
CREATE TEMPORARY TABLE datagen_stream ( // データ生成ストリームを作成します v STRING, // v を STRING 型として定義します p STRING // p を STRING 型として定義します ) WITH ( 'connector' = 'datagen' // コネクタを datagen に設定します ); CREATE TEMPORARY TABLE tair_output ( // 一時的な Tair 出力テーブルを作成します key STRING, // key を STRING 型として定義します step STRING, // step を STRING 型として定義します PRIMARY KEY (key) NOT ENFORCED // key をプライマリキーとして設定しますが、強制しません ) WITH ( 'connector' = 'tair', // コネクタを tair に設定します 'mode' = 'tairstring', // モードを tairstring に設定します 'host' = '${tairHost}', // ホストを ${tairHost} に設定します 'port' = '${tairPort}', // ポートを ${tairPort} に設定します 'password' = '${password}', // パスワードを ${password} に設定します 'incrMode' = 'dynamic_float', // incrMode を dynamic_float に設定します 'incrValue' = 'step' // incrValue を step に設定します ); INSERT INTO tair_output // tair_output テーブルにデータを挿入します SELECT * // すべての列を選択します FROM datagen_stream; // datagen_stream からデータを選択しますCREATE TEMPORARY TABLE datagen_stream ( // データ生成ストリームを作成します v STRING, // v を STRING 型として定義します p STRING // p を STRING 型として定義します ) WITH ( 'connector' = 'datagen' // コネクタを datagen に設定します ); CREATE TEMPORARY TABLE tair_output ( // 一時的な Tair 出力テーブルを作成します key STRING, // key を STRING 型として定義します PRIMARY KEY (key) NOT ENFORCED // key をプライマリキーとして設定しますが、強制しません ) WITH ( 'connector' = 'tair', // コネクタを tair に設定します 'mode' = 'tairstring', // モードを tairstring に設定します 'host' = '${tairHost}', // ホストを ${tairHost} に設定します 'port' = '${tairPort}', // ポートを ${tairPort} に設定します 'password' = '${password}', // パスワードを ${password} に設定します 'incrMode' = 'float', // incrMode を float に設定します 'incrValue' = '11.11' // incrValue を 11.11 に設定します ); INSERT INTO tair_output // tair_output テーブルにデータを挿入します SELECT v // v を選択します FROM datagen_stream; // datagen_stream からデータを選択しますfieldExpireMode パラメータが構成されている場合に Tair シンクテーブルにデータを挿入するサンプルコード
CREATE TEMPORARY TABLE datagen_stream ( // データ生成ストリームを作成します v STRING, // v を STRING 型として定義します p STRING, // p を STRING 型として定義します s STRING // s を STRING 型として定義します ) WITH ( 'connector' = 'datagen' // コネクタを datagen に設定します ); CREATE TEMPORARY TABLE tair_ouput ( // 一時的な Tair 出力テーブルを作成します key STRING, // key を STRING 型として定義します field STRING, // field を STRING 型として定義します value STRING, // value を STRING 型として定義します PRIMARY KEY (key) NOT ENFORCED // key をプライマリキーとして設定しますが、強制しません ) WITH ( 'connector' = 'tair', // コネクタを tair に設定します 'mode' = 'tairhash', // モードを tairhash に設定します 'host' = '${tairHost}', // ホストを ${tairHost} に設定します 'port' = '${tairPort}', // ポートを ${tairPort} に設定します 'password' = '${password}', // パスワードを ${password} に設定します 'fieldExpireMode' = 'millisecond', // fieldExpireMode を millisecond に設定します 'fieldExpireValue' = '1000' // fieldExpireValue を 1000 に設定します ); INSERT INTO tair_output // tair_output テーブルにデータを挿入します SELECT v, p, s // v、p、s を選択します FROM datagen_stream; // datagen_stream からデータを選択します