The Tair (Redis OSS-compatible) connector lets you read from Tair as a dimension table and write to Tair as a sink table in Flink SQL streaming jobs.
Connector overview
| Item | Details |
|---|---|
| Table types | Dimension table, sink table |
| Running mode | Streaming |
| Data format | STRING |
| Metrics | Dimension tables: none. Sink tables: numBytesOut, numRecordsOutPerSecond, numBytesOutPerSecond, currentSendTime. For details, see Monitoring metrics. |
| API type | SQL API |
| Data updates and deletes in sink tables | Supported |
Prerequisites
Before you begin, ensure that you have:
-
A Tair (Redis OSS-compatible) instance. See Step 1: Create an instance.
-
A whitelist configured for the instance. See Step 2: Configure whitelists.
Limitations
-
Delivery semantics: The connector supports only best-effort delivery. Exactly-once semantics are not supported. Make sure your write operations are idempotent.
-
Dimension table data types: Dimension tables can read only STRING and HASHMAP data. All fields must be of the STRING type.
-
Dimension table primary key: Each dimension table must have exactly one primary key. The ON clause in a dimension table JOIN must use equality conditions on the primary key.
Known issues
VVR 8.0.9 — Buffered Writer cache bug: A Buffered Writer cache issue exists in VVR 8.0.9. To work around this, set sink.buffer-flush.max-rows to 0 in the sink table's WITH clause.
Syntax
CREATE TABLE redis_table (
col1 STRING,
col2 STRING,
PRIMARY KEY (col1) NOT ENFORCED -- Required.
) WITH (
'connector' = 'redis',
'host' = '<yourHost>',
'mode' = 'STRING' -- Required for sink tables.
);
Connector options
General options
These options apply to both sink tables and dimension tables.
| Option | Data type | Required | Default | Description |
|---|---|---|---|---|
connector |
STRING | Yes | — | Set to redis. |
host |
STRING | Yes | — | IP address used to connect to the ApsaraDB for Redis database. Use the internal endpoint whenever possible. Internet connections may experience higher latency or bandwidth limitations. |
port |
INT | No | 6379 |
Port number. |
password |
STRING | No | (empty string) | Access password. |
dbNum |
INT | No | 0 |
Database sequence number. |
clusterMode |
BOOLEAN | No | false |
Whether the database is in cluster mode. |
hostAndPorts |
STRING | No | — | Host and port pairs in the format "host1:port1,host2:port2". Required when clusterMode is true and high availability (HA) is needed for Jedis connections to the self-managed Redis cluster. Takes precedence over host and port. If clusterMode is true but HA is not required, you can configure only host and port to specify a single node. |
key-prefix |
STRING | No | — | Prefix added to the primary key value when reading from a dimension table or writing to a sink table. A delimiter specified by key-prefix-delimiter separates the prefix and the primary key value. Requires VVR 8.0.7 or later. |
key-prefix-delimiter |
STRING | No | — | Delimiter between the key prefix and primary key value. |
connection.pool.max-total |
INT | No | 8 |
Maximum number of connections that can be allocated by the connection pool. Requires VVR 8.0.9 or later. |
connection.pool.max-idle |
INT | No | 8 |
Maximum number of idle connections in the connection pool. |
connection.pool.min-idle |
INT | No | 0 |
Minimum number of idle connections in the connection pool. |
connect.timeout |
DURATION | No | 3000ms |
Timeout for connection setup. |
socket.timeout |
DURATION | No | 3000ms |
Timeout for receiving data from the Redis server. |
cacert.filepath |
STRING | No | — | Full path to the SSL/TLS certificate. The file must be in JKS format. If not set, SSL/TLS encryption is disabled. To enable encryption, download the CA certificate and upload it as an additional dependency — it is stored under the /flink/usrlib directory. Example: 'cacert.filepath' = '/flink/usrlib/ca.jks'. Requires VVR 11.1 or later. |
Sink table options
| Option | Data type | Required | Default | Description |
|---|---|---|---|---|
mode |
STRING | Yes | — | Redis data structure for the sink table. Five structures are supported: STRING, LIST, SET, HASHMAP, and SORTEDSET. The DDL statement must match the chosen structure. See Data structures for sink tables. |
flattenHash |
BOOLEAN | No | false |
Whether to write HASHMAP data in multi-value mode. When true, declare multiple non-primary key fields: the primary key maps to the Redis key, each non-primary key field name maps to a Hash field, and each field value maps to the Hash value. When false (single-value mode), declare exactly three fields: the primary key maps to the key, the first non-primary key field maps to the Hash field, and the second maps to the Hash value. Takes effect only when mode is HASHMAP. Requires VVR 8.0.7 or later. |
ignoreDelete |
BOOLEAN | No | false |
Whether to ignore retraction messages. When true, retraction messages are dropped. When false, the key and its data are deleted when a retraction message is received. |
expiration |
LONG | No | 0 |
Time-to-live (TTL) for inserted keys, in milliseconds. 0 disables TTL. |
sink.buffer-flush.max-rows |
INT | No | 200 |
Maximum number of records (append, modify, and delete events) held in the buffer before it flushes. Requires VVR 8.0.9 or later for clusterMode = false; VVR 11.4.0 or later for clusterMode = true. |
sink.buffer-flush.interval |
DURATION | No | 1000ms |
Interval at which the buffer flushes asynchronously. Requires VVR 8.0.9 or later for clusterMode = false; VVR 11.4.0 or later for clusterMode = true. |
Dimension table options
| Option | Data type | Required | Default | Description |
|---|---|---|---|---|
mode |
STRING | No | STRING |
Data type to read from the dimension table. STRING reads STRING data. HASHMAP reads nested Hash data (Key → Map\<Field, Value\>): declare multiple non-primary key columns, where the primary key maps to the Redis key, each non-primary key column name maps to a Hash field, and its value maps to the field value. Requires VVR 8.0.7 or later. To read HASHMAP data in single-value mode, set hashName instead. |
hashName |
STRING | No | — | Fixed Hash key used when reading HASHMAP data in single-value mode. When set, declare two fields: the primary key maps to the Hash field, and the non-primary key maps to the Hash value. |
cache |
STRING | No | None |
Cache policy. None disables caching. LRU caches a subset of data — on cache miss, the connector queries the dimension table. ALL loads the full dimension table into cache before the deployment runs; all subsequent lookups use the cache, and the cache reloads after entries expire. See Usage notes for the cache option. |
cacheSize |
LONG | No | 10000 |
Maximum number of rows to cache. Required when cache is LRU. |
cacheTTLMs |
LONG | No | — | Cache timeout in milliseconds. For LRU, sets the per-entry expiration (no expiration by default). For ALL, sets the cache reload interval (no reload by default). Has no effect when cache is None. |
cacheEmpty |
BOOLEAN | No | true |
Whether to cache empty (no-match) results. |
cacheReloadTimeBlackList |
STRING | No | — | Time periods during which the ALL cache policy does not reload. Useful during high-traffic events. Format: use -> to separate start and end times, and , to separate multiple periods. Examples: single-day 2017-10-24 14:00 -> 2017-10-24 15:00; cross-day 2017-11-10 23:30 -> 2017-11-11 08:00; daily recurring 12:00 -> 14:00, 22:00 -> 2:00 (requires VVR 11.1 or later). |
async |
BOOLEAN | No | false |
Whether to enable asynchronous lookup. When true, results are returned out of order. |
Usage notes for the cache option
-
ALLrequires VVR 8.0.3 or later. -
For VVR 8.0.3 to VVR 11.1 (exclusive),
cache = ALLreads HASHMAP in single-value mode only. In the DDL WITH clause, sethashNameto the key name; declare Field as the primary key and Value as a non-primary key column. -
From VVR 11.1 onward,
cache = ALLsupports HASHMAP multi-value mode. Specify the Redis key as the primary key and declare multiple non-primary key columns for each Hash field. Setmode = HASHMAPin the WITH clause. -
The
cacheoption must be used together withcacheSizeandcacheTTLMs.
Data structures for sink tables
Each Redis data structure corresponds to a specific DDL schema and write command.
| Data structure | DDL schema | Write command |
|---|---|---|
| STRING | Two columns: key (STRING), value (STRING) | set key value |
| LIST | Two columns: key (STRING), value (STRING) | lpush key value |
| SET | Two columns: key (STRING), value (STRING) | sadd key value |
| HASHMAP (single-value mode, default) | Three columns: key (STRING), field (STRING), value (STRING) | hmset key field value |
HASHMAP (multi-value mode, flattenHash = true) |
Multiple columns: key (STRING), then one column per Hash field — each column name is the field name and its value is the field value | hmset key col1 value1 col2 value2 ... |
| SORTEDSET | Three columns: key (STRING), score (DOUBLE), value (STRING) | zadd key score value |
TheignoreDeleteoption controls how retraction messages are handled. When set totrue, delete operations are skipped.
Data type mappings
| Scope | Tair (ApsaraDB for Redis) type | Flink type |
|---|---|---|
| All table types | STRING | STRING |
| Sink tables only | SCORE | DOUBLE |
The SCORE type is used with SORTEDSET data. Each value in a sorted set requires a DOUBLE score, and values are sorted in ascending order of their scores.
Examples
Sink table examples
All sink table examples read from a Kafka source and write to a Tair sink.
Write STRING data
This example uses user_id as the Redis key and login_time as the Redis value.
CREATE TEMPORARY TABLE kafka_source (
user_id STRING, -- User ID
login_time STRING -- Login time (Unix timestamp)
) WITH (
'connector' = 'kafka',
'topic' = 'user_logins',
'properties.bootstrap.servers' = '<yourKafkaBroker>',
'format' = 'json',
'scan.startup.mode' = 'earliest-offset'
);
CREATE TEMPORARY TABLE redis_sink (
user_id STRING, -- Redis key
login_time STRING, -- Redis value
PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
'connector' = 'redis',
'mode' = 'STRING',
'host' = '<yourHost>',
'port' = '<yourPort>',
'password' = '<yourPassword>'
);
INSERT INTO redis_sink
SELECT * FROM kafka_source;
Write HASHMAP data in multi-value mode
This example uses order_id as the Redis key and writes product_name, quantity, and amount as separate Hash fields.
CREATE TEMPORARY TABLE kafka_source (
order_id STRING, -- Order ID
product_name STRING, -- Product name
quantity STRING, -- Product quantity
amount STRING -- Order amount
) WITH (
'connector' = 'kafka',
'topic' = 'orders_topic',
'properties.bootstrap.servers' = '<yourKafkaBroker>',
'format' = 'json',
'scan.startup.mode' = 'earliest-offset'
);
CREATE TEMPORARY TABLE redis_sink (
order_id STRING, -- Redis key
product_name STRING, -- Hash field: product_name
quantity STRING, -- Hash field: quantity
amount STRING, -- Hash field: amount
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'redis',
'mode' = 'HASHMAP',
'flattenHash' = 'true',
'host' = '<yourHost>',
'port' = '<yourPort>',
'password' = '<yourPassword>'
);
INSERT INTO redis_sink
SELECT * FROM kafka_source;
Write HASHMAP data in single-value mode
This example uses order_id as the Redis key, product_name as the Hash field, and quantity as the Hash value.
CREATE TEMPORARY TABLE kafka_source (
order_id STRING, -- Order ID
product_name STRING, -- Product name
quantity STRING -- Product quantity
) WITH (
'connector' = 'kafka',
'topic' = 'orders_topic',
'properties.bootstrap.servers' = '<yourKafkaBroker>',
'format' = 'json',
'scan.startup.mode' = 'earliest-offset'
);
CREATE TEMPORARY TABLE redis_sink (
order_id STRING, -- Redis key
product_name STRING, -- Redis Hash field
quantity STRING, -- Redis Hash value
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'redis',
'mode' = 'HASHMAP',
'host' = '<yourHost>',
'port' = '<yourPort>',
'password' = '<yourPassword>'
);
INSERT INTO redis_sink
SELECT * FROM kafka_source;
Dimension table examples
All dimension table examples look up user information from a Tair dimension table and join it with a Kafka stream.
Read STRING data
This example uses user_id as the Redis key and retrieves user_name as the Redis value.
CREATE TEMPORARY TABLE kafka_source (
user_id STRING,
proctime AS PROCTIME()
) WITH (
'connector' = 'kafka',
'topic' = 'user_clicks',
'properties.bootstrap.servers' = '<yourKafkaBroker>',
'format' = 'json',
'scan.startup.mode' = 'earliest-offset'
);
CREATE TEMPORARY TABLE redis_dim (
user_id STRING, -- Redis key
user_name STRING, -- Redis value
PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
'connector' = 'redis',
'host' = '<yourHost>',
'port' = '<yourPort>',
'password' = '<yourPassword>',
'mode' = 'STRING'
);
CREATE TEMPORARY TABLE blackhole_sink (
user_id STRING,
redis_user_id STRING,
user_name STRING
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink
SELECT
t1.user_id,
t2.user_id,
t2.user_name
FROM kafka_source AS t1
JOIN redis_dim FOR SYSTEM_TIME AS OF t1.proctime AS t2
ON t1.user_id = t2.user_id;
Read HASHMAP data in multi-value mode
This example uses user_id as the Redis key and retrieves multiple Hash fields — user_name, email, and register_time — in a single lookup.
CREATE TEMPORARY TABLE kafka_source (
user_id STRING,
click_time TIMESTAMP(3),
proctime AS PROCTIME()
) WITH (
'connector' = 'kafka',
'topic' = 'user_clicks',
'properties.bootstrap.servers' = '<yourKafkaBroker>',
'format' = 'json',
'scan.startup.mode' = 'earliest-offset'
);
CREATE TEMPORARY TABLE redis_dim (
user_id STRING, -- Redis key
user_name STRING, -- Hash field: user_name
email STRING, -- Hash field: email
register_time STRING, -- Hash field: register_time
PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
'connector' = 'redis',
'host' = '<yourHost>',
'port' = '<yourPort>',
'password' = '<yourPassword>',
'mode' = 'HASHMAP'
);
CREATE TEMPORARY TABLE blackhole_sink (
user_id STRING,
user_name STRING,
email STRING,
register_time STRING,
click_time TIMESTAMP(3)
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink
SELECT
t1.user_id,
t2.user_name,
t2.email,
t2.register_time,
t1.click_time
FROM kafka_source AS t1
JOIN redis_dim FOR SYSTEM_TIME AS OF t1.proctime AS t2
ON t1.user_id = t2.user_id;
Read HASHMAP data in single-value mode
This example uses a fixed Hash key (testkey) set via hashName. The user_id column maps to the Hash field, and user_name maps to the Hash value.
CREATE TEMPORARY TABLE kafka_source (
user_id STRING,
proctime AS PROCTIME()
) WITH (
'connector' = 'kafka',
'topic' = 'user_clicks',
'properties.bootstrap.servers' = '<yourKafkaBroker>',
'format' = 'json',
'scan.startup.mode' = 'earliest-offset'
);
CREATE TEMPORARY TABLE redis_dim (
user_id STRING, -- Hash field
user_name STRING, -- Hash value
PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
'connector' = 'redis',
'host' = '<yourHost>',
'port' = '<yourPort>',
'password' = '<yourPassword>',
'hashName' = 'testkey' -- Fixed Hash key
);
CREATE TEMPORARY TABLE blackhole_sink (
user_id STRING,
redis_user_id STRING,
user_name STRING
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink
SELECT
t1.user_id,
t2.user_id,
t2.user_name
FROM kafka_source AS t1
JOIN redis_dim FOR SYSTEM_TIME AS OF t1.proctime AS t2
ON t1.user_id = t2.user_id;