このトピックでは、Tair (Redis OSS-compatible) コネクタの使用方法について説明します。
背景情報
Alibaba Cloud Tair は、オープンソースの Redis プロトコルと互換性のあるデータベースサービスです。メモリとディスクの両方を使用するハイブリッドストレージを提供します。Tair は、アクティブ/アクティブの高可用性アーキテクチャとスケーラブルなクラスターアーキテクチャを特徴とし、高スループット、低レイテンシー、柔軟なスケーリングといったビジネスニーズに対応します。詳細については、「Tair (Redis OSS 互換) とは」をご参照ください。
Redis コネクタは、以下の機能をサポートしています。
カテゴリ | 詳細 |
サポートされるタイプ | ディメンションテーブルとシンクテーブル |
サポートされるモード | ストリーミング |
データ形式 | String |
特定のモニタリングメトリック |
説明 メトリックの詳細については、「メトリックの説明」をご参照ください。 |
API タイプ | SQL |
結果テーブルでのデータ更新または削除のサポート | はい |
前提条件
Tair (Redis OSS 互換) インスタンスを作成します。詳細については、「ステップ 1:インスタンスの作成」をご参照ください。
ホワイトリストを設定します。詳細については、「ステップ 2:ホワイトリストの設定」をご参照ください。
制限事項
Redis コネクタはベストエフォートセマンティクスのみを提供し、exactly-once 配信を保証することはできません。したがって、べき等性を保証する必要があります。
ディメンションテーブルには以下の制限が適用されます:
Redis データストレージからは STRING および HASHMAP データのみを読み取ることができます。
ディメンションテーブルのすべてのフィールドは STRING 型である必要があります。プライマリキーを 1 つだけ宣言する必要があります。
ディメンションテーブルを結合する場合、ON 句にはプライマリキーの等価条件を含める必要があります。
既知の問題と解決策
Ververica Runtime (VVR) 8.0.9 には、キャッシュ機能に関する既知の問題があります。この問題を回避するには、結果テーブルの WITH 句で 'sink.buffer-flush.max-rows' = '0' を設定して、この機能を無効にすることができます。
構文
CREATE TABLE redis_table (
col1 STRING,
col2 STRING,
PRIMARY KEY (col1) NOT ENFORCED -- 必須。
) WITH (
'connector' = 'redis',
'host' = '<yourHost>',
'mode' = 'STRING' -- 結果テーブルに必須。
);WITH パラメーター
一般
パラメーター | 説明 | データ型 | 必須 | デフォルト値 | 備考 |
connector | テーブルのタイプ。 | String | はい | なし | 値は redis である必要があります。 |
host | Redis サーバーの接続アドレス。 | String | はい | なし | 内部ネットワークアドレスを使用します。 説明 パブリックネットワークアドレスへの接続は、ネットワーク遅延や帯域幅制限により不安定になる場合があります。 |
port | Redis サーバーの接続ポート。 | Int | いいえ | 6379 | なし。 |
password | Redis データベースのパスワード。 | String | いいえ | 空の文字列。これは検証が実行されないことを意味します。 | なし。 |
dbNum | 操作対象のデータベースの番号。 | Int | いいえ | 0 | なし。 |
clusterMode | Redis クラスターがクラスターモードであるかどうかを指定します。 | Boolean | いいえ | false | なし。 |
hostAndPorts | Redis クラスターのホストとポート番号。 説明 クラスターモードが有効で、高可用性接続が不要な場合は、host および port パラメーターを使用して 1 つのホストのみを構成できます。このパラメーターのみを構成することもできます。このパラメーターは、個別の host および port パラメーターよりも優先度が高くなります。 | String | いいえ | Empty |
|
key-prefix | テーブルのプライマリキー値のプレフィックス。 | String | いいえ | なし | このパラメーターを構成すると、データのクエリまたは書き込み時に、プライマリキーのフィールド値にプレフィックスが自動的に追加されます。プレフィックスは、キープレフィックス (key-prefix) とプレフィックス区切り文字 (key-prefix-delimiter) で構成されます。 説明 このパラメーターは VVR 8.0.7 以降でのみサポートされます。 |
key-prefix-delimiter | プライマリキー値とそのプレフィックスの間の区切り文字。 | String | いいえ | なし | |
connection.pool.max-total | 接続プールが割り当てることができる最大接続数。 | Int | いいえ | 8 | 説明 このパラメーターは VVR 8.0.9 以降でのみサポートされます。 |
connection.pool.max-idle | 接続プール内のアイドル接続の最大数。 | Int | いいえ | 8 | |
connection.pool.min-idle | 接続プール内のアイドル接続の最小数。 | Int | いいえ | 0 | |
connect.timeout | 接続確立のタイムアウト。 | Duration | いいえ | 3000ms | |
socket.timeout | Redis サーバーからのデータ受信のタイムアウト (ソケットタイムアウト)。 | Duration | いいえ | 3000ms | |
cacert.filepath | SSL/TLS 証明書ファイルの完全なパス。ファイル形式は JKS である必要があります。 | String | いいえ | なし。これは SSL/TLS 暗号化が無効であることを意味します。 | 詳細については、「TLS 暗号化の有効化」をご参照いただき、CA 証明書をダウンロードしてください。ジョブの [追加依存ファイル] セクションで証明書をアップロードします。アップロード後、CA 証明書は /flink/usrlib フォルダーに保存されます。[追加依存ファイル] セクションでファイルをアップロードする方法の詳細については、「ジョブのデプロイ」をご参照ください。例: 説明 このパラメーターは VVR 11.1 以降でのみサポートされます。 |
結果テーブル固有
パラメーター | 説明 | データ型 | 必須 | デフォルト値 | 備考 |
mode | 対応する Redis のデータの型。 | String | はい | なし | Tair の結果テーブルは 5 種類の Redis データ型をサポートしています。DDL は指定された形式で定義し、プライマリキーを定義する必要があります。詳細については、「Redis 結果テーブルのデータ形式」をご参照ください。 |
flattenHash | HASHMAP データをマルチバリューモードで書き込むかどうかを指定します。 | Boolean | いいえ | false | 有効な値:
説明
|
ignoreDelete | リトラクションメッセージを無視するかどうかを指定します。 | Boolean | いいえ | false | 有効な値:
|
expiration | 書き込まれたデータのキーに TTL を設定します。 | Long | いいえ | 0。これは TTL が設定されていないことを意味します。 | このパラメーターの値が 0 より大きい場合、書き込まれたデータのキーに対応する TTL が設定されます。単位はミリ秒です。 |
sink.buffer-flush.max-rows | キャッシュに保存できるレコードの最大数。 | Int | いいえ | 200 | キャッシュされたレコードには、すべての追加、更新、削除イベントが含まれます。レコードの最大数を超えると、キャッシュはフラッシュされます。 説明
|
sink.buffer-flush.interval | キャッシュのフラッシュ間隔。 | Duration | いいえ | 1000ms | キャッシュを非同期でフラッシュします。 説明
|
ディメンションテーブル固有
パラメーター | 説明 | データの型 | 必須 | デフォルト値 | 備考 |
mode | Redis から読み取るデータの型。 | String | いいえ | STRING | 有効な値: STRING: デフォルトでデータを STRING 型として読み取ります。 HASHMAP: データをマルチバリューモードで HASHMAP 型として読み取ります。 この場合、DDL で複数の非プライマリキーフィールドを宣言する必要があります。
説明 このパラメーターは VVR 8.0.7 以降でのみサポートされます。 HASHMAP データをシングルバリューモードで読み取るには、hashName パラメーターを構成します。 |
hashName | HASHMAP データをシングルバリューモードで読み取る際に使用するキー。 | String | いいえ | なし | mode パラメーターを指定せず、HASHMAP データをシングルバリューモードで読み取る場合は、hashName を構成する必要があります。 この場合、DDL で 2 つのフィールドのみを宣言する必要があります。最初のプライマリキーフィールドの値はフィールドに対応し、2 番目の非プライマリキーフィールドの値は値に対応します。 |
cache | キャッシュポリシー。 | String | いいえ | None | Tair ディメンションテーブルは、以下のキャッシュポリシーをサポートしています: None (デフォルト): キャッシュなし。 LRU: ディメンションテーブルのデータの一部をキャッシュします。ソーステーブルの各レコードについて、システムはまずキャッシュを検索します。データが見つからない場合は、物理ディメンションテーブルを検索します。 ALL: ディメンションテーブルのすべてのデータをキャッシュします。ジョブが実行される前に、システムはディメンションテーブルのすべてのデータをキャッシュにロードします。以降のすべてのルックアップはキャッシュに対して実行されます。キャッシュにデータが見つからない場合、そのキーは存在しません。フルキャッシュには有効期限があります。有効期限が切れると、フルキャッシュは再読み込みされます。 重要
|
cacheSize | キャッシュサイズ。 | Long | いいえ | 10000 | LRU キャッシュポリシーを選択する場合は、キャッシュサイズを設定する必要があります。 |
cacheTTLMs | キャッシュのタイムアウト (ミリ秒)。 | Long | いいえ | なし | cacheTTLMs の構成は、cache の設定に依存します: cache が None に設定されている場合、cacheTTLMs を構成する必要はありません。これはキャッシュがタイムアウトしないことを意味します。 cache が LRU に設定されている場合、cacheTTLMs はキャッシュのタイムアウト期間です。デフォルトでは有効期限はありません。 cache が ALL に設定されている場合、cacheTTLMs はキャッシュの再読み込み時間です。デフォルトでは再読み込みされません。 |
cacheEmpty | 空の結果をキャッシュするかどうかを指定します。 | Boolean | いいえ | true | なし。 |
cacheReloadTimeBlackList | キャッシュ更新禁止時間。キャッシュポリシーが ALL に設定されている場合、独身の日 (ダブルイレブン) イベント中など、この期間中のキャッシュ更新を防ぐために禁止時間を有効にできます。 | String | いいえ | なし | 形式:
区切り文字は次のように使用します:
|
async | データを非同期で返すかどうかを指定します。 | Boolean | いいえ | false |
|
Redis 結果テーブルのデータ形式
タイプ | 形式 | データを挿入する Redis コマンド |
STRING 型 | DDL には 2 つの列があります:
|
|
LIST 型 | DDL には 2 つの列があります:
|
|
SET 型 | DDL には 2 つの列があります:
|
|
HASHMAP 型 | デフォルトでは、DDL には 3 つの列があります:
|
|
flattenHash パラメーターが true に設定されている場合、DDL は複数の列をサポートします。次の例では 4 つの列を使用します:
|
| |
SORTEDSET 型 | DDL には 3 つの列があります:
|
|
型マッピング
カテゴリ | Redis フィールド型 | Flink フィールド型 |
一般 | STRING | STRING |
結果テーブル固有 | SCORE | DOUBLE |
Redis の SCORE 型は SORTEDSET (ソート済みセット) データ構造に適用されます。したがって、各値に手動で DOUBLE のスコアを設定する必要があります。値はスコアに基づいて昇順にソートされます。
例
結果テーブル
STRING データの書き込み:この例では、
redis_sink結果テーブルのuser_id列の値がキーとして使用され、login_time列の値が値として使用されます。両方とも Redis に書き込まれます。CREATE TEMPORARY TABLE kafka_source ( user_id STRING, -- ユーザー ID login_time STRING -- ログインタイムスタンプ ) WITH ( 'connector' = 'kafka', 'topic' = 'user_logins', -- Kafka トピック 'properties.bootstrap.servers' = 'yourKafkaBroker', -- Kafka ブローカーアドレス 'format' = 'json', -- データ形式は JSON です。 'scan.startup.mode' = 'earliest-offset' -- 最も早いオフセットから消費を開始します。 ); CREATE TEMPORARY TABLE redis_sink ( user_id STRING, -- Redis キー login_time STRING, -- Redis 値 PRIMARY KEY (user_id) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'mode' = 'STRING', -- STRING モードを使用します。 'host' = 'yourHost', 'port' = 'yourPort', 'password' = 'yourPassword' ); INSERT INTO redis_sink SELECT * FROM kafka_source;マルチバリューモードでの HASHMAP データの書き込み:この例では、
redis_sink結果テーブルのorder_id列の値がキーとして使用されます。product_name、quantity、およびamount列の値は、それぞれ Redis の `product_name`、`quantity`、および `amount` フィールドに書き込まれます。CREATE TEMPORARY TABLE kafka_source ( order_id STRING, -- 注文 ID product_name STRING, -- 製品名 quantity STRING, -- 製品数量 amount STRING -- 注文金額 ) WITH ( 'connector' = 'kafka', 'topic' = 'orders_topic', -- Kafka トピック 'properties.bootstrap.servers' = 'yourKafkaBroker', -- Kafka ブローカーアドレス 'format' = 'json', -- データ形式は JSON です。 'scan.startup.mode' = 'earliest-offset' -- 最も早いオフセットから消費を開始します。 ); CREATE TEMPORARY TABLE redis_sink ( order_id STRING, -- 注文 ID、Redis キーとして使用されます。 product_name STRING, -- 製品名、Redis Hash フィールドとして使用されます。 quantity STRING, -- 製品数量、Redis Hash フィールドとして使用されます。 amount STRING, -- 注文金額、Redis Hash フィールドとして使用されます。 PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'mode' = 'HASHMAP', -- HASHMAP モードを使用します。 'flattenHash' = 'true', 'host' = 'yourHost', 'port' = 'yourPort', 'password' = 'yourPassword' ); INSERT INTO redis_sink SELECT * FROM kafka_source;シングルバリューモードでの HASHMAP データの書き込み:この例では、
redis_sink結果テーブルのorder_id列の値がキーとして使用されます。product_name列の値はフィールドとして、quantity列の値は値として使用されます。すべてのデータは Redis に書き込まれます。CREATE TEMPORARY TABLE kafka_source ( order_id STRING, -- 注文 ID product_name STRING, -- 製品名 quantity STRING -- 製品数量 ) WITH ( 'connector' = 'kafka', 'topic' = 'orders_topic', -- Kafka トピック 'properties.bootstrap.servers' = 'yourKafkaBroker', -- Kafka ブローカーアドレス 'format' = 'json', -- データ形式は JSON です。 'scan.startup.mode' = 'earliest-offset' -- 最も早いオフセットから消費を開始します。 ); CREATE TEMPORARY TABLE redis_sink ( order_id STRING, -- Redis キー product_name STRING, -- Redis フィールド quantity STRING, -- Redis 値 PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'mode' = 'HASHMAP', 'host' = 'yourHost', 'port' = 'yourPort', 'password' = 'yourPassword' ); INSERT INTO redis_sink SELECT * FROM kafka_source;
ディメンションテーブル
STRING データの読み取り:この例では、
redis_dimディメンションテーブルのuser_id列の値がキーで、user_name列の値が値です。CREATE TEMPORARY TABLE kafka_source ( user_id STRING, -- ユーザー ID proctime AS PROCTIME() -- 処理時間 ) WITH ( 'connector' = 'kafka', 'topic' = 'user_clicks', -- Kafka トピック 'properties.bootstrap.servers' = 'yourKafkaBroker', -- Kafka ブローカーアドレス 'format' = 'json', -- データ形式は JSON です。 'scan.startup.mode' = 'earliest-offset' -- 最も早いオフセットから消費を開始します。 ); CREATE TEMPORARY TABLE redis_dim ( user_id STRING, -- ユーザー ID (Redis キー) user_name STRING, -- ユーザー名 (Redis 値) PRIMARY KEY (user_id) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'host' = 'yourHost', -- Redis ホストアドレス 'port' = 'yourPort', -- Redis ポート 'password' = 'yourPassword', -- Redis パスワード 'mode' = 'STRING' -- STRING モードを使用します。 ); CREATE TEMPORARY TABLE blackhole_sink ( user_id STRING, -- ユーザー ID redis_user_id STRING, -- Redis 内のユーザー ID user_name STRING -- ユーザー名 ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT t1.user_id, -- ユーザー ID (Kafka から) t2.user_id, -- Redis 内のユーザー ID t2.user_name -- ユーザー名 (Redis から) FROM kafka_source AS t1 JOIN redis_dim FOR SYSTEM_TIME AS OF t1.proctime AS t2 ON t1.user_id = t2.user_id;マルチバリューモードでの HASHMAP データの読み取り:この例では、
redis_dimディメンションテーブルのuser_id列の値がキーです。user_name、email、およびregister_time列の値は、それぞれ `user_name`、`email`、および `register_time` フィールドの値に対応します。CREATE TEMPORARY TABLE kafka_source ( user_id STRING, -- ユーザー ID click_time TIMESTAMP(3), -- クリック時間 proctime AS PROCTIME() -- 処理時間 ) WITH ( 'connector' = 'kafka', 'topic' = 'user_clicks', -- Kafka トピック 'properties.bootstrap.servers' = 'yourKafkaBroker', -- Kafka ブローカーアドレス 'format' = 'json', -- データ形式は JSON です。 'scan.startup.mode' = 'earliest-offset' ); CREATE TEMPORARY TABLE redis_dim ( user_id STRING, -- ユーザー ID (Redis キー) user_name STRING, -- ユーザー名 (Redis フィールド-値ペアの一部) email STRING, -- メールアドレス (Redis フィールド-値ペアの一部) register_time STRING, -- 登録時間 (Redis フィールド-値ペアの一部) PRIMARY KEY (user_id) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'host' = 'yourHost', 'port' = 'yourPort', 'password' = 'yourPassword', 'mode' = 'HASHMAP' -- HASHMAP モードを使用します。 ); CREATE TEMPORARY TABLE blackhole_sink ( user_id STRING, -- ユーザー ID user_name STRING, -- ユーザー名 email STRING, -- メールアドレス register_time STRING, -- 登録時間 click_time TIMESTAMP(3) -- クリック時間 ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT t1.user_id, -- ユーザー ID t2.user_name, -- ユーザー名 t2.email, -- メールアドレス t2.register_time, -- 登録時間 t1.click_time -- クリック時間 FROM kafka_source AS t1 JOIN redis_dim FOR SYSTEM_TIME AS OF t1.proctime AS t2 ON t1.user_id = t2.user_id;シングルバリューモードでの HASHMAP データの読み取り:この例では、
hashNameパラメーターの値 `testKey` がキーです。redis_dimディメンションテーブルのuser_id列の値はフィールドで、user_name列の値は値です。CREATE TEMPORARY TABLE kafka_source ( user_id STRING, -- ユーザー ID proctime AS PROCTIME() -- 処理時間 ) WITH ( 'connector' = 'kafka', 'topic' = 'user_clicks', -- Kafka トピック 'properties.bootstrap.servers' = 'yourKafkaBroker', -- Kafka ブローカーアドレス 'format' = 'json', -- データ形式は JSON です。 'scan.startup.mode' = 'earliest-offset' -- 最も早いオフセットから消費を開始します。 ); CREATE TEMPORARY TABLE redis_dim ( user_id STRING, -- ユーザー ID (Redis ハッシュフィールド) user_name STRING, -- ユーザー名 (Redis ハッシュ値) PRIMARY KEY (user_id) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'host' = 'yourHost', -- Redis ホストアドレス 'port' = 'yourPort', -- Redis ポート 'password' = 'yourPassword',-- Redis パスワード 'hashName' = 'testkey' -- 固定 Redis ハッシュ名 ); CREATE TEMPORARY TABLE blackhole_sink ( user_id STRING, -- ユーザー ID redis_user_id STRING, -- Redis 内のユーザー ID user_name STRING -- ユーザー名 ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT t1.user_id, -- ユーザー ID (Kafka から) t2.user_id, -- Redis 内のユーザー ID t2.user_name -- ユーザー名 (Redis から) FROM kafka_source AS t1 JOIN redis_dim FOR SYSTEM_TIME AS OF t1.proctime AS t2 ON t1.user_id = t2.user_id;