All Products
Search
Document Center

Realtime Compute for Apache Flink:Tair (Redis OSS-compatible) connector

Last Updated:Mar 26, 2026

The Tair (Redis OSS-compatible) connector lets you read from Tair as a dimension table and write to Tair as a sink table in Flink SQL streaming jobs.

Connector overview

Item Details
Table types Dimension table, sink table
Running mode Streaming
Data format STRING
Metrics Dimension tables: none. Sink tables: numBytesOut, numRecordsOutPerSecond, numBytesOutPerSecond, currentSendTime. For details, see Monitoring metrics.
API type SQL API
Data updates and deletes in sink tables Supported

Prerequisites

Before you begin, ensure that you have:

Limitations

  • Delivery semantics: The connector supports only best-effort delivery. Exactly-once semantics are not supported. Make sure your write operations are idempotent.

  • Dimension table data types: Dimension tables can read only STRING and HASHMAP data. All fields must be of the STRING type.

  • Dimension table primary key: Each dimension table must have exactly one primary key. The ON clause in a dimension table JOIN must use equality conditions on the primary key.

Known issues

VVR 8.0.9 — Buffered Writer cache bug: A Buffered Writer cache issue exists in VVR 8.0.9. To work around this, set sink.buffer-flush.max-rows to 0 in the sink table's WITH clause.

Syntax

CREATE TABLE redis_table (
  col1 STRING,
  col2 STRING,
  PRIMARY KEY (col1) NOT ENFORCED -- Required.
) WITH (
  'connector' = 'redis',
  'host' = '<yourHost>',
  'mode' = 'STRING' -- Required for sink tables.
);

Connector options

General options

These options apply to both sink tables and dimension tables.

Option Data type Required Default Description
connector STRING Yes Set to redis.
host STRING Yes IP address used to connect to the ApsaraDB for Redis database. Use the internal endpoint whenever possible. Internet connections may experience higher latency or bandwidth limitations.
port INT No 6379 Port number.
password STRING No (empty string) Access password.
dbNum INT No 0 Database sequence number.
clusterMode BOOLEAN No false Whether the database is in cluster mode.
hostAndPorts STRING No Host and port pairs in the format "host1:port1,host2:port2". Required when clusterMode is true and high availability (HA) is needed for Jedis connections to the self-managed Redis cluster. Takes precedence over host and port. If clusterMode is true but HA is not required, you can configure only host and port to specify a single node.
key-prefix STRING No Prefix added to the primary key value when reading from a dimension table or writing to a sink table. A delimiter specified by key-prefix-delimiter separates the prefix and the primary key value. Requires VVR 8.0.7 or later.
key-prefix-delimiter STRING No Delimiter between the key prefix and primary key value.
connection.pool.max-total INT No 8 Maximum number of connections that can be allocated by the connection pool. Requires VVR 8.0.9 or later.
connection.pool.max-idle INT No 8 Maximum number of idle connections in the connection pool.
connection.pool.min-idle INT No 0 Minimum number of idle connections in the connection pool.
connect.timeout DURATION No 3000ms Timeout for connection setup.
socket.timeout DURATION No 3000ms Timeout for receiving data from the Redis server.
cacert.filepath STRING No Full path to the SSL/TLS certificate. The file must be in JKS format. If not set, SSL/TLS encryption is disabled. To enable encryption, download the CA certificate and upload it as an additional dependency — it is stored under the /flink/usrlib directory. Example: 'cacert.filepath' = '/flink/usrlib/ca.jks'. Requires VVR 11.1 or later.

Sink table options

Option Data type Required Default Description
mode STRING Yes Redis data structure for the sink table. Five structures are supported: STRING, LIST, SET, HASHMAP, and SORTEDSET. The DDL statement must match the chosen structure. See Data structures for sink tables.
flattenHash BOOLEAN No false Whether to write HASHMAP data in multi-value mode. When true, declare multiple non-primary key fields: the primary key maps to the Redis key, each non-primary key field name maps to a Hash field, and each field value maps to the Hash value. When false (single-value mode), declare exactly three fields: the primary key maps to the key, the first non-primary key field maps to the Hash field, and the second maps to the Hash value. Takes effect only when mode is HASHMAP. Requires VVR 8.0.7 or later.
ignoreDelete BOOLEAN No false Whether to ignore retraction messages. When true, retraction messages are dropped. When false, the key and its data are deleted when a retraction message is received.
expiration LONG No 0 Time-to-live (TTL) for inserted keys, in milliseconds. 0 disables TTL.
sink.buffer-flush.max-rows INT No 200 Maximum number of records (append, modify, and delete events) held in the buffer before it flushes. Requires VVR 8.0.9 or later for clusterMode = false; VVR 11.4.0 or later for clusterMode = true.
sink.buffer-flush.interval DURATION No 1000ms Interval at which the buffer flushes asynchronously. Requires VVR 8.0.9 or later for clusterMode = false; VVR 11.4.0 or later for clusterMode = true.

Dimension table options

Option Data type Required Default Description
mode STRING No STRING Data type to read from the dimension table. STRING reads STRING data. HASHMAP reads nested Hash data (Key → Map\<Field, Value\>): declare multiple non-primary key columns, where the primary key maps to the Redis key, each non-primary key column name maps to a Hash field, and its value maps to the field value. Requires VVR 8.0.7 or later. To read HASHMAP data in single-value mode, set hashName instead.
hashName STRING No Fixed Hash key used when reading HASHMAP data in single-value mode. When set, declare two fields: the primary key maps to the Hash field, and the non-primary key maps to the Hash value.
cache STRING No None Cache policy. None disables caching. LRU caches a subset of data — on cache miss, the connector queries the dimension table. ALL loads the full dimension table into cache before the deployment runs; all subsequent lookups use the cache, and the cache reloads after entries expire. See Usage notes for the cache option.
cacheSize LONG No 10000 Maximum number of rows to cache. Required when cache is LRU.
cacheTTLMs LONG No Cache timeout in milliseconds. For LRU, sets the per-entry expiration (no expiration by default). For ALL, sets the cache reload interval (no reload by default). Has no effect when cache is None.
cacheEmpty BOOLEAN No true Whether to cache empty (no-match) results.
cacheReloadTimeBlackList STRING No Time periods during which the ALL cache policy does not reload. Useful during high-traffic events. Format: use -> to separate start and end times, and , to separate multiple periods. Examples: single-day 2017-10-24 14:00 -> 2017-10-24 15:00; cross-day 2017-11-10 23:30 -> 2017-11-11 08:00; daily recurring 12:00 -> 14:00, 22:00 -> 2:00 (requires VVR 11.1 or later).
async BOOLEAN No false Whether to enable asynchronous lookup. When true, results are returned out of order.

Usage notes for the cache option

  • ALL requires VVR 8.0.3 or later.

  • For VVR 8.0.3 to VVR 11.1 (exclusive), cache = ALL reads HASHMAP in single-value mode only. In the DDL WITH clause, set hashName to the key name; declare Field as the primary key and Value as a non-primary key column.

  • From VVR 11.1 onward, cache = ALL supports HASHMAP multi-value mode. Specify the Redis key as the primary key and declare multiple non-primary key columns for each Hash field. Set mode = HASHMAP in the WITH clause.

  • The cache option must be used together with cacheSize and cacheTTLMs.

Data structures for sink tables

Each Redis data structure corresponds to a specific DDL schema and write command.

Data structure DDL schema Write command
STRING Two columns: key (STRING), value (STRING) set key value
LIST Two columns: key (STRING), value (STRING) lpush key value
SET Two columns: key (STRING), value (STRING) sadd key value
HASHMAP (single-value mode, default) Three columns: key (STRING), field (STRING), value (STRING) hmset key field value
HASHMAP (multi-value mode, flattenHash = true) Multiple columns: key (STRING), then one column per Hash field — each column name is the field name and its value is the field value hmset key col1 value1 col2 value2 ...
SORTEDSET Three columns: key (STRING), score (DOUBLE), value (STRING) zadd key score value
The ignoreDelete option controls how retraction messages are handled. When set to true, delete operations are skipped.

Data type mappings

Scope Tair (ApsaraDB for Redis) type Flink type
All table types STRING STRING
Sink tables only SCORE DOUBLE
The SCORE type is used with SORTEDSET data. Each value in a sorted set requires a DOUBLE score, and values are sorted in ascending order of their scores.

Examples

Sink table examples

All sink table examples read from a Kafka source and write to a Tair sink.

Write STRING data

This example uses user_id as the Redis key and login_time as the Redis value.

CREATE TEMPORARY TABLE kafka_source (
  user_id    STRING,   -- User ID
  login_time STRING    -- Login time (Unix timestamp)
) WITH (
  'connector'                     = 'kafka',
  'topic'                         = 'user_logins',
  'properties.bootstrap.servers'  = '<yourKafkaBroker>',
  'format'                        = 'json',
  'scan.startup.mode'             = 'earliest-offset'
);

CREATE TEMPORARY TABLE redis_sink (
  user_id    STRING,   -- Redis key
  login_time STRING,   -- Redis value
  PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
  'connector' = 'redis',
  'mode'      = 'STRING',
  'host'      = '<yourHost>',
  'port'      = '<yourPort>',
  'password'  = '<yourPassword>'
);

INSERT INTO redis_sink
SELECT * FROM kafka_source;

Write HASHMAP data in multi-value mode

This example uses order_id as the Redis key and writes product_name, quantity, and amount as separate Hash fields.

CREATE TEMPORARY TABLE kafka_source (
  order_id     STRING,  -- Order ID
  product_name STRING,  -- Product name
  quantity     STRING,  -- Product quantity
  amount       STRING   -- Order amount
) WITH (
  'connector'                     = 'kafka',
  'topic'                         = 'orders_topic',
  'properties.bootstrap.servers'  = '<yourKafkaBroker>',
  'format'                        = 'json',
  'scan.startup.mode'             = 'earliest-offset'
);

CREATE TEMPORARY TABLE redis_sink (
  order_id     STRING,  -- Redis key
  product_name STRING,  -- Hash field: product_name
  quantity     STRING,  -- Hash field: quantity
  amount       STRING,  -- Hash field: amount
  PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
  'connector'   = 'redis',
  'mode'        = 'HASHMAP',
  'flattenHash' = 'true',
  'host'        = '<yourHost>',
  'port'        = '<yourPort>',
  'password'    = '<yourPassword>'
);

INSERT INTO redis_sink
SELECT * FROM kafka_source;

Write HASHMAP data in single-value mode

This example uses order_id as the Redis key, product_name as the Hash field, and quantity as the Hash value.

CREATE TEMPORARY TABLE kafka_source (
  order_id     STRING,  -- Order ID
  product_name STRING,  -- Product name
  quantity     STRING   -- Product quantity
) WITH (
  'connector'                     = 'kafka',
  'topic'                         = 'orders_topic',
  'properties.bootstrap.servers'  = '<yourKafkaBroker>',
  'format'                        = 'json',
  'scan.startup.mode'             = 'earliest-offset'
);

CREATE TEMPORARY TABLE redis_sink (
  order_id     STRING,  -- Redis key
  product_name STRING,  -- Redis Hash field
  quantity     STRING,  -- Redis Hash value
  PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
  'connector' = 'redis',
  'mode'      = 'HASHMAP',
  'host'      = '<yourHost>',
  'port'      = '<yourPort>',
  'password'  = '<yourPassword>'
);

INSERT INTO redis_sink
SELECT * FROM kafka_source;

Dimension table examples

All dimension table examples look up user information from a Tair dimension table and join it with a Kafka stream.

Read STRING data

This example uses user_id as the Redis key and retrieves user_name as the Redis value.

CREATE TEMPORARY TABLE kafka_source (
  user_id  STRING,
  proctime AS PROCTIME()
) WITH (
  'connector'                     = 'kafka',
  'topic'                         = 'user_clicks',
  'properties.bootstrap.servers'  = '<yourKafkaBroker>',
  'format'                        = 'json',
  'scan.startup.mode'             = 'earliest-offset'
);

CREATE TEMPORARY TABLE redis_dim (
  user_id   STRING,   -- Redis key
  user_name STRING,   -- Redis value
  PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
  'connector' = 'redis',
  'host'      = '<yourHost>',
  'port'      = '<yourPort>',
  'password'  = '<yourPassword>',
  'mode'      = 'STRING'
);

CREATE TEMPORARY TABLE blackhole_sink (
  user_id       STRING,
  redis_user_id STRING,
  user_name     STRING
) WITH (
  'connector' = 'blackhole'
);

INSERT INTO blackhole_sink
SELECT
  t1.user_id,
  t2.user_id,
  t2.user_name
FROM kafka_source AS t1
JOIN redis_dim FOR SYSTEM_TIME AS OF t1.proctime AS t2
ON t1.user_id = t2.user_id;

Read HASHMAP data in multi-value mode

This example uses user_id as the Redis key and retrieves multiple Hash fields — user_name, email, and register_time — in a single lookup.

CREATE TEMPORARY TABLE kafka_source (
  user_id    STRING,
  click_time TIMESTAMP(3),
  proctime   AS PROCTIME()
) WITH (
  'connector'                     = 'kafka',
  'topic'                         = 'user_clicks',
  'properties.bootstrap.servers'  = '<yourKafkaBroker>',
  'format'                        = 'json',
  'scan.startup.mode'             = 'earliest-offset'
);

CREATE TEMPORARY TABLE redis_dim (
  user_id       STRING,   -- Redis key
  user_name     STRING,   -- Hash field: user_name
  email         STRING,   -- Hash field: email
  register_time STRING,   -- Hash field: register_time
  PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
  'connector' = 'redis',
  'host'      = '<yourHost>',
  'port'      = '<yourPort>',
  'password'  = '<yourPassword>',
  'mode'      = 'HASHMAP'
);

CREATE TEMPORARY TABLE blackhole_sink (
  user_id       STRING,
  user_name     STRING,
  email         STRING,
  register_time STRING,
  click_time    TIMESTAMP(3)
) WITH (
  'connector' = 'blackhole'
);

INSERT INTO blackhole_sink
SELECT
  t1.user_id,
  t2.user_name,
  t2.email,
  t2.register_time,
  t1.click_time
FROM kafka_source AS t1
JOIN redis_dim FOR SYSTEM_TIME AS OF t1.proctime AS t2
ON t1.user_id = t2.user_id;

Read HASHMAP data in single-value mode

This example uses a fixed Hash key (testkey) set via hashName. The user_id column maps to the Hash field, and user_name maps to the Hash value.

CREATE TEMPORARY TABLE kafka_source (
  user_id  STRING,
  proctime AS PROCTIME()
) WITH (
  'connector'                     = 'kafka',
  'topic'                         = 'user_clicks',
  'properties.bootstrap.servers'  = '<yourKafkaBroker>',
  'format'                        = 'json',
  'scan.startup.mode'             = 'earliest-offset'
);

CREATE TEMPORARY TABLE redis_dim (
  user_id   STRING,   -- Hash field
  user_name STRING,   -- Hash value
  PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
  'connector' = 'redis',
  'host'      = '<yourHost>',
  'port'      = '<yourPort>',
  'password'  = '<yourPassword>',
  'hashName'  = 'testkey'   -- Fixed Hash key
);

CREATE TEMPORARY TABLE blackhole_sink (
  user_id       STRING,
  redis_user_id STRING,
  user_name     STRING
) WITH (
  'connector' = 'blackhole'
);

INSERT INTO blackhole_sink
SELECT
  t1.user_id,
  t2.user_id,
  t2.user_name
FROM kafka_source AS t1
JOIN redis_dim FOR SYSTEM_TIME AS OF t1.proctime AS t2
ON t1.user_id = t2.user_id;