This topic describes how to use the ApsaraDB for Redis connector.
Background information
ApsaraDB for Redis is a database service that is compatible with the protocols of the open source Redis system. It supports a hybrid of memory and hard disks for storage. ApsaraDB for Redis provides a hot standby architecture to ensure high availability, and uses the scalable cluster architecture to meet business requirements for high throughputs, low-latency operations, and flexible configuration modifications. For more information, see What is ApsaraDB for Redis?
The following table describes the capabilities supported by the ApsaraDB for Redis connector.
Item | Description |
Table type | Dimension table and sink table |
Running mode | Streaming mode |
Data format | STRING |
Metric |
Note For more information about the metrics, see Metrics. |
API type | SQL API |
Data update or deletion in a sink table | Supported |
Prerequisites
An ApsaraDB for Redis instance is created. For more information, see Step 1: Create an ApsaraDB for Redis instance.
A whitelist is configured for the ApsaraDB for Redis instance. For more information, see Step 2: Configure whitelists.
Limits
The ApsaraDB for Redis connector supports only the best-effort delivery and does not support the exactly-once semantics. You must ensure semantic idempotence.
Limits on only dimension tables
ApsaraDB for Redis dimension tables can read only data of the STRING and HASHMAP type from ApsaraDB for Redis databases.
The fields in ApsaraDB for Redis dimension tables must be of the STRING type. You must declare only one primary key for an ApsaraDB for Redis dimension table.
When you join an ApsaraDB for Redis dimension table with another table, the ON clause must contain the equality conditions for primary key fields.
Syntax
CREATE TABLE redis_table (
col1 STRING,
col2 STRING,
PRIMARY KEY (col1) NOT ENFORCED -- Required.
) WITH (
'connector' = 'redis',
'host' = '<yourHost>',
'mode' = 'STRING' -- This parameter is required for sink tables.
);
Parameters in the WITH clause
Common parameters
Parameter
Description
Data type
Required
Default value
Remarks
connector
The type of the table.
STRING
Yes
No default value
Set the value to redis.
host
The IP address that is used to access the ApsaraDB for Redis database.
STRING
Yes
No default value
We recommend that you use an internal endpoint.
NoteWhen you access the ApsaraDB for Redis database over the Internet, the network may be unstable due to issues, such as network latency and limits on bandwidth.
port
The port number that is used to access the ApsaraDB for Redis database.
INT
No
6379
N/A.
password
The password that is used to access the ApsaraDB for Redis database.
STRING
No
An empty string
N/A.
dbNum
The sequence number of the ApsaraDB for Redis database.
INT
No
0
N/A.
clusterMode
Specifies whether the ApsaraDB for Redis database is in cluster mode.
BOOLEAN
No
false
N/A.
hostAndPorts
The host and port number of the ApsaraDB for Redis database.
NoteIf the ClusterMode parameter is set to true and connections in high availability (HA) mode are not required, you can configure the host and port parameters to specify only one of the hosts. You can also configure only this parameter. The priority of this parameter is higher than the priority of the host and port parameters.
STRING
No
No default value
If the
ClusterMode
parameter is set to true and HA is required for the connection from Jedis to the self-managed Redis database in cluster mode, you must configure this parameter. The value of this parameter must be a string that is in the"host1:port1,host2:port2"
format.key-prefix
The prefix of the primary key value of the table.
STRING
No
No default value
After you configure this parameter, a prefix specified by key-prefix is automatically added to the value of the primary key column when you query data in an ApsaraDB for Redis dimension table or write data to an ApsaraDB for Redis sink table. A delimiter specified by key-prefix-delimiter is used to separate the prefix and the primary key value.
NoteOnly Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 8.0.7 or later supports this parameter.
key-prefix-delimiter
The delimiter that is used to separate the prefix and the primary key value.
STRING
No
No default value
Parameters only for sink tables
Parameter
Description
Data type
Required
Default value
Remarks
mode
The data type of the data that is stored in the ApsaraDB for Redis sink table.
STRING
Yes
No default value
ApsaraDB for Redis sink tables support five data structures. When you execute a DDL statement to create an ApsaraDB for Redis sink table, make sure that the DDL statement complies with the specified data structure and a primary key is specified for the ApsaraDB for Redis sink table. For more information, see Data structures supported by an ApsaraDB for Redis sink table.
flattenHash
Specifies whether to write data of the HASHMAP type in multi-value mode.
BOOLEAN
No
false
Valid values:
true: Data of the HASHMAP type is written to a sink table in multi-value mode. In this case, you must declare multiple non-primary key fields in a DDL statement. The value of the primary key field corresponds to the key, the name of each non-primary key field corresponds to a field, and the value of each non-primary key field corresponds to the value of the field.
false: Data of the HASHMAP type is written to a sink table in single-value mode. In this case, you must declare three fields in a DDL statement. The value of the primary key field corresponds to the key, the value of the first non-primary key field corresponds to the field, and the value of the second non-primary key field corresponds to the value.
NoteThis parameter takes effect only when the mode parameter is set to HASHMAP.
Only Realtime Compute for Apache Flink that uses VVR 8.0.7 or later supports this parameter.
ignoreDelete
Specifies whether to ignore retraction messages.
BOOLEAN
No
false
Valid values:
true: A retraction message is ignored when the retraction message is received.
false: The inserted data and the key of the data are deleted when a retraction message is received.
expiration
Specifies whether to configure the time-to-live (TTL) for the key of the inserted data.
LONG
No
0
The default value 0 indicates that the TTL is not configured for the key of the inserted data. If the value of this parameter is greater than 0, the TTL is configured for the key of the inserted data. Unit: milliseconds.
NoteOnly Realtime Compute for Apache Flink that uses VVR 4.0.13 or later supports this parameter.
Parameters only for dimension tables
Parameter
Description
Data type
Required
Default value
Remarks
mode
The data type of the data that ApsaraDB for Redis dimension tables read.
STRING
No
STRING
Valid values:
STRING: If you do not configure this parameter, ApsaraDB for Redis dimension tables read data of the STRING type by default.
HASHMAP: If you set the mode parameter to HASHMAP, data of the HASHMAP type is read in multi-value mode.
In this case, you must declare multiple non-primary key fields in a DDL statement. The value of the primary key field corresponds to the key, the name of each non-primary key field corresponds to a field, and the value of each non-primary key field corresponds to the value of the field.
NoteOnly Realtime Compute for Apache Flink that uses VVR 8.0.7 or later supports this parameter.
If you want ApsaraDB for Redis dimension tables to read data of the HASHMAP type in single-value mode, you must configure the hashName parameter.
hashName
The key that is used when data of the HASHMAP type is read in single-value mode.
STRING
No
No default value
If you do not configure the mode parameter and you want ApsaraDB for Redis dimension tables to read data of the HASHMAP type in single-value mode, you must configure the hashName parameter.
In this case, you must declare only two fields in a DDL statement. The value of the primary key field corresponds to the field and the value of the non-primary key field corresponds to the value.
cache
The cache policy.
STRING
No
None
Valid values:
None: Data is not cached. This is the default value.
LRU: Only specific data in the dimension table is cached. Each time the system receives a data record, the system searches the cache. If the system does not find the record in the cache, the system searches for the data record in the physical dimension table.
ALL: All data in the dimension table is cached. Before a deployment runs, the system loads all data in the dimension table to the cache. This way, the cache is searched for all subsequent queries in the dimension table. If the data that meets the requirement cannot be found in the cache, the key does not exist. The system reloads all data in the cache after cache entries expire.
ImportantOnly Realtime Compute for Apache Flink that uses VVR 8.0.3 or later supports the ALL cache policy.
You can set the cache parameter to ALL only when the hashName parameter is configured and the mode parameter is left empty. This is because data of the HASHMAP type can be read only in single-value mode if you use the ALL cache policy.
If you configure the cache parameter, you must configure the cacheSize and cacheTTLMs parameters.
cacheSize
The maximum number of rows of data that can be cached.
LONG
No
10000
If you set the cache parameter to LRU, you must configure this parameter.
cacheTTLMs
The cache timeout period. Unit: milliseconds.
LONG
No
No default value
The configuration of the cacheTTLMs parameter varies based on the value of the cache parameter.
If you set the cache parameter to None, the cacheTTLMs parameter can be left empty. This indicates that cache entries do not expire.
If you set the cache parameter to LRU, the cacheTTLMs parameter specifies the timeout period of the cache. By default, cache entries do not expire.
If you set the cache parameter to ALL, the cacheTTLMs parameter specifies the interval at which the system reloads the cache. By default, the cache is not reloaded.
cacheEmpty
Specifies whether to cache empty results.
BOOLEAN
No
true
N/A.
cacheReloadTimeBlackList
The time periods during which cache is not refreshed. This parameter takes effect when the cache parameter is set to ALL. The cache is not refreshed during the periods of time that you specify for this parameter. This parameter is suitable for large-scale online promotional events such as Double 11.
STRING
No
No default value
The following example shows the format of the values: 2017-10-24 14:00 -> 2017-10-24 15:00, 2017-11-10 23:30 -> 2017-11-11 08:00. Use delimiters based on the following rules:
Separate multiple time periods with commas (,).
Separate the start time and end time of each time period with an arrow (->) that is a combination of a hyphen (-) and a closing angle bracket (>).
Data structures supported by an ApsaraDB for Redis sink table
Data type | Data structure | Command used to insert data into an ApsaraDB for Redis sink table |
STRING | A DDL statement has two columns.
|
|
LIST | A DDL statement has two columns.
|
|
SET | A DDL statement has two columns.
|
|
HASHMAP | By default, a DDL statement has three columns.
|
|
If the flattenHash parameter is set to true, a DDL statement can have multiple columns. In this example, a DDL statement has four columns.
|
| |
SORTEDSET | A DDL statement has three columns.
|
|
Data type mappings
Category | Data type of ApsaraDB for Redis | Data type of Realtime Compute for Apache Flink |
Data type mappings for all types of tables | STRING | STRING |
Data type mappings for only sink tables | SCORE | DOUBLE |
The data of the SCORE type is added to the values of the SORTEDSET data type in ApsaraDB for Redis databases. You must manually set a score of the DOUBLE type for each sorted value and sort the values based on their scores in ascending order.
Sample code
Sample code for a sink table
Write data of the STRING type to ApsaraDB for Redis. In this example, the value of the
col1
column in theredis_sink
sink table is used as the key and the value of thecol2
column is used as the value.CREATE TEMPORARY TABLE datagen_source ( col1 STRING, col2 STRING ) WITH ( 'connector' = 'datagen', 'number-of-rows' = '100' ); CREATE TEMPORARY TABLE redis_sink ( col1 STRING, col2 STRING, PRIMARY KEY (col1) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'mode' = 'STRING', 'host' = '<yourHost>', 'port' = '<yourPort>', 'password' = '<yourPassword>' ); INSERT INTO redis_sink SELECT * FROM datagen_source;
Write data of the HASHMAP type in single-value mode to ApsaraDB for Redis. In this example, the value of the
col1
column in theredis_sink
sink table is used as the key, the value of thecol2
column is used as the field, and the value of thecol3
column is used as the value.CREATE TEMPORARY TABLE datagen_source ( col1 STRING, col2 STRING, col3 STRING ) WITH ( 'connector' = 'datagen', 'number-of-rows' = '100' ); CREATE TEMPORARY TABLE redis_sink ( col1 STRING, col2 STRING, col3 STRING PRIMARY KEY (col1) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'mode' = 'HASHMAP', 'host' = '<yourHost>', 'port' = '<yourPort>', 'password' = '<yourPassword>' ); INSERT INTO redis_sink SELECT * FROM datagen_source;
Write data of the HASHMAP type in multi-value mode to ApsaraDB for Redis. In this example, the value of the
col1
column in theredis_sink
sink table is used as the key, the value of thecol2
column is used as the value of the field col2, the value of thecol3
column is used as the value of the field col3, and the value of thecol4
column is used as the value of the field col4.CREATE TEMPORARY TABLE datagen_source ( col1 STRING, col2 STRING, col3 STRING, col4 STRING ) WITH ( 'connector' = 'datagen', 'number-of-rows' = '100' ); CREATE TEMPORARY TABLE redis_sink ( col1 STRING, col2 STRING, col3 STRING, col4 STRING PRIMARY KEY (col1) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'mode' = 'HASHMAP', 'flattenHash' = 'true', 'host' = '<yourHost>', 'port' = '<yourPort>', 'password' = '<yourPassword>' ); INSERT INTO redis_sink SELECT * FROM datagen_source;
Sample code for a dimension table
Read data of the STRING type. In this example, the value of the
col1
column in theredis_dim
dimension table is used as the key, and the value of thecol2
column is used as the value.CREATE TEMPORARY TABLE datagen_source ( col1 STRING, proctime as PROCTIME() ) WITH ( 'connector' = 'datagen', 'number-of-rows' = '100' ); CREATE TEMPORARY TABLE redis_dim ( col1 STRING, col2 STRING, PRIMARY KEY (col1) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'host' = '<yourHost>', 'port' = '<yourPort>', 'password' = '<yourPassword>' ); CREATE TEMPORARY TABLE blackhole_sink ( col1 STRING, col2 STRING, col3 STRING ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT t1.col1, t2.col1, t2.col2 FROM datagen_source AS t1 JOIN redis_dim FOR SYSTEM_TIME AS OF t1.proctime AS t2 ON t1.col1 = t2.col1;
Read data of the HASHMAP type in single-value mode. In this example, the value testKey of the
hashName
parameter is used as the key, the value of thecol1
column in theredis_dim
dimension table is used as the field, and the value of thecol2
column is used as the value.CREATE TEMPORARY TABLE datagen_source ( col1 STRING, proctime as PROCTIME() ) WITH ( 'connector' = 'datagen', 'number-of-rows' = '100' ); CREATE TEMPORARY TABLE redis_dim ( col1 STRING, col2 STRING, PRIMARY KEY (col1) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'host' = '<yourHost>', 'port' = '<yourPort>', 'password' = '<yourPassword>', 'hashName' = 'testkey' ); CREATE TEMPORARY TABLE blackhole_sink ( col1 STRING, col2 STRING, col3 STRING ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT t1.col1, t2.col1, t2.col2 FROM datagen_source AS t1 JOIN redis_dim FOR SYSTEM_TIME AS OF t1.proctime AS t2 ON t1.col1 = t2.col1;
Read data of the HASHMAP type in multi-value mode. In this example, the value of the
col1
column in theredis_dim
dimension table is used as the key, the value of thecol2
column is used as the value of the field col2, the value of thecol3
column is used as the value of the field col3, and the value of thecol4
column is used as the value of the field col4.CREATE TEMPORARY TABLE datagen_source ( col1 STRING, proctime as PROCTIME() ) WITH ( 'connector' = 'datagen', 'number-of-rows' = '100' ); CREATE TEMPORARY TABLE redis_dim ( col1 STRING, col2 STRING, col3 STRING, col4 STRING PRIMARY KEY (col1) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'host' = '<yourHost>', 'port' = '<yourPort>', 'password' = '<yourPassword>', 'mode' = 'HASHMAP' ); CREATE TEMPORARY TABLE blackhole_sink ( col1 STRING, col2 STRING, col3 STRING, col4 STRING ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT t1.col1, t2.col2, t2.col3, t2.col4 FROM datagen_source AS t1 JOIN redis_dim FOR SYSTEM_TIME AS OF t1.proctime AS t2 ON t1.col1 = t2.col1;