All Products
Search
Document Center

Realtime Compute for Apache Flink:Lindorm connector

Last Updated:Mar 05, 2024

This topic describes how to use the Lindorm connector.

Background information

Lindorm is a cloud-native multi-mode hyper-converged database service that is designed and optimized for Internet of Things (IoT), Internet, and Internet of Vehicles (IoV). Lindorm is suitable for various scenarios, such as logging, monitoring, billing, advertising, social networking, travel, and risk management. Lindorm is also one of the database services that support the core business of Alibaba Cloud. For more information, see What is Lindorm?

Lindorm provides the following features:

  • Supports unified access and processing of various data, such as wide tables, time series, text, objects, streams, and spaces.

  • Is compatible with multiple standard interfaces, such as SQL, Apache HBase, Apache Cassandra, Amazon S3, Time Series Database (TSDB), Hadoop Distributed File System (HDFS), Apache Solr, and Kafka. Lindorm can also be seamlessly integrated with third-party ecosystem tools.

The following table describes the capabilities supported by the Lindorm connector.

Item

Description

Table type

Dimension table and result table

Running mode

Streaming mode

Data format

N/A

Metric

  • Metrics for dimension tables: none

  • Metrics for result tables:

    • numBytesOut

    • numBytesOutPerSecond

    • numRecordsOut

    • numRecordsOutPerSecond

Note

For more information about the metrics and how to view the metrics, see Report metrics of fully managed Flink to other platforms.

API type

SQL API

Lindorm engine

Wide table engine

Data update or deletion in a result table

Supported

Prerequisites

  • A Lindorm wide table engine and a Lindorm table are created. For more information, see Create an instance.

  • A network connection is established between the Lindorm cluster and fully managed Flink. For example, the Lindorm cluster and fully managed Flink reside in the same virtual private cloud (VPC).

Limits

Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 4.0.8 or later supports the Lindorm connector.

Syntax

CREATE TABLE white_list (
id varchar,
name varchar,
age int,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'lindorm',
'seedserver' = '<yourSeedServer>',
'namespace' = '<yourNamespace>',
'username' = '<yourUsername>',
'password' = '<yourPassword>',
'tableName' = '<yourTableName>',
'columnFamily' = '<yourColumnFamily>'
);

Parameters in the WITH clause

Category

Parameter

Description

Data type

Required

Default value

Remarks

Common parameters

connector

The type of the table.

STRING

Yes

No default value

Set the value to lindorm.

seedserver

The endpoint of the Lindorm server.

STRING

Yes

No default value

Fully managed Flink calls Java APIs to access Lindorm. The endpoint of the Lindorm server is in the host:port format. For more information, see Use ApsaraDB for HBase API for Java to develop applications.

namespace

The namespace of the Lindorm database.

STRING

Yes

No default value

N/A.

username

The username that is used to access the Lindorm database.

STRING

Yes

No default value

N/A.

password

The password that is used to access the Lindorm database.

STRING

Yes

No default value

N/A.

tableName

The name of the Lindorm table.

STRING

Yes

No default value

N/A.

columnFamily

The name of the column family of the Lindorm table.

STRING

Yes

No default value

If the column family name is not specified when you create the Lindorm table, enter the default column family name f.

retryIntervalMs

The interval at which the read operation is retried when data reading fails.

INTEGER

No

1000

Unit: milliseconds.

maxRetryTimes

The maximum number of retries for reading or writing data.

INTEGER

No

5

N/A.

Parameters only for result tables

bufferSize

The number of data records that can be written at a time.

INTEGER

No

500

N/A.

flushIntervalMs

The interval at which data is written to the table when the amount of data is small.

INTEGER

No

2000

Unit: milliseconds.

ignoreDelete

Specifies whether to skip delete operations.

BOOLEAN

No

false

Valid values:

  • true: Delete operations are ignored.

  • false: Delete operations are not ignored. This is the default value.

isDynamicTable

Specifies whether to enable the dynamic table feature. For more information about the dynamic table feature, see Dynamic table.

BOOLEAN

No

false

Valid values:

  • true: The dynamic table feature is enabled.

  • false: The dynamic table feature is disabled. This is the default value.

Note

Only Realtime Compute for Apache Flink that uses VVR 6.0.1 or later supports this parameter.

Parameters only for dimension tables

partitionedJoin

Specifies whether to use the JoinKey for partitioning.

BOOLEAN

No

false

Valid values:

  • true: The JoinKey is used for partitioning. Data is distributed to each JOIN node to improve the cache hit rate.

  • false: The JoinKey is not used for partitioning. This is the default value.

shuffleEmptyKey

Specifies whether to randomly shuffle empty upstream keys to downstream nodes.

BOOLEAN

No

false

Valid values:

  • true: The system randomly shuffles empty upstream keys to downstream nodes.

  • false: The system shuffles empty upstream keys to the first parallel thread of each downstream node. The first parallel thread is numbered 0. This is the default value.

cache

The cache policy.

STRING

No

None

Valid values:

  • None: No data is cached. This is the default value.

  • LRU: Only recently accessed data in the dimension table is cached.

If you set the cache parameter to LRU, you must configure the cacheSize and cacheTTLMs parameters.

cacheSize

The number of rows of data that can be cached.

INTEGER

No

1000

If you set the cache parameter to LRU, you can configure the cacheSize parameter.

cacheTTLMs

The cache timeout period.

INTEGER

No

No default value

Unit: milliseconds. If you set the cache parameter to LRU, you can configure the cacheTTLMs parameter. By default, cache entries do not expire.

cacheEmpty

Specifies whether to cache the JOIN queries whose return value is empty.

BOOLEAN

No

true

N/A.

async

Specifies whether to enable data synchronization in asynchronous mode.

BOOLEAN

No

false

Valid values:

  • true: Data synchronization in asynchronous mode is enabled.

  • false: Data synchronization in asynchronous mode is disabled. This is the default value.

asyncLindormRpcTimeoutMs

The timeout period when data is requested in asynchronous mode.

INTEGER

No

300000

Unit: milliseconds.

Dynamic table

The dynamic table feature is suitable for scenarios in which no columns are specified in a table and columns are created and inserted into the table based on the job status. For example, you use days as the primary key and hours as columns to calculate the hourly transaction volume per day. Data for each hour is dynamically generated. The following table shows the dynamic table.

Primary key

Column name: 00:00

Column name: 01:00

2025-06-01

45

32

2025-06-02

76

34

The dynamic table must comply with the following DDL rules: The first several columns are defined as the primary key. The value of the first column in the last two columns is used as the column name, the value of the last column is used as the value of the previous column, and the data type of the last two columns must be VARCHAR. Sample statement:

CREATE TABLE lindorm_dynamic_output(
pk1varchar,
pk2varchar,
pk3varchar,
c1varchar,
c2varchar,
PRIMARYKEY(pk1,pk2,pk3)notenforced
) WITH (
'connector' = 'lindorm',
'seedserver' = '<yourSeedServer>',
'namespace' = '<yourNamespace>',
'username' = '<yourUsername>',
'password' = '<yourPassword>',
'tableName' = '<yourTableName>',
'columnFamily' = '<yourColumnFamily>'
);

In the preceding example, pk1, pk2, and pk3 are used as the primary key. c1 and c2 are the two columns that are required for the dynamic table and must be the last two columns. Columns that are not used as the primary key are not allowed. Each time data is written to the Lindorm result table, a column is added to or modified in the data record that corresponds to the values <pk1, pk2, pk3> of the primary key. The value of c1 is used as the name of the column, and the value of c2 is used as the value of the column. Each time a data record is received, only the values in one column are added or changed. The values of other columns remain unchanged.

Data type mapping method

All data in Lindorm is in the binary format. The following table shows how to convert data into bytes of binary data or parse bytes of binary data based on the data type of a field in Flink.

Flink SQL data type

Method used to convert data into bytes and write the bytes to Lindorm

Method used to parse bytes from Lindorm

CHAR

org.apache.flink.table.data.StringData::toBytes

org.apache.flink.table.data.StringData::fromBytes

VARCHAR

BOOLEAN

com.alibaba.lindorm.client.core.utils.Bytes::toBytes(boolean)

com.alibaba.lindorm.client.core.utils.Bytes::toBigDecimal

BINARY

Directly convert data into bytes.

Directly return bytes.

VARBINARY

DECIMAL

com.alibaba.lindorm.client.core.utils.Bytes::toBytes(BigDecimal)

com.alibaba.lindorm.client.core.utils.Bytes::toBigDecimal

TINYINT

Directly encapsulate data into the first byte of byte[].

Directly return bytes[0].

SMALLINT

com.alibaba.lindorm.client.core.utils.Bytes::toBytes(short)

com.alibaba.lindorm.client.core.utils.Bytes::toShort

INT

com.alibaba.lindorm.client.core.utils.Bytes::toBytes(int)

com.alibaba.lindorm.client.core.utils.Bytes::toInt

BIGINT

com.alibaba.lindorm.client.core.utils.Bytes::toBytes(long)

com.alibaba.lindorm.client.core.utils.Bytes::toLong

FLOAT

com.alibaba.lindorm.client.core.utils.Bytes::toBytes(float)

com.alibaba.lindorm.client.core.utils.Bytes::toFloat

DOUBLE

com.alibaba.lindorm.client.core.utils.Bytes::toBytes(double)

com.alibaba.lindorm.client.core.utils.Bytes::toDouble

DATE

Call com.alibaba.lindorm.client.core.utils.Bytes::toBytes(int) after the number of days since January 1, 1970 is obtained.

Call com.alibaba.lindorm.client.core.utils.Bytes::toInt to obtain the number of days since January 1, 1970.

TIME

Call com.alibaba.lindorm.client.core.utils.Bytes::toBytes(int) after the number of milliseconds since 00:00:00 of the current day is obtained.

Call com.alibaba.lindorm.client.core.utils.Bytes::toInt to obtain the number of milliseconds since 00:00:00 of the current day.

TIMESTAMP

Call com.alibaba.lindorm.client.core.utils.Bytes::toBytes(long) after the number of milliseconds since 00:00:00 on January 1, 1970 is obtained.

Call com.alibaba.lindorm.client.core.utils.Bytes::toLong to obtain the number of milliseconds since 00:00:00 on January 1, 1970.

Sample code

CREATE TEMPORARY TABLE example_source(
 id INT,
 proc_time AS PROCTIME()
) WITH (
 'connector' = 'datagen',
 'number-of-rows' = '10',
 'fields.id.kind' = 'sequence',
 'fields.id.start' = '0',
 'fields.id.end' = '9'
);

CREATE TEMPORARY TABLE lindorm_hbase_dim(
 `id` INT,
 `name` VARCHAR,
 `birth` VARCHAR,
 PRIMARY KEY (id) NOT ENFORCED
) WITH (
 'connector'='lindorm',
 'tablename'='${lindorm_dim_table}',
 'seedserver'='${lindorm_seed_server}',
 'namespace'='default',
 'username'='${lindorm_username}',
 'password'='${lindorm_username}'
);

CREATE TEMPORARY TABLE lindorm_hbase_sink(
 `id` INT,
 `name` VARCHAR,
 `birth` VARCHAR,
 PRIMARY KEY (id) NOT ENFORCED
) WITH (
 'connector'='lindorm',
 'tablename'='${lindorm_sink_table}',
 'seedserver'='${lindorm_seed_server}',
 'namespace'='default',
 'username'='${lindorm_username}',
 'password'='${lindorm_username}'
);

INSERT INTO lindorm_hbase_sink
SELECT example_source.id as id, lindorm_hbase_dim.name as name, lindorm_hbase_dim.birth as birth
FROM example_source JOIN lindorm_hbase_dim FOR SYSTEM_TIME AS OF PROCTIME() ON example_source.id = lindorm_hbase_dim.id;