The Tair (Enterprise Edition) connector writes Flink streaming data into a Tair (Enterprise Edition) instance. Tair is a Redis-compatible database that supports hybrid memory and disk storage, hot standby for high availability, and a scalable cluster architecture for high-throughput, low-latency workloads.
Supported features
| Category | Details |
|---|---|
| Supported types | Sink table |
| Running mode | Stream mode |
| Data format | STRING |
| API type | SQL |
| Data update or deletion in a sink table | Yes |
| Unique metrics | numBytesSend, numBytesSendPerSecond, numRecordsSend, numRecordsSendPerSecond, numRecordSendErrors, currentSendTime |
For details on metrics, see Metrics.
Prerequisites
Before you begin, ensure that you have:
-
A Tair (Enterprise Edition) instance. See Step 1: Create an instance
-
An IP address whitelist configured for the instance. See Step 2: Configure whitelists
Limitations
-
Realtime Compute for Apache Flink requires Ververica Runtime (VVR) 6.0.6 or later to use the Tair (Enterprise Edition) connector.
-
TairTs, TairCpc, TairRoaring, TairVector, and TairGis require VVR 8.0.1 or later.
-
The connector does not support configuring multiple hosts.
Syntax
The following DDL statement creates a Tair sink table. PRIMARY KEY is required.
CREATE TABLE tair_table (
a STRING,
b STRING,
PRIMARY KEY (a) NOT ENFORCED
) WITH (
'connector' = 'tair',
'host' = '<yourHost>',
'mode' = '<dataStructure>'
);
Tair supports all Redis data structures (STRING, LIST, SET, HASHMAP, SORTEDSET) and its own proprietary data structures. For syntax examples for Redis-compatible data structures, see Tair (Redis OSS-compatible) connector.
Connector options
Required parameters
| Parameter | Type | Description |
|---|---|---|
connector |
String | Set to tair. |
host |
String | Endpoint of the Tair server. Use an internal endpoint to avoid the latency and bandwidth limitations of public network connections. |
mode |
STRING | Data structure to write to. See Supported data structures for valid values. |
Connection parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
port |
INT | 6379 |
Port number of the Tair server. |
password |
String | Empty string | Password for the Tair database. Leave blank to disable password verification. |
dbNum |
INT | 0 |
ID of the destination database. |
clusterMode |
BOOLEAN | false |
Set to true to use cluster architecture. Set to false for standalone mode. |
Write behavior parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
ignoreDelete |
BOOLEAN | false |
Controls behavior when a retraction message is received. false: delete the inserted data and its key. true: retain the inserted data and its key. |
incrMode |
STRING | None |
Sink write mode. None: insert operation. int: INCRBY with a fixed increment defined by incrValue. float: INCRBYFLOAT with a fixed increment defined by incrValue. dynamic_int: INCRBY with the increment read from the column named by incrValue. dynamic_float: INCRBYFLOAT with the increment read from the column named by incrValue. |
incrValue |
STRING | None | When incrMode is int or float, this is the fixed increment value. When incrMode is dynamic_int or dynamic_float, this is the DDL column name that contains the increment value. Not used when incrMode is None. |
Key expiration parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
expiration |
LONG | 0 |
Relative time-to-live (TTL) for inserted keys, in milliseconds. 0 disables TTL. |
expirationAt |
LONG | 0 |
Absolute expiration timestamp for inserted keys, in milliseconds. Takes effect only when expiration is set to 0. 0 disables absolute expiration. |
Field expiration parameters (TairHash and TairTs only)
| Parameter | Type | Default | Description |
|---|---|---|---|
fieldExpireMode |
String | None |
Expiration mode for fields in TairHash, or skeys in TairTs. None: no expiration. millisecond: relative expiration set to the value of fieldExpireValue. unixtime: absolute expiration set to the value of fieldExpireValue. dynamic_millisecond: relative expiration read from the column named by fieldExpireValue. dynamic_unixtime: absolute expiration read from the column named by fieldExpireValue. Important
TairTs skeys must use |
fieldExpireValue |
String | None | When fieldExpireMode is millisecond or unixtime, this is the fixed expiration value. When fieldExpireMode is dynamic_millisecond or dynamic_unixtime, this is the DDL column name that contains the expiration value. |
Data type mappings
| Flink type | Tair type |
|---|---|
| VARCHAR | STRING |
| DOUBLE | DOUBLE |
Supported data structures
The mode parameter accepts the following values. Each data structure has specific DDL column requirements that vary by incrMode.
Redis-compatible data structures
For DDL formats of STRING, LIST, SET, HASHMAP, and SORTEDSET, see Tair (Redis OSS-compatible) connector.
Tair proprietary data structures
| Data structure | incrMode |
DDL columns | Write command |
|---|---|---|---|
| TairString | None |
2: key (STRING), value (STRING) | exset key value |
| TairString | int or float |
1: key (STRING) | exincrby/exincrbyfloat key incrValue |
| TairString | dynamic_int or dynamic_float |
2: key (STRING), incrValue (STRING) | exincrby/exincrbyfloat key incrValue |
| TairHash | None |
3: key (STRING), field (STRING), value (STRING) | exhset key field value |
| TairHash | int or float |
2: key (STRING), field (STRING) | exhincrby/exincrbyfloat key field incrValue |
| TairHash | dynamic_int or dynamic_float |
3: key (STRING), field (STRING), incrValue (STRING) | exhincrby/exincrbyfloat key field incrValue |
| TairZset | None |
3–258: key (STRING), member (STRING), score×N (DOUBLE, up to 256 dimensions) | exzadd key score member |
| TairZset | int or float |
2: key (STRING), member (STRING) | exzincyby key member incrValue |
| TairZset | dynamic_int or dynamic_float |
3: key (STRING), member (STRING), incrValue (STRING) | exzincyby key member incrValue |
| TairBloom | Must be None |
2: key (STRING), item (STRING) | BF.ADD key item |
| TairDoc | Must be None |
3: key (STRING), path (STRING), json (STRING) | JSON.SET key path json |
| TairSearch | None |
4: index (STRING), doc_id (STRING), document (STRING, JSON), mapping (STRING) | TFT.ADDDOC index document docid |
| TairSearch | int or float |
4: index (STRING), doc_id (STRING), field (STRING), mapping (STRING) | TFT.INCRLONGDOCFIELD/TFT.INCRFLOATDOCFIELD index doc_id field increment |
| TairSearch | dynamic_int or dynamic_float |
5: index (STRING), doc_id (STRING), field (STRING), mapping (STRING), incrValue (STRING) | TFT.INCRLONGDOCFIELD/TFT.INCRFLOATDOCFIELD index doc_id field increment |
| TairCpc | Must be None |
2: key (STRING), item (STRING) | CPC.UPDATE key item |
| TairGis | Must be None |
3: key (STRING), polygon_name (STRING), polygon_wkt (STRING) | GIS.ADD area polygonName polygonWkt |
| TairRoaring | Must be None |
3: key (STRING), offset (BIGINT), value (BIGINT, 0 or 1) | TR.SETBIT key offset value |
| TairVector | Must be None |
6: index_name (STRING), pk (STRING), vector_data (STRING), dims (INT), algorithm (STRING), distance_method (STRING) | TVS.HSET index_name key VECTOR vector_data |
| TairTs | None |
4: pkey (STRING, timeline group), skey (STRING, single timeline), timestamp (STRING), value (STRING) | EXTS.S.RAW_MODIFY Pkey Skey timestamp value |
| TairTs | float |
3: pkey (STRING), skey (STRING), timestamp (STRING) | EXTS.S.RAW_INCRBY Pkey Skey timestamp incrValue |
| TairTs | dynamic_float |
4: pkey (STRING), skey (STRING), timestamp (STRING), incrValue (STRING) | EXTS.S.RAW_INCRBY Pkey Skey timestamp incrValue |
TairSearch and TairVector require an index and mapping to be created before data is inserted. For TairSearch, runTFT.CREATEINDEX index mappings. For TairVector, runTVS.CREATEINDEX index_name dims algorithm distance_method. TairBloom creates a key with a default capacity of 100 elements and an error rate of 0.01 on first insert. For multi-dimensional TairZset sorting, all score dimensions must use the same format.
Examples
Write data to a TairSearch sink table
This example uses index_name as the TairSearch index, doc_id as the document ID, doc as the JSON document body, and mapping as the index mapping definition.
CREATE TEMPORARY TABLE datagen_stream (
v STRING, -- document content
p STRING -- mapping definition
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE tair_output (
index_name STRING, -- TairSearch index name
doc_id STRING, -- document ID
doc STRING, -- document body (JSON)
mapping STRING, -- index mapping
PRIMARY KEY (index_name) NOT ENFORCED
) WITH (
'connector' = 'tair',
'mode' = 'tairsearch',
'host' = '${tairHost}',
'port' = '${tairPort}',
'password' = '${password}'
);
INSERT INTO tair_output
SELECT
'index' AS index_name,
v AS doc_id,
p AS doc,
'{"mappings":{"_source":{"enabled":true},"properties":{"product_id":{"type":"keyword","ignore_above":128},"product_name":{"type":"text"}}}}' AS mapping
FROM datagen_stream;
Write data using dynamic increment (TairString with incrMode=dynamic_float)
This example uses key as the TairString key and step as the per-record increment value, read from the step column.
CREATE TEMPORARY TABLE datagen_stream (
v STRING, -- key
p STRING -- increment value per record
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE tair_output (
key STRING, -- TairString key
step STRING, -- increment value (column referenced by incrValue)
PRIMARY KEY (key) NOT ENFORCED
) WITH (
'connector' = 'tair',
'mode' = 'tairstring',
'host' = '${tairHost}',
'port' = '${tairPort}',
'password' = '${password}',
'incrMode' = 'dynamic_float',
'incrValue' = 'step'
);
INSERT INTO tair_output
SELECT *
FROM datagen_stream;
Write data using a fixed increment (TairString with incrMode=float)
This example uses a fixed increment of 11.11 applied to each key on every write.
CREATE TEMPORARY TABLE datagen_stream (
v STRING,
p STRING
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE tair_output (
key STRING, -- TairString key
PRIMARY KEY (key) NOT ENFORCED
) WITH (
'connector' = 'tair',
'mode' = 'tairstring',
'host' = '${tairHost}',
'port' = '${tairPort}',
'password' = '${password}',
'incrMode' = 'float',
'incrValue' = '11.11'
);
INSERT INTO tair_output
SELECT v
FROM datagen_stream;
Write data with field-level TTL (TairHash with fieldExpireMode=millisecond)
This example writes to a TairHash sink table where each field expires 1000 milliseconds after it is written.
CREATE TEMPORARY TABLE datagen_stream (
v STRING,
p STRING,
s STRING
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE tair_output (
key STRING, -- TairHash key
field STRING, -- hash field
value STRING, -- field value
PRIMARY KEY (key) NOT ENFORCED
) WITH (
'connector' = 'tair',
'mode' = 'tairhash',
'host' = '${tairHost}',
'port' = '${tairPort}',
'password' = '${password}',
'fieldExpireMode' = 'millisecond',
'fieldExpireValue' = '1000'
);
INSERT INTO tair_output
SELECT v, p, s
FROM datagen_stream;
What's next
-
Tair (Redis OSS-compatible) connector — syntax examples for Redis-compatible data structures
-
Metrics — full list of connector metrics and their definitions