全部產品
Search
文件中心

Realtime Compute for Apache Flink:雲資料庫Tair(Redis開源版)

更新時間:Dec 04, 2025

本文為您介紹如何使用雲資料庫Tair連接器(Redis開源版)。

背景資訊

阿里雲資料庫Tair是相容開源Redis協議標準、提供記憶體加硬碟混合儲存的資料庫服務,基於高可靠雙機熱備架構及可平滑擴充的叢集架構,充分滿足高吞吐、低延遲及彈性變更配置的業務需求,更多內容詳情請參見什麼是雲資料庫 Tair(相容 Redis)

Redis連接器支援的資訊如下。

類別

詳情

支援類型

維表和結果表

支援模式

流模式

資料格式

String

特有監控指標

  • 維表:無

  • 結果表:

    • numBytesOut

    • numRecordsOutPerSecond

    • numBytesOutPerSecond

    • currentSendTime

說明

指標含義詳情,請參見監控指標說明

API 種類

SQL

是否支援更新或刪除結果表資料

前提條件

使用限制

  • 目前Redis連接器是僅提供Best Effort語義,無法保證資料的Exactly Once,需要您自行保證語義上的等冪性。

  • 維表使用限制有:

    • 僅支援讀取Redis資料存放區中STRING和HASHMAP類型的資料。

    • 維表的欄位必須為STRING,且必須聲明且只能聲明一個主鍵。

    • 維表JOIN時,ON條件必須包含主鍵的等值條件。

已知缺陷及解決方案

Realtime Compute引擎VVR 8.0.9版本緩衝功能存在問題,需要在結果表WITH參數中添加 'sink.buffer-flush.max-rows' = '0' 禁用。

文法結構

CREATE TABLE redis_table (
  col1 STRING,
  col2 STRING,
  PRIMARY KEY (col1) NOT ENFORCED -- 必填。
) WITH (
  'connector' = 'redis',
  'host' = '<yourHost>',
  'mode' = 'STRING'  -- 結果表時必填。
);

WITH參數

通用

參數

說明

資料類型

是否必填

預設值

備忘

connector

表類型。

String

固定值為redis。

host

Redis Server串連地址。

String

推薦您使用內網地址。

說明

由於網路延遲和頻寬節流設定等因素,串連公網地址時可能會出現不穩定的情況。

port

Redis Server串連連接埠。

Int

6379

無。

password

Redis資料庫密碼。

String

Null 字元串,表示不進行校正。

無。

dbNum

選擇操作的資料庫編號。

Int

0

無。

clusterMode

Redis叢集是否為叢集模式。

Boolean

false

無。

hostAndPorts

Redis叢集的主機和連接埠號碼。

說明

如果啟用了叢集模式,且不需要串連高可用,可以通過host和port配置項只配置其中一台主機,也可以只配置該項。該配置項的優先順序高於獨立的host和port配置項。

String

如果ClusterMode = true,同時需要支援Jedis到自建Redis叢集串連的高可用,必須配置該項。配置格式為字串:"host1:port1,host2:port2"

key-prefix

表主索引值的首碼。

String

配置後,Redis維表和結果表的主鍵欄位值在查詢或者寫入Redis時會被自動添加首碼,該首碼是由鍵首碼(key-prefix)和其後的首碼分隔字元(key-prefix-delimiter)組成。

說明

僅Realtime Compute引擎VVR 8.0.7及以上版本支援該參數。

key-prefix-delimiter

表主索引值與表主索引值首碼之間的分隔字元。

String

connection.pool.max-total

串連池可以分配的最大串連數。

Int

8

說明

僅Realtime Compute引擎VVR 8.0.9及以上版本支援該參數。

connection.pool.max-idle

串連池中最大空閑串連數。

Int

8

connection.pool.min-idle

串連池中最小空閑串連數。

Int

0

connect.timeout

建立串連的逾時時間。

Duration

3000ms

socket.timeout

從Redis伺服器接收資料的逾時時間(即通訊端逾時)。

Duration

3000ms

cacert.filepath

SSL/TLS認證檔案的完整路徑,檔案格式必須為jks。

String

無,表示不開啟SSL/TLS加密。

參考開啟TLS加密下載CA認證,並在作業的附加依賴檔案中上傳,上傳後,CA認證將儲存在/flink/usrlib目錄下。如何在附加依賴檔案中上傳檔案,詳情請參見部署作業。樣本:'cacert.filepath' = '/flink/usrlib/ca.jks'

說明

僅Realtime Compute引擎VVR 11.1及以上版本支援該參數。

結果表專屬

參數

說明

資料類型

是否必填

預設值

備忘

mode

對應Redis的資料類型。

String

雲資料庫Tair結果表支援5種Redis資料類型,其DDL必須按指定格式定義且主鍵必須被定義。詳情請參見Redis結果表資料類型格式

flattenHash

是否按照多值模式寫入HASHMAP類型資料。

Boolean

false

參數取值如下:

  • true:按照多值模式寫入。此時,您需要在DDL中聲明多個非主鍵欄位,主鍵欄位值對應key,每個非主鍵欄位的欄位名對應一個field,欄位值對應該field的value。

  • false:按照單值模式寫入。此時您需要在DDL中聲明三個欄位,第一個主鍵欄位的欄位值對應key,第二個非主鍵欄位的欄位值對應field,第三個非主鍵欄位的欄位值對應value。

說明
  • 該參數僅在mode參數取值為HASHMAP時生效。

  • 僅Realtime Compute引擎VVR 8.0.7及以上版本支援該參數。

ignoreDelete

是否忽略Retraction訊息。

Boolean

false

參數取值如下:

  • true:收到Retraction訊息時,忽略Retraction訊息。

  • false:收到Retraction訊息時,同時刪除資料對應的key及已插入的資料。

expiration

為寫入資料對應的Key設定TTL。

Long

0,代表不設定TTL。

如果該參數的值大於0,則寫入資料對應的Key會被設定相應的TTL,單位為毫秒。

sink.buffer-flush.max-rows

緩衝可儲存的最大記錄數。

Int

200

緩衝記錄包括所有追加、修改和刪除的事件,超過最大記錄數時將刷寫緩衝。

說明
  • 僅Realtime Compute引擎VVR 8.0.9及以上版本支援該參數。

  • 僅VVR 11.4.0及更高版本支援Redis叢集執行個體。

  • 若使用低於VVR 11.4.0的版本寫入 Redis 叢集執行個體,請將該參數設為0以禁用。

sink.buffer-flush.interval

緩衝刷寫時間間隔。

Duration

1000ms

非同步刷寫緩衝。

說明
  • 僅Realtime Compute引擎VVR 8.0.9及以上版本支援該參數。

  • 僅VVR 11.4.0及更高版本支援Redis叢集執行個體。

  • 若使用低於VVR 11.4.0的版本寫入Redis叢集執行個體,請將該參數設為0以禁用。

維表專屬

參數

說明

資料類型

是否必填

預設值

備忘

mode

讀取Redis的資料類型。

String

STRING

參數取值如下:

STRING:預設以STRING類型讀取。

HASHMAP:按照多值模式讀取HASHMAP類型資料。

此時DDL需要聲明多個非主鍵欄位。

  • 主鍵欄位 :主鍵欄位的值將作為HASHMAP中的key。

  • 非主鍵欄位 :每個非主鍵欄位的欄位名將作為HASHMAP中的field,而欄位的值則對應value。

說明

僅Realtime Compute引擎VVR 8.0.7及以上版本支援該參數。

如果您需要以單值模式讀取HASHMAP類型資料時,請配置hashName參數。

hashName

單值模式讀取HASHMAP類型資料時使用的key。

String

如果您未指定mode參數,還希望以單值模式讀取HASHMAP類型資料。此時,您需要配置hashName。

此時DDL僅需要聲明兩個欄位,第一個主鍵欄位的欄位值對應field,第二個非主鍵欄位的欄位值對應value。

cache

緩衝策略。

String

None

雲資料庫Tair維表支援以下緩衝策略:

None(預設值):無緩衝。

LRU:緩衝維表裡的部分資料。源表的每條資料都會觸發系統先在Cache中尋找資料,如果沒有找到,則去物理維表中尋找。

ALL:緩衝維表裡的所有資料。在Job運行前,系統會將維表中所有資料載入到Cache中,之後所有的維表尋找資料都會通過Cache進行。如果在Cache中無法找到資料,則KEY不存在。全量的Cache有一個到期時間,到期後會重新載入一遍全量Cache。

重要
  • 僅Realtime Compute引擎VVR 8.0.3及以上版本支援ALL緩衝策略。

  • Realtime Compute引擎VVR 8.0.3及以上、11.1以下版本的ALL緩衝策略,僅支援單值模式讀取hashmap類型資料。在DDL中需要聲明三個欄位,第一個在with參數中指定hashName作為key,第二個主鍵欄位的欄位值對應field,第三個非主鍵欄位的欄位值對應value。

  • Realtime Compute引擎VVR 11.1及以上版本的ALL緩衝策略支援多值模式讀取hashmap類型資料。在DDL中需要聲明多個非主鍵欄位,主鍵欄位值對應key,每個非主鍵欄位的欄位名對應一個field,欄位值對應該field的value,同時在with參數中將mode指定為HASHMAP。

  • 需要同時配置緩衝大小(cacheSize)以及緩衝更新時間間隔(cacheTTLMs)參數。

cacheSize

緩衝大小。

Long

10000

當選擇LRU緩衝策略時,需要設定緩衝大小。

cacheTTLMs

緩衝逾時時間長度,單位為毫秒。

Long

cacheTTLMs配置和cache有關:

如果cache配置為None,則cacheTTLMs可以不配置,表示緩衝不逾時。

如果cache配置為LRU,則cacheTTLMs為緩衝逾時時間。預設不到期。

如果cache配置為ALL,則cacheTTLMs為緩衝載入時間。預設不重新載入。

cacheEmpty

是否緩衝空結果。

Boolean

true

無。

cacheReloadTimeBlackList

更新時間黑名單。在緩衝策略選擇為ALL時,啟用更新時間黑名單,防止在此時間內做Cache更新(例如雙11情境)。

String

格式為:

  • 完整日期時間範圍:2017-10-24 14:00 -> 2017-10-24 15:00。

  • 跨天時間段:2017-11-10 23:30 -> 2017-11-11 08:00。

  • 每日固定時間段:12:00 -> 14:00, 22:00 -> 2:00。

    說明

    僅Realtime Compute引擎VVR 11.1及以上版本支援未指定日期的時間段會對每天生效。

分隔字元的使用方式如下所示:

  • 用英文逗號(,)來分隔多個時間黑名單。

  • 用箭頭(->)來分割黑名單的起始結束時間。

async

是否非同步返回資料。

Boolean

false

  • true:表示非同步返回資料。非同步返回資料預設是無序的。

  • false(預設值):表示不進行非同步返回資料。

Redis結果表資料類型格式

類型

格式

Redis插入資料的命令

STRING類型

DDL為兩列:

  • 第1列為key,STRING類型。

  • 第2列為value,STRING類型。

set key value

LIST類型

DDL為兩列:

  • 第1列為key,STRING類型。

  • 第2列為value,STRING類型。

lpush key value

SET類型

DDL為兩列:

  • 第1列為key,STRING類型。

  • 第2列為value,STRING類型。

sadd key value

HASHMAP類型

預設情況下,DDL為三列:

  • 第1列為key,STRING類型。

  • 第2列為field,STRING類型。

  • 第3列為value,STRING類型。

hmset key field value

flattenHash參數配置為true時,DDL支援多列,以4列的情況為例:

  • 第1列為key,STRING類型。

  • 第2列的欄位名(假設為col1)對應一個field,欄位值(假設為value1)對應該field的value,STRING類型。

  • 第3列的欄位名(假設為col2)對應一個field,欄位值(假設為value2)對應該field的value,STRING類型。

  • 第4列的欄位名(假設為col3)對應一個field,欄位值(假設為value3)對應該field的value,STRING類型。

hmset key col1 value1 col2 value2 col3 value3

SORTEDSET類型

DDL為三列:

  • 第1列為key,STRING類型。

  • 第2列為score,DOUBLE類型。

  • 第3列為value,STRING類型。

zadd key score value

類型映射

類型

Redis欄位類型

Flink欄位類型

通用

STRING

STRING

結果表專屬

SCORE

DOUBLE

說明

因為Redis的SCORE類型應用於SORTEDSET(有序集合),所以需要手動為每個Value設定一個DOUBLE類型的SCORE,Value才能按照該SCORE從小到大進行排序。

使用樣本

  • 結果表

    • 寫入STRING類型資料:在程式碼範例中,redis_sink結果表中user_id列的值會作為key,login_time列的值會作為value寫入到Redis中。

      CREATE TEMPORARY TABLE kafka_source (
        user_id STRING,          -- 使用者ID
        login_time STRING        -- 登入時間戳記
      ) WITH (
        'connector' = 'kafka',
        'topic' = 'user_logins',      -- Kafka主題
        'properties.bootstrap.servers' = 'yourKafkaBroker', -- Kafka broker地址
        'format' = 'json',            -- 資料格式為JSON
        'scan.startup.mode' = 'earliest-offset' -- 從最早的訊息開始消費
      );
      
      CREATE TEMPORARY TABLE redis_sink (
        user_id STRING,               -- Redis的key
        login_time STRING,            -- Redis的value
        PRIMARY KEY (user_id) NOT ENFORCED
      ) WITH (
        'connector' = 'redis',
        'mode' = 'STRING',   -- 使用STRING模式
        'host' = 'yourHost',
        'port' = 'yourPort',
        'password' = 'yourPassword'
      );
      
      INSERT INTO redis_sink
      SELECT *
      FROM kafka_source;
    • 多值模式寫入HASHMAP類型資料:在程式碼範例中,redis_sink結果表中的order_id列的值會作為key,product_name列的值會作為field為product_name的value,quantity列的值會作為field為quantity的value,amount列的值會作為field為amount的value,寫入到Redis中。

      CREATE TEMPORARY TABLE kafka_source (
        order_id STRING,     -- 訂單 ID
        product_name STRING, -- 商品名稱
        quantity STRING,         -- 商品數量
        amount STRING         -- 訂單金額
      ) WITH (
        'connector' = 'kafka',
        'topic' = 'orders_topic', -- Kafka主題
        'properties.bootstrap.servers' = 'yourKafkaBroker', -- Kafka broker地址
        'format' = 'json',        -- 資料格式為JSON
        'scan.startup.mode' = 'earliest-offset' -- 從最早的訊息開始消費
      );
      
      CREATE TEMPORARY TABLE redis_sink (
        order_id STRING,        -- 訂單ID,作為Redis的key
        product_name STRING,    -- 商品名稱,作為Redis Hash的field
        quantity STRING,        -- 商品數量,作為Redis Hash的field
        amount STRING,          -- 訂單金額,作為Redis Hash的field
        PRIMARY KEY (order_id) NOT ENFORCED
      ) WITH (
        'connector' = 'redis',
        'mode' = 'HASHMAP',   -- 使用HASHMAP模式
        'flattenHash' = 'true',
        'host' = 'yourHost',
        'port' = 'yourPort',
        'password' = 'yourPassword'
      );
      
      INSERT INTO redis_sink
      SELECT *
      FROM kafka_source;
    • 單值模式寫入HASHMAP類型資料:在程式碼範例中,redis_sink結果表中的order_id列的值會作為key,product_name列的值會作為field,quantity列的值會作為value寫入到Redis中。

      CREATE TEMPORARY TABLE kafka_source (
        order_id STRING,          -- 訂單ID
        product_name STRING,      -- 商品名稱
        quantity STRING           -- 商品數量
      ) WITH (
        'connector' = 'kafka',
        'topic' = 'orders_topic', -- Kafka主題
        'properties.bootstrap.servers' = 'yourKafkaBroker', -- Kafka broker地址
        'format' = 'json',        -- 資料格式為JSON
        'scan.startup.mode' = 'earliest-offset' -- 從最早的訊息開始消費
      );
      
      CREATE TEMPORARY TABLE redis_sink (
        order_id STRING,          -- Redis的key
        product_name STRING,      -- Redis的field
        quantity STRING,          -- Redis的value
        PRIMARY KEY (order_id) NOT ENFORCED
      ) WITH (
        'connector' = 'redis',
        'mode' = 'HASHMAP',
        'host' = 'yourHost',
        'port' = 'yourPort',
        'password' = 'yourPassword'
      );
      
      INSERT INTO redis_sink
      SELECT *
      FROM kafka_source;
  • 維表

    • 讀取STRING類型資料:在程式碼範例中,redis_dim維表中的user_id列的值對應key,user_name列的值對應value。

      CREATE TEMPORARY TABLE kafka_source (
        user_id STRING,               -- 使用者ID
        proctime AS PROCTIME()        -- 處理時間
      ) WITH (
        'connector' = 'kafka',
        'topic' = 'user_clicks',      -- Kafka主題
        'properties.bootstrap.servers' = 'yourKafkaBroker', -- Kafka broker地址
        'format' = 'json',            -- 資料格式為JSON
        'scan.startup.mode' = 'earliest-offset' -- 從最早的訊息開始消費
      );
      
      CREATE TEMPORARY TABLE redis_dim (
        user_id STRING,               -- 使用者ID(Redis key)
        user_name STRING,             -- 使用者名稱(Redis value)
        PRIMARY KEY (user_id) NOT ENFORCED
      ) WITH (
        'connector' = 'redis',
        'host' = 'yourHost',    -- Redis主機地址
        'port' = 'yourPort',    -- Redis連接埠
        'password' = 'yourPassword',  -- Redis密碼
        'mode' = 'STRING'             -- 使用STRING模式
      );
      
      CREATE TEMPORARY TABLE blackhole_sink (
        user_id STRING,               -- 使用者ID
        redis_user_id STRING,         -- Redis中的使用者ID
        user_name STRING              -- 使用者名稱
      ) WITH (
        'connector' = 'blackhole'
      );
      
      INSERT INTO blackhole_sink
      SELECT 
        t1.user_id,                   -- 使用者ID(來自 Kafka)
        t2.user_id,                   -- Redis中的使用者ID
        t2.user_name                  -- 使用者名稱(來自Redis)
      FROM kafka_source AS t1
      JOIN redis_dim FOR SYSTEM_TIME AS OF t1.proctime AS t2
      ON t1.user_id = t2.user_id;
    • 多值模式讀取HASHMAP類型資料:在程式碼範例中,redis_dim維表中的user_id列的值對應key,user_name列的值對應field為user_name的value,email列的值對應field為email的value,register_time列的值對應field為register_time的value。

      CREATE TEMPORARY TABLE kafka_source (
        user_id STRING,               -- 使用者ID
        click_time TIMESTAMP(3),      -- 點擊時間
        proctime AS PROCTIME()        -- 處理時間
      ) WITH (
        'connector' = 'kafka',
        'topic' = 'user_clicks',      -- Kafka主題
        'properties.bootstrap.servers' = 'yourKafkaBroker', -- Kafka broker地址
        'format' = 'json',            -- 資料格式為JSON
        'scan.startup.mode' = 'earliest-offset'
      );
      
      CREATE TEMPORARY TABLE redis_dim (
        user_id STRING,     -- 使用者ID(Redis key)
        user_name STRING,    -- 使用者名稱(Redis field-value對的一部分)
        email STRING,        -- 郵箱(Redis field-value對的一部分)
        register_time STRING, -- 註冊時間(Redis field-value對的一部分)
        PRIMARY KEY (user_id) NOT ENFORCED
      ) WITH (
        'connector' = 'redis',
        'host' = 'yourHost',
        'port' = 'yourPort',
        'password' = 'yourPassword',
        'mode' = 'HASHMAP'    -- 使用HASHMAP模式
      );
      
      CREATE TEMPORARY TABLE blackhole_sink (
        user_id STRING,        -- 使用者ID
        user_name STRING,      -- 使用者名稱
        email STRING,          -- 郵箱
        register_time STRING,   -- 註冊時間
        click_time TIMESTAMP(3) -- 點擊時間
      ) WITH (
        'connector' = 'blackhole'
      );
      
      INSERT INTO blackhole_sink
      SELECT 
        t1.user_id,    -- 使用者ID
        t2.user_name,  -- 使用者名稱
        t2.email,      -- 郵箱
        t2.register_time,  -- 註冊時間
        t1.click_time      -- 點擊時間
      FROM kafka_source AS t1
      JOIN redis_dim FOR SYSTEM_TIME AS OF t1.proctime AS t2
      ON t1.user_id = t2.user_id;
    • 單值模式讀取HASHMAP類型資料:在程式碼範例中,hashName參數的值testKey為key,redis_dim維表中的user_id列的值對應field,user_name列的值對應value。

      CREATE TEMPORARY TABLE kafka_source (
        user_id STRING,            -- 使用者ID
        proctime AS PROCTIME()     -- 處理時間
      ) WITH (
        'connector' = 'kafka',
        'topic' = 'user_clicks',      -- Kafka主題
        'properties.bootstrap.servers' = 'yourKafkaBroker', -- Kafka broker地址
        'format' = 'json',            -- 資料格式為JSON
        'scan.startup.mode' = 'earliest-offset' -- 從最早的訊息開始消費
      );
      
      CREATE TEMPORARY TABLE redis_dim (
        user_id STRING,               -- 使用者ID(Redis hash field)
        user_name STRING,             -- 使用者名稱(Redis hash value)
        PRIMARY KEY (user_id) NOT ENFORCED
      ) WITH (
        'connector' = 'redis',
        'host' = 'yourHost',        -- Redis主機地址
        'port' = 'yourPort',        -- Redis連接埠
        'password' = 'yourPassword',-- Redis密碼
        'hashName' = 'testkey'      -- 固定的Redis hash名稱
      );
      
      CREATE TEMPORARY TABLE blackhole_sink (
        user_id STRING,               -- 使用者ID
        redis_user_id STRING,         -- Redis中的使用者ID
        user_name STRING              -- 使用者名稱
      ) WITH (
        'connector' = 'blackhole'
      );
      
      INSERT INTO blackhole_sink
      SELECT 
         t1.user_id,     -- 使用者ID(來自Kafka)
         t2.user_id,     -- Redis中的使用者ID
         t2.user_name    -- 使用者名稱(來自 Redis)
      FROM kafka_source AS t1
      JOIN redis_dim FOR SYSTEM_TIME AS OF t1.proctime AS t2
      ON t1.user_id = t2.user_id;