This topic describes how to create an ApsaraDB for Redis result table in Realtime Compute for Apache Flink. This topic also describes the parameters in the WITH clause, the mappings between the field data types of ApsaraDB for Redis and Realtime Compute for Apache Flink, and the attribute fields. You may need to use these items when you create the table.
- This topic applies to only Blink V3.2.0 and later.
- User-created Redis databases can be used to store the result tables in the ApsaraDB for Redis databases in Realtime Compute for Apache Flink.
What is ApsaraDB for Redis?
ApsaraDB for Redis is a database service that is compatible with the protocols of the open source Redis system. It supports a hybrid of memory and hard disks for storage. ApsaraDB for Redis provides a hot standby architecture to ensure high availability. Based on the scalable cluster architecture, ApsaraDB for Redis can meet the business requirements for high throughputs, low-latency operations, and flexible configuration changes. Realtime Compute for Apache Flink allows you to store the output streaming data in ApsaraDB for Redis.
Syntax
- STRING type
Each DDL statement for the STRING data type contains two columns. The first column specifies the key and the second column specifies the value. To insert data into an ApsaraDB for Redis result table, run the
set key value
command.create table resik_output ( a varchar, b varchar, primary key(a) ) with ( type = 'redis', mode = 'string', host = '${redisHost}', -- An example value is '127.0.0.1'. port = '${redisPort}', -- An example value is '6379'. dbNum = '${dbNum}', -- The default value is 0. ignoreDelete = 'true' -- Specifies whether to delete the previously inserted data when the retraction message is returned. The default value is false. );
- LIST type
Each DDL statement for the STRING data type contains two columns. The first column specifies the key and the second column specifies the value. To insert data into an ApsaraDB for Redis result table, run the
lpush key value
command.create table resik_output ( a varchar, b varchar, primary key(a) ) with ( type = 'redis', mode = 'list', host = '${redisHost}', -- An example value is '127.0.0.1'. port = '${redisPort}', -- An example value is '6379'. dbNum = '${dbNum}', -- The default value is 0. ignoreDelete = 'true' -- Specifies whether to delete the previously inserted data when the retraction message is returned. The default value is false. );
- SET type
Each DDL statement for the STRING data type contains two columns. The first column specifies the key and the second column specifies the value. To insert data into an ApsaraDB for Redis result table, run the
sadd key value
command.create table resik_output ( a varchar, b varchar, primary key(a) ) with ( type = 'redis', mode = 'set', host = '${redisHost}', -- An example value is '127.0.0.1'. port = '${redisPort}', -- An example value is '6379'. dbNum = '${dbNum}', -- The default value is 0. ignoreDelete = 'true' -- Specifies whether to delete the previously inserted data when the retraction message is returned. The default value is false. );
- HASHMAP type
A DDL statement has three columns. The first column is the key, the second column is the hash key, and the third column is the hash value. To insert data into an ApsaraDB for Redis result table, run the
hmset key hash_key hash_value
command.create table resik_output ( a varchar, b varchar, c varchar, primary key(a) ) with ( type = 'redis', mode = 'hashmap', host = '${redisHost}', -- An example value is '127.0.0.1'. port = '${redisPort}', -- An example value is '6379'. dbNum = '${dbNum}', -- The default value is 0. ignoreDelete = 'true' -- Specifies whether to delete the previously inserted data when the retraction message is returned. The default value is false. );
- SORTEDSET type
A DDL statement has three columns. The first column is the key, the second column is the score, and the third column is the value. To insert data into an ApsaraDB for Redis result table, run the
add key score value
command.create table resik_output ( a varchar, b double, -- The data must be of the DOUBLE type. c varchar, primary key(a) ) with ( type = 'redis', mode = 'sortedset', host = '${redisHost}', -- An example value is '127.0.0.1'. port = '${redisPort}', -- An example value is '6379'. dbNum = '${dbNum}', -- The default value is 0. ignoreDelete = 'true' -- Specifies whether to delete the previously inserted data when the retraction message is returned. The default value is false. );
Parameters in the WITH clause
Parameter | Description | Required | Value |
---|---|---|---|
type | The type of the result table. | Yes | Set the value to redis .
|
mode | The data type of the ApsaraDB for Redis result table. | Valid values:
|
|
host | The endpoint of ApsaraDB for Redis. | Example: 127.0.0.1 .
|
|
port | The port of the ApsaraDB for Redis database. | No | Default value: 6379. |
dbNum | The sequence number of the ApsaraDB for Redis database. | Default value: 0. | |
ignoreDelete | Specifies whether to ignore the retraction message. | Valid values: true and false. Default value: false. If this parameter is set to false, the inserted data and the keys of the data are deleted when a retraction message is received. | |
password | The password that is used to access the ApsaraDB for Redis database. | By default, this parameter is empty. This indicates that permission verification is not required. |
Field type mappings
The following table describes the mappings between the field data types of ApsaraDB for Redis and Realtime Compute for Apache Flink. We recommend that you declare the mappings in DDL statements.
ApsaraDB for Redis field type | Realtime Compute for Apache Flink field type |
---|---|
STRING | VARCHAR |
SCORE | DOUBLE |
Sample code
CREATE TABLE random_stream (
v VARCHAR,
p VARCHAR) with (
type = 'random'
);
create table resik_output (
a VARCHAR,
b VARCHAR,
primary key(a)
) with (
type = 'redis',
mode = 'string',
host = '<yourRedisHost>',
password = '<yourRedisPassword>'
);
INSERT INTO resik_output
SELECT v, p
FROM random_stream;