All Products
Search
Document Center

Lindorm:Use Flink to write data to LindormTSDB

Last Updated:Mar 28, 2026

The LindormTSDB sink connector lets you stream real-time Flink processing results into LindormTSDB for time series storage and monitoring.

Prerequisites

Before you begin, ensure that you have:

End-to-end flow

To write Flink results to LindormTSDB:

  1. Download the LindormTSDB sink connector JAR package and upload it to the Realtime Compute for Apache Flink console.

  2. In Flink SQL, define a source table and a result table. Configure the result table's WITH clause to point to LindormTSDB.

  3. Write an INSERT INTO statement to map fields from the source table to the result table.

  4. Submit the Flink job. The connector writes data points to the specified LindormTSDB time series table.

Set up the connector

Download the LindormTSDB sink connector JAR package and upload it to the Realtime Compute for Apache Flink console. For upload instructions, see Develop a JAR draft.

Define the result table

Create a result table in Flink SQL and use the WITH clause to map it to a LindormTSDB time series table.

CREATE TEMPORARY TABLE tsdb_sink(
  `timestamp` BIGINT,
  tag_<tagname> VARCHAR,
  field_<fieldname1> DOUBLE,
  field_<fieldname2> VARCHAR,
  field_<fieldname3> BIGINT,
  field_<fieldname4> BOOLEAN
  -- table VARCHAR (optional)
)
WITH (
    'connector' = 'lindormtsdb',
    'url' = '<lindormTSDBHttpUrl>',
    'table' = '<yourTableName>',
    'defaultDatabase' = '<yourDatabaseName>',
    'schemaPolicy' = '<schemaPolicy>',
    'sink.parallelism' = '<sinkParallelism>',
    'ignoreErrorData' = '<ignoreErrorData>',
    'maxRetries' = '<maxRetries>',
    'batchSize' = '<batchSize>',
    'connectTimeoutMs' = '<connectTimeoutMs>',
    'sync' = '<sync>',
    'debug' = '<debug>'
);

Table schema parameters

ParameterTypeRequiredDescription
timestampBIGINTYesThe timestamp of the data point. Must be named timestamp — it is a reserved keyword and must be enclosed in backticks. Unit: milliseconds (13-digit). A 10-digit timestamp is automatically converted to 13-digit on write.
tag_<tagname>VARCHARYesA tag column. The tag_ prefix is required and cannot be changed. Replace <tagname> with the actual tag name, for example, tag_deviceid. Multiple tag columns are supported.
field_<fieldname>DOUBLE, VARCHAR, BIGINT, or BOOLEANYesA field (metric value) column. The field_ prefix is required and cannot be changed. Replace <fieldname> with the actual field name, for example, field_humidity. Multiple field columns are supported.
tableVARCHARNoThe target time series table. Use this column in the table schema (rather than in the WITH clause) when writing to multiple tables dynamically.

WITH clause parameters

ParameterRequiredDefaultExampleDescription
connectorYeslindormtsdbMust be set to lindormtsdb.
urlYeshttp://ld-xxx-proxy-tsdb.lindorm.rds.aliyuncs.com:8242The HTTP endpoint of LindormTSDB. See View endpoints.
tableNomytableThe target time series table. Use this parameter when writing to a single table. To write to multiple tables, use the table column in the table schema instead.
usernameConditionaladminThe username for connecting to LindormTSDB. Required only if user authentication and permission verification is enabled.
passwordConditionalThe password for the specified username. Required only if user authentication and permission verification is enabled.
defaultDatabaseNodefaultmydbThe target database.
schemaPolicyNostrongweakThe schema constraint policy. See Schema policies.
sink.parallelismNo14The number of parallel write threads. Increase this value for high-throughput workloads.
ignoreErrorDataNofalsetrueWhether to skip write errors and continue. If false, the job stops on error.
maxRetriesNo35The maximum number of retry attempts for failed writes caused by server or network errors.
batchSizeNo5001000The number of data points written per batch.
connectTimeoutMsNo9000030000The HTTP connection timeout. Unit: milliseconds.
syncNofalsefalseWhether to write synchronously. Keep the default (false) for better throughput. Set to true only when strict write ordering is required.
debugNofalsetrueWhether to enable debug logging for written data points.
Note

User authentication and permission verification is disabled by default. Enable it to improve data security.

Schema policies

The schemaPolicy parameter controls how LindormTSDB handles the target table schema:

PolicyBehaviorWhen to use
strong (default)LindormTSDB strictly validates the table name, field names, and data types against the predefined schema. The target table must be created manually before writing.When you need strict schema enforcement
weakIf the target table does not exist, LindormTSDB creates it automatically.For quick prototyping or when the table schema is not predefined
noneIf the target table does not exist, no error is raised and no table is created. Data is written but cannot be queried using SQL.Advanced use cases only

For more information, see Constraint policies for schemas.

Example

The following example reads data from a DataGen source connector and writes it to a LindormTSDB time series table named mytable. The source table generates random values for id, score, and name fields.

-- Source table: generate random data using the DataGen connector
CREATE TEMPORARY TABLE datagen_source (
  id INTEGER,
  score DOUBLE,
  name STRING
)
WITH (
  'connector' = 'datagen'
);

-- Result table: map to LindormTSDB
-- schemaPolicy='weak' lets LindormTSDB create the table automatically if it does not exist.
-- For high-throughput jobs, increase sink.parallelism and batchSize.
CREATE TEMPORARY TABLE tsdb_sink(
  tag_tagk VARCHAR,
  field_score DOUBLE,
  field_name STRING,
  `timestamp` BIGINT
)
WITH (
    'connector' = 'lindormtsdb',
    'url' = 'http://ld-bp159jt4eivt3****-proxy-tsdb.lindorm.rds.aliyuncs.com:8242',
    'table' = 'mytable',
    'schemaPolicy' = 'weak'
    -- 'sink.parallelism' = '4',   -- Increase for high-throughput workloads
    -- 'batchSize' = '1000'        -- Adjust batch size to tune write performance
);

-- Write data from the source table to LindormTSDB
INSERT INTO tsdb_sink
SELECT
  CAST(id AS STRING) AS tag_tagk,
  score AS field_score,
  name AS field_name,
  UNIX_TIMESTAMP(now()) * 1000 AS `timestamp`
FROM datagen_source;

Verify the results

After submitting the Flink job, confirm that data is being written to LindormTSDB:

  1. In the Realtime Compute for Apache Flink console, check that the job status is Running.

  2. Connect to LindormTSDB using the SQL console or a client tool and run:

    SELECT * FROM mytable LIMIT 10;

    If the query returns rows, the write pipeline is working correctly.

  3. If no rows are returned, check the Flink job logs for connection errors, schema validation failures, or timeout messages.

What's next