All Products
Search
Document Center

Realtime Compute for Apache Flink:TSDB for InfluxDB® (deprecating)

Last Updated:Mar 26, 2026
Important

The TSDB for InfluxDB connector will be deprecated in a future release. Once deprecated, it is removed from the console and no longer receives feature updates or maintenance. For the deprecation timeline, see End of support for the TSDB for InfluxDB connector. Migrate your workloads as soon as possible to avoid disrupting your production jobs.

The TSDB for InfluxDB connector writes streaming data from a Flink SQL sink table into a Ververica Runtime (VVR) TSDB for InfluxDB instance. TSDB for InfluxDB is a time series database optimized for high write and query throughput, commonly used for DevOps monitoring, application metrics, and IoT sensor data.

Connector capabilities

Item Value
Table type Sink
Running mode Streaming
Data format Point
API type SQL
Data update or deletion in sink table Not supported
Metrics numRecordsOut, numRecordsOutPerSecond, currentSendTime

For details about these metrics, see Monitoring metrics.

Prerequisites

Before you begin, ensure that you have:

Limitations

The TSDB for InfluxDB connector is supported only by Realtime Compute for Apache Flink deployments that use VVR 2.1.5 or later.

Create a sink table

Minimal DDL

The following example shows the minimum required columns to define a sink table:

CREATE TABLE influxdb_sink (
  `metric`            VARCHAR,
  `timestamp`         BIGINT,
  `tag_value1`        VARCHAR,
  `field_fieldValue1` DOUBLE
) WITH (
  'connector' = 'influxdb',
  'url'       = 'http://service.cn.influxdb.aliyuncs.com:****',
  'database'  = '<yourDatabaseName>',
  'username'  = '<yourDatabaseUserName>',
  'password'  = '<yourDatabasePassword>'
);

Schema column conventions

Sink table columns must follow a fixed naming convention that maps to the InfluxDB data model. The column order is fixed.

Position Column name Type Required Maps to
0 metric VARCHAR Yes InfluxDB measurement name
1 timestamp BIGINT Yes InfluxDB timestamp; the unit must be milliseconds
2+ tag_<name> VARCHAR At least one InfluxDB tag (indexed metadata)
3+ field_<name> Any supported type At least one InfluxDB field (data value)

To write to multiple field columns, define them using the following pattern:

`field_fieldValue1` DOUBLE,
`field_fieldValue2` INTEGER,
`field_fieldValueN` INTEGER
Note

Only metric, timestamp, tag_*, and field_* column names are supported. Any other column name causes an error.

Connector options

Parameter Required Default Type Description
connector Yes String Must be influxdb.
url Yes String VPC endpoint of the TSDB for InfluxDB instance. Both HTTP and HTTPS are supported. Example: https://localhost:8086 or http://localhost:3242.
database Yes String Name of the database. Example: db-flink.
username Yes String Username for the database. The user must have write permissions on the target database. See Manage user accounts and databasesManage user accounts and databasesManage user accounts and databasesManage user accounts and databases.
password Yes String Password for the specified user. See Manage user accounts and databasesManage user accounts and databasesManage user accounts and databasesManage user accounts and databases.
batchSize No 300 Integer Number of records to write in a single batch.
retentionPolicy No autogen String Retention policy of the target database. If not specified, the database's default retention policy (autogen) is used. See Manage user accounts and databasesManage user accounts and databasesManage user accounts and databasesManage user accounts and databases.
ignoreErrorData No false Boolean How to handle write errors. true: ignore write errors and continue. false: fail the job when a write error occurs.

Data type mappings

InfluxDB type Flink type
BOOLEAN BOOLEAN
INT INT
BIGINT BIGINT
FLOAT FLOAT
DECIMAL DECIMAL
DOUBLE DOUBLE
DATE DATE
TIME TIME
TIMESTAMP TIMESTAMP
VARCHAR VARCHAR

Example

The following example generates random data using the datagen connector and writes it to TSDB for InfluxDB.

CREATE TEMPORARY TABLE datagen_source (
  `metric`     VARCHAR,
  `timestamp`  BIGINT,
  `fieldvalue` DOUBLE,
  `tagvalue`   VARCHAR
) WITH (
  'connector'                  = 'datagen',
  'fields.metric.length'       = '3',
  'fields.tagvalue.length'     = '3',
  'fields.timestamp.min'       = '1587539547000',
  'fields.timestamp.max'       = '1619075547000',
  'fields.fieldvalue.min'      = '1',
  'fields.fieldvalue.max'      = '100000',
  'rows-per-second'            = '50'
);

CREATE TEMPORARY TABLE influxdb_sink (
  `metric`            VARCHAR,
  `timestamp`         BIGINT,
  `field_fieldValue1` DOUBLE,
  `tag_value1`        VARCHAR
) WITH (
  'connector'        = 'influxdb',
  'url'              = 'https://***********.influxdata.tsdb.aliyuncs.com:****',
  'database'         = '<yourDatabaseName>',
  'username'         = '<yourDatabaseUserName>',
  'password'         = '<yourDatabasePassword>',
  'batchSize'        = '100',
  'retentionPolicy'  = 'autogen',
  'ignoreErrorData'  = 'false'
);

INSERT INTO influxdb_sink
SELECT
  `metric`,
  `timestamp`,
  `fieldvalue`,
  `tagvalue`
FROM datagen_source;

What's next