すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:Tair (Redis OSS-compatible)

最終更新日:Mar 05, 2026

このトピックでは、Tair (Redis OSS-compatible) コネクタの使用方法について説明します。

背景情報

Alibaba Cloud Tair は、オープンソース Redis プロトコルと互換性のあるデータベースサービスであり、ハイブリッドメモリ・ディスクストレージを提供します。Tair は、高スループット、低レイテンシー、柔軟なスケールアップ/スケールダウンといったビジネス要件を満たすために、高可用性 (HA) アクティブ/アクティブアーキテクチャとスケーラブルなクラスタアーキテクチャを使用しています。詳細については、「Tair (Redis OSS-compatible) とは?」をご参照ください。

以下の表は、Redis コネクタでサポートされる特徴について説明しています。

カテゴリ

詳細

サポートされるタイプ

ディメンションテーブルと結果テーブル

サポートされるモード

ストリーミングモード

データフォーマット

文字列

特定のモニタリングメトリック

  • ディメンションテーブル: なし

  • 結果テーブル:

    • numBytesOut

    • numRecordsOutPerSecond

    • numBytesOutPerSecond

    • currentSendTime

説明

メトリックの詳細については、「モニタリングメトリック」をご参照ください。

API タイプ

SQL

結果テーブルでのデータ更新または削除のサポート

はい

前提条件

制限事項

  • Redis コネクタはベストエフォートセマンティクスを提供し、Exactly-once 配信を保証しません。ご利用のアプリケーションでべき等性を確保する必要があります。

  • ディメンションテーブルには、以下の制限が適用されます。

    • Redis からは STRING および HASHMAP データの型のみを読み取ることができます。

    • ディメンションテーブル内のすべてのフィールドは STRING タイプである必要があります。プライマリキーを正確に 1 つ宣言する必要があります。

    • ディメンションテーブルを結合する場合、ON 句にはプライマリキーの同等条件を含める必要があります。

既知の問題とソリューション

Ververica Runtime (VVR) 8.0.9 のキャッシュ機能には問題があります。これを無効にするには、結果テーブルの WITH 句に sink.buffer-flush.max-rows = '0' を追加します。

構文

CREATE TABLE redis_table (
  col1 STRING,
  col2 STRING,
  PRIMARY KEY (col1) NOT ENFORCED -- Required.
) WITH (
  'connector' = 'redis',
  'host' = '<yourHost>',
  'mode' = 'STRING'  -- Required for sink tables.
);

WITH パラメーター

一般

パラメーター

説明

データの型

必須

デフォルト

備考

connector

テーブルタイプ。

String

はい

なし

値は redis である必要があります。

host

Redis サーバーの接続エンドポイント。

String

はい

なし

内部ネットワークエンドポイントを使用します。

説明

パブリックネットワークエンドポイントへの接続は、ネットワーク遅延や帯域幅制限などの要因により不安定になる場合があります。

port

Redis サーバーの接続ポート。

Int

いいえ

6379

なし。

password

Redis データベースのパスワード。

String

いいえ

空の文字列。これは検証が実行されないことを意味します。

なし。

dbNum

操作するデータベースの番号。

Int

いいえ

0

なし。

clusterMode

Redis クラスターがクラスターモードであるかどうかを指定します。

Boolean

いいえ

false

なし。

hostAndPorts

Redis クラスターのホストとポート番号。

説明

クラスターモードが有効で HA が不要な場合、host および port パラメーターを使用して 1 つのホストのみを構成するか、このパラメーターのみを構成できます。このパラメーターは、個々の host および port パラメーターよりも優先度が高くなります。

String

いいえ

ClusterMode = true で、セルフマネージド Redis クラスターへの Jedis 接続に HA が必要な場合は、このパラメーターを構成する必要があります。フォーマットは文字列です: "host1:port1,host2:port2"

key-prefix

テーブルのプライマリキー値のプレフィックス。

String

いいえ

なし

このパラメーターを構成すると、データのクエリまたは書き込み時に、プライマリキーフィールドの値にプレフィックスが自動的に追加されます。プレフィックスは、キープレフィックス (key-prefix) とプレフィックスデリミタ (key-prefix-delimiter) で構成されます。

説明

このパラメーターは VVR 8.0.7 以降でのみサポートされます。

key-prefix-delimiter

プライマリキー値とそのプレフィックス間のデリミタ。

String

いいえ

なし

connection.pool.max-total

接続プールが割り当てることができる最大接続数。

Int

いいえ

8

説明

このパラメーターは VVR 8.0.9 以降でのみサポートされます。

connection.pool.max-idle

接続プール内の最大アイドル接続数。

Int

いいえ

8

connection.pool.min-idle

接続プール内の最小アイドル接続数。

Int

いいえ

0

connect.timeout

接続確立のタイムアウト。

Duration

いいえ

3000 ms

socket.timeout

Redis サーバーからデータを受信する際のタイムアウト (ソケットタイムアウト)。

Duration

いいえ

3000 ms

cacert.filepath

SSL/TLS 証明書ファイルの完全なパス。ファイルフォーマットは jks である必要があります。

String

いいえ

なし。これは SSL/TLS 暗号化が無効であることを意味します。

TLS 暗号化の有効化」で説明されているように CA 証明書をダウンロードし、ジョブの [追加の依存関係ファイル] セクションに証明書をアップロードします。アップロード後、CA 証明書は /flink/usrlib ディレクトリに保存されます。「追加の依存関係ファイル」セクションでファイルをアップロードする方法の詳細については、「ジョブのデプロイ」をご参照ください。例: 'cacert.filepath' = '/flink/usrlib/ca.jks'

説明

このパラメーターは VVR 11.1 以降でのみサポートされます。

シンク固有

パラメーター

説明

データの型

必須

デフォルト

備考

mode

使用する Redis データの型。

String

はい

なし

Tair 結果テーブルは 5 種類の Redis データの型をサポートしています。DDL は指定されたフォーマットで定義され、プライマリキーが定義されている必要があります。詳細については、「Redis 結果テーブルのデータ型フォーマット」をご参照ください。

flattenHash

HASHMAP データをマルチバリューモードで書き込むかどうかを指定します。

Boolean

いいえ

false

有効な値:

  • true: データをマルチバリューモードで書き込みます。この場合、DDL で複数の非プライマリキーフィールドを宣言します。プライマリキーフィールドの値はキーに対応します。各非プライマリキーフィールドの名前はフィールドに対応し、その値はフィールドの値に対応します。

  • false: データをシングルバリューモードで書き込みます。この場合、DDL で 3 つのフィールドを宣言します。最初のプライマリキーフィールドの値はキーに対応します。2 番目の非プライマリキーフィールドの値はフィールドに対応します。3 番目の非プライマリキーフィールドの値は値に対応します。

説明
  • このパラメーターは、mode パラメーターが HASHMAP に設定されている場合にのみ有効です。

  • このパラメーターは VVR 8.0.7 以降でのみサポートされます。

ignoreDelete

リトラクションメッセージを無視するかどうかを指定します。

Boolean

いいえ

false

有効な値:

  • true: リトラクションメッセージが受信されたときに無視します。

  • false: リトラクションメッセージが受信されたときに、データの対応するキーと挿入されたデータを削除します。

expiration

書き込まれたデータのキーに TTL を設定します。

Long

いいえ

0。これは TTL が設定されないことを意味します。

このパラメーターの値が 0 より大きい場合、書き込まれたデータのキーにそれに対応する TTL が設定されます。単位はミリ秒です。

sink.buffer-flush.max-rows

キャッシュに保存できる最大レコード数。

Int

いいえ

200

キャッシュされたレコードには、すべての追加、更新、および削除イベントが含まれます。最大レコード数を超過すると、キャッシュがフラッシュされます。

説明
  • このパラメーターは VVR 8.0.9 以降でのみサポートされます。非クラスターモード (clusterMode = false) でのみ有効です。

  • VVR 11.4.0 以降では、このパラメーターは Redis クラスターモード (clusterMode = true) でもサポートされます。

sink.buffer-flush.interval

キャッシュフラッシュ間隔。

Duration

いいえ

1000 ms

キャッシュは非同期的にフラッシュされます。

説明
  • このパラメーターは VVR 8.0.9 以降でのみサポートされます。非クラスターモード (clusterMode = false) でのみ有効です。

  • VVR 11.4.0 以降では、このパラメーターは Redis クラスターモード (clusterMode = true) でもサポートされます。

ディメンションテーブル固有

パラメーター

説明

データの型

必須

デフォルト

備考

mode

読み取る Redis データの型。

String

いいえ

STRING

有効な値:

STRING: デフォルトで STRING タイプとしてデータを読み取ります。

HASHMAP: HASHMAP データをマルチバリューモードで読み取ります。

この場合、DDL で複数の非プライマリキーフィールドを宣言する必要があります。

  • プライマリキーフィールド: プライマリキーフィールドの値は HASHMAP のキーとして使用されます。

  • 非プライマリキーフィールド: 各非プライマリキーフィールドの名前はフィールドとして使用され、その値はフィールドの値として使用されます。

説明

このパラメーターは VVR 8.0.7 以降でのみサポートされます。

HASHMAP データをシングルバリューモードで読み取るには、hashName パラメーターを構成します。

hashName

HASHMAP データをシングルバリューモードで読み取る際に使用するキー。

String

いいえ

なし

mode パラメーターを指定しないが、HASHMAP データをシングルバリューモードで読み取りたい場合は、hashName を構成する必要があります。

この場合、DDL は 2 つのフィールドのみを宣言する必要があります。最初のプライマリキーフィールドの値はフィールドに対応し、2 番目の非プライマリキーフィールドの値は値に対応します。

cache

キャッシュポリシー。

String

いいえ

なし

Tair ディメンションテーブルは、以下のキャッシュポリシーをサポートしています。

None (デフォルト): キャッシュなし。

LRU: ディメンションテーブルデータの一部をキャッシュします。ソーステーブルからの各レコードについて、システムはまずキャッシュを検索します。データが見つからない場合、システムは物理ディメンションテーブルをクエリします。

ALL: ディメンションテーブルのすべてのデータをキャッシュします。ジョブの実行前に、システムはディメンションテーブルのすべてのデータをキャッシュにロードします。その後のすべてのルックアップはキャッシュに対して実行されます。キャッシュにデータが見つからない場合、キーは存在しません。フルキャッシュには有効期限があり、その後再ロードされます。

重要
  • ALL キャッシュポリシーは VVR 8.0.3 以降でのみサポートされます。

  • VVR 8.0.3 から 11.0 までのバージョンでは、ALL キャッシュポリシーは HASHMAP データをシングルバリューモードでのみ読み取ることをサポートしています。DDL では、3 つのフィールドを宣言する必要があります。最初のフィールドは、WITH 句の hashName パラメーターを使用してキーとして指定されます。2 番目のプライマリキーフィールドの値はフィールドに対応します。3 番目の非プライマリキーフィールドの値は値に対応します。

  • VVR 11.1 以降では、ALL キャッシュポリシーは HASHMAP データをマルチバリューモードで読み取ることをサポートしています。DDL では、複数の非プライマリキーフィールドを宣言します。プライマリキーフィールドの値はキーに対応します。各非プライマリキーフィールドの名前はフィールドに対応し、その値はフィールドの値に対応します。また、WITH 句で mode パラメーターを HASHMAP に設定する必要があります。

  • キャッシュサイズ (cacheSize) とキャッシュ更新間隔 (cacheTTLMs) パラメーターも構成する必要があります。

cacheSize

キャッシュサイズ。

Long

いいえ

10000

LRU キャッシュポリシーを選択する場合、キャッシュサイズを設定する必要があります。

cacheTTLMs

キャッシュタイムアウト期間 (ミリ秒)。

Long

いいえ

なし

cacheTTLMs の構成は、キャッシュ設定によって異なります。

キャッシュが None に設定されている場合、cacheTTLMs を構成する必要はありません。これは、キャッシュがタイムアウトしないことを意味します。

キャッシュが LRU に設定されている場合、cacheTTLMs はキャッシュタイムアウト期間です。デフォルトでは、キャッシュは有効期限切れになりません。

キャッシュが ALL に設定されている場合、cacheTTLMs はキャッシュ再ロード時間です。デフォルトでは、キャッシュは再ロードされません。

cacheEmpty

空の結果をキャッシュするかどうかを指定します。

Boolean

いいえ

true

なし。

cacheReloadTimeBlackList

キャッシュ更新禁止時間。キャッシュポリシーが ALL に設定されている場合、独身の日ショッピングフェスティバルなどの期間中にキャッシュがリフレッシュされるのを防ぐために、禁止時間を有効にすることができます。

String

いいえ

なし

フォーマットは以下のとおりです。

  • 完全な日付と時間範囲: 2017-10-24 14:00 -> 2017-10-24 15:00。

  • 日をまたぐ期間: 2017-11-10 23:30 -> 2017-11-11 08:00。

  • 毎日固定の期間: 12:00 -> 14:00, 22:00 -> 2:00。

    説明

    VVR 11.1 以降では、日付が指定されていない期間は毎日適用されます。

セパレータは以下のとおりです。

  • 複数の禁止期間を区切るには、コンマ (,) を使用します。

  • 禁止期間の開始時刻と終了時刻を区切るには、矢印 (->) を使用します。

async

データを非同期的に返すかどうかを指定します。

Boolean

いいえ

false

  • true: データを非同期的に返します。非同期的に返されるデータは、デフォルトで順序付けされません。

  • false (デフォルト): データを非同期的に返しません。

Redis 結果テーブルのデータ型フォーマット

タイプ

フォーマット

Redis 挿入コマンド

STRING タイプ

2 カラム DDL:

  • カラム 1: キー (STRING)

  • カラム 2: 値 (STRING)

set key value

LIST タイプ

2 カラム DDL:

  • カラム 1: キー (STRING)

  • カラム 2: 値 (STRING)

lpush key value

SET タイプ

2 カラム DDL:

  • カラム 1: キー (STRING)

  • カラム 2: 値 (STRING)

sadd key value

HASHMAP タイプ

デフォルトでは、3 カラム DDL:

  • カラム 1: キー (STRING)

  • カラム 2: フィールド (STRING)

  • カラム 3: 値 (STRING)

hmset key field value

flattenHash が true に設定されている場合、DDL は複数のカラムをサポートします。以下の例では 4 つのカラムを使用しています。

  • カラム 1: キー (STRING)

  • カラム 2: カラム名 (例: col1) はフィールドに対応し、カラム値 (例: value1) はフィールドの値 (STRING) に対応します。

  • カラム 3: カラム名 (例: col2) はフィールドに対応し、カラム値 (例: value2) はフィールドの値 (STRING) に対応します。

  • カラム 4: カラム名 (例: col3) はフィールドに対応し、カラム値 (例: value3) はフィールドの値 (STRING) に対応します。

hmset key col1 value1 col2 value2 col3 value3

SORTEDSET タイプ

3 カラム DDL:

  • カラム 1: キー (STRING)

  • カラム 2: スコア (DOUBLE)

  • カラム 3: 値 (STRING)

zadd key score value

タイプマッピング

タイプ

Redis フィールドタイプ

Flink フィールドタイプ

一般

STRING

STRING

結果テーブルのみ対象

SCORE

DOUBLE

説明

Redis SCORE タイプは SORTEDSET (ソートセット) に適用されます。各値に DOUBLE スコアを手動で割り当てる必要があります。値はその後、スコアに基づいて昇順でソートされます。

使用例

  • シンクテーブル

    • STRING データの書き込み: コード例では、redis_sink 結果テーブルの user_id カラムの値が Redis にキーとして書き込まれ、login_time カラムの値が値として書き込まれます。

      CREATE TEMPORARY TABLE kafka_source (
        user_id STRING,          -- ユーザー ID
        login_time STRING        -- ログオンタイムスタンプ
      ) WITH (
        'connector' = 'kafka',
        'topic' = 'user_logins',      -- Kafka トピック
        'properties.bootstrap.servers' = 'yourKafkaBroker', -- Kafka ブローカーアドレス
        'format' = 'json',            -- データフォーマットは JSON
        'scan.startup.mode' = 'earliest-offset' -- 最も古いオフセットから消費する
      );
      
      CREATE TEMPORARY TABLE redis_sink (
        user_id STRING,               -- Redis キー
        login_time STRING,            -- Redis 値
        PRIMARY KEY (user_id) NOT ENFORCED
      ) WITH (
        'connector' = 'redis',
        'mode' = 'STRING',   -- STRING モードを使用する
        'host' = 'yourHost',
        'port' = 'yourPort',
        'password' = 'yourPassword'
      );
      
      INSERT INTO redis_sink
      SELECT *
      FROM kafka_source;
    • HASHMAP データをマルチバリューモードで書き込む: コード例では、redis_sink 結果テーブルの order_id カラムの値が Redis にキーとして書き込まれます。product_namequantity、および amount カラムの値は、それぞれ product_name、quantity、および amount フィールドに書き込まれます。

      CREATE TEMPORARY TABLE kafka_source (
        order_id STRING,     -- 注文 ID
        product_name STRING, -- 製品名
        quantity STRING,         -- 数量
        amount STRING         -- 注文金額
      ) WITH (
        'connector' = 'kafka',
        'topic' = 'orders_topic', -- Kafka トピック
        'properties.bootstrap.servers' = 'yourKafkaBroker', -- Kafka ブローカーアドレス
        'format' = 'json',        -- データフォーマットは JSON
        'scan.startup.mode' = 'earliest-offset' -- 最も古いオフセットから消費する
      );
      
      CREATE TEMPORARY TABLE redis_sink (
        order_id STRING,        -- 注文 ID (Redis キーとして使用)
        product_name STRING,    -- 製品名 (Redis Hash フィールドとして使用)
        quantity STRING,        -- 数量 (Redis Hash フィールドとして使用)
        amount STRING,          -- 注文金額 (Redis Hash フィールドとして使用)
        PRIMARY KEY (order_id) NOT ENFORCED
      ) WITH (
        'connector' = 'redis',
        'mode' = 'HASHMAP',   -- HASHMAP モードを使用する
        'flattenHash' = 'true',
        'host' = 'yourHost',
        'port' = 'yourPort',
        'password' = 'yourPassword'
      );
      
      INSERT INTO redis_sink
      SELECT *
      FROM kafka_source;
    • HASHMAP データをシングルバリューモードで書き込む: コード例では、redis_sink 結果テーブルの order_id カラムの値が Redis にキーとして書き込まれます。product_name カラムの値はフィールドとして書き込まれ、quantity カラムの値は値として書き込まれます。

      CREATE TEMPORARY TABLE kafka_source (
        order_id STRING,          -- 注文 ID
        product_name STRING,      -- 製品名
        quantity STRING           -- 数量
      ) WITH (
        'connector' = 'kafka',
        'topic' = 'orders_topic', -- Kafka トピック
        'properties.bootstrap.servers' = 'yourKafkaBroker', -- Kafka ブローカーアドレス
        'format' = 'json',        -- データフォーマットは JSON
        'scan.startup.mode' = 'earliest-offset' -- 最も古いオフセットから消費する
      );
      
      CREATE TEMPORARY TABLE redis_sink (
        order_id STRING,          -- Redis キー
        product_name STRING,      -- Redis フィールド
        quantity STRING,          -- Redis 値
        PRIMARY KEY (order_id) NOT ENFORCED
      ) WITH (
        'connector' = 'redis',
        'mode' = 'HASHMAP',
        'host' = 'yourHost',
        'port' = 'yourPort',
        'password' = 'yourPassword'
      );
      
      INSERT INTO redis_sink
      SELECT *
      FROM kafka_source;
  • ディメンションテーブル

    • STRING データの読み取り: コード例では、redis_dim ディメンションテーブルの user_id カラムの値がキーに対応し、user_name カラムの値が値に対応します。

      CREATE TEMPORARY TABLE kafka_source (
        user_id STRING,               -- ユーザー ID
        proctime AS PROCTIME()        -- 処理時間
      ) WITH (
        'connector' = 'kafka',
        'topic' = 'user_clicks',      -- Kafka トピック
        'properties.bootstrap.servers' = 'yourKafkaBroker', -- Kafka ブローカーアドレス
        'format' = 'json',            -- データフォーマットは JSON
        'scan.startup.mode' = 'earliest-offset' -- 最も古いオフセットから消費する
      );
      
      CREATE TEMPORARY TABLE redis_dim (
        user_id STRING,               -- ユーザー ID (Redis キー)
        user_name STRING,             -- ユーザー名 (Redis 値)
        PRIMARY KEY (user_id) NOT ENFORCED
      ) WITH (
        'connector' = 'redis',
        'host' = 'yourHost',    -- Redis ホストアドレス
        'port' = 'yourPort',    -- Redis ポート
        'password' = 'yourPassword',  -- Redis パスワード
        'mode' = 'STRING'             -- STRING モードを使用する
      );
      
      CREATE TEMPORARY TABLE blackhole_sink (
        user_id STRING,               -- ユーザー ID
        redis_user_id STRING,         -- Redis からのユーザー ID
        user_name STRING              -- ユーザー名
      ) WITH (
        'connector' = 'blackhole'
      );
      
      INSERT INTO blackhole_sink
      SELECT 
        t1.user_id,                   -- ユーザー ID (Kafka から)
        t2.user_id,                   -- ユーザー ID (Redis から)
        t2.user_name                  -- ユーザー名 (Redis から)
      FROM kafka_source AS t1
      JOIN redis_dim FOR SYSTEM_TIME AS OF t1.proctime AS t2
      ON t1.user_id = t2.user_id;
    • HASHMAP データをマルチバリューモードで読み取る: コード例では、redis_dim ディメンションテーブルの user_id カラムの値がキーです。user_name カラムの値は user_name フィールドにマッピングされます。email カラムの値は email フィールドにマッピングされます。register_time カラムの値は register_time フィールドにマッピングされます。

      CREATE TEMPORARY TABLE kafka_source (
        user_id STRING,               -- ユーザー ID
        click_time TIMESTAMP(3),      -- クリック時間
        proctime AS PROCTIME()        -- 処理時間
      ) WITH (
        'connector' = 'kafka',
        'topic' = 'user_clicks',      -- Kafka トピック
        'properties.bootstrap.servers' = 'yourKafkaBroker', -- Kafka ブローカーアドレス
        'format' = 'json',            -- データフォーマットは JSON
        'scan.startup.mode' = 'earliest-offset'
      );
      
      CREATE TEMPORARY TABLE redis_dim (
        user_id STRING,     -- ユーザー ID (Redis キー)
        user_name STRING,    -- ユーザー名 (Redis フィールドと値のペアの一部)
        email STRING,        -- メールアドレス (Redis フィールドと値のペアの一部)
        register_time STRING, -- 登録時間 (Redis フィールドと値のペアの一部)
        PRIMARY KEY (user_id) NOT ENFORCED
      ) WITH (
        'connector' = 'redis',
        'host' = 'yourHost',
        'port' = 'yourPort',
        'password' = 'yourPassword',
        'mode' = 'HASHMAP'    -- HASHMAP モードを使用する
      );
      
      CREATE TEMPORARY TABLE blackhole_sink (
        user_id STRING,        -- ユーザー ID
        user_name STRING,      -- ユーザー名
        email STRING,          -- メールアドレス
        register_time STRING,   -- 登録時間
        click_time TIMESTAMP(3) -- クリック時間
      ) WITH (
        'connector' = 'blackhole'
      );
      
      INSERT INTO blackhole_sink
      SELECT 
        t1.user_id,    -- ユーザー ID
        t2.user_name,  -- ユーザー名
        t2.email,      -- メールアドレス
        t2.register_time,  -- 登録時間
        t1.click_time      -- クリック時間
      FROM kafka_source AS t1
      JOIN redis_dim FOR SYSTEM_TIME AS OF t1.proctime AS t2
      ON t1.user_id = t2.user_id;
    • HASHMAP データをシングルバリューモードで読み取る: コード例では、hashName パラメーターの値は testKey であり、キーとして使用されます。redis_dim ディメンションテーブルの user_id カラムの値はフィールドであり、user_name カラムの値は値です。

      CREATE TEMPORARY TABLE kafka_source (
        user_id STRING,            -- ユーザー ID
        proctime AS PROCTIME()     -- 処理時間
      ) WITH (
        'connector' = 'kafka',
        'topic' = 'user_clicks',      -- Kafka トピック
        'properties.bootstrap.servers' = 'yourKafkaBroker', -- Kafka ブローカーアドレス
        'format' = 'json',            -- データフォーマットは JSON
        'scan.startup.mode' = 'earliest-offset' -- 最も古いオフセットから消費する
      );
      
      CREATE TEMPORARY TABLE redis_dim (
        user_id STRING,               -- ユーザー ID (Redis ハッシュフィールド)
        user_name STRING,             -- ユーザー名 (Redis ハッシュ値)
        PRIMARY KEY (user_id) NOT ENFORCED
      ) WITH (
        'connector' = 'redis',
        'host' = 'yourHost',        -- Redis ホストアドレス
        'port' = 'yourPort',        -- Redis ポート
        'password' = 'yourPassword',-- Redis パスワード
        'hashName' = 'testkey'      -- 固定 Redis ハッシュ名
      );
      
      CREATE TEMPORARY TABLE blackhole_sink (
        user_id STRING,               -- ユーザー ID
        redis_user_id STRING,         -- Redis からのユーザー ID
        user_name STRING              -- ユーザー名
      ) WITH (
        'connector' = 'blackhole'
      );
      
      INSERT INTO blackhole_sink
      SELECT 
         t1.user_id,     -- ユーザー ID (Kafka から)
         t2.user_id,     -- ユーザー ID (Redis から)
         t2.user_name    -- ユーザー名 (Redis から)
      FROM kafka_source AS t1
      JOIN redis_dim FOR SYSTEM_TIME AS OF t1.proctime AS t2
      ON t1.user_id = t2.user_id;