This topic describes how to use the Tair (Redis OSS-compatible) connector.
Background information
Alibaba Cloud Tair is a database service compatible with open source Redis protocols and provides hybrid memory-and-disk storage. Tair uses a high availability (HA) active-active architecture and a scalable cluster architecture to meet business requirements for high throughput, low latency, and flexible scaling up or down. For more information, see What is Tair (Redis OSS-compatible)?.
The following table describes the features supported by the Redis connector.
Category | Details |
Supported types | Dimension table and sink table |
Supported modes | Streaming mode |
Data format | String |
Specific monitoring metrics |
Note For more information about the metrics, see Monitoring metrics. |
API types | SQL |
Supports data updates or deletions in sink tables | Yes |
Prerequisites
You have created a Tair (Redis OSS-compatible) instance. For more information, see Step 1: Create an instance.
You have configured a whitelist. For more information, see Step 2: Configure a whitelist.
Limits
The Redis connector provides best-effort semantics and does not guarantee exactly-once delivery. You must ensure idempotence in your application.
The following limits apply to dimension tables:
Only STRING and HASHMAP data types can be read from Redis.
All fields in the dimension table must be of the STRING type. You must declare exactly one primary key.
When you join a dimension table, the ON clause must include an equivalent condition for the primary key.
Known issues and solutions
The cache feature in Ververica Runtime (VVR) 8.0.9 has an issue. To disable it, add sink.buffer-flush.max-rows = '0' to the WITH clause of the sink table.
Syntax
CREATE TABLE redis_table (
col1 STRING,
col2 STRING,
PRIMARY KEY (col1) NOT ENFORCED -- Required.
) WITH (
'connector' = 'redis',
'host' = '<yourHost>',
'mode' = 'STRING' -- Required for sink tables.
);WITH parameters
General
Parameter | Description | Data type | Required | Default | Remarks |
connector | The table type. | String | Yes | None | The value must be redis. |
host | The connection address of the Redis server. | String | Yes | None | Use an internal network endpoint. Note Connections to public network endpoints may be unstable because of factors such as network latency and bandwidth limits. |
port | The connection port of the Redis server. | Int | No | 6379 | None. |
password | The password of the Redis database. | String | No | Empty string, which means no validation is performed. | None. |
dbNum | The number of the database to operate on. | Int | No | 0 | None. |
clusterMode | Specifies whether the Redis cluster is in cluster mode. | Boolean | No | false | None. |
hostAndPorts | The hosts and port numbers of the Redis cluster. Note If cluster mode is enabled and HA is not required, you can configure only one host using the host and port parameters, or configure only this parameter. This parameter has a higher priority than the individual host and port parameters. | String | No | Empty | If |
key-prefix | The prefix for the table's primary key value. | String | No | None | After this parameter is configured, a prefix is automatically added to the primary key field's value when you query or write data. The prefix consists of the key prefix (key-prefix) and the prefix separator (key-prefix-delimiter). Note This parameter is supported only in VVR 8.0.7 and later. |
key-prefix-delimiter | The separator between the primary key value and its prefix. | String | No | None | |
connection.pool.max-total | The maximum number of connections that the connection pool can allocate. | Int | No | 8 | Note This parameter is supported only in VVR 8.0.9 and later. |
connection.pool.max-idle | The maximum number of idle connections in the connection pool. | Int | No | 8 | |
connection.pool.min-idle | The minimum number of idle connections in the connection pool. | Int | No | 0 | |
connect.timeout | The timeout for establishing a connection. | Duration | No | 3000 ms | |
socket.timeout | The timeout for receiving data from the Redis server (socket timeout). | Duration | No | 3000 ms | |
cacert.filepath | The full path of the SSL/TLS certificate file. The file format must be jks. | String | No | None, which indicates that SSL/TLS encryption is disabled. | Download the CA certificate as described in Enable TLS encryption, and upload the certificate in the Additional Dependency Files section of the job. After the upload, the CA certificate is stored in the /flink/usrlib directory. For more information about how to upload a file in the Additional Dependency Files section, see Deploy a job. Example: Note This parameter is supported only in VVR 11.1 and later. |
Sink-specific
Parameter | Description | Data type | Required | Default | Remarks |
mode | The Redis data type to use. | String | Yes | None | Tair sink tables support five Redis data types. The DDL must be defined in the specified format, and a primary key must be defined. For more information, see Data type formats for Redis sink tables. |
flattenHash | Specifies whether to write HASHMAP data in multi-value mode. | Boolean | No | false | Valid values:
Note
|
ignoreDelete | Specifies whether to ignore retraction messages. | Boolean | No | false | Valid values:
|
expiration | Sets a TTL for the key of the written data. | Long | No | 0, which means no TTL is set. | If the value of this parameter is greater than 0, a corresponding TTL is set for the key of the written data. The unit is milliseconds. |
sink.buffer-flush.max-rows | The maximum number of records that can be saved in the cache. | Int | No | 200 | Cached records include all append, update, and delete events. The cache is flushed when the maximum number of records is exceeded. Note
|
sink.buffer-flush.interval | The cache flush interval. | Duration | No | 1000 ms | The cache is flushed asynchronously. Note
|
Dimension table-specific
Parameter | Description | Data type | Required | Default | Remarks |
mode | The Redis data type to read. | String | No | STRING | Valid values: STRING: Reads data as the STRING type by default. HASHMAP: Reads HASHMAP data in multi-value mode. In this case, the DDL must declare multiple non-primary key fields.
Note This parameter is supported only in VVR 8.0.7 and later. To read HASHMAP data in single-value mode, configure the hashName parameter. |
hashName | The key to use when reading HASHMAP data in single-value mode. | String | No | None | If you do not specify the mode parameter but want to read HASHMAP data in single-value mode, you must configure hashName. In this case, the DDL only needs to declare two fields. The value of the first primary key field corresponds to the field, and the value of the second non-primary key field corresponds to the value. |
cache | The cache policy. | String | No | None | Tair dimension tables support the following cache policies: None (default): No cache. LRU: Caches a portion of the dimension table data. For each record from the source table, the system first searches the cache. If the data is not found, the system queries the physical dimension table. ALL: Caches all data from the dimension table. Before the job runs, the system loads all data from the dimension table into the cache. All subsequent lookups are performed against the cache. If the data is not found in the cache, the key does not exist. The full cache has an expiration time, after which it is reloaded. Important
|
cacheSize | The cache size. | Long | No | 10000 | When you select the LRU cache policy, you must set the cache size. |
cacheTTLMs | The cache timeout duration, in milliseconds. | Long | No | None | The configuration of cacheTTLMs depends on the cache setting: If cache is set to None, you do not need to configure cacheTTLMs. This means the cache does not time out. If cache is set to LRU, cacheTTLMs is the cache timeout period. By default, the cache does not expire. If cache is set to ALL, cacheTTLMs is the cache reload time. By default, the cache is not reloaded. |
cacheEmpty | Specifies whether to cache empty results. | Boolean | No | true | None. |
cacheReloadTimeBlackList | The disallowed cache refresh time. When the cache policy is set to ALL, you can enable a disallowed time to prevent the cache from being refreshed during this period, for example, during a Double 11 shopping festival. | String | No | None | The format is as follows:
Use separators as follows:
|
async | Specifies whether to return data asynchronously. | Boolean | No | false |
|
Data type formats for Redis sink tables
Type | Format | Redis insert command |
STRING type | Two-column DDL:
|
|
LIST type | Two-column DDL:
|
|
SET type | Two-column DDL:
|
|
HASHMAP type | By default, a three-column DDL:
|
|
If flattenHash is set to true, the DDL supports multiple columns. The following example uses four columns:
|
| |
SORTEDSET type | Three-column DDL:
|
|
Type mappings
Type | Redis field type | Flink field type |
General | STRING | STRING |
For sink tables only | SCORE | DOUBLE |
The Redis SCORE type applies to SORTEDSETs (sorted sets). You must manually assign a DOUBLE score to each value. The values are then sorted in ascending order based on their scores.
Usage examples
Sink table
Write STRING data: In the code example, the value from the
user_idcolumn in theredis_sinksink table is written to Redis as the key, and the value from thelogin_timecolumn is written as the value.CREATE TEMPORARY TABLE kafka_source ( user_id STRING, -- User ID login_time STRING -- Logon timestamp ) WITH ( 'connector' = 'kafka', 'topic' = 'user_logins', -- Kafka topic 'properties.bootstrap.servers' = 'yourKafkaBroker', -- Kafka broker address 'format' = 'json', -- Data format is JSON 'scan.startup.mode' = 'earliest-offset' -- Consume from the earliest offset ); CREATE TEMPORARY TABLE redis_sink ( user_id STRING, -- Redis key login_time STRING, -- Redis value PRIMARY KEY (user_id) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'mode' = 'STRING', -- Use STRING mode 'host' = 'yourHost', 'port' = 'yourPort', 'password' = 'yourPassword' ); INSERT INTO redis_sink SELECT * FROM kafka_source;Write HASHMAP data in multi-value mode: In the code example, the value from the
order_idcolumn in theredis_sinksink table is written to Redis as the key. The values from theproduct_name,quantity, andamountcolumns are written to the product_name, quantity, and amount fields, respectively.CREATE TEMPORARY TABLE kafka_source ( order_id STRING, -- Order ID product_name STRING, -- Product name quantity STRING, -- Quantity amount STRING -- Order amount ) WITH ( 'connector' = 'kafka', 'topic' = 'orders_topic', -- Kafka topic 'properties.bootstrap.servers' = 'yourKafkaBroker', -- Kafka broker address 'format' = 'json', -- Data format is JSON 'scan.startup.mode' = 'earliest-offset' -- Consume from the earliest offset ); CREATE TEMPORARY TABLE redis_sink ( order_id STRING, -- Order ID, used as the Redis key product_name STRING, -- Product name, used as a Redis Hash field quantity STRING, -- Quantity, used as a Redis Hash field amount STRING, -- Order amount, used as a Redis Hash field PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'mode' = 'HASHMAP', -- Use HASHMAP mode 'flattenHash' = 'true', 'host' = 'yourHost', 'port' = 'yourPort', 'password' = 'yourPassword' ); INSERT INTO redis_sink SELECT * FROM kafka_source;Write HASHMAP data in single-value mode: In the code example, the value from the
order_idcolumn in theredis_sinksink table is written to Redis as the key. The value from theproduct_namecolumn is written as the field, and the value from thequantitycolumn is written as the value.CREATE TEMPORARY TABLE kafka_source ( order_id STRING, -- Order ID product_name STRING, -- Product name quantity STRING -- Quantity ) WITH ( 'connector' = 'kafka', 'topic' = 'orders_topic', -- Kafka topic 'properties.bootstrap.servers' = 'yourKafkaBroker', -- Kafka broker address 'format' = 'json', -- Data format is JSON 'scan.startup.mode' = 'earliest-offset' -- Consume from the earliest offset ); CREATE TEMPORARY TABLE redis_sink ( order_id STRING, -- Redis key product_name STRING, -- Redis field quantity STRING, -- Redis value PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'mode' = 'HASHMAP', 'host' = 'yourHost', 'port' = 'yourPort', 'password' = 'yourPassword' ); INSERT INTO redis_sink SELECT * FROM kafka_source;
Dimension table
Read STRING data: In the code example, the value from the
user_idcolumn in theredis_dimdimension table corresponds to the key, and the value from theuser_namecolumn corresponds to the value.CREATE TEMPORARY TABLE kafka_source ( user_id STRING, -- User ID proctime AS PROCTIME() -- Processing time ) WITH ( 'connector' = 'kafka', 'topic' = 'user_clicks', -- Kafka topic 'properties.bootstrap.servers' = 'yourKafkaBroker', -- Kafka broker address 'format' = 'json', -- Data format is JSON 'scan.startup.mode' = 'earliest-offset' -- Consume from the earliest offset ); CREATE TEMPORARY TABLE redis_dim ( user_id STRING, -- User ID (Redis key) user_name STRING, -- Username (Redis value) PRIMARY KEY (user_id) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'host' = 'yourHost', -- Redis host address 'port' = 'yourPort', -- Redis port 'password' = 'yourPassword', -- Redis password 'mode' = 'STRING' -- Use STRING mode ); CREATE TEMPORARY TABLE blackhole_sink ( user_id STRING, -- User ID redis_user_id STRING, -- User ID from Redis user_name STRING -- Username ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT t1.user_id, -- User ID (from Kafka) t2.user_id, -- User ID (from Redis) t2.user_name -- Username (from Redis) FROM kafka_source AS t1 JOIN redis_dim FOR SYSTEM_TIME AS OF t1.proctime AS t2 ON t1.user_id = t2.user_id;Read HASHMAP data in multi-value mode: In the code example, the value from the
user_idcolumn in theredis_dimdimension table is the key. The value from theuser_namecolumn maps to the user_name field. The value from theemailcolumn maps to the email field. The value from theregister_timecolumn maps to the register_time field.CREATE TEMPORARY TABLE kafka_source ( user_id STRING, -- User ID click_time TIMESTAMP(3), -- Click time proctime AS PROCTIME() -- Processing time ) WITH ( 'connector' = 'kafka', 'topic' = 'user_clicks', -- Kafka topic 'properties.bootstrap.servers' = 'yourKafkaBroker', -- Kafka broker address 'format' = 'json', -- Data format is JSON 'scan.startup.mode' = 'earliest-offset' ); CREATE TEMPORARY TABLE redis_dim ( user_id STRING, -- User ID (Redis key) user_name STRING, -- Username (part of a Redis field-value pair) email STRING, -- Email (part of a Redis field-value pair) register_time STRING, -- Registration time (part of a Redis field-value pair) PRIMARY KEY (user_id) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'host' = 'yourHost', 'port' = 'yourPort', 'password' = 'yourPassword', 'mode' = 'HASHMAP' -- Use HASHMAP mode ); CREATE TEMPORARY TABLE blackhole_sink ( user_id STRING, -- User ID user_name STRING, -- Username email STRING, -- Email register_time STRING, -- Registration time click_time TIMESTAMP(3) -- Click time ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT t1.user_id, -- User ID t2.user_name, -- Username t2.email, -- Email t2.register_time, -- Registration time t1.click_time -- Click time FROM kafka_source AS t1 JOIN redis_dim FOR SYSTEM_TIME AS OF t1.proctime AS t2 ON t1.user_id = t2.user_id;Read HASHMAP data in single-value mode: In the code example, the value of the
hashNameparameter is testKey, which is used as the key. The value from theuser_idcolumn in theredis_dimdimension table is the field, and the value from theuser_namecolumn is the value.CREATE TEMPORARY TABLE kafka_source ( user_id STRING, -- User ID proctime AS PROCTIME() -- Processing time ) WITH ( 'connector' = 'kafka', 'topic' = 'user_clicks', -- Kafka topic 'properties.bootstrap.servers' = 'yourKafkaBroker', -- Kafka broker address 'format' = 'json', -- Data format is JSON 'scan.startup.mode' = 'earliest-offset' -- Consume from the earliest offset ); CREATE TEMPORARY TABLE redis_dim ( user_id STRING, -- User ID (Redis hash field) user_name STRING, -- Username (Redis hash value) PRIMARY KEY (user_id) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'host' = 'yourHost', -- Redis host address 'port' = 'yourPort', -- Redis port 'password' = 'yourPassword',-- Redis password 'hashName' = 'testkey' -- Fixed Redis hash name ); CREATE TEMPORARY TABLE blackhole_sink ( user_id STRING, -- User ID redis_user_id STRING, -- User ID from Redis user_name STRING -- Username ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT t1.user_id, -- User ID (from Kafka) t2.user_id, -- User ID (from Redis) t2.user_name -- Username (from Redis) FROM kafka_source AS t1 JOIN redis_dim FOR SYSTEM_TIME AS OF t1.proctime AS t2 ON t1.user_id = t2.user_id;