すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:Tair (Redis OSS-compatible)

最終更新日:Dec 04, 2025

このトピックでは、Tair (Redis OSS-compatible) コネクタの使用方法について説明します。

背景情報

Alibaba Cloud Tair は、オープンソースの Redis プロトコルと互換性のあるデータベースサービスです。メモリとディスクの両方を使用するハイブリッドストレージを提供します。Tair は、アクティブ/アクティブの高可用性アーキテクチャとスケーラブルなクラスターアーキテクチャを特徴とし、高スループット、低レイテンシー、柔軟なスケーリングといったビジネスニーズに対応します。詳細については、「Tair (Redis OSS 互換) とは」をご参照ください。

Redis コネクタは、以下の機能をサポートしています。

カテゴリ

詳細

サポートされるタイプ

ディメンションテーブルとシンクテーブル

サポートされるモード

ストリーミング

データ形式

String

特定のモニタリングメトリック

  • ディメンションテーブル:なし

  • 結果テーブル:

    • numBytesOut

    • numRecordsOutPerSecond

    • numBytesOutPerSecond

    • currentSendTime

説明

メトリックの詳細については、「メトリックの説明」をご参照ください。

API タイプ

SQL

結果テーブルでのデータ更新または削除のサポート

はい

前提条件

制限事項

  • Redis コネクタはベストエフォートセマンティクスのみを提供し、exactly-once 配信を保証することはできません。したがって、べき等性を保証する必要があります。

  • ディメンションテーブルには以下の制限が適用されます:

    • Redis データストレージからは STRING および HASHMAP データのみを読み取ることができます。

    • ディメンションテーブルのすべてのフィールドは STRING 型である必要があります。プライマリキーを 1 つだけ宣言する必要があります。

    • ディメンションテーブルを結合する場合、ON 句にはプライマリキーの等価条件を含める必要があります。

既知の問題と解決策

Ververica Runtime (VVR) 8.0.9 には、キャッシュ機能に関する既知の問題があります。この問題を回避するには、結果テーブルの WITH 句で 'sink.buffer-flush.max-rows' = '0' を設定して、この機能を無効にすることができます。

構文

CREATE TABLE redis_table (
  col1 STRING,
  col2 STRING,
  PRIMARY KEY (col1) NOT ENFORCED -- 必須。
) WITH (
  'connector' = 'redis',
  'host' = '<yourHost>',
  'mode' = 'STRING'  -- 結果テーブルに必須。
);

WITH パラメーター

一般

パラメーター

説明

データ型

必須

デフォルト値

備考

connector

テーブルのタイプ。

String

はい

なし

値は redis である必要があります。

host

Redis サーバーの接続アドレス。

String

はい

なし

内部ネットワークアドレスを使用します。

説明

パブリックネットワークアドレスへの接続は、ネットワーク遅延や帯域幅制限により不安定になる場合があります。

port

Redis サーバーの接続ポート。

Int

いいえ

6379

なし。

password

Redis データベースのパスワード。

String

いいえ

空の文字列。これは検証が実行されないことを意味します。

なし。

dbNum

操作対象のデータベースの番号。

Int

いいえ

0

なし。

clusterMode

Redis クラスターがクラスターモードであるかどうかを指定します。

Boolean

いいえ

false

なし。

hostAndPorts

Redis クラスターのホストとポート番号。

説明

クラスターモードが有効で、高可用性接続が不要な場合は、host および port パラメーターを使用して 1 つのホストのみを構成できます。このパラメーターのみを構成することもできます。このパラメーターは、個別の host および port パラメーターよりも優先度が高くなります。

String

いいえ

Empty

ClusterMode = true で、自己管理型 Redis クラスターへの Jedis 接続に高可用性が必要な場合は、このパラメーターを構成する必要があります。形式は文字列です:"host1:port1,host2:port2"

key-prefix

テーブルのプライマリキー値のプレフィックス。

String

いいえ

なし

このパラメーターを構成すると、データのクエリまたは書き込み時に、プライマリキーのフィールド値にプレフィックスが自動的に追加されます。プレフィックスは、キープレフィックス (key-prefix) とプレフィックス区切り文字 (key-prefix-delimiter) で構成されます。

説明

このパラメーターは VVR 8.0.7 以降でのみサポートされます。

key-prefix-delimiter

プライマリキー値とそのプレフィックスの間の区切り文字。

String

いいえ

なし

connection.pool.max-total

接続プールが割り当てることができる最大接続数。

Int

いいえ

8

説明

このパラメーターは VVR 8.0.9 以降でのみサポートされます。

connection.pool.max-idle

接続プール内のアイドル接続の最大数。

Int

いいえ

8

connection.pool.min-idle

接続プール内のアイドル接続の最小数。

Int

いいえ

0

connect.timeout

接続確立のタイムアウト。

Duration

いいえ

3000ms

socket.timeout

Redis サーバーからのデータ受信のタイムアウト (ソケットタイムアウト)。

Duration

いいえ

3000ms

cacert.filepath

SSL/TLS 証明書ファイルの完全なパス。ファイル形式は JKS である必要があります。

String

いいえ

なし。これは SSL/TLS 暗号化が無効であることを意味します。

詳細については、「TLS 暗号化の有効化」をご参照いただき、CA 証明書をダウンロードしてください。ジョブの [追加依存ファイル] セクションで証明書をアップロードします。アップロード後、CA 証明書は /flink/usrlib フォルダーに保存されます。[追加依存ファイル] セクションでファイルをアップロードする方法の詳細については、「ジョブのデプロイ」をご参照ください。例:'cacert.filepath' = '/flink/usrlib/ca.jks'

説明

このパラメーターは VVR 11.1 以降でのみサポートされます。

結果テーブル固有

パラメーター

説明

データ型

必須

デフォルト値

備考

mode

対応する Redis のデータの型。

String

はい

なし

Tair の結果テーブルは 5 種類の Redis データ型をサポートしています。DDL は指定された形式で定義し、プライマリキーを定義する必要があります。詳細については、「Redis 結果テーブルのデータ形式」をご参照ください。

flattenHash

HASHMAP データをマルチバリューモードで書き込むかどうかを指定します。

Boolean

いいえ

false

有効な値:

  • true: マルチバリューモードでデータを書き込みます。この場合、DDL で複数の非プライマリキーフィールドを宣言します。プライマリキーのフィールド値はキーに対応します。各非プライマリキーフィールドの名前はフィールドに対応し、そのフィールドの値はフィールドの値に対応します。

  • false: シングルバリューモードでデータを書き込みます。この場合、DDL で 3 つのフィールドを宣言します。最初のプライマリキーフィールドの値はキーに対応します。2 番目の非プライマリキーフィールドの値はフィールドに対応し、3 番目の非プライマリキーフィールドの値は値に対応します。

説明
  • このパラメーターは、mode パラメーターが HASHMAP に設定されている場合にのみ有効です。

  • このパラメーターは VVR 8.0.7 以降でのみサポートされます。

ignoreDelete

リトラクションメッセージを無視するかどうかを指定します。

Boolean

いいえ

false

有効な値:

  • true: リトラクションメッセージを受信したときに無視します。

  • false: リトラクションメッセージを受信したときにキーと挿入されたデータを削除します。

expiration

書き込まれたデータのキーに TTL を設定します。

Long

いいえ

0。これは TTL が設定されていないことを意味します。

このパラメーターの値が 0 より大きい場合、書き込まれたデータのキーに対応する TTL が設定されます。単位はミリ秒です。

sink.buffer-flush.max-rows

キャッシュに保存できるレコードの最大数。

Int

いいえ

200

キャッシュされたレコードには、すべての追加、更新、削除イベントが含まれます。レコードの最大数を超えると、キャッシュはフラッシュされます。

説明
  • このパラメーターは VVR 8.0.9 以降でのみサポートされます。

  • Redis クラスターインスタンスは VVR 11.4.0 以降でのみサポートされます。

  • VVR 11.4.0 より前のバージョンを使用して Redis クラスターインスタンスにデータを書き込む場合は、このパラメーターを 0 に設定して無効にしてください。

sink.buffer-flush.interval

キャッシュのフラッシュ間隔。

Duration

いいえ

1000ms

キャッシュを非同期でフラッシュします。

説明
  • このパラメーターは VVR 8.0.9 以降でのみサポートされます。

  • Redis クラスターインスタンスは VVR 11.4.0 以降でのみサポートされます。

  • VVR 11.4.0 より前のバージョンを使用して Redis クラスターインスタンスにデータを書き込む場合は、このパラメーターを 0 に設定して無効にしてください。

ディメンションテーブル固有

パラメーター

説明

データの型

必須

デフォルト値

備考

mode

Redis から読み取るデータの型。

String

いいえ

STRING

有効な値:

STRING: デフォルトでデータを STRING 型として読み取ります。

HASHMAP: データをマルチバリューモードで HASHMAP 型として読み取ります。

この場合、DDL で複数の非プライマリキーフィールドを宣言する必要があります。

  • プライマリキーフィールド:プライマリキーフィールドの値は HASHMAP のキーとして使用されます。

  • 非プライマリキーフィールド:各非プライマリキーフィールドの名前は HASHMAP のフィールドとして使用され、そのフィールドの値は値に対応します。

説明

このパラメーターは VVR 8.0.7 以降でのみサポートされます。

HASHMAP データをシングルバリューモードで読み取るには、hashName パラメーターを構成します。

hashName

HASHMAP データをシングルバリューモードで読み取る際に使用するキー。

String

いいえ

なし

mode パラメーターを指定せず、HASHMAP データをシングルバリューモードで読み取る場合は、hashName を構成する必要があります。

この場合、DDL で 2 つのフィールドのみを宣言する必要があります。最初のプライマリキーフィールドの値はフィールドに対応し、2 番目の非プライマリキーフィールドの値は値に対応します。

cache

キャッシュポリシー。

String

いいえ

None

Tair ディメンションテーブルは、以下のキャッシュポリシーをサポートしています:

None (デフォルト): キャッシュなし。

LRU: ディメンションテーブルのデータの一部をキャッシュします。ソーステーブルの各レコードについて、システムはまずキャッシュを検索します。データが見つからない場合は、物理ディメンションテーブルを検索します。

ALL: ディメンションテーブルのすべてのデータをキャッシュします。ジョブが実行される前に、システムはディメンションテーブルのすべてのデータをキャッシュにロードします。以降のすべてのルックアップはキャッシュに対して実行されます。キャッシュにデータが見つからない場合、そのキーは存在しません。フルキャッシュには有効期限があります。有効期限が切れると、フルキャッシュは再読み込みされます。

重要
  • ALL キャッシュポリシーは VVR 8.0.3 以降でのみサポートされます。

  • VVR バージョン 8.0.3 から 11.0 まで、ALL キャッシュポリシーは HASHMAP データのシングルバリューモードでの読み取りのみをサポートします。DDL では、3 つのフィールドを宣言する必要があります。1 つ目は WITH 句の hashName パラメーターを使用してキーとして指定されます。2 つ目のプライマリキーフィールドはフィールドに対応し、3 つ目の非プライマリキーフィールドは値に対応します。

  • VVR 11.1 以降では、ALL キャッシュポリシーは HASHMAP データのマルチバリューモードでの読み取りをサポートします。DDL では、複数の非プライマリキーフィールドを宣言します。プライマリキーフィールドの値はキーに対応します。各非プライマリキーフィールドの名前はフィールドに対応し、その値はフィールドの値に対応します。また、WITH 句で mode パラメーターを HASHMAP に設定します。

  • キャッシュサイズ (cacheSize) とキャッシュ更新間隔 (cacheTTLMs) パラメーターも構成する必要があります。

cacheSize

キャッシュサイズ。

Long

いいえ

10000

LRU キャッシュポリシーを選択する場合は、キャッシュサイズを設定する必要があります。

cacheTTLMs

キャッシュのタイムアウト (ミリ秒)。

Long

いいえ

なし

cacheTTLMs の構成は、cache の設定に依存します:

cache が None に設定されている場合、cacheTTLMs を構成する必要はありません。これはキャッシュがタイムアウトしないことを意味します。

cache が LRU に設定されている場合、cacheTTLMs はキャッシュのタイムアウト期間です。デフォルトでは有効期限はありません。

cache が ALL に設定されている場合、cacheTTLMs はキャッシュの再読み込み時間です。デフォルトでは再読み込みされません。

cacheEmpty

空の結果をキャッシュするかどうかを指定します。

Boolean

いいえ

true

なし。

cacheReloadTimeBlackList

キャッシュ更新禁止時間。キャッシュポリシーが ALL に設定されている場合、独身の日 (ダブルイレブン) イベント中など、この期間中のキャッシュ更新を防ぐために禁止時間を有効にできます。

String

いいえ

なし

形式:

  • 完全な日時範囲:2017-10-24 14:00 -> 2017-10-24 15:00。

  • 日をまたぐ時間範囲:2017-11-10 23:30 -> 2017-11-11 08:00。

  • 毎日の固定時間範囲:12:00 -> 14:00, 22:00 -> 2:00。

    説明

    VVR 11.1 以降では、日付が指定されていない時間範囲は毎日適用されます。

区切り文字は次のように使用します:

  • 複数の禁止時間を区切るには、コンマ (,) を使用します。

  • 禁止期間の開始時刻と終了時刻を区切るには、矢印 (->) を使用します。

async

データを非同期で返すかどうかを指定します。

Boolean

いいえ

false

  • true: データを非同期で返します。非同期で返されるデータは、デフォルトで順序付けされません。

  • false (デフォルト): データを非同期で返しません。

Redis 結果テーブルのデータ形式

タイプ

形式

データを挿入する Redis コマンド

STRING 型

DDL には 2 つの列があります:

  • 列 1 はキー、STRING 型。

  • 列 2 は値、STRING 型。

set key value

LIST 型

DDL には 2 つの列があります:

  • 列 1 はキー、STRING 型。

  • 列 2 は値、STRING 型。

lpush key value

SET 型

DDL には 2 つの列があります:

  • 列 1 はキー、STRING 型。

  • 列 2 は値、STRING 型。

sadd key value

HASHMAP 型

デフォルトでは、DDL には 3 つの列があります:

  • 列 1 はキー、STRING 型。

  • 列 2 はフィールド、STRING 型。

  • 列 3 は値、STRING 型。

hmset key field value

flattenHash パラメーターが true に設定されている場合、DDL は複数の列をサポートします。次の例では 4 つの列を使用します:

  • 列 1 はキー、STRING 型。

  • 列 2 の名前 (例:col1) はフィールドに対応し、その値 (例:value1) はフィールドの値に対応します。STRING 型。

  • 列 3 の名前 (例:col2) はフィールドに対応し、その値 (例:value2) はフィールドの値に対応します。STRING 型。

  • 列 4 の名前 (例:col3) はフィールドに対応し、その値 (例:value3) はフィールドの値に対応します。STRING 型。

hmset key col1 value1 col2 value2 col3 value3

SORTEDSET 型

DDL には 3 つの列があります:

  • 列 1 はキー、STRING 型。

  • 列 2 はスコア、DOUBLE 型。

  • 列 3 は値、STRING 型。

zadd key score value

型マッピング

カテゴリ

Redis フィールド型

Flink フィールド型

一般

STRING

STRING

結果テーブル固有

SCORE

DOUBLE

説明

Redis の SCORE 型は SORTEDSET (ソート済みセット) データ構造に適用されます。したがって、各値に手動で DOUBLE のスコアを設定する必要があります。値はスコアに基づいて昇順にソートされます。

  • 結果テーブル

    • STRING データの書き込み:この例では、redis_sink 結果テーブルの user_id 列の値がキーとして使用され、login_time 列の値が値として使用されます。両方とも Redis に書き込まれます。

      CREATE TEMPORARY TABLE kafka_source (
        user_id STRING,          -- ユーザー ID
        login_time STRING        -- ログインタイムスタンプ
      ) WITH (
        'connector' = 'kafka',
        'topic' = 'user_logins',      -- Kafka トピック
        'properties.bootstrap.servers' = 'yourKafkaBroker', -- Kafka ブローカーアドレス
        'format' = 'json',            -- データ形式は JSON です。
        'scan.startup.mode' = 'earliest-offset' -- 最も早いオフセットから消費を開始します。
      );
      
      CREATE TEMPORARY TABLE redis_sink (
        user_id STRING,               -- Redis キー
        login_time STRING,            -- Redis 値
        PRIMARY KEY (user_id) NOT ENFORCED
      ) WITH (
        'connector' = 'redis',
        'mode' = 'STRING',   -- STRING モードを使用します。
        'host' = 'yourHost',
        'port' = 'yourPort',
        'password' = 'yourPassword'
      );
      
      INSERT INTO redis_sink
      SELECT *
      FROM kafka_source;
    • マルチバリューモードでの HASHMAP データの書き込み:この例では、redis_sink 結果テーブルの order_id 列の値がキーとして使用されます。product_namequantity、および amount 列の値は、それぞれ Redis の `product_name`、`quantity`、および `amount` フィールドに書き込まれます。

      CREATE TEMPORARY TABLE kafka_source (
        order_id STRING,     -- 注文 ID
        product_name STRING, -- 製品名
        quantity STRING,         -- 製品数量
        amount STRING         -- 注文金額
      ) WITH (
        'connector' = 'kafka',
        'topic' = 'orders_topic', -- Kafka トピック
        'properties.bootstrap.servers' = 'yourKafkaBroker', -- Kafka ブローカーアドレス
        'format' = 'json',        -- データ形式は JSON です。
        'scan.startup.mode' = 'earliest-offset' -- 最も早いオフセットから消費を開始します。
      );
      
      CREATE TEMPORARY TABLE redis_sink (
        order_id STRING,        -- 注文 ID、Redis キーとして使用されます。
        product_name STRING,    -- 製品名、Redis Hash フィールドとして使用されます。
        quantity STRING,        -- 製品数量、Redis Hash フィールドとして使用されます。
        amount STRING,          -- 注文金額、Redis Hash フィールドとして使用されます。
        PRIMARY KEY (order_id) NOT ENFORCED
      ) WITH (
        'connector' = 'redis',
        'mode' = 'HASHMAP',   -- HASHMAP モードを使用します。
        'flattenHash' = 'true',
        'host' = 'yourHost',
        'port' = 'yourPort',
        'password' = 'yourPassword'
      );
      
      INSERT INTO redis_sink
      SELECT *
      FROM kafka_source;
    • シングルバリューモードでの HASHMAP データの書き込み:この例では、redis_sink 結果テーブルの order_id 列の値がキーとして使用されます。product_name 列の値はフィールドとして、quantity 列の値は値として使用されます。すべてのデータは Redis に書き込まれます。

      CREATE TEMPORARY TABLE kafka_source (
        order_id STRING,          -- 注文 ID
        product_name STRING,      -- 製品名
        quantity STRING           -- 製品数量
      ) WITH (
        'connector' = 'kafka',
        'topic' = 'orders_topic', -- Kafka トピック
        'properties.bootstrap.servers' = 'yourKafkaBroker', -- Kafka ブローカーアドレス
        'format' = 'json',        -- データ形式は JSON です。
        'scan.startup.mode' = 'earliest-offset' -- 最も早いオフセットから消費を開始します。
      );
      
      CREATE TEMPORARY TABLE redis_sink (
        order_id STRING,          -- Redis キー
        product_name STRING,      -- Redis フィールド
        quantity STRING,          -- Redis 値
        PRIMARY KEY (order_id) NOT ENFORCED
      ) WITH (
        'connector' = 'redis',
        'mode' = 'HASHMAP',
        'host' = 'yourHost',
        'port' = 'yourPort',
        'password' = 'yourPassword'
      );
      
      INSERT INTO redis_sink
      SELECT *
      FROM kafka_source;
  • ディメンションテーブル

    • STRING データの読み取り:この例では、redis_dim ディメンションテーブルの user_id 列の値がキーで、user_name 列の値が値です。

      CREATE TEMPORARY TABLE kafka_source (
        user_id STRING,               -- ユーザー ID
        proctime AS PROCTIME()        -- 処理時間
      ) WITH (
        'connector' = 'kafka',
        'topic' = 'user_clicks',      -- Kafka トピック
        'properties.bootstrap.servers' = 'yourKafkaBroker', -- Kafka ブローカーアドレス
        'format' = 'json',            -- データ形式は JSON です。
        'scan.startup.mode' = 'earliest-offset' -- 最も早いオフセットから消費を開始します。
      );
      
      CREATE TEMPORARY TABLE redis_dim (
        user_id STRING,               -- ユーザー ID (Redis キー)
        user_name STRING,             -- ユーザー名 (Redis 値)
        PRIMARY KEY (user_id) NOT ENFORCED
      ) WITH (
        'connector' = 'redis',
        'host' = 'yourHost',    -- Redis ホストアドレス
        'port' = 'yourPort',    -- Redis ポート
        'password' = 'yourPassword',  -- Redis パスワード
        'mode' = 'STRING'             -- STRING モードを使用します。
      );
      
      CREATE TEMPORARY TABLE blackhole_sink (
        user_id STRING,               -- ユーザー ID
        redis_user_id STRING,         -- Redis 内のユーザー ID
        user_name STRING              -- ユーザー名
      ) WITH (
        'connector' = 'blackhole'
      );
      
      INSERT INTO blackhole_sink
      SELECT 
        t1.user_id,                   -- ユーザー ID (Kafka から)
        t2.user_id,                   -- Redis 内のユーザー ID
        t2.user_name                  -- ユーザー名 (Redis から)
      FROM kafka_source AS t1
      JOIN redis_dim FOR SYSTEM_TIME AS OF t1.proctime AS t2
      ON t1.user_id = t2.user_id;
    • マルチバリューモードでの HASHMAP データの読み取り:この例では、redis_dim ディメンションテーブルの user_id 列の値がキーです。user_nameemail、および register_time 列の値は、それぞれ `user_name`、`email`、および `register_time` フィールドの値に対応します。

      CREATE TEMPORARY TABLE kafka_source (
        user_id STRING,               -- ユーザー ID
        click_time TIMESTAMP(3),      -- クリック時間
        proctime AS PROCTIME()        -- 処理時間
      ) WITH (
        'connector' = 'kafka',
        'topic' = 'user_clicks',      -- Kafka トピック
        'properties.bootstrap.servers' = 'yourKafkaBroker', -- Kafka ブローカーアドレス
        'format' = 'json',            -- データ形式は JSON です。
        'scan.startup.mode' = 'earliest-offset'
      );
      
      CREATE TEMPORARY TABLE redis_dim (
        user_id STRING,     -- ユーザー ID (Redis キー)
        user_name STRING,    -- ユーザー名 (Redis フィールド-値ペアの一部)
        email STRING,        -- メールアドレス (Redis フィールド-値ペアの一部)
        register_time STRING, -- 登録時間 (Redis フィールド-値ペアの一部)
        PRIMARY KEY (user_id) NOT ENFORCED
      ) WITH (
        'connector' = 'redis',
        'host' = 'yourHost',
        'port' = 'yourPort',
        'password' = 'yourPassword',
        'mode' = 'HASHMAP'    -- HASHMAP モードを使用します。
      );
      
      CREATE TEMPORARY TABLE blackhole_sink (
        user_id STRING,        -- ユーザー ID
        user_name STRING,      -- ユーザー名
        email STRING,          -- メールアドレス
        register_time STRING,   -- 登録時間
        click_time TIMESTAMP(3) -- クリック時間
      ) WITH (
        'connector' = 'blackhole'
      );
      
      INSERT INTO blackhole_sink
      SELECT 
        t1.user_id,    -- ユーザー ID
        t2.user_name,  -- ユーザー名
        t2.email,      -- メールアドレス
        t2.register_time,  -- 登録時間
        t1.click_time      -- クリック時間
      FROM kafka_source AS t1
      JOIN redis_dim FOR SYSTEM_TIME AS OF t1.proctime AS t2
      ON t1.user_id = t2.user_id;
    • シングルバリューモードでの HASHMAP データの読み取り:この例では、hashName パラメーターの値 `testKey` がキーです。redis_dim ディメンションテーブルの user_id 列の値はフィールドで、user_name 列の値は値です。

      CREATE TEMPORARY TABLE kafka_source (
        user_id STRING,            -- ユーザー ID
        proctime AS PROCTIME()     -- 処理時間
      ) WITH (
        'connector' = 'kafka',
        'topic' = 'user_clicks',      -- Kafka トピック
        'properties.bootstrap.servers' = 'yourKafkaBroker', -- Kafka ブローカーアドレス
        'format' = 'json',            -- データ形式は JSON です。
        'scan.startup.mode' = 'earliest-offset' -- 最も早いオフセットから消費を開始します。
      );
      
      CREATE TEMPORARY TABLE redis_dim (
        user_id STRING,               -- ユーザー ID (Redis ハッシュフィールド)
        user_name STRING,             -- ユーザー名 (Redis ハッシュ値)
        PRIMARY KEY (user_id) NOT ENFORCED
      ) WITH (
        'connector' = 'redis',
        'host' = 'yourHost',        -- Redis ホストアドレス
        'port' = 'yourPort',        -- Redis ポート
        'password' = 'yourPassword',-- Redis パスワード
        'hashName' = 'testkey'      -- 固定 Redis ハッシュ名
      );
      
      CREATE TEMPORARY TABLE blackhole_sink (
        user_id STRING,               -- ユーザー ID
        redis_user_id STRING,         -- Redis 内のユーザー ID
        user_name STRING              -- ユーザー名
      ) WITH (
        'connector' = 'blackhole'
      );
      
      INSERT INTO blackhole_sink
      SELECT 
         t1.user_id,     -- ユーザー ID (Kafka から)
         t2.user_id,     -- Redis 内のユーザー ID
         t2.user_name    -- ユーザー名 (Redis から)
      FROM kafka_source AS t1
      JOIN redis_dim FOR SYSTEM_TIME AS OF t1.proctime AS t2
      ON t1.user_id = t2.user_id;