全部产品
Search
文档中心

实时计算Flink版:云数据库Redis

更新时间:May 08, 2023

本文为您介绍如何使用云数据库Redis连接器。

背景信息

阿里云数据库Redis是兼容开源Redis协议标准、提供内存加硬盘混合存储的数据库服务,基于高可靠双机热备架构及可平滑扩展的集群架构,充分满足高吞吐、低延迟及弹性变配的业务需求,更多内容详情请参见阿里云数据库Redis版

Redis连接器支持的信息如下。

类别

详情

支持类型

维表和结果表

支持模式

流模式

数据格式

String

特有监控指标

  • 维表:无

  • 结果表:

    • numBytesOut

    • numRecordsOutPerSecond

    • numBytesOutPerSecond

    • currentSendTime

说明

指标的含义及如何查看监控指标,请参见查看监

API 种类

SQL

是否支持更新或删除结果表数据

前提条件

使用限制

  • 仅Flink计算引擎VVR 2.0.0及以上版本支持云数据库Redis连接器。

  • Redis维表必须声明且只能声明一个主键。

  • Redis维表仅支持声明两个字段,且字段类型必须为STRING。

  • Redis维表仅支持读取Redis数据存储中STRING和HASHMAP类型的数据。

  • Redis维表JOIN时,ON条件必须包含所有主键的等值条件。

  • Redis维表仅支持None和LRU两种缓存策略。

语法结构

CREATE TABLE redis_table (
  a STRING,
  b STRING,
  PRIMARY KEY (a) NOT ENFORCED -- 必填。
) WITH (
  'connector' = 'redis',
  'host' = '<yourHost>'
);

云数据库Redis版维表DDL必须满足如下要求:

  • Redis维表必须声明且只能声明一个主键。

  • Redis维表仅支持声明两个字段,且字段类型必须为STRING。

  • Redis维表仅支持读取Redis数据存储中STRING和HASHMAP类型的数据。

  • Redis维表JOIN时,ON条件必须包含所有主键的等值条件。

  • Redis维表仅支持None和LRU两种缓存策略。

WITH参数

  • 通用

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    connector

    表类型。

    String

    固定值为redis。

    host

    Redis Server连接地址。

    String

    推荐您使用内网地址。

    说明

    由于网络延迟和带宽限制等因素,连接公网地址时可能会出现不稳定的情况。

    port

    Redis Server连接端口。

    Int

    6379

    无。

    password

    Redis数据库密码。

    String

    空字符串,表示不进行校验。

    无。

    dbNum

    选择操作的数据库编号。

    Int

    0

    无。

    clusterMode

    Redis集群是否为集群模式。

    Boolean

    false

    无。

    hostAndPorts

    Redis集群的主机和端口号。

    说明

    如果启用了集群模式,且不需要连接高可用,可以通过host和port配置项只配置其中一台主机,也可以只配置该项。该配置项的优先级高于独立的host和port配置项。

    String

    如果ClusterMode = true,同时需要支持Jedis到自建Redis集群连接的高可用,必须配置该项。配置格式为字符串: "host1:port1,host2:port2"

  • 结果表独有

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    mode

    对应Redis的数据结构。

    String

    参数取值如下:

    • string

    • list

    • set

    • hashmap

    • sortedset

    ignoreDelete

    是否忽略Retraction消息。

    Boolean

    false

    参数取值如下:

    • true:收到Retraction消息时,忽略Retraction消息。

    • false:收到Retraction消息时,同时删除数据对应的key及已插入的数据。

    expiration

    为写入数据对应的Key设置TTL。

    Long

    0,代表不设置TTL。

    如果该参数的值大于0,则写入数据对应的Key会被设置相应的TTL,单位为毫秒。

    说明

    仅实时计算引擎VVR 4.0.13及以上版本支持该参数。

  • 维表独有

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    hashName

    Hash模式下的Hash Key名称。

    String

    空字符串

    通常,Redis维表中的数据类型为STRING类型,即key-value对。如果设置hashName参数,则Redis维表中的数据类型为HASHMAP类型,即key-{field-value}对,其中:

    • key为hashName参数值。

    • field为您在CREATE TABLE中指明的key参数值。

    • value为key对应的赋值,和STRING类型key-value中value语义相同。

    cache

    缓存策略。

    String

    None

    云数据库Redis维表支持以下两种缓存策略:

    • None(默认值):无缓存。

    • LRU:缓存维表里的部分数据。源表的每条数据都会触发系统先在Cache中查找数据,如果没有找到,则去物理维表中查找。

    说明

    需要配置相关参数:缓存大小(cacheSize)和缓存更新时间间隔(cacheTTLMs)。

    cacheSize

    缓存大小。

    Long

    10000

    选择LRU缓存策略后,可以设置缓存大小。

    cacheTTLMs

    缓存超时时长,单位为毫秒。

    Long

    cacheTTLMs配置和cache有关:

    • 如果cache配置为None,则cacheTTLMs可以不配置,表示缓存不超时。

    • 如果cache配置为LRU,则cacheTTLMs为缓存超时时间。默认不过期。

    cacheEmpty

    是否缓存空结果。

    Boolean

    true

    无。

类型映射

类型

Redis字段类型

Flink字段类型

通用

STRING

VARCHAR

结果表独有

SCORE

DOUBLE

说明
  • 因为Redis的SCORE类型应用于SORTEDSET(有序集合),所以需要手动为每个Value设置一个DOUBLE类型的SCORE,Value才能按照该SCORE从小到大进行排序。

  • 云数据库Redis版结果表支持5种Redis数据结构,其DDL必须按指定格式定义且主键必须被定义。指定格式详情请参见Redis数据结构格式

表 1. Redis数据结构格式

类型

格式

Redis插入数据的命令

STRING类型

DDL为两列:

  • 第1列为key,STRING类型。

  • 第2列为value,STRING类型。

set key value

LIST类型

DDL为两列:

  • 第1列为key,STRING类型。

  • 第2列为value,STRING类型。

lpush key value

SET类型

DDL为两列:

  • 第1列为key,STRING类型。

  • 第2列为value,STRING类型。

sadd key value

HASHMAP类型

DDL为三列:

  • 第1列为key,STRING类型。

  • 第2列为hash_key,STRING类型。

  • 第3列为hash_key对应的hash_value,STRING类型。

hmset key hash_key hash_value

SORTEDSET类型

DDL为三列:

  • 第1列为key,STRING类型。

  • 第2列为score,DOUBLE类型。

  • 第3列为value,STRING类型。

zadd key score value

使用示例

  • 结果表

    CREATE TEMPORARY TABLE datagen_source (
      v STRING,
      p STRING
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE redis_sink (
      a STRING,
      b STRING,
      PRIMARY KEY (a) NOT ENFORCED
    ) WITH (
      'connector' = 'redis',
      'mode' = 'string',
      'host' = '<yourHost>',
      'port' = '<yourPort>',
      'password' = '<yourPassword>'
    );
    
    INSERT INTO redis_sink
    SELECT v, p
    FROM datagen_source;
  • 维表

    CREATE TEMPORARY TABLE datagen_source (
      id STRING,
      data STRING,
      proctime as PROCTIME()
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE redis_dim (
      id STRING,
      name STRING,
      PRIMARY KEY (id) NOT ENFORCED --Redis中的Row Key字段。
    ) WITH (
      'connector' = 'redis',
      'host' = '<yourHost>',
      'port' = '<yourPort>',
      'password' = '<yourPassword>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink (
      id STRING,
      data STRING,
      name STRING
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink
    SELECT e.*, w.*
    FROM datagen_source AS e
    JOIN redis_dim FOR SYSTEM_TIME AS OF e.proctime AS w
    ON e.id = w.id;