All Products
Search
Document Center

Realtime Compute for Apache Flink:Tair connector

Last Updated:Sep 08, 2023

This topic describes how to use the Tair connector.

Background information

Tair is a cloud-native in-memory database developed by Alibaba Cloud. Tair is compatible with open source Redis and provides rich data models and enterprise-class capabilities to support real-time online scenarios. Tair also introduces persistent memory-optimized instances that are based on the new non-volatile memory (NVM) storage medium. The instances can help reduce costs by more than 30%, ensure data persistence, and provide approximately the same performance as in-memory databases.

The following table describes the capabilities supported by the Tair connector.

Item

Description

Table type

Result table

Running mode

Streaming mode

Data format

STRING

Metric

  • numBytesSend

  • numBytesSendPerSecond

  • numRecordsSend

  • numRecordsSendPerSecond

  • numRecordSendErrors

  • currentSendTime

Note

For more information about the metrics and how to view the metrics, see View metrics.

API type

SQL API

Data update or deletion in a result table

Supported

Prerequisites

Limits

  • Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 6.0.6 or later supports the Tair connector.

  • The Tair connector does not allow you to configure multiple hosts.

Syntax

Tair supports all self-developed Tair data structures based on the compatibility with the following Redis data structures: STRING, LIST, SET, HASHMAP, and SORTEDSET.

To create a Tair result table, execute the following DDL statement:

CREATE TABLE tair_table (
  a STRING,
  b STRING,
  PRIMARY KEY (a) NOT ENFORCED -- Required. 
) WITH (
  'connector'= 'tair',
  'host' = '<yourHost>'
);
Note

Tair is compatible with Redis data structures. For more information about the syntax examples of Redis data structures, see ApsaraDB for Redis connector.

Parameters in the WITH clause

Parameter

Description

Data type

Required

Default value

Remarks

connector

The type of the table.

STRING

Yes

No default value

Set the value to tair.

host

The endpoint of the Tair server.

STRING

Yes

No default value

We recommend that you use an internal endpoint.

Note

When you access the Tair database over the Internet, the network may be unstable due to issues, such as network latency and limits on bandwidth.

mode

The data structure of Tair.

STRING

Yes

No default value

Valid values:

  • string

  • list

  • set

  • hashmap

  • sortedset

  • tairstring

  • tairhash

  • tairzset

  • tairbloom

  • tairdoc

  • tairsearch

  • tairts

  • taircpc

  • tairroaring

  • tairgis

  • tairvector

Tair supports Redis data structures and self-developed Tair data structures. For more information about the valid values, see Data structures supported by ApsaraDB for Redis and Formats of Tair data structures.

Note
  • Only Realtime Compute for Apache Flink that uses VVR 8.0.1 or later supports TairTs, TairCpc, TairRoaring, TairVector, and TairGis.

  • Tair result tables support self-developed Tair data structures. The DDL statement that is used to create a Tair result table must define the table based on the specified format and a primary key must be specified for the table.

port

The port number of the Tair server.

INT

No

6379

N/A.

password

The password that is used to access the Tair database.

STRING

No

An empty string

An empty string indicates that no verification is performed.

dbNum

The ID of the destination database.

INT

No

0

N/A.

clusterMode

Specifies whether to use the cluster architecture.

BOOLEAN

No

false

Valid values:

  • true: uses the cluster architecture.

  • false: uses the standalone architecture.

ignoreDelete

Specifies whether to ignore retraction messages.

BOOLEAN

No

false

Valid values:

  • false: When a retraction message is received, the inserted data and the key of the data are deleted. This is the default value.

  • true: When a retraction message is received, the inserted data and the key of the data are retained.

expiration

The time-to-live (TTL) that is specified for the keys of the inserted data.

LONG

No

0

The value 0 indicates that the TTL is not configured. If the value of this parameter is greater than 0, the TTL is configured for the key of the inserted data. Unit: milliseconds.

expirationAt

The absolute expiration time that is specified for the keys of the inserted data.

LONG

No

0

Unit: milliseconds. Default value: 0. The default value indicates that no expiration time is specified.

If the value of this parameter is greater than 0 and the expiration parameter is set to 0, an absolute expiration time is specified for the keys of the inserted data.

incrMode

The sink mode of the Tair database.

STRING

No

None

Valid values:

  • None: indicates the insert operation. This is the default value.

  • int: indicates the INCRBY operation. The value of INCR is fixed to the value of the incrValue parameter.

  • float: indicates the INCRBYFLOAT operation. The value of INCR is fixed to the value of the incrValue parameter.

  • dynamic_int: indicates the INCRBY operation. The value of INCR is the name of the column to which the value of INCR belongs in a DDL statement.

  • dynamic_float: indicates the INCRBYFLOAT operation. The value of INCR is the name of the column to which the value of INCR belongs in a DDL statement.

incrValue

The value of INCR that is determined based on the value of the incrMode parameter.

STRING

No

No default value

The value of the incrValue parameter varies based on the value of the incrMode parameter.

  • If the incrMode parameter is set to None, you do not need to configure the incrValue parameter.

  • If the incrMode parameter is set to int or float, the value of the incrValue parameter is the value of INCR.

  • If the incrMode parameter is set to dynamic_int or dynamic_float, the value of the incrValue parameter is the name of the column to which the value of INCR belongs in a DDL statement.

fieldExpireMode

The expiration mode of the fields in TairHash or skeys in TairTS.

STRING

No

None

Valid values:

  • None: No expiration time is specified.

  • millisecond: A relative expiration time is specified. The expiration time is fixed to the value of the fieldExpireValue parameter.

    Note

    The expiration mode of the skeys in TairTS must be millisecond.

  • unixtime: An absolute expiration time is specified. The expiration time is fixed to the value of the fieldExpireValue parameter.

  • dynamic_millisecond: A relative expiration time is specified. The expiration time is the name of the column to which the value of fieldExpireValue belongs in a DDL statement.

  • dynamic_unixtime: An absolute expiration time is specified. The expiration time is the name of the column to which the value of fieldExpireValue belongs in a DDL statement.

fieldExpireValue

The expiration time of the fields in TairHash or skeys in TairTS.

STRING

No

No default value

The value of the fieldExpireValue parameter varies based on the value of the fieldExpireMode parameter.

  • If the fieldExpireMode parameter is set to None, no expiration time is specified.

  • If the fieldExpireMode parameter is set to millisecond or unixtime, the value of the fieldExpireValue parameter is the expiration time.

  • If the fieldExpireMode parameter is set to dynamic_millisecond or dynamic_unixtime, the value of the fieldExpireValue parameter is the name of the column to which the value of the fieldExpireValue parameter belongs in a DDL statement.

Data type mappings

Data type of Flink

Data type of Tair

VARCHAR

STRING

DOUBLE

DOUBLE

Formats of Tair data structures

Data structure

Format

Command for inserting data into a Tair result table

TairString

If the incrMode parameter is set to None, a DDL statement has two columns.

  • The first column lists keys of the STRING type.

  • The second column lists values of the STRING type.

exset key value

If the incrMode parameter is set to int or float, a DDL statement has only one column and the column lists keys of the STRING type.

exincrby/exincrbyfloat key incrValue

If the incrMode parameter is set to dynamic_int or dynamic_float, a DDL statement has two columns.

  • The first column lists keys of the STRING type.

  • The second column lists incrValue values of the STRING type.

exincrby/exincrbyfloat key incrValue

TairHash

If the incrMode parameter is set to None, a DDL statement has three columns.

  • The first column lists keys of the STRING type.

  • The second column lists fields of the STRING type.

  • The third column lists field values of the STRING type.

exhset key field value

If the incrMode parameter is set to int or float, a DDL statement has two columns.

  • The first column lists keys of the STRING type.

  • The second column lists fields of the STRING type.

 exhincrby/exincrbyfloat key field incrValue

If the incrMode parameter is set to dynamic_int or dynamic_float, a DDL statement has three columns.

  • The first column lists keys of the STRING type.

  • The second column lists fields of the STRING type.

  • The third column lists incrValue values of the STRING type that correspond to fields.

exhincrby/exincrbyfloat key field incrValue

TairZset

If the incrMode parameter is set to None, TairZset supports multi-dimensional data sorting. TairZset allows you to sort data of the DOUBLE type from a maximum of 256 dimensions. Therefore, a DDL statement has 3 to 258 columns.

  • The first column lists keys of the STRING type.

  • The second column lists members of the STRING type.

  • The remaining columns list scores of the DOUBLE type.

exzadd key score member
Note

If you want to sort data from multiple dimensions, make sure that the score formats of all dimensions are the same.

If the incrMode parameter is set to int or float, a DDL statement has two columns.

  • The first column lists keys of the STRING type.

  • The second column lists members of the STRING type.

exzincyby key member incrValue

If the incrMode parameter is set to dynamic_int or dynamic_float, a DDL statement has three columns.

  • The first column lists keys of the STRING type.

  • The second column lists members of the STRING type.

  • The third column lists incrValue values of the STRING type.

exzincyby key member incrValue

TairBloom

The incrMode parameter must be set to None.

When data is inserted into a Tair result table for the first time, a TairBloom key that has a default capacity of 100 elements and an error rate of 0.01 is created. A DDL statement has two columns.

  • The first column lists keys of the STRING type.

  • The second column lists items of the STRING type.

BF.ADD key item

TairDoc

The incrMode parameter must be set to None. A DDL statement has three columns.

  • The first column lists keys of the STRING type.

  • The second column lists paths of the STRING type.

  • The third column lists JSON elements of the STRING type.

JSON.SET key path json

TairSearch

If the incrMode parameter is set to None, a DDL statement has four columns.

  • The first column lists indexes of the STRING type.

  • The second column lists document IDs of the STRING type.

  • The third column lists documents of the STRING type. The documents must be in the JSON format.

  • The fourth column lists mappings of the STRING type.

TFT.ADDDOC index document docid
Note

Before you insert data into a Tair result table, you must create an index and add a mapping. Sample command:

TFT.CREATEINDEX index mappings

If the incrMode parameter is set to int or float, a DDL statement has four columns.

  • The first column lists indexes of the STRING type.

  • The second column lists document IDs of the STRING type.

  • The third column lists fields of the STRING type.

  • The fourth column lists mappings of the STRING type.

Sample command for document operations:

TFT.INCRLONGDOCFIELD/TFT.INCRFLOATDOCFIELD index doc_id field increment
Note

Before you insert data into a Tair result table, you must create an index and add a mapping. Sample command:

TFT.CREATEINDEX index mappings

If the incrMode parameter is set to dynamic_int or dynamic_float, a DDL statement has five columns.

  • The first column lists indexes of the STRING type.

  • The second column lists document IDs of the STRING type.

  • The third column lists fields of the STRING type.

  • The fourth column lists mappings of the STRING type.

  • The fifth column lists incrValue values of the STRING type.

Sample command for document operations:

TFT.INCRLONGDOCFIELD/TFT.INCRFLOATDOCFIELD index doc_id field increment
Note

Before you insert data into a Tair result table, you must create an index and add a mapping. Sample command:

TFT.CREATEINDEX index mappings

TairCpc

The incrMode parameter must be set to None. A DDL statement has two columns.

  • The first column lists keys of the STRING type.

  • The second column lists items of the STRING type.

CPC.UPDATE key item

TairGis

The incrMode parameter must be set to None. A DDL statement has three columns.

  • The first column lists keys of the STRING type.

  • The second column lists polygon names of the STRING type.

  • The third column lists Well-known Text (WKT) values of polygons. The values are of the STRING type.

GIS.ADD area polygonName polygonWkt

TairRoaring

The incrMode parameter must be set to None. A DDL statement has three columns.

  • The first column lists keys of the STRING type.

  • The second column lists the specified offsets of the BIGINT type.

  • The third column lists values of the BIGINT type. Valid values: 0 and 1.

TR.SETBIT key offset value

TairVector

The incrMode parameter must be set to None. A DDL statement has six columns.

  • The first column lists index names of the STRING type.

  • The second column lists the primary keys of records. The values are of the STRING type.

  • The third column lists vector data of the STRING type.

  • The fourth column lists vector dimensions of the INT type.

  • The fifth column lists algorithms that are used to build and query indexes. The values are of the STRING type.

  • The sixth column lists distance methods that are used to calculate the vector distance. The values are of the STRING type.

TVS.HSET index_name key VECTOR vector_data
Note

Before you insert data into a Tair result table, you must create an index and add a mapping. Sample command:

TVS.CREATEINDEX index_name dims algorithm distance_method

TairTs

If the incrMode parameter is set to None, a DDL statement has four columns.

  • The first column lists pkeys of the STRING type. A pkey indicates a group of timelines.

  • The second column lists skeys of the STRING type. An skey indicates a timeline.

  • The third column lists timestamps of the STRING type.

  • The fourth column lists values of the STRING type.

EXTS.S.RAW_MODIFY Pkey Skey timestamp value

If the incrMode parameter is set to float, a DDL statement has three columns.

  • The first column lists pkeys of the STRING type.

  • The second column lists skeys of the STRING type.

  • The third column lists timestamps of the STRING type.

EXTS.S.RAW_INCRBY Pkey Skey timestamp incrValue

If the incrMode parameter is set to dynamic_float, a DDL statement has four columns.

  • The first column lists pkeys of the STRING type.

  • The second column lists skeys of the STRING type.

  • The third column lists timestamps of the STRING type.

  • The fourth column lists incrValue values of the STRING type. The incrValue values correspond to the timestamps that are listed in the third column.

EXTS.S.RAW_INCRBY Pkey Skey timestamp incrValue

Sample code

  • Sample code for inserting data into a Tair result table in common mode

    CREATE TEMPORARY TABLE datagen_stream (
      v STRING,
      p STRING
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORATY TABLE tair_output (
      index_name STRING,
      doc_id STRING,
      doc STRING,
      mapping STRING,
      PRIMARY KEY(index_name) NOT ENFORCED
    ) WITH (
      'connector' = 'tair',
      'mode' = 'tairsearch',
      'host' = '${tairHost}',
      'port' = '${tairPort}',
      'password' = '${password}'
    );
    
    INSERT INTO tair_output
    SELECT 'index' as index,v,p,'{"mappings":{"_source":{"enabled":true},"properties":{"product_id":{"type":"keyword","ignore_above":128},"product_name":{"type":"text"}}}}' as mapping
    FROM datagen_stream;
  • Sample code for inserting data into a Tair result table when the incrMode parameter is configured

    CREATE TEMPORARY TABLE datagen_stream (
      v STRING,
      p STRING
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE tair_output (
      key STRING,
      step STRING,
      PRIMARY KEY (key) NOT ENFORCED
    ) WITH (
      'connector' = 'tair',
      'mode' = 'tairstring',
      'host' = '${tairHost}',
      'port' = '${tairPort}',
      'password' = '${password}',
      'incrMode' = 'dynamic_float',
      'incrValue' = 'step'
    );
    
    INSERT INTO tair_output
    SELECT *
    FROM datagen_stream;           
    CREATE TEMPORARY TABLE datagen_stream (
      v STRING,
      p STRING
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE tair_output (
      key STRING,
      PRIMARY KEY (key) NOT ENFORCED
    ) WITH (
      'connector' = 'tair',
      'mode' = 'tairstring',
      'host' = '${tairHost}',
      'port' = '${tairPort}',
      'password' = '${password}',
      'incrMode' = 'float',
      'incrValue' = '11.11'
    );
    
    INSERT INTO tair_output
    SELECT v
    FROM datagen_stream;
  • Sample code for inserting data into a Tair result table when the fieldExpireMode parameter is configured

    CREATE TEMPORARY TABLE datagen_stream (
      v STRING,
      p STRING,
      s STRING
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE tair_ouput (
    	key STRING,
    	field STRING,
    	value STRING,
    	PRIMARY KEY (key) NOT ENFORCED
    ) WITH (
      'connector' = 'tair',
      'mode' = 'tairhash',
      'host' = '${tairHost}',
      'port' = '${tairPort}',
      'password' = '${password}',
      'fieldExpireMode' = 'millisecond',
      'fieldExpireValue' = '1000'
    );
    
    INSERT INTO tair_output
    SELECT v, p, s
    FROM datagen_stream;