本文為您介紹如何使用雲資料庫Tair連接器(Redis開源版)。
背景資訊
阿里雲資料庫Tair是相容開源Redis協議標準、提供記憶體加硬碟混合儲存的資料庫服務,基於高可靠雙機熱備架構及可平滑擴充的叢集架構,充分滿足高吞吐、低延遲及彈性變更配置的業務需求,更多內容詳情請參見什麼是雲資料庫 Tair(相容 Redis)。
Redis連接器支援的資訊如下。
類別 | 詳情 |
支援類型 | 維表和結果表 |
支援模式 | 流模式 |
資料格式 | String |
特有監控指標 |
說明 指標含義詳情,請參見監控指標說明。 |
API 種類 | SQL |
是否支援更新或刪除結果表資料 | 是 |
前提條件
已建立雲資料庫Tair(Redis開源版)執行個體,詳情請參見步驟1:建立執行個體。
已設定白名單,詳情請參見步驟2:設定白名單。
使用限制
目前Redis連接器是僅提供Best Effort語義,無法保證資料的Exactly Once,需要您自行保證語義上的等冪性。
維表使用限制有:
僅支援讀取Redis資料存放區中STRING和HASHMAP類型的資料。
維表的欄位必須為STRING,且必須聲明且只能聲明一個主鍵。
維表JOIN時,ON條件必須包含主鍵的等值條件。
已知缺陷及解決方案
Realtime Compute引擎VVR 8.0.9版本緩衝功能存在問題,需要在結果表WITH參數中添加 'sink.buffer-flush.max-rows' = '0' 禁用。
文法結構
CREATE TABLE redis_table (
col1 STRING,
col2 STRING,
PRIMARY KEY (col1) NOT ENFORCED -- 必填。
) WITH (
'connector' = 'redis',
'host' = '<yourHost>',
'mode' = 'STRING' -- 結果表時必填。
);WITH參數
通用
參數 | 說明 | 資料類型 | 是否必填 | 預設值 | 備忘 |
connector | 表類型。 | String | 是 | 無 | 固定值為redis。 |
host | Redis Server串連地址。 | String | 是 | 無 | 推薦您使用內網地址。 說明 由於網路延遲和頻寬節流設定等因素,串連公網地址時可能會出現不穩定的情況。 |
port | Redis Server串連連接埠。 | Int | 否 | 6379 | 無。 |
password | Redis資料庫密碼。 | String | 否 | Null 字元串,表示不進行校正。 | 無。 |
dbNum | 選擇操作的資料庫編號。 | Int | 否 | 0 | 無。 |
clusterMode | Redis叢集是否為叢集模式。 | Boolean | 否 | false | 無。 |
hostAndPorts | Redis叢集的主機和連接埠號碼。 說明 如果啟用了叢集模式,且不需要串連高可用,可以通過host和port配置項只配置其中一台主機,也可以只配置該項。該配置項的優先順序高於獨立的host和port配置項。 | String | 否 | 空 | 如果 |
key-prefix | 表主索引值的首碼。 | String | 否 | 無 | 配置後,Redis維表和結果表的主鍵欄位值在查詢或者寫入Redis時會被自動添加首碼,該首碼是由鍵首碼(key-prefix)和其後的首碼分隔字元(key-prefix-delimiter)組成。 說明 僅Realtime Compute引擎VVR 8.0.7及以上版本支援該參數。 |
key-prefix-delimiter | 表主索引值與表主索引值首碼之間的分隔字元。 | String | 否 | 無 | |
connection.pool.max-total | 串連池可以分配的最大串連數。 | Int | 否 | 8 | 說明 僅Realtime Compute引擎VVR 8.0.9及以上版本支援該參數。 |
connection.pool.max-idle | 串連池中最大空閑串連數。 | Int | 否 | 8 | |
connection.pool.min-idle | 串連池中最小空閑串連數。 | Int | 否 | 0 | |
connect.timeout | 建立串連的逾時時間。 | Duration | 否 | 3000ms | |
socket.timeout | 從Redis伺服器接收資料的逾時時間(即通訊端逾時)。 | Duration | 否 | 3000ms | |
cacert.filepath | SSL/TLS認證檔案的完整路徑,檔案格式必須為jks。 | String | 否 | 無,表示不開啟SSL/TLS加密。 | 參考開啟TLS加密下載CA認證,並在作業的附加依賴檔案中上傳,上傳後,CA認證將儲存在/flink/usrlib目錄下。如何在附加依賴檔案中上傳檔案,詳情請參見部署作業。樣本: 說明 僅Realtime Compute引擎VVR 11.1及以上版本支援該參數。 |
結果表專屬
參數 | 說明 | 資料類型 | 是否必填 | 預設值 | 備忘 |
mode | 對應Redis的資料類型。 | String | 是 | 無 | 雲資料庫Tair結果表支援5種Redis資料類型,其DDL必須按指定格式定義且主鍵必須被定義。詳情請參見Redis結果表資料類型格式。 |
flattenHash | 是否按照多值模式寫入HASHMAP類型資料。 | Boolean | 否 | false | 參數取值如下:
說明
|
ignoreDelete | 是否忽略Retraction訊息。 | Boolean | 否 | false | 參數取值如下:
|
expiration | 為寫入資料對應的Key設定TTL。 | Long | 否 | 0,代表不設定TTL。 | 如果該參數的值大於0,則寫入資料對應的Key會被設定相應的TTL,單位為毫秒。 |
sink.buffer-flush.max-rows | 緩衝可儲存的最大記錄數。 | Int | 否 | 200 | 緩衝記錄包括所有追加、修改和刪除的事件,超過最大記錄數時將刷寫緩衝。 說明
|
sink.buffer-flush.interval | 緩衝刷寫時間間隔。 | Duration | 否 | 1000ms | 非同步刷寫緩衝。 說明
|
維表專屬
參數 | 說明 | 資料類型 | 是否必填 | 預設值 | 備忘 |
mode | 讀取Redis的資料類型。 | String | 否 | STRING | 參數取值如下: STRING:預設以STRING類型讀取。 HASHMAP:按照多值模式讀取HASHMAP類型資料。 此時DDL需要聲明多個非主鍵欄位。
說明 僅Realtime Compute引擎VVR 8.0.7及以上版本支援該參數。 如果您需要以單值模式讀取HASHMAP類型資料時,請配置hashName參數。 |
hashName | 單值模式讀取HASHMAP類型資料時使用的key。 | String | 否 | 無 | 如果您未指定mode參數,還希望以單值模式讀取HASHMAP類型資料。此時,您需要配置hashName。 此時DDL僅需要聲明兩個欄位,第一個主鍵欄位的欄位值對應field,第二個非主鍵欄位的欄位值對應value。 |
cache | 緩衝策略。 | String | 否 | None | 雲資料庫Tair維表支援以下緩衝策略: None(預設值):無緩衝。 LRU:緩衝維表裡的部分資料。源表的每條資料都會觸發系統先在Cache中尋找資料,如果沒有找到,則去物理維表中尋找。 ALL:緩衝維表裡的所有資料。在Job運行前,系統會將維表中所有資料載入到Cache中,之後所有的維表尋找資料都會通過Cache進行。如果在Cache中無法找到資料,則KEY不存在。全量的Cache有一個到期時間,到期後會重新載入一遍全量Cache。 重要
|
cacheSize | 緩衝大小。 | Long | 否 | 10000 | 當選擇LRU緩衝策略時,需要設定緩衝大小。 |
cacheTTLMs | 緩衝逾時時間長度,單位為毫秒。 | Long | 否 | 無 | cacheTTLMs配置和cache有關: 如果cache配置為None,則cacheTTLMs可以不配置,表示緩衝不逾時。 如果cache配置為LRU,則cacheTTLMs為緩衝逾時時間。預設不到期。 如果cache配置為ALL,則cacheTTLMs為緩衝載入時間。預設不重新載入。 |
cacheEmpty | 是否緩衝空結果。 | Boolean | 否 | true | 無。 |
cacheReloadTimeBlackList | 更新時間黑名單。在緩衝策略選擇為ALL時,啟用更新時間黑名單,防止在此時間內做Cache更新(例如雙11情境)。 | String | 否 | 無 | 格式為:
分隔字元的使用方式如下所示:
|
async | 是否非同步返回資料。 | Boolean | 否 | false |
|
Redis結果表資料類型格式
類型 | 格式 | Redis插入資料的命令 |
STRING類型 | DDL為兩列:
|
|
LIST類型 | DDL為兩列:
|
|
SET類型 | DDL為兩列:
|
|
HASHMAP類型 | 預設情況下,DDL為三列:
|
|
flattenHash參數配置為true時,DDL支援多列,以4列的情況為例:
|
| |
SORTEDSET類型 | DDL為三列:
|
|
類型映射
類型 | Redis欄位類型 | Flink欄位類型 |
通用 | STRING | STRING |
結果表專屬 | SCORE | DOUBLE |
因為Redis的SCORE類型應用於SORTEDSET(有序集合),所以需要手動為每個Value設定一個DOUBLE類型的SCORE,Value才能按照該SCORE從小到大進行排序。
使用樣本
結果表
寫入STRING類型資料:在程式碼範例中,
redis_sink結果表中user_id列的值會作為key,login_time列的值會作為value寫入到Redis中。CREATE TEMPORARY TABLE kafka_source ( user_id STRING, -- 使用者ID login_time STRING -- 登入時間戳記 ) WITH ( 'connector' = 'kafka', 'topic' = 'user_logins', -- Kafka主題 'properties.bootstrap.servers' = 'yourKafkaBroker', -- Kafka broker地址 'format' = 'json', -- 資料格式為JSON 'scan.startup.mode' = 'earliest-offset' -- 從最早的訊息開始消費 ); CREATE TEMPORARY TABLE redis_sink ( user_id STRING, -- Redis的key login_time STRING, -- Redis的value PRIMARY KEY (user_id) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'mode' = 'STRING', -- 使用STRING模式 'host' = 'yourHost', 'port' = 'yourPort', 'password' = 'yourPassword' ); INSERT INTO redis_sink SELECT * FROM kafka_source;多值模式寫入HASHMAP類型資料:在程式碼範例中,
redis_sink結果表中的order_id列的值會作為key,product_name列的值會作為field為product_name的value,quantity列的值會作為field為quantity的value,amount列的值會作為field為amount的value,寫入到Redis中。CREATE TEMPORARY TABLE kafka_source ( order_id STRING, -- 訂單 ID product_name STRING, -- 商品名稱 quantity STRING, -- 商品數量 amount STRING -- 訂單金額 ) WITH ( 'connector' = 'kafka', 'topic' = 'orders_topic', -- Kafka主題 'properties.bootstrap.servers' = 'yourKafkaBroker', -- Kafka broker地址 'format' = 'json', -- 資料格式為JSON 'scan.startup.mode' = 'earliest-offset' -- 從最早的訊息開始消費 ); CREATE TEMPORARY TABLE redis_sink ( order_id STRING, -- 訂單ID,作為Redis的key product_name STRING, -- 商品名稱,作為Redis Hash的field quantity STRING, -- 商品數量,作為Redis Hash的field amount STRING, -- 訂單金額,作為Redis Hash的field PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'mode' = 'HASHMAP', -- 使用HASHMAP模式 'flattenHash' = 'true', 'host' = 'yourHost', 'port' = 'yourPort', 'password' = 'yourPassword' ); INSERT INTO redis_sink SELECT * FROM kafka_source;單值模式寫入HASHMAP類型資料:在程式碼範例中,
redis_sink結果表中的order_id列的值會作為key,product_name列的值會作為field,quantity列的值會作為value寫入到Redis中。CREATE TEMPORARY TABLE kafka_source ( order_id STRING, -- 訂單ID product_name STRING, -- 商品名稱 quantity STRING -- 商品數量 ) WITH ( 'connector' = 'kafka', 'topic' = 'orders_topic', -- Kafka主題 'properties.bootstrap.servers' = 'yourKafkaBroker', -- Kafka broker地址 'format' = 'json', -- 資料格式為JSON 'scan.startup.mode' = 'earliest-offset' -- 從最早的訊息開始消費 ); CREATE TEMPORARY TABLE redis_sink ( order_id STRING, -- Redis的key product_name STRING, -- Redis的field quantity STRING, -- Redis的value PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'mode' = 'HASHMAP', 'host' = 'yourHost', 'port' = 'yourPort', 'password' = 'yourPassword' ); INSERT INTO redis_sink SELECT * FROM kafka_source;
維表
讀取STRING類型資料:在程式碼範例中,
redis_dim維表中的user_id列的值對應key,user_name列的值對應value。CREATE TEMPORARY TABLE kafka_source ( user_id STRING, -- 使用者ID proctime AS PROCTIME() -- 處理時間 ) WITH ( 'connector' = 'kafka', 'topic' = 'user_clicks', -- Kafka主題 'properties.bootstrap.servers' = 'yourKafkaBroker', -- Kafka broker地址 'format' = 'json', -- 資料格式為JSON 'scan.startup.mode' = 'earliest-offset' -- 從最早的訊息開始消費 ); CREATE TEMPORARY TABLE redis_dim ( user_id STRING, -- 使用者ID(Redis key) user_name STRING, -- 使用者名稱(Redis value) PRIMARY KEY (user_id) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'host' = 'yourHost', -- Redis主機地址 'port' = 'yourPort', -- Redis連接埠 'password' = 'yourPassword', -- Redis密碼 'mode' = 'STRING' -- 使用STRING模式 ); CREATE TEMPORARY TABLE blackhole_sink ( user_id STRING, -- 使用者ID redis_user_id STRING, -- Redis中的使用者ID user_name STRING -- 使用者名稱 ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT t1.user_id, -- 使用者ID(來自 Kafka) t2.user_id, -- Redis中的使用者ID t2.user_name -- 使用者名稱(來自Redis) FROM kafka_source AS t1 JOIN redis_dim FOR SYSTEM_TIME AS OF t1.proctime AS t2 ON t1.user_id = t2.user_id;多值模式讀取HASHMAP類型資料:在程式碼範例中,
redis_dim維表中的user_id列的值對應key,user_name列的值對應field為user_name的value,email列的值對應field為email的value,register_time列的值對應field為register_time的value。CREATE TEMPORARY TABLE kafka_source ( user_id STRING, -- 使用者ID click_time TIMESTAMP(3), -- 點擊時間 proctime AS PROCTIME() -- 處理時間 ) WITH ( 'connector' = 'kafka', 'topic' = 'user_clicks', -- Kafka主題 'properties.bootstrap.servers' = 'yourKafkaBroker', -- Kafka broker地址 'format' = 'json', -- 資料格式為JSON 'scan.startup.mode' = 'earliest-offset' ); CREATE TEMPORARY TABLE redis_dim ( user_id STRING, -- 使用者ID(Redis key) user_name STRING, -- 使用者名稱(Redis field-value對的一部分) email STRING, -- 郵箱(Redis field-value對的一部分) register_time STRING, -- 註冊時間(Redis field-value對的一部分) PRIMARY KEY (user_id) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'host' = 'yourHost', 'port' = 'yourPort', 'password' = 'yourPassword', 'mode' = 'HASHMAP' -- 使用HASHMAP模式 ); CREATE TEMPORARY TABLE blackhole_sink ( user_id STRING, -- 使用者ID user_name STRING, -- 使用者名稱 email STRING, -- 郵箱 register_time STRING, -- 註冊時間 click_time TIMESTAMP(3) -- 點擊時間 ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT t1.user_id, -- 使用者ID t2.user_name, -- 使用者名稱 t2.email, -- 郵箱 t2.register_time, -- 註冊時間 t1.click_time -- 點擊時間 FROM kafka_source AS t1 JOIN redis_dim FOR SYSTEM_TIME AS OF t1.proctime AS t2 ON t1.user_id = t2.user_id;單值模式讀取HASHMAP類型資料:在程式碼範例中,
hashName參數的值testKey為key,redis_dim維表中的user_id列的值對應field,user_name列的值對應value。CREATE TEMPORARY TABLE kafka_source ( user_id STRING, -- 使用者ID proctime AS PROCTIME() -- 處理時間 ) WITH ( 'connector' = 'kafka', 'topic' = 'user_clicks', -- Kafka主題 'properties.bootstrap.servers' = 'yourKafkaBroker', -- Kafka broker地址 'format' = 'json', -- 資料格式為JSON 'scan.startup.mode' = 'earliest-offset' -- 從最早的訊息開始消費 ); CREATE TEMPORARY TABLE redis_dim ( user_id STRING, -- 使用者ID(Redis hash field) user_name STRING, -- 使用者名稱(Redis hash value) PRIMARY KEY (user_id) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'host' = 'yourHost', -- Redis主機地址 'port' = 'yourPort', -- Redis連接埠 'password' = 'yourPassword',-- Redis密碼 'hashName' = 'testkey' -- 固定的Redis hash名稱 ); CREATE TEMPORARY TABLE blackhole_sink ( user_id STRING, -- 使用者ID redis_user_id STRING, -- Redis中的使用者ID user_name STRING -- 使用者名稱 ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT t1.user_id, -- 使用者ID(來自Kafka) t2.user_id, -- Redis中的使用者ID t2.user_name -- 使用者名稱(來自 Redis) FROM kafka_source AS t1 JOIN redis_dim FOR SYSTEM_TIME AS OF t1.proctime AS t2 ON t1.user_id = t2.user_id;