このトピックでは、Tair (Redis OSS-compatible) コネクタの使用方法について説明します。
背景情報
Alibaba Cloud Tair は、オープンソース Redis プロトコルと互換性のあるデータベースサービスであり、ハイブリッドメモリ・ディスクストレージを提供します。Tair は、高スループット、低レイテンシー、柔軟なスケールアップ/スケールダウンといったビジネス要件を満たすために、高可用性 (HA) アクティブ/アクティブアーキテクチャとスケーラブルなクラスタアーキテクチャを使用しています。詳細については、「Tair (Redis OSS-compatible) とは?」をご参照ください。
以下の表は、Redis コネクタでサポートされる特徴について説明しています。
カテゴリ | 詳細 |
サポートされるタイプ | ディメンションテーブルと結果テーブル |
サポートされるモード | ストリーミングモード |
データフォーマット | 文字列 |
特定のモニタリングメトリック |
説明 メトリックの詳細については、「モニタリングメトリック」をご参照ください。 |
API タイプ | SQL |
結果テーブルでのデータ更新または削除のサポート | はい |
前提条件
Tair (Redis OSS-compatible) インスタンスを作成済みであること。詳細については、「ステップ 1: インスタンスの作成」をご参照ください。
ホワイトリストを構成済みであること。詳細については、「ステップ 2: ホワイトリストの構成」をご参照ください。
制限事項
Redis コネクタはベストエフォートセマンティクスを提供し、Exactly-once 配信を保証しません。ご利用のアプリケーションでべき等性を確保する必要があります。
ディメンションテーブルには、以下の制限が適用されます。
Redis からは STRING および HASHMAP データの型のみを読み取ることができます。
ディメンションテーブル内のすべてのフィールドは STRING タイプである必要があります。プライマリキーを正確に 1 つ宣言する必要があります。
ディメンションテーブルを結合する場合、ON 句にはプライマリキーの同等条件を含める必要があります。
既知の問題とソリューション
Ververica Runtime (VVR) 8.0.9 のキャッシュ機能には問題があります。これを無効にするには、結果テーブルの WITH 句に sink.buffer-flush.max-rows = '0' を追加します。
構文
CREATE TABLE redis_table (
col1 STRING,
col2 STRING,
PRIMARY KEY (col1) NOT ENFORCED -- Required.
) WITH (
'connector' = 'redis',
'host' = '<yourHost>',
'mode' = 'STRING' -- Required for sink tables.
);WITH パラメーター
一般
パラメーター | 説明 | データの型 | 必須 | デフォルト | 備考 |
connector | テーブルタイプ。 | String | はい | なし | 値は redis である必要があります。 |
host | Redis サーバーの接続エンドポイント。 | String | はい | なし | 内部ネットワークエンドポイントを使用します。 説明 パブリックネットワークエンドポイントへの接続は、ネットワーク遅延や帯域幅制限などの要因により不安定になる場合があります。 |
port | Redis サーバーの接続ポート。 | Int | いいえ | 6379 | なし。 |
password | Redis データベースのパスワード。 | String | いいえ | 空の文字列。これは検証が実行されないことを意味します。 | なし。 |
dbNum | 操作するデータベースの番号。 | Int | いいえ | 0 | なし。 |
clusterMode | Redis クラスターがクラスターモードであるかどうかを指定します。 | Boolean | いいえ | false | なし。 |
hostAndPorts | Redis クラスターのホストとポート番号。 説明 クラスターモードが有効で HA が不要な場合、host および port パラメーターを使用して 1 つのホストのみを構成するか、このパラメーターのみを構成できます。このパラメーターは、個々の host および port パラメーターよりも優先度が高くなります。 | String | いいえ | 空 |
|
key-prefix | テーブルのプライマリキー値のプレフィックス。 | String | いいえ | なし | このパラメーターを構成すると、データのクエリまたは書き込み時に、プライマリキーフィールドの値にプレフィックスが自動的に追加されます。プレフィックスは、キープレフィックス (key-prefix) とプレフィックスデリミタ (key-prefix-delimiter) で構成されます。 説明 このパラメーターは VVR 8.0.7 以降でのみサポートされます。 |
key-prefix-delimiter | プライマリキー値とそのプレフィックス間のデリミタ。 | String | いいえ | なし | |
connection.pool.max-total | 接続プールが割り当てることができる最大接続数。 | Int | いいえ | 8 | 説明 このパラメーターは VVR 8.0.9 以降でのみサポートされます。 |
connection.pool.max-idle | 接続プール内の最大アイドル接続数。 | Int | いいえ | 8 | |
connection.pool.min-idle | 接続プール内の最小アイドル接続数。 | Int | いいえ | 0 | |
connect.timeout | 接続確立のタイムアウト。 | Duration | いいえ | 3000 ms | |
socket.timeout | Redis サーバーからデータを受信する際のタイムアウト (ソケットタイムアウト)。 | Duration | いいえ | 3000 ms | |
cacert.filepath | SSL/TLS 証明書ファイルの完全なパス。ファイルフォーマットは jks である必要があります。 | String | いいえ | なし。これは SSL/TLS 暗号化が無効であることを意味します。 | 「TLS 暗号化の有効化」で説明されているように CA 証明書をダウンロードし、ジョブの [追加の依存関係ファイル] セクションに証明書をアップロードします。アップロード後、CA 証明書は /flink/usrlib ディレクトリに保存されます。「追加の依存関係ファイル」セクションでファイルをアップロードする方法の詳細については、「ジョブのデプロイ」をご参照ください。例: 説明 このパラメーターは VVR 11.1 以降でのみサポートされます。 |
シンク固有
パラメーター | 説明 | データの型 | 必須 | デフォルト | 備考 |
mode | 使用する Redis データの型。 | String | はい | なし | Tair 結果テーブルは 5 種類の Redis データの型をサポートしています。DDL は指定されたフォーマットで定義され、プライマリキーが定義されている必要があります。詳細については、「Redis 結果テーブルのデータ型フォーマット」をご参照ください。 |
flattenHash | HASHMAP データをマルチバリューモードで書き込むかどうかを指定します。 | Boolean | いいえ | false | 有効な値:
説明
|
ignoreDelete | リトラクションメッセージを無視するかどうかを指定します。 | Boolean | いいえ | false | 有効な値:
|
expiration | 書き込まれたデータのキーに TTL を設定します。 | Long | いいえ | 0。これは TTL が設定されないことを意味します。 | このパラメーターの値が 0 より大きい場合、書き込まれたデータのキーにそれに対応する TTL が設定されます。単位はミリ秒です。 |
sink.buffer-flush.max-rows | キャッシュに保存できる最大レコード数。 | Int | いいえ | 200 | キャッシュされたレコードには、すべての追加、更新、および削除イベントが含まれます。最大レコード数を超過すると、キャッシュがフラッシュされます。 説明
|
sink.buffer-flush.interval | キャッシュフラッシュ間隔。 | Duration | いいえ | 1000 ms | キャッシュは非同期的にフラッシュされます。 説明
|
ディメンションテーブル固有
パラメーター | 説明 | データの型 | 必須 | デフォルト | 備考 |
mode | 読み取る Redis データの型。 | String | いいえ | STRING | 有効な値: STRING: デフォルトで STRING タイプとしてデータを読み取ります。 HASHMAP: HASHMAP データをマルチバリューモードで読み取ります。 この場合、DDL で複数の非プライマリキーフィールドを宣言する必要があります。
説明 このパラメーターは VVR 8.0.7 以降でのみサポートされます。 HASHMAP データをシングルバリューモードで読み取るには、hashName パラメーターを構成します。 |
hashName | HASHMAP データをシングルバリューモードで読み取る際に使用するキー。 | String | いいえ | なし | mode パラメーターを指定しないが、HASHMAP データをシングルバリューモードで読み取りたい場合は、hashName を構成する必要があります。 この場合、DDL は 2 つのフィールドのみを宣言する必要があります。最初のプライマリキーフィールドの値はフィールドに対応し、2 番目の非プライマリキーフィールドの値は値に対応します。 |
cache | キャッシュポリシー。 | String | いいえ | なし | Tair ディメンションテーブルは、以下のキャッシュポリシーをサポートしています。 None (デフォルト): キャッシュなし。 LRU: ディメンションテーブルデータの一部をキャッシュします。ソーステーブルからの各レコードについて、システムはまずキャッシュを検索します。データが見つからない場合、システムは物理ディメンションテーブルをクエリします。 ALL: ディメンションテーブルのすべてのデータをキャッシュします。ジョブの実行前に、システムはディメンションテーブルのすべてのデータをキャッシュにロードします。その後のすべてのルックアップはキャッシュに対して実行されます。キャッシュにデータが見つからない場合、キーは存在しません。フルキャッシュには有効期限があり、その後再ロードされます。 重要
|
cacheSize | キャッシュサイズ。 | Long | いいえ | 10000 | LRU キャッシュポリシーを選択する場合、キャッシュサイズを設定する必要があります。 |
cacheTTLMs | キャッシュタイムアウト期間 (ミリ秒)。 | Long | いいえ | なし | cacheTTLMs の構成は、キャッシュ設定によって異なります。 キャッシュが None に設定されている場合、cacheTTLMs を構成する必要はありません。これは、キャッシュがタイムアウトしないことを意味します。 キャッシュが LRU に設定されている場合、cacheTTLMs はキャッシュタイムアウト期間です。デフォルトでは、キャッシュは有効期限切れになりません。 キャッシュが ALL に設定されている場合、cacheTTLMs はキャッシュ再ロード時間です。デフォルトでは、キャッシュは再ロードされません。 |
cacheEmpty | 空の結果をキャッシュするかどうかを指定します。 | Boolean | いいえ | true | なし。 |
cacheReloadTimeBlackList | キャッシュ更新禁止時間。キャッシュポリシーが ALL に設定されている場合、独身の日ショッピングフェスティバルなどの期間中にキャッシュがリフレッシュされるのを防ぐために、禁止時間を有効にすることができます。 | String | いいえ | なし | フォーマットは以下のとおりです。
セパレータは以下のとおりです。
|
async | データを非同期的に返すかどうかを指定します。 | Boolean | いいえ | false |
|
Redis 結果テーブルのデータ型フォーマット
タイプ | フォーマット | Redis 挿入コマンド |
STRING タイプ | 2 カラム DDL:
|
|
LIST タイプ | 2 カラム DDL:
|
|
SET タイプ | 2 カラム DDL:
|
|
HASHMAP タイプ | デフォルトでは、3 カラム DDL:
|
|
flattenHash が true に設定されている場合、DDL は複数のカラムをサポートします。以下の例では 4 つのカラムを使用しています。
|
| |
SORTEDSET タイプ | 3 カラム DDL:
|
|
タイプマッピング
タイプ | Redis フィールドタイプ | Flink フィールドタイプ |
一般 | STRING | STRING |
結果テーブルのみ対象 | SCORE | DOUBLE |
Redis SCORE タイプは SORTEDSET (ソートセット) に適用されます。各値に DOUBLE スコアを手動で割り当てる必要があります。値はその後、スコアに基づいて昇順でソートされます。
使用例
シンクテーブル
STRING データの書き込み: コード例では、
redis_sink結果テーブルのuser_idカラムの値が Redis にキーとして書き込まれ、login_timeカラムの値が値として書き込まれます。CREATE TEMPORARY TABLE kafka_source ( user_id STRING, -- ユーザー ID login_time STRING -- ログオンタイムスタンプ ) WITH ( 'connector' = 'kafka', 'topic' = 'user_logins', -- Kafka トピック 'properties.bootstrap.servers' = 'yourKafkaBroker', -- Kafka ブローカーアドレス 'format' = 'json', -- データフォーマットは JSON 'scan.startup.mode' = 'earliest-offset' -- 最も古いオフセットから消費する ); CREATE TEMPORARY TABLE redis_sink ( user_id STRING, -- Redis キー login_time STRING, -- Redis 値 PRIMARY KEY (user_id) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'mode' = 'STRING', -- STRING モードを使用する 'host' = 'yourHost', 'port' = 'yourPort', 'password' = 'yourPassword' ); INSERT INTO redis_sink SELECT * FROM kafka_source;HASHMAP データをマルチバリューモードで書き込む: コード例では、
redis_sink結果テーブルのorder_idカラムの値が Redis にキーとして書き込まれます。product_name、quantity、およびamountカラムの値は、それぞれ product_name、quantity、および amount フィールドに書き込まれます。CREATE TEMPORARY TABLE kafka_source ( order_id STRING, -- 注文 ID product_name STRING, -- 製品名 quantity STRING, -- 数量 amount STRING -- 注文金額 ) WITH ( 'connector' = 'kafka', 'topic' = 'orders_topic', -- Kafka トピック 'properties.bootstrap.servers' = 'yourKafkaBroker', -- Kafka ブローカーアドレス 'format' = 'json', -- データフォーマットは JSON 'scan.startup.mode' = 'earliest-offset' -- 最も古いオフセットから消費する ); CREATE TEMPORARY TABLE redis_sink ( order_id STRING, -- 注文 ID (Redis キーとして使用) product_name STRING, -- 製品名 (Redis Hash フィールドとして使用) quantity STRING, -- 数量 (Redis Hash フィールドとして使用) amount STRING, -- 注文金額 (Redis Hash フィールドとして使用) PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'mode' = 'HASHMAP', -- HASHMAP モードを使用する 'flattenHash' = 'true', 'host' = 'yourHost', 'port' = 'yourPort', 'password' = 'yourPassword' ); INSERT INTO redis_sink SELECT * FROM kafka_source;HASHMAP データをシングルバリューモードで書き込む: コード例では、
redis_sink結果テーブルのorder_idカラムの値が Redis にキーとして書き込まれます。product_nameカラムの値はフィールドとして書き込まれ、quantityカラムの値は値として書き込まれます。CREATE TEMPORARY TABLE kafka_source ( order_id STRING, -- 注文 ID product_name STRING, -- 製品名 quantity STRING -- 数量 ) WITH ( 'connector' = 'kafka', 'topic' = 'orders_topic', -- Kafka トピック 'properties.bootstrap.servers' = 'yourKafkaBroker', -- Kafka ブローカーアドレス 'format' = 'json', -- データフォーマットは JSON 'scan.startup.mode' = 'earliest-offset' -- 最も古いオフセットから消費する ); CREATE TEMPORARY TABLE redis_sink ( order_id STRING, -- Redis キー product_name STRING, -- Redis フィールド quantity STRING, -- Redis 値 PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'mode' = 'HASHMAP', 'host' = 'yourHost', 'port' = 'yourPort', 'password' = 'yourPassword' ); INSERT INTO redis_sink SELECT * FROM kafka_source;
ディメンションテーブル
STRING データの読み取り: コード例では、
redis_dimディメンションテーブルのuser_idカラムの値がキーに対応し、user_nameカラムの値が値に対応します。CREATE TEMPORARY TABLE kafka_source ( user_id STRING, -- ユーザー ID proctime AS PROCTIME() -- 処理時間 ) WITH ( 'connector' = 'kafka', 'topic' = 'user_clicks', -- Kafka トピック 'properties.bootstrap.servers' = 'yourKafkaBroker', -- Kafka ブローカーアドレス 'format' = 'json', -- データフォーマットは JSON 'scan.startup.mode' = 'earliest-offset' -- 最も古いオフセットから消費する ); CREATE TEMPORARY TABLE redis_dim ( user_id STRING, -- ユーザー ID (Redis キー) user_name STRING, -- ユーザー名 (Redis 値) PRIMARY KEY (user_id) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'host' = 'yourHost', -- Redis ホストアドレス 'port' = 'yourPort', -- Redis ポート 'password' = 'yourPassword', -- Redis パスワード 'mode' = 'STRING' -- STRING モードを使用する ); CREATE TEMPORARY TABLE blackhole_sink ( user_id STRING, -- ユーザー ID redis_user_id STRING, -- Redis からのユーザー ID user_name STRING -- ユーザー名 ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT t1.user_id, -- ユーザー ID (Kafka から) t2.user_id, -- ユーザー ID (Redis から) t2.user_name -- ユーザー名 (Redis から) FROM kafka_source AS t1 JOIN redis_dim FOR SYSTEM_TIME AS OF t1.proctime AS t2 ON t1.user_id = t2.user_id;HASHMAP データをマルチバリューモードで読み取る: コード例では、
redis_dimディメンションテーブルのuser_idカラムの値がキーです。user_nameカラムの値は user_name フィールドにマッピングされます。emailカラムの値は email フィールドにマッピングされます。register_timeカラムの値は register_time フィールドにマッピングされます。CREATE TEMPORARY TABLE kafka_source ( user_id STRING, -- ユーザー ID click_time TIMESTAMP(3), -- クリック時間 proctime AS PROCTIME() -- 処理時間 ) WITH ( 'connector' = 'kafka', 'topic' = 'user_clicks', -- Kafka トピック 'properties.bootstrap.servers' = 'yourKafkaBroker', -- Kafka ブローカーアドレス 'format' = 'json', -- データフォーマットは JSON 'scan.startup.mode' = 'earliest-offset' ); CREATE TEMPORARY TABLE redis_dim ( user_id STRING, -- ユーザー ID (Redis キー) user_name STRING, -- ユーザー名 (Redis フィールドと値のペアの一部) email STRING, -- メールアドレス (Redis フィールドと値のペアの一部) register_time STRING, -- 登録時間 (Redis フィールドと値のペアの一部) PRIMARY KEY (user_id) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'host' = 'yourHost', 'port' = 'yourPort', 'password' = 'yourPassword', 'mode' = 'HASHMAP' -- HASHMAP モードを使用する ); CREATE TEMPORARY TABLE blackhole_sink ( user_id STRING, -- ユーザー ID user_name STRING, -- ユーザー名 email STRING, -- メールアドレス register_time STRING, -- 登録時間 click_time TIMESTAMP(3) -- クリック時間 ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT t1.user_id, -- ユーザー ID t2.user_name, -- ユーザー名 t2.email, -- メールアドレス t2.register_time, -- 登録時間 t1.click_time -- クリック時間 FROM kafka_source AS t1 JOIN redis_dim FOR SYSTEM_TIME AS OF t1.proctime AS t2 ON t1.user_id = t2.user_id;HASHMAP データをシングルバリューモードで読み取る: コード例では、
hashNameパラメーターの値は testKey であり、キーとして使用されます。redis_dimディメンションテーブルのuser_idカラムの値はフィールドであり、user_nameカラムの値は値です。CREATE TEMPORARY TABLE kafka_source ( user_id STRING, -- ユーザー ID proctime AS PROCTIME() -- 処理時間 ) WITH ( 'connector' = 'kafka', 'topic' = 'user_clicks', -- Kafka トピック 'properties.bootstrap.servers' = 'yourKafkaBroker', -- Kafka ブローカーアドレス 'format' = 'json', -- データフォーマットは JSON 'scan.startup.mode' = 'earliest-offset' -- 最も古いオフセットから消費する ); CREATE TEMPORARY TABLE redis_dim ( user_id STRING, -- ユーザー ID (Redis ハッシュフィールド) user_name STRING, -- ユーザー名 (Redis ハッシュ値) PRIMARY KEY (user_id) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'host' = 'yourHost', -- Redis ホストアドレス 'port' = 'yourPort', -- Redis ポート 'password' = 'yourPassword',-- Redis パスワード 'hashName' = 'testkey' -- 固定 Redis ハッシュ名 ); CREATE TEMPORARY TABLE blackhole_sink ( user_id STRING, -- ユーザー ID redis_user_id STRING, -- Redis からのユーザー ID user_name STRING -- ユーザー名 ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT t1.user_id, -- ユーザー ID (Kafka から) t2.user_id, -- ユーザー ID (Redis から) t2.user_name -- ユーザー名 (Redis から) FROM kafka_source AS t1 JOIN redis_dim FOR SYSTEM_TIME AS OF t1.proctime AS t2 ON t1.user_id = t2.user_id;