All Products
Search
Document Center

Realtime Compute for Apache Flink:Tair (Enterprise)

Last Updated:Mar 26, 2026

The Tair (Enterprise Edition) connector writes Flink streaming data into a Tair (Enterprise Edition) instance. Tair is a Redis-compatible database that supports hybrid memory and disk storage, hot standby for high availability, and a scalable cluster architecture for high-throughput, low-latency workloads.

Supported features

Category Details
Supported types Sink table
Running mode Stream mode
Data format STRING
API type SQL
Data update or deletion in a sink table Yes
Unique metrics numBytesSend, numBytesSendPerSecond, numRecordsSend, numRecordsSendPerSecond, numRecordSendErrors, currentSendTime

For details on metrics, see Metrics.

Prerequisites

Before you begin, ensure that you have:

Limitations

  • Realtime Compute for Apache Flink requires Ververica Runtime (VVR) 6.0.6 or later to use the Tair (Enterprise Edition) connector.

  • TairTs, TairCpc, TairRoaring, TairVector, and TairGis require VVR 8.0.1 or later.

  • The connector does not support configuring multiple hosts.

Syntax

The following DDL statement creates a Tair sink table. PRIMARY KEY is required.

CREATE TABLE tair_table (
  a STRING,
  b STRING,
  PRIMARY KEY (a) NOT ENFORCED
) WITH (
  'connector' = 'tair',
  'host'      = '<yourHost>',
  'mode'      = '<dataStructure>'
);

Tair supports all Redis data structures (STRING, LIST, SET, HASHMAP, SORTEDSET) and its own proprietary data structures. For syntax examples for Redis-compatible data structures, see Tair (Redis OSS-compatible) connector.

Connector options

Required parameters

Parameter Type Description
connector String Set to tair.
host String Endpoint of the Tair server. Use an internal endpoint to avoid the latency and bandwidth limitations of public network connections.
mode STRING Data structure to write to. See Supported data structures for valid values.

Connection parameters

Parameter Type Default Description
port INT 6379 Port number of the Tair server.
password String Empty string Password for the Tair database. Leave blank to disable password verification.
dbNum INT 0 ID of the destination database.
clusterMode BOOLEAN false Set to true to use cluster architecture. Set to false for standalone mode.

Write behavior parameters

Parameter Type Default Description
ignoreDelete BOOLEAN false Controls behavior when a retraction message is received. false: delete the inserted data and its key. true: retain the inserted data and its key.
incrMode STRING None Sink write mode. None: insert operation. int: INCRBY with a fixed increment defined by incrValue. float: INCRBYFLOAT with a fixed increment defined by incrValue. dynamic_int: INCRBY with the increment read from the column named by incrValue. dynamic_float: INCRBYFLOAT with the increment read from the column named by incrValue.
incrValue STRING None When incrMode is int or float, this is the fixed increment value. When incrMode is dynamic_int or dynamic_float, this is the DDL column name that contains the increment value. Not used when incrMode is None.

Key expiration parameters

Parameter Type Default Description
expiration LONG 0 Relative time-to-live (TTL) for inserted keys, in milliseconds. 0 disables TTL.
expirationAt LONG 0 Absolute expiration timestamp for inserted keys, in milliseconds. Takes effect only when expiration is set to 0. 0 disables absolute expiration.

Field expiration parameters (TairHash and TairTs only)

Parameter Type Default Description
fieldExpireMode String None Expiration mode for fields in TairHash, or skeys in TairTs. None: no expiration. millisecond: relative expiration set to the value of fieldExpireValue. unixtime: absolute expiration set to the value of fieldExpireValue. dynamic_millisecond: relative expiration read from the column named by fieldExpireValue. dynamic_unixtime: absolute expiration read from the column named by fieldExpireValue.
Important

TairTs skeys must use millisecond.

fieldExpireValue String None When fieldExpireMode is millisecond or unixtime, this is the fixed expiration value. When fieldExpireMode is dynamic_millisecond or dynamic_unixtime, this is the DDL column name that contains the expiration value.

Data type mappings

Flink type Tair type
VARCHAR STRING
DOUBLE DOUBLE

Supported data structures

The mode parameter accepts the following values. Each data structure has specific DDL column requirements that vary by incrMode.

Redis-compatible data structures

For DDL formats of STRING, LIST, SET, HASHMAP, and SORTEDSET, see Tair (Redis OSS-compatible) connector.

Tair proprietary data structures

Data structure incrMode DDL columns Write command
TairString None 2: key (STRING), value (STRING) exset key value
TairString int or float 1: key (STRING) exincrby/exincrbyfloat key incrValue
TairString dynamic_int or dynamic_float 2: key (STRING), incrValue (STRING) exincrby/exincrbyfloat key incrValue
TairHash None 3: key (STRING), field (STRING), value (STRING) exhset key field value
TairHash int or float 2: key (STRING), field (STRING) exhincrby/exincrbyfloat key field incrValue
TairHash dynamic_int or dynamic_float 3: key (STRING), field (STRING), incrValue (STRING) exhincrby/exincrbyfloat key field incrValue
TairZset None 3–258: key (STRING), member (STRING), score×N (DOUBLE, up to 256 dimensions) exzadd key score member
TairZset int or float 2: key (STRING), member (STRING) exzincyby key member incrValue
TairZset dynamic_int or dynamic_float 3: key (STRING), member (STRING), incrValue (STRING) exzincyby key member incrValue
TairBloom Must be None 2: key (STRING), item (STRING) BF.ADD key item
TairDoc Must be None 3: key (STRING), path (STRING), json (STRING) JSON.SET key path json
TairSearch None 4: index (STRING), doc_id (STRING), document (STRING, JSON), mapping (STRING) TFT.ADDDOC index document docid
TairSearch int or float 4: index (STRING), doc_id (STRING), field (STRING), mapping (STRING) TFT.INCRLONGDOCFIELD/TFT.INCRFLOATDOCFIELD index doc_id field increment
TairSearch dynamic_int or dynamic_float 5: index (STRING), doc_id (STRING), field (STRING), mapping (STRING), incrValue (STRING) TFT.INCRLONGDOCFIELD/TFT.INCRFLOATDOCFIELD index doc_id field increment
TairCpc Must be None 2: key (STRING), item (STRING) CPC.UPDATE key item
TairGis Must be None 3: key (STRING), polygon_name (STRING), polygon_wkt (STRING) GIS.ADD area polygonName polygonWkt
TairRoaring Must be None 3: key (STRING), offset (BIGINT), value (BIGINT, 0 or 1) TR.SETBIT key offset value
TairVector Must be None 6: index_name (STRING), pk (STRING), vector_data (STRING), dims (INT), algorithm (STRING), distance_method (STRING) TVS.HSET index_name key VECTOR vector_data
TairTs None 4: pkey (STRING, timeline group), skey (STRING, single timeline), timestamp (STRING), value (STRING) EXTS.S.RAW_MODIFY Pkey Skey timestamp value
TairTs float 3: pkey (STRING), skey (STRING), timestamp (STRING) EXTS.S.RAW_INCRBY Pkey Skey timestamp incrValue
TairTs dynamic_float 4: pkey (STRING), skey (STRING), timestamp (STRING), incrValue (STRING) EXTS.S.RAW_INCRBY Pkey Skey timestamp incrValue
TairSearch and TairVector require an index and mapping to be created before data is inserted. For TairSearch, run TFT.CREATEINDEX index mappings. For TairVector, run TVS.CREATEINDEX index_name dims algorithm distance_method. TairBloom creates a key with a default capacity of 100 elements and an error rate of 0.01 on first insert. For multi-dimensional TairZset sorting, all score dimensions must use the same format.

Examples

Write data to a TairSearch sink table

This example uses index_name as the TairSearch index, doc_id as the document ID, doc as the JSON document body, and mapping as the index mapping definition.

CREATE TEMPORARY TABLE datagen_stream (
  v STRING,   -- document content
  p STRING    -- mapping definition
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE tair_output (
  index_name STRING,  -- TairSearch index name
  doc_id     STRING,  -- document ID
  doc        STRING,  -- document body (JSON)
  mapping    STRING,  -- index mapping
  PRIMARY KEY (index_name) NOT ENFORCED
) WITH (
  'connector' = 'tair',
  'mode'      = 'tairsearch',
  'host'      = '${tairHost}',
  'port'      = '${tairPort}',
  'password'  = '${password}'
);

INSERT INTO tair_output
SELECT
  'index' AS index_name,
  v AS doc_id,
  p AS doc,
  '{"mappings":{"_source":{"enabled":true},"properties":{"product_id":{"type":"keyword","ignore_above":128},"product_name":{"type":"text"}}}}' AS mapping
FROM datagen_stream;

Write data using dynamic increment (TairString with incrMode=dynamic_float)

This example uses key as the TairString key and step as the per-record increment value, read from the step column.

CREATE TEMPORARY TABLE datagen_stream (
  v STRING,   -- key
  p STRING    -- increment value per record
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE tair_output (
  key  STRING,  -- TairString key
  step STRING,  -- increment value (column referenced by incrValue)
  PRIMARY KEY (key) NOT ENFORCED
) WITH (
  'connector' = 'tair',
  'mode'      = 'tairstring',
  'host'      = '${tairHost}',
  'port'      = '${tairPort}',
  'password'  = '${password}',
  'incrMode'  = 'dynamic_float',
  'incrValue' = 'step'
);

INSERT INTO tair_output
SELECT *
FROM datagen_stream;

Write data using a fixed increment (TairString with incrMode=float)

This example uses a fixed increment of 11.11 applied to each key on every write.

CREATE TEMPORARY TABLE datagen_stream (
  v STRING,
  p STRING
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE tair_output (
  key STRING,  -- TairString key
  PRIMARY KEY (key) NOT ENFORCED
) WITH (
  'connector' = 'tair',
  'mode'      = 'tairstring',
  'host'      = '${tairHost}',
  'port'      = '${tairPort}',
  'password'  = '${password}',
  'incrMode'  = 'float',
  'incrValue' = '11.11'
);

INSERT INTO tair_output
SELECT v
FROM datagen_stream;

Write data with field-level TTL (TairHash with fieldExpireMode=millisecond)

This example writes to a TairHash sink table where each field expires 1000 milliseconds after it is written.

CREATE TEMPORARY TABLE datagen_stream (
  v STRING,
  p STRING,
  s STRING
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE tair_output (
  key   STRING,  -- TairHash key
  field STRING,  -- hash field
  value STRING,  -- field value
  PRIMARY KEY (key) NOT ENFORCED
) WITH (
  'connector'        = 'tair',
  'mode'             = 'tairhash',
  'host'             = '${tairHost}',
  'port'             = '${tairPort}',
  'password'         = '${password}',
  'fieldExpireMode'  = 'millisecond',
  'fieldExpireValue' = '1000'
);

INSERT INTO tair_output
SELECT v, p, s
FROM datagen_stream;

What's next