This topic describes how to use the Lindorm connector.
Background
Lindorm is a cloud-native hyper-converged database service that is developed and optimized to store and process multimodal data in various scenarios such as IoT, Internet, and Internet of Vehicles (IoV). Lindorm is suitable for various scenarios, such as logging, monitoring, billing, advertising, social networking, travel, and risk management. Lindorm is also one of the database services that support the core business of Alibaba Group.
Lindorm provides the following features:
Supports unified access and processing of various data, such as wide tables, time series, text, objects, streams, and spaces.
Is compatible with multiple standard interfaces, such as SQL, Apache HBase, Apache Cassandra, Amazon S3, Time Series Database (TSDB), Hadoop Distributed File System (HDFS), Apache Solr, and Kafka. Lindorm can also be seamlessly integrated with third-party ecosystem tools.
The following table describes the capabilities supported by the Lindorm connector.
Item | Description |
Table type | Dimension table and sink table |
Running mode | Streaming mode |
Data format | N/A |
Metric | |
API type | SQL API |
Lindorm engine | LindormTable |
Data update or deletion in a sink table | Supported |
Usage notes
Lindorm HBase tables are not supported.
A Lindorm wide table engine and a Lindorm table are created in advance. For more information, see Create an instance.
A network connection must be established between the Lindorm cluster and the Flink workspace. For example, the Lindorm cluster and Realtime Compute for Apache Flink reside in the same virtual private cloud (VPC).
Syntax
CREATE TABLE white_list (
id varchar,
name varchar,
age int,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'lindorm',
'seedserver' = '<yourSeedServer>',
'namespace' = '<yourNamespace>',
'username' = '<yourUsername>',
'password' = '<yourPassword>',
'tableName' = '<yourTableName>',
'columnFamily' = '<yourColumnFamily>'
);Connector options
General
Option
Description
Type
Required
Default value
Remarks
connector
The table type.
String
Yes
No default value
Set the value to lindorm.
seedserver
The endpoint of the Lindorm server.
String
Yes
No default value
Realtime Compute for Apache Flink calls the ApsaraDB for HBase API for Java to access Lindorm and use LindormTable. The endpoint of the Lindorm server is in the
host:portformat. For more information, see Use Flink to connect to and use LindormTable.namespace
The namespace of the Lindorm database.
String
Yes
No default value
N/A
username
The username that is used to access the Lindorm database.
String
Yes
No default value
N/A
password
The password that is used to access the Lindorm database.
String
Yes
No default value
N/A
tableName
The name of the Lindorm table.
String
Yes
No default value
N/A
columnFamily
The name of the column family of the Lindorm table.
String
Yes
No default value
If the column family name is not specified when you create the Lindorm table, enter the default column family name f.
retryIntervalMs
The interval at which the read operation is retried when data reading fails.
Integer
No
1000
Unit: milliseconds.
maxRetryTimes
The maximum number of retries for reading or writing data.
Integer
No
5
N/A
Sink-specific
Option
Description
Type
Required
Default value
Remarks
bufferSize
The number of data records that can be written at a time.
Integer
No
500
N/A
flushIntervalMs
The interval at which data is written to the table when the amount of data is small.
Integer
No
2000
Unit: milliseconds.
ignoreDelete
Specifies whether to skip delete operations.
Boolean
No
false
Valid values:
true
false (default)
dynamicColumnSink
Specifies whether to enable the dynamic table feature. For more information about the dynamic table feature, see the Dynamic table section of this topic.
Boolean
No
false
Valid values:
true
false (default)
excludeUpdateColumns
The fields not to be updated. Updated values of the specified fields are not inserted into the sink table.
String
No
No default value
Separate multiple fields with commas (
,). For example, if you set theexcludeUpdateColumnsoption toa,b,c, updates on thea, b, and cfields are ignored.NoteOnly VVR 8.0.9 or later supports this option.
Dimension table-specific
Option
Description
Type
Required
Default value
Remarks
partitionedJoin
Specifies whether to use the JoinKey for partitioning.
Boolean
No
false
Valid values:
true: The JoinKey is used for partitioning. Data is distributed to each JOIN node to improve the cache hit rate.
false (default): The JoinKey is not used for partitioning.
shuffleEmptyKey
Specifies whether to randomly shuffle empty upstream keys to downstream nodes.
Boolean
No
false
Valid values:
true: The system randomly shuffles empty upstream keys to downstream nodes.
false (default): The system shuffles empty upstream keys to the first parallel thread of each downstream node. The first parallel thread is numbered 0.
cache
The cache policy.
String
No
None
Valid values:
None (default): No data is cached.
LRU: Only recently accessed data in the dimension table is cached.
If you set the
cacheoption toLRU, you must configure thecacheSizeandcacheTTLMsoptions.cacheSize
The number of rows of data that can be cached.
Integer
No
1000
If you set the
cacheoption toLRU, you can configure thecacheSizeoption.cacheTTLMs
The cache timeout period.
Integer
No
No default value
Unit: milliseconds. If you set the
cacheoption toLRU, you can configure thecacheTTLMsoption. By default, cache entries do not expire.cacheEmpty
Specifies whether to cache the JOIN queries whose return value is empty.
Boolean
No
true
NoteThe Lindorm connector supports one-to-many lookup joins. Pay close attention to caching strategies and throughput for optimal performance.
async
Specifies whether to enable data synchronization in asynchronous mode.
Boolean
No
false
Valid values:
true
false (default)
asyncLindormRpcTimeoutMs
The timeout period when data is requested in asynchronous mode.
Integer
No
300000
Unit: milliseconds.
Dynamic table
The dynamic table feature is suitable for scenarios in which no columns are specified in a table and columns are created and inserted into the table based on the deployment status. For example, you use days as the primary key and hours as columns to calculate the hourly transaction volume per day. Data for each hour is dynamically generated. The following table shows the dynamic table.
Primary key | Column name: 00:00 | Column name: 01:00 |
2025-06-01 | 45 | 32 |
2025-06-02 | 76 | 34 |
The dynamic table must comply with the following DDL rules: The first several columns are defined as the primary key. The value of the first column in the last two columns is used as the column name, the value of the last column is used as the value of the previous column, and the data type of the last two columns must be VARCHAR. Sample code:
CREATE TABLE lindorm_dynamic_output(
pk1 varchar,
pk2 varchar,
pk3 varchar,
c1 varchar,
c2 varchar,
PRIMARY KEY (pk1,pk2,pk3) NOT ENFORCED
) WITH (
'connector' = 'lindorm',
'seedserver' = '<yourSeedServer>',
'namespace' = '<yourNamespace>',
'username' = '<yourUsername>',
'password' = '<yourPassword>',
'tableName' = '<yourTableName>',
'columnFamily' = '<yourColumnFamily>'
);In the preceding example, pk1, pk2, and pk3 are used as the primary key. c1 and c2 are the two columns that are required for the dynamic table and must be the last two columns. Except for c1 and c2, columns that are not used as the primary key are not allowed. Each time data is written to the Lindorm sink table, a column is added to or modified in the data record that corresponds to the values <pk1, pk2, pk3> of the primary key. The value of c1 is used as the name of the column, and the value of c2 is used as the value of the column. Each time a data record is received, only the value in one column is added or changed. The values of other columns remain unchanged.
Data type mappings
All data in Lindorm is in the binary format. The following table shows how to convert data into bytes of binary data or parse bytes of binary data based on the data type of a field in Realtime Compute for Apache Flink.
Data type of Flink SQL | Method used to convert data into bytes for Lindorm | Method used to parse bytes from Lindorm |
CHAR | org.apache.flink.table.data.StringData::toBytes | org.apache.flink.table.data.StringData::fromBytes |
VARCHAR | ||
BOOLEAN | com.alibaba.lindorm.client.core.utils.Bytes::toBytes(boolean) | com.alibaba.lindorm.client.core.utils.Bytes::toBigDecimal |
BINARY | Directly convert data into bytes. | Directly return bytes. |
VARBINARY | ||
DECIMAL | com.alibaba.lindorm.client.core.utils.Bytes::toBytes(BigDecimal) | com.alibaba.lindorm.client.core.utils.Bytes::toBigDecimal |
TINYINT | Directly encapsulate data into the first byte of byte[]. | Directly return bytes[0]. |
SMALLINT | com.alibaba.lindorm.client.core.utils.Bytes::toBytes(short) | com.alibaba.lindorm.client.core.utils.Bytes::toShort |
INT | com.alibaba.lindorm.client.core.utils.Bytes::toBytes(int) | com.alibaba.lindorm.client.core.utils.Bytes::toInt |
BIGINT | com.alibaba.lindorm.client.core.utils.Bytes::toBytes(long) | com.alibaba.lindorm.client.core.utils.Bytes::toLong |
FLOAT | com.alibaba.lindorm.client.core.utils.Bytes::toBytes(float) | com.alibaba.lindorm.client.core.utils.Bytes::toFloat |
DOUBLE | com.alibaba.lindorm.client.core.utils.Bytes::toBytes(double) | com.alibaba.lindorm.client.core.utils.Bytes::toDouble |
DATE | Call com.alibaba.lindorm.client.core.utils.Bytes::toBytes(int) after the number of days since January 1, 1970 is obtained. | Call com.alibaba.lindorm.client.core.utils.Bytes::toInt to obtain the number of days since January 1, 1970. |
TIME | Call com.alibaba.lindorm.client.core.utils.Bytes::toBytes(int) after the number of milliseconds since 00:00:00 of the current day is obtained. | Call com.alibaba.lindorm.client.core.utils.Bytes::toInt to obtain the number of milliseconds since 00:00:00 of the current day. |
TIMESTAMP | Call com.alibaba.lindorm.client.core.utils.Bytes::toBytes(long) after the number of milliseconds since 00:00:00 on January 1, 1970 is obtained. | Call com.alibaba.lindorm.client.core.utils.Bytes::toLong to obtain the number of milliseconds since 00:00:00 on January 1, 1970. |
Sample code
CREATE TEMPORARY TABLE example_source(
id INT,
proc_time AS PROCTIME()
) WITH (
'connector' = 'datagen',
'number-of-rows' = '10',
'fields.id.kind' = 'sequence',
'fields.id.start' = '0',
'fields.id.end' = '9'
);
CREATE TEMPORARY TABLE lindorm_hbase_dim(
`id` INT,
`name` VARCHAR,
`birth` VARCHAR,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector'='lindorm',
'tablename'='${lindorm_dim_table}',
'seedserver'='${lindorm_seed_server}',
'namespace'='default',
'username'='${lindorm_username}',
'password'='${lindorm_username}'
);
CREATE TEMPORARY TABLE lindorm_hbase_sink(
`id` INT,
`name` VARCHAR,
`birth` VARCHAR,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector'='lindorm',
'tablename'='${lindorm_sink_table}',
'seedserver'='${lindorm_seed_server}',
'namespace'='default',
'username'='${lindorm_username}',
'password'='${lindorm_username}'
);
INSERT INTO lindorm_hbase_sink
SELECT example_source.id as id, lindorm_hbase_dim.name as name, lindorm_hbase_dim.birth as birth
FROM example_source JOIN lindorm_hbase_dim FOR SYSTEM_TIME AS OF PROCTIME() ON example_source.id = lindorm_hbase_dim.id;