All Products
Search
Document Center

Realtime Compute for Apache Flink:ApsaraDB for Redis connector

Last Updated:Jan 03, 2024

This topic describes how to use the ApsaraDB for Redis connector.

Background information

ApsaraDB for Redis is a database service that is compatible with the protocols of the open source Redis system. It supports a hybrid of memory and hard disks for storage. ApsaraDB for Redis provides a hot standby architecture to ensure high availability, and uses the scalable cluster architecture to meet business requirements for high throughputs, low-latency operations, and flexible configuration modifications. For more information, see What is ApsaraDB for Redis?

The following table describes the capabilities supported by the ApsaraDB for Redis connector.

Item

Description

Table type

Dimension table and result table

Running mode

Streaming mode

Data format

STRING

Metric

  • Metrics for dimension tables: none

  • Metrics for result tables:

    • numBytesOut

    • numRecordsOutPerSecond

    • numBytesOutPerSecond

    • currentSendTime

Note

For more information about the metrics and how to view the metrics, see Report metrics of fully managed Flink to other platforms.

API type

SQL API

Data update or deletion in a result table

Supported

Prerequisites

Limits

  • Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 2.0.0 or later supports the ApsaraDB for Redis connector.

  • The ApsaraDB for Redis connector supports only the best-effort delivery and does not support the exactly-once semantics. You must ensure semantic idempotence.

  • You must declare only one primary key for an ApsaraDB for Redis dimension table.

  • You can declare only two fields for an ApsaraDB for Redis dimension table. The data type of the fields must be STRING.

  • ApsaraDB for Redis dimension tables can read only the data of the STRING and HASHMAP type from ApsaraDB for Redis databases.

  • When you join an ApsaraDB for Redis dimension table with another table, the ON clause must contain the equivalent (=) conditions for all the primary key fields.

  • You must set the cache parameter to None or LRU for an ApsaraDB for Redis dimension table.

Syntax

CREATE TABLE redis_table (
  a STRING,
  b STRING,
  PRIMARY KEY (a) NOT ENFORCED -- Required. 
) WITH (
  'connector' = 'redis',
  'host' = '<yourHost>'
);

The DDL statement of an ApsaraDB for Redis dimension table must meet the following requirements:

  • You must declare only one primary key for an ApsaraDB for Redis dimension table.

  • You can declare only two fields for an ApsaraDB for Redis dimension table. The data type of the fields must be STRING.

  • ApsaraDB for Redis dimension tables can read only the data of the STRING and HASHMAP type from ApsaraDB for Redis databases.

  • When you join an ApsaraDB for Redis dimension table with another table, the ON clause must contain the equivalent (=) conditions for all the primary key fields.

  • You must set the cache parameter to None or LRU for an ApsaraDB for Redis dimension table.

Parameters in the WITH clause

  • Common parameters

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    connector

    The type of the table.

    STRING

    Yes

    No default value

    Set the value to redis.

    host

    The IP address that is used to access the ApsaraDB for Redis database.

    STRING

    Yes

    No default value

    We recommend that you use an internal endpoint.

    Note

    When you access the ApsaraDB for Redis database over the Internet, the network may be unstable due to issues, such as network latency and limits on bandwidth.

    port

    The port number that is used to access the ApsaraDB for Redis database.

    INT

    No

    6379

    N/A.

    password

    The password that is used to access the ApsaraDB for Redis database.

    STRING

    No

    An empty string

    The default value is an empty string, which indicates that permission verification is not required.

    dbNum

    The sequence number of the ApsaraDB for Redis database.

    INT

    No

    0

    N/A.

    clusterMode

    Specifies whether the ApsaraDB for Redis database is in cluster mode.

    BOOLEAN

    No

    false

    N/A.

    hostAndPorts

    The host and port number of the ApsaraDB for Redis database.

    Note

    If the ClusterMode parameter is set to true and connections in high availability (HA) mode are not required, you can configure the host and port parameters to specify only one of the hosts. You can also configure only this parameter. The priority of this parameter is higher than the priority of the host and port parameters.

    STRING

    No

    No default value

    If the ClusterMode parameter is set to true and HA is required for the connection from Jedis to the self-managed Redis database in cluster mode, you must configure this parameter. The value of this parameter is a string that is in the "host1:port1,host2:port2" format.

  • Parameters only for result tables

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    mode

    The data type of the data that is stored in the ApsaraDB for Redis result table.

    STRING

    Yes

    No default value

    ApsaraDB for Redis result tables supports five data structures. When you execute a DDL statement to create an ApsaraDB for Redis result table, make sure that the DDL statement complies with the specified data structure and a primary key is specified for the ApsaraDB for Redis result table. For more information, see Data structures supported by ApsaraDB for Redis.

    ignoreDelete

    Specifies whether to ignore retraction messages.

    BOOLEAN

    No

    false

    Valid values:

    • true: A retraction message is ignored when the retraction message is received.

    • false: The inserted data and the key of the data are deleted when a retraction message is received.

    expiration

    Specifies whether to configure the time-to-live (TTL) for the key of the inserted data.

    LONG

    No

    0

    The default value 0 indicates that the TTL is not configured for the key of the inserted data. If the value of this parameter is greater than 0, the TTL is configured for the key of the inserted data. Unit: milliseconds.

    Note

    Only Realtime Compute for Apache Flink that uses VVR 4.0.13 or later supports this parameter.

  • Parameters only for dimension tables

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    hashName

    The hash key name in hash mode.

    STRING

    No

    An empty string

    In most cases, the data stored in the ApsaraDB for Redis dimension table is of the STRING data type. The data is expressed in the format of key-value pairs. If you configure the hashName parameter, the ApsaraDB for Redis dimension table stores data of the HASHMAP type in the format of hash key-{field-value}.

    • hash key: the value of the hashName parameter.

    • field: the value of the key parameter that you specify in the CREATE TABLE statement.

    • value: the value that is assigned to key. The value field is equivalent to the value field in a key-value pair of the STRING data type.

    cache

    The cache policy.

    STRING

    No

    None

    Valid values:

    • None: Data is not cached. This is the default value.

    • LRU: Only specific data in the dimension table is cached. Each time the system receives a data record, the system searches the cache. If the system does not find the record in the cache, the system searches for the data record in the physical dimension table.

    • ALL: All data in the dimension table is cached. Before a deployment runs, the system loads all data in the dimension table to the cache. This way, the cache is searched for all subsequent queries in the dimension table. If the data that meets the requirement cannot be found in the cache, the key does not exist. The system reloads all data in the cache after cache entries expire.

    Important
    • Only Realtime Compute for Apache Flink that uses VVR 8.0.3 or later supports the ALL cache policy.

    • If you set the cache parameter to ALL, the hash structure is supported. The hash structure indicates that the hashName parameter is not empty.

    • If this cache policy is used, you must configure the cacheSize and cacheTTLMs parameters.

    cacheSize

    The maximum number of rows of data that can be cached.

    LONG

    No

    10000

    If you set the cache parameter to LRU, you can configure this parameter.

    cacheTTLMs

    The cache timeout period. Unit: milliseconds.

    LONG

    No

    No default value

    The configuration of the cacheTTLMs parameter varies based on the value of the cache parameter.

    • If you set the cache parameter to None, the cacheTTLMs parameter can be left empty. This indicates that cache entries do not expire.

    • If you set the cache parameter to LRU, the cacheTTLMs parameter specifies the timeout period of the cache. By default, cache entries do not expire.

    • If you set the cache parameter to ALL, the cacheTTLMs parameter specifies the interval at which the system reloads the cache. By default, the cache is not reloaded.

    cacheEmpty

    Specifies whether to cache empty results.

    BOOLEAN

    No

    true

    N/A.

    cacheReloadTimeBlackList

    The time periods during which cache is not refreshed. This parameter takes effect when the cache parameter is set to ALL. The cache is not refreshed during the periods of time that you specify for this parameter. This parameter is suitable for large-scale online promotional events such as Double 11.

    STRING

    No

    No default value

    The following example shows the format of the values: 2017-10-24 14:00 -> 2017-10-24 15:00, 2017-11-10 23:30 -> 2017-11-11 08:00. Use delimiters based on the following rules:

    • Separate multiple time periods with commas (,).

    • Separate the start time and end time of each time period with an arrow (->) that is a combination of a hyphen (-) and a closing angle bracket (>).

Data structures supported by ApsaraDB for Redis

Data type

Data structure

Command used to insert data into an ApsaraDB for Redis result table

STRING

A DDL statement has two columns.

  • The first column lists keys of the STRING type.

  • The second column lists values of the STRING type.

set key value

LIST

A DDL statement has two columns.

  • The first column lists keys of the STRING type.

  • The second column lists values of the STRING type.

lpush key value

SET

A DDL statement has two columns.

  • The first column lists keys of the STRING type.

  • The second column lists values of the STRING type.

sadd key value

HASHMAP

A DDL statement has three columns.

  • The first column lists keys of the STRING type.

  • The second column lists hash keys of the STRING type.

  • The third column lists hash values of the STRING type.

hmset key hash_key hash_value

SORTEDSET

A DDL statement has three columns.

  • The first column lists keys of the STRING type.

  • The second column lists scores of the DOUBLE type.

  • The third column lists values of the STRING type.

zadd key score value

Data type mappings

Category

Data type of ApsaraDB for Redis

Data type of Flink

Data type mappings for all types of tables

STRING

VARCHAR

Data type mappings for only result tables

SCORE

DOUBLE

Note

The data of the SCORE type is added to the values of the SORTEDSET data type in ApsaraDB for Redis databases. You must manually set a score of the DOUBLE type for each sorted value and sort the values based on their scores in ascending order.

Sample code

  • Sample code for a result table

    CREATE TEMPORARY TABLE datagen_source (
      v STRING,
      p STRING
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE redis_sink (
      a STRING,
      b STRING,
      PRIMARY KEY (a) NOT ENFORCED
    ) WITH (
      'connector' = 'redis',
      'mode' = 'STRING',
      'host' = '<yourHost>',
      'port' = '<yourPort>',
      'password' = '<yourPassword>'
    );
    
    INSERT INTO redis_sink
    SELECT v, p
    FROM datagen_source;
  • Sample code for a dimension table

    CREATE TEMPORARY TABLE datagen_source (
      id STRING,
      data STRING,
      proctime as PROCTIME() -- Generate a proctime column based on a calculated column. 
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE redis_dim (
      id STRING,
      name STRING,
      PRIMARY KEY (id) NOT ENFORCED -- The row key type in the ApsaraDB for Redis database. 
    ) WITH (
      'connector' = 'redis',
      'host' = '<yourHost>',
      'port' = '<yourPort>',
      'password' = '<yourPassword>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink (
      id STRING,
      data STRING,
      name STRING
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink
    SELECT e.*, w.*
    FROM datagen_source AS e
    JOIN redis_dim FOR SYSTEM_TIME AS OF e.proctime AS w
    ON e.id = w.id;