本文为您介绍如何使用云数据库Redis连接器。
背景信息
阿里云数据库Redis是兼容开源Redis协议标准、提供内存加硬盘混合存储的数据库服务,基于高可靠双机热备架构及可平滑扩展的集群架构,充分满足高吞吐、低延迟及弹性变配的业务需求,更多内容详情请参见阿里云数据库Redis版。
Redis连接器支持的信息如下。
类别 | 详情 |
支持类型 | 维表和结果表 |
支持模式 | 流模式 |
数据格式 | String |
特有监控指标 |
说明 指标的含义及如何查看监控指标,请参见查看监。 |
API 种类 | SQL |
是否支持更新或删除结果表数据 | 是 |
前提条件
使用限制
仅Flink计算引擎VVR 2.0.0及以上版本支持云数据库Redis连接器。
Redis维表必须声明且只能声明一个主键。
Redis维表仅支持声明两个字段,且字段类型必须为STRING。
Redis维表仅支持读取Redis数据存储中STRING和HASHMAP类型的数据。
Redis维表JOIN时,ON条件必须包含所有主键的等值条件。
Redis维表仅支持None和LRU两种缓存策略。
语法结构
CREATE TABLE redis_table (
a STRING,
b STRING,
PRIMARY KEY (a) NOT ENFORCED -- 必填。
) WITH (
'connector' = 'redis',
'host' = '<yourHost>'
);
云数据库Redis版维表DDL必须满足如下要求:
Redis维表必须声明且只能声明一个主键。
Redis维表仅支持声明两个字段,且字段类型必须为STRING。
Redis维表仅支持读取Redis数据存储中STRING和HASHMAP类型的数据。
Redis维表JOIN时,ON条件必须包含所有主键的等值条件。
Redis维表仅支持None和LRU两种缓存策略。
WITH参数
通用
参数
说明
数据类型
是否必填
默认值
备注
connector
表类型。
String
是
无
固定值为redis。
host
Redis Server连接地址。
String
是
无
推荐您使用内网地址。
说明由于网络延迟和带宽限制等因素,连接公网地址时可能会出现不稳定的情况。
port
Redis Server连接端口。
Int
否
6379
无。
password
Redis数据库密码。
String
否
空字符串,表示不进行校验。
无。
dbNum
选择操作的数据库编号。
Int
否
0
无。
clusterMode
Redis集群是否为集群模式。
Boolean
否
false
无。
hostAndPorts
Redis集群的主机和端口号。
说明如果启用了集群模式,且不需要连接高可用,可以通过host和port配置项只配置其中一台主机,也可以只配置该项。该配置项的优先级高于独立的host和port配置项。
String
否
空
如果
ClusterMode = true
,同时需要支持Jedis到自建Redis集群连接的高可用,必须配置该项。配置格式为字符串:"host1:port1,host2:port2"
。结果表独有
参数
说明
数据类型
是否必填
默认值
备注
mode
对应Redis的数据结构。
String
是
无
参数取值如下:
string
list
set
hashmap
sortedset
ignoreDelete
是否忽略Retraction消息。
Boolean
否
false
参数取值如下:
true:收到Retraction消息时,忽略Retraction消息。
false:收到Retraction消息时,同时删除数据对应的key及已插入的数据。
expiration
为写入数据对应的Key设置TTL。
Long
否
0,代表不设置TTL。
如果该参数的值大于0,则写入数据对应的Key会被设置相应的TTL,单位为毫秒。
说明仅实时计算引擎VVR 4.0.13及以上版本支持该参数。
维表独有
参数
说明
数据类型
是否必填
默认值
备注
hashName
Hash模式下的Hash Key名称。
String
否
空字符串
通常,Redis维表中的数据类型为STRING类型,即key-value对。如果设置hashName参数,则Redis维表中的数据类型为HASHMAP类型,即key-{field-value}对,其中:
key为hashName参数值。
field为您在CREATE TABLE中指明的key参数值。
value为key对应的赋值,和STRING类型key-value中value语义相同。
cache
缓存策略。
String
否
None
云数据库Redis维表支持以下两种缓存策略:
None(默认值):无缓存。
LRU:缓存维表里的部分数据。源表的每条数据都会触发系统先在Cache中查找数据,如果没有找到,则去物理维表中查找。
说明需要配置相关参数:缓存大小(cacheSize)和缓存更新时间间隔(cacheTTLMs)。
cacheSize
缓存大小。
Long
否
10000
选择LRU缓存策略后,可以设置缓存大小。
cacheTTLMs
缓存超时时长,单位为毫秒。
Long
否
无
cacheTTLMs配置和cache有关:
如果cache配置为None,则cacheTTLMs可以不配置,表示缓存不超时。
如果cache配置为LRU,则cacheTTLMs为缓存超时时间。默认不过期。
cacheEmpty
是否缓存空结果。
Boolean
否
true
无。
类型映射
类型 | Redis字段类型 | Flink字段类型 |
通用 | STRING | VARCHAR |
结果表独有 | SCORE | DOUBLE |
因为Redis的SCORE类型应用于SORTEDSET(有序集合),所以需要手动为每个Value设置一个DOUBLE类型的SCORE,Value才能按照该SCORE从小到大进行排序。
云数据库Redis版结果表支持5种Redis数据结构,其DDL必须按指定格式定义且主键必须被定义。指定格式详情请参见Redis数据结构格式。
类型 | 格式 | Redis插入数据的命令 |
STRING类型 | DDL为两列:
|
|
LIST类型 | DDL为两列:
|
|
SET类型 | DDL为两列:
|
|
HASHMAP类型 | DDL为三列:
|
|
SORTEDSET类型 | DDL为三列:
|
|
使用示例
结果表
CREATE TEMPORARY TABLE datagen_source ( v STRING, p STRING ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE redis_sink ( a STRING, b STRING, PRIMARY KEY (a) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'mode' = 'string', 'host' = '<yourHost>', 'port' = '<yourPort>', 'password' = '<yourPassword>' ); INSERT INTO redis_sink SELECT v, p FROM datagen_source;
维表
CREATE TEMPORARY TABLE datagen_source ( id STRING, data STRING, proctime as PROCTIME() ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE redis_dim ( id STRING, name STRING, PRIMARY KEY (id) NOT ENFORCED --Redis中的Row Key字段。 ) WITH ( 'connector' = 'redis', 'host' = '<yourHost>', 'port' = '<yourPort>', 'password' = '<yourPassword>' ); CREATE TEMPORARY TABLE blackhole_sink ( id STRING, data STRING, name STRING ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT e.*, w.* FROM datagen_source AS e JOIN redis_dim FOR SYSTEM_TIME AS OF e.proctime AS w ON e.id = w.id;