このトピックでは、Tair (Enterprise Edition) コネクタの使用方法について説明します。
背景情報
Tair (Redis OSS-compatible) は、オープンソース Redis システムのプロトコルと互換性のあるデータベースサービスです。Tair (Redis OSS-compatible) は、メモリとハードディスクを組み合わせたハイブリッドストレージをサポートしています。また、高可用性を確保するためのホットスタンバイアーキテクチャを提供し、スケーラブルなクラスタアーキテクチャにより、高スループット、低遅延の操作、柔軟な構成変更といったビジネス要件に対応します。
Tair コネクタは以下の機能をサポートしています。
カテゴリ | 説明 |
サポートされるタイプ | 結果テーブル |
実行モード | ストリームモードのみサポートされています。 |
データフォーマット | STRING |
固有メトリック |
説明 メトリックの詳細については、「Metrics」をご参照ください。 |
API タイプ | SQL |
結果テーブルでのデータ更新または削除 | はい。 |
前提条件
Tair (Enterprise Edition) インスタンスが作成されました。詳細については、「手順 1: インスタンスの作成」をご参照ください。
Tair (Enterprise Edition) インスタンスに対して IP アドレスホワイトリストが設定済みである必要があります。詳細については、「Step 2: Configure whitelists」をご参照ください。
制限事項
Ververica Runtime (VVR) 6.0.6 以降を使用する Realtime Compute for Apache Flink のみが、Tair (Enterprise Edition) コネクタをサポートしています。
Tair (Enterprise Edition) コネクタでは、複数のホストを設定することはできません。
構文
Tair (Enterprise Edition) は、STRING、LIST、SET、HASHMAP、SORTEDSET といった Redis データ構造との互換性に基づき、独自開発の Tair データ構造すべてをサポートしています。
以下は DDL 文の例です。
CREATE TABLE tair_table (
a STRING,
b STRING,
PRIMARY KEY (a) NOT ENFORCED -- 必須。
) WITH (
'connector'= 'tair',
'host' = '<yourHost>'
);Tair は Redis データ構造と互換性があります。Redis データ構造の構文例の詳細については、「Tair (Redis OSS-compatible) connector」をご参照ください。
WITH 句のパラメーター
パラメーター | 説明 | データの型 | 必須 | デフォルト値 | 備考 |
connector | テーブルのタイプ。 | String | はい | なし | 静的フィールドの値は tair です。 |
host | Tair サーバーのエンドポイント。 | String | はい | なし | 内部エンドポイントの使用を推奨します。 説明 ネットワーク遅延や帯域幅制限などの要因により、パブリックネットワーク接続は不安定になる可能性があります。 |
mode | Tair のデータ構造。 | STRING | はい | なし | 有効な値:
Tair は Redis データ構造および独自開発の Tair データ構造をサポートしています。有効な値の詳細については、「ApsaraDB for Redis 結果テーブルでサポートされるデータ構造」および「Tair データ構造のフォーマット」をご参照ください。 説明
|
port | Tair サーバーのポート番号。 | INT | いいえ | 6379 | 該当なし。 |
password | Tair データベースへのアクセスに使用するパスワード。 | String | いいえ | 空文字列 | 空文字列は認証を行わないことを示します。 |
dbNum | ターゲットデータベースの ID。 | INT | いいえ | 0 | 該当なし。 |
clusterMode | クラスタアーキテクチャを使用するかどうかを指定します。 | BOOLEAN | いいえ | false | 有効な値:
|
ignoreDelete | 取り消しメッセージを無視するかどうかを指定します。 | BOOLEAN | いいえ | false | 有効な値:
|
expiration | 挿入されたデータのキーに設定する生存時間 (TTL)。 | LONG | いいえ | 0 | 値 0 は TTL が設定されていないことを示します。このパラメーターの値が 0 より大きい場合、挿入されたデータのキーに対して TTL が設定されます。単位:ミリ秒。 |
expirationAt | 挿入されたデータのキーに設定する絶対有効期限。 | LONG | いいえ | 0 | 単位:ミリ秒。デフォルト値:0。デフォルト値は有効期限が設定されていないことを示します。 このパラメーターの値が 0 より大きく、かつ expiration パラメーターが 0 に設定されている場合、挿入されたデータのキーに対して絶対有効期限が設定されます。 |
incrMode | Tair データベースの sink モード。 | STRING | いいえ | None | 有効な値:
|
incrValue | incrMode パラメーターの値に基づいて決定される INCR の値。 | STRING | いいえ | None | このパラメーターは以下の値を受け入れます:
|
fieldExpireMode | TairHash のフィールドまたは TairTS の skey の有効期限モード。 | String | いいえ | None | 有効な値:
|
fieldExpireValue | TairHash のフィールドまたは TairTS の skey の有効期限。 | String | いいえ | None | 有効な値:
|
データ型のマッピング
Flink フィールドの型 | Tair のデータ型 |
VARCHAR | STRING |
DOUBLE | DOUBLE |
Tair データ構造のフォーマット
タイプ | フォーマット | コマンド |
incrMode パラメーターが None に設定されている場合、DDL 文には 2 つのカラムがあります。
| | |
incrMode パラメーターが int または float に設定されている場合、DDL 文には 1 つのカラムのみがあり、そのカラムには STRING 型のキーが含まれます。 | | |
incrMode パラメーターが dynamic_int または dynamic_float に設定されている場合、DDL 文には 2 つのカラムがあります。
| | |
incrMode パラメーターが None に設定されている場合、DDL 文には 3 つのカラムがあります。
| | |
incrMode パラメーターが int または float に設定されている場合、DDL 文には 2 つのカラムがあります。
| | |
incrMode パラメーターが dynamic_int または dynamic_float に設定されている場合、DDL 文には 3 つのカラムがあります。
| | |
incrMode パラメーターが None に設定されている場合、TairZset は多次元データのソートをサポートします。TairZset では、最大 256 次元の DOUBLE 型データをソートできます。そのため、DDL 文には 3 ~ 258 のカラムがあります。
| 説明 多次元でデータをソートする場合は、すべての次元のスコアフォーマットが同一であることを確認してください。 | |
incrMode パラメーターが int または float に設定されている場合、DDL 文には 2 つのカラムがあります。
| | |
incrMode パラメーターが dynamic_int または dynamic_float に設定されている場合、DDL 文には 3 つのカラムがあります。
| | |
incrMode パラメーターは None に設定する必要があります。 データが Tair 結果テーブルに初めて挿入される際、デフォルト容量 100 要素、エラー率 0.01 の TairBloom キーが作成されます。DDL 文には 2 つのカラムがあります。
| | |
incrMode パラメーターは None に設定する必要があります。DDL 文には 3 つのカラムがあります。
| | |
incrMode パラメーターが None に設定されている場合、DDL 文には 4 つのカラムがあります。
| 説明 Tair 結果テーブルにデータを挿入する前に、インデックスを作成し、マッピングを追加する必要があります。サンプルコマンド: | |
incrMode パラメーターが int または float に設定されている場合、DDL 文には 4 つのカラムがあります。
| ドキュメント操作のサンプルコマンド: 説明 Tair 結果テーブルにデータを挿入する前に、インデックスを作成し、マッピングを追加する必要があります。サンプルコマンド: | |
incrMode パラメーターが dynamic_int または dynamic_float に設定されている場合、DDL 文には 5 つのカラムがあります。
| ドキュメント操作に使用されるコマンドは以下のとおりです。 説明 Tair 結果テーブルにデータを挿入する前に、インデックスを作成し、マッピングを追加する必要があります。サンプルコマンド: | |
incrMode パラメーターは None に設定する必要があります。DDL 文には 2 つのカラムがあります。
| | |
incrMode パラメーターは None に設定する必要があります。DDL 文には 3 つのカラムがあります。
| | |
incrMode パラメーターは None に設定する必要があります。DDL 文には 3 つのカラムがあります。
| | |
incrMode パラメーターは None に設定する必要があります。DDL 文には 6 つのカラムがあります。
| 説明 Tair 結果テーブルにデータを挿入する前に、インデックスを作成し、マッピングを追加する必要があります。サンプルコマンド: | |
incrMode パラメーターが None に設定されている場合、DDL 文には 4 つのカラムがあります。
| | |
incrMode パラメーターが float に設定されている場合、DDL 文には 3 つのカラムがあります。
| | |
incrMode パラメーターが dynamic_float に設定されている場合、DDL 文には 4 つのカラムがあります。
| |
サンプルコード
通常モードで Tair 結果テーブルにデータを挿入するサンプルコード
CREATE TEMPORARY TABLE datagen_stream ( v STRING, p STRING ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE tair_output ( index_name STRING, doc_id STRING, doc STRING, mapping STRING, PRIMARY KEY(index_name) NOT ENFORCED ) WITH ( 'connector' = 'tair', 'mode' = 'tairsearch', 'host' = '${tairHost}', 'port' = '${tairPort}', 'password' = '${password}' ); INSERT INTO tair_output SELECT 'index' as index,v,p,'{"mappings":{"_source":{"enabled":true},"properties":{"product_id":{"type":"keyword","ignore_above":128},"product_name":{"type":"text"}}}}' as mapping FROM datagen_stream;incr パターンでの挿入データの例
CREATE TEMPORARY TABLE datagen_stream ( v STRING, p STRING ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE tair_output ( key STRING, step STRING, PRIMARY KEY (key) NOT ENFORCED ) WITH ( 'connector' = 'tair', 'mode' = 'tairstring', 'host' = '${tairHost}', 'port' = '${tairPort}', 'password' = '${password}', 'incrMode' = 'dynamic_float', 'incrValue' = 'step' ); INSERT INTO tair_output SELECT * FROM datagen_stream;CREATE TEMPORARY TABLE datagen_stream ( v STRING, p STRING ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE tair_output ( key STRING, PRIMARY KEY (key) NOT ENFORCED ) WITH ( 'connector' = 'tair', 'mode' = 'tairstring', 'host' = '${tairHost}', 'port' = '${tairPort}', 'password' = '${password}', 'incrMode' = 'float', 'incrValue' = '11.11' ); INSERT INTO tair_output SELECT v FROM datagen_stream;fieldExpire パターンを使用して結果テーブルに書き込まれるデータの例
CREATE TEMPORARY TABLE datagen_stream ( v STRING, p STRING, s STRING ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE tair_ouput ( key STRING, field STRING, value STRING, PRIMARY KEY (key) NOT ENFORCED ) WITH ( 'connector' = 'tair', 'mode' = 'tairhash', 'host' = '${tairHost}', 'port' = '${tairPort}', 'password' = '${password}', 'fieldExpireMode' = 'millisecond', 'fieldExpireValue' = '1000' ); INSERT INTO tair_output SELECT v, p, s FROM datagen_stream;