The LindormTSDB sink connector lets you stream real-time Flink processing results into LindormTSDB for time series storage and monitoring.
Prerequisites
Before you begin, ensure that you have:
Activated Realtime Compute for Apache Flink or set up a self-managed Flink service. See Activate Realtime Compute for Apache Flink.
NoteThe Ververica Runtime (VVR) version used by Realtime Compute for Apache Flink must be 4.0.13 or later. VVR 4.0.13 is based on Apache Flink V1.13.
The Lindorm instance and your Realtime Compute for Apache Flink workspace in the same Virtual Private Cloud (VPC).
NoteRealtime Compute for Apache Flink is not accessible over the Internet by default. To write data over the Internet, see How does Realtime Compute for Apache Flink access the Internet?
LindormTSDB activated on your Lindorm instance
LindormTSDB version 3.4.7 or later. To check or upgrade the version, see Release notes of LindormTSDB and Upgrade the minor engine version of a Lindorm instance
The IP address or CIDR block of the Flink service added to the Lindorm instance whitelist. To get the CIDR block of the Realtime Compute for Apache Flink vSwitch, see How do I configure a whitelist? To add IP addresses or CIDR blocks to the Lindorm instance whitelist, see Configure whitelists
End-to-end flow
To write Flink results to LindormTSDB:
Download the LindormTSDB sink connector JAR package and upload it to the Realtime Compute for Apache Flink console.
In Flink SQL, define a source table and a result table. Configure the result table's WITH clause to point to LindormTSDB.
Write an
INSERT INTOstatement to map fields from the source table to the result table.Submit the Flink job. The connector writes data points to the specified LindormTSDB time series table.
Set up the connector
Download the LindormTSDB sink connector JAR package and upload it to the Realtime Compute for Apache Flink console. For upload instructions, see Develop a JAR draft.
Define the result table
Create a result table in Flink SQL and use the WITH clause to map it to a LindormTSDB time series table.
CREATE TEMPORARY TABLE tsdb_sink(
`timestamp` BIGINT,
tag_<tagname> VARCHAR,
field_<fieldname1> DOUBLE,
field_<fieldname2> VARCHAR,
field_<fieldname3> BIGINT,
field_<fieldname4> BOOLEAN
-- table VARCHAR (optional)
)
WITH (
'connector' = 'lindormtsdb',
'url' = '<lindormTSDBHttpUrl>',
'table' = '<yourTableName>',
'defaultDatabase' = '<yourDatabaseName>',
'schemaPolicy' = '<schemaPolicy>',
'sink.parallelism' = '<sinkParallelism>',
'ignoreErrorData' = '<ignoreErrorData>',
'maxRetries' = '<maxRetries>',
'batchSize' = '<batchSize>',
'connectTimeoutMs' = '<connectTimeoutMs>',
'sync' = '<sync>',
'debug' = '<debug>'
);Table schema parameters
| Parameter | Type | Required | Description |
|---|---|---|---|
timestamp | BIGINT | Yes | The timestamp of the data point. Must be named timestamp — it is a reserved keyword and must be enclosed in backticks. Unit: milliseconds (13-digit). A 10-digit timestamp is automatically converted to 13-digit on write. |
tag_<tagname> | VARCHAR | Yes | A tag column. The tag_ prefix is required and cannot be changed. Replace <tagname> with the actual tag name, for example, tag_deviceid. Multiple tag columns are supported. |
field_<fieldname> | DOUBLE, VARCHAR, BIGINT, or BOOLEAN | Yes | A field (metric value) column. The field_ prefix is required and cannot be changed. Replace <fieldname> with the actual field name, for example, field_humidity. Multiple field columns are supported. |
table | VARCHAR | No | The target time series table. Use this column in the table schema (rather than in the WITH clause) when writing to multiple tables dynamically. |
WITH clause parameters
| Parameter | Required | Default | Example | Description |
|---|---|---|---|---|
connector | Yes | — | lindormtsdb | Must be set to lindormtsdb. |
url | Yes | — | http://ld-xxx-proxy-tsdb.lindorm.rds.aliyuncs.com:8242 | The HTTP endpoint of LindormTSDB. See View endpoints. |
table | No | — | mytable | The target time series table. Use this parameter when writing to a single table. To write to multiple tables, use the table column in the table schema instead. |
username | Conditional | — | admin | The username for connecting to LindormTSDB. Required only if user authentication and permission verification is enabled. |
password | Conditional | — | — | The password for the specified username. Required only if user authentication and permission verification is enabled. |
defaultDatabase | No | default | mydb | The target database. |
schemaPolicy | No | strong | weak | The schema constraint policy. See Schema policies. |
sink.parallelism | No | 1 | 4 | The number of parallel write threads. Increase this value for high-throughput workloads. |
ignoreErrorData | No | false | true | Whether to skip write errors and continue. If false, the job stops on error. |
maxRetries | No | 3 | 5 | The maximum number of retry attempts for failed writes caused by server or network errors. |
batchSize | No | 500 | 1000 | The number of data points written per batch. |
connectTimeoutMs | No | 90000 | 30000 | The HTTP connection timeout. Unit: milliseconds. |
sync | No | false | false | Whether to write synchronously. Keep the default (false) for better throughput. Set to true only when strict write ordering is required. |
debug | No | false | true | Whether to enable debug logging for written data points. |
User authentication and permission verification is disabled by default. Enable it to improve data security.
Schema policies
The schemaPolicy parameter controls how LindormTSDB handles the target table schema:
| Policy | Behavior | When to use |
|---|---|---|
strong (default) | LindormTSDB strictly validates the table name, field names, and data types against the predefined schema. The target table must be created manually before writing. | When you need strict schema enforcement |
weak | If the target table does not exist, LindormTSDB creates it automatically. | For quick prototyping or when the table schema is not predefined |
none | If the target table does not exist, no error is raised and no table is created. Data is written but cannot be queried using SQL. | Advanced use cases only |
For more information, see Constraint policies for schemas.
Example
The following example reads data from a DataGen source connector and writes it to a LindormTSDB time series table named mytable. The source table generates random values for id, score, and name fields.
-- Source table: generate random data using the DataGen connector
CREATE TEMPORARY TABLE datagen_source (
id INTEGER,
score DOUBLE,
name STRING
)
WITH (
'connector' = 'datagen'
);
-- Result table: map to LindormTSDB
-- schemaPolicy='weak' lets LindormTSDB create the table automatically if it does not exist.
-- For high-throughput jobs, increase sink.parallelism and batchSize.
CREATE TEMPORARY TABLE tsdb_sink(
tag_tagk VARCHAR,
field_score DOUBLE,
field_name STRING,
`timestamp` BIGINT
)
WITH (
'connector' = 'lindormtsdb',
'url' = 'http://ld-bp159jt4eivt3****-proxy-tsdb.lindorm.rds.aliyuncs.com:8242',
'table' = 'mytable',
'schemaPolicy' = 'weak'
-- 'sink.parallelism' = '4', -- Increase for high-throughput workloads
-- 'batchSize' = '1000' -- Adjust batch size to tune write performance
);
-- Write data from the source table to LindormTSDB
INSERT INTO tsdb_sink
SELECT
CAST(id AS STRING) AS tag_tagk,
score AS field_score,
name AS field_name,
UNIX_TIMESTAMP(now()) * 1000 AS `timestamp`
FROM datagen_source;Verify the results
After submitting the Flink job, confirm that data is being written to LindormTSDB:
In the Realtime Compute for Apache Flink console, check that the job status is Running.
Connect to LindormTSDB using the SQL console or a client tool and run:
SELECT * FROM mytable LIMIT 10;If the query returns rows, the write pipeline is working correctly.
If no rows are returned, check the Flink job logs for connection errors, schema validation failures, or timeout messages.