The TSDB for InfluxDB connector will be deprecated in a future release. Once deprecated, it is removed from the console and no longer receives feature updates or maintenance. For the deprecation timeline, see End of support for the TSDB for InfluxDB connector. Migrate your workloads as soon as possible to avoid disrupting your production jobs.
The TSDB for InfluxDB connector writes streaming data from a Flink SQL sink table into a Ververica Runtime (VVR) TSDB for InfluxDB instance. TSDB for InfluxDB is a time series database optimized for high write and query throughput, commonly used for DevOps monitoring, application metrics, and IoT sensor data.
Connector capabilities
| Item | Value |
|---|---|
| Table type | Sink |
| Running mode | Streaming |
| Data format | Point |
| API type | SQL |
| Data update or deletion in sink table | Not supported |
| Metrics | numRecordsOut, numRecordsOutPerSecond, currentSendTime |
For details about these metrics, see Monitoring metrics.
Prerequisites
Before you begin, ensure that you have:
-
A database created in TSDB for InfluxDB. See Manage user accounts and databasesManage user accounts and databasesManage user accounts and databasesManage user accounts and databases
Limitations
The TSDB for InfluxDB connector is supported only by Realtime Compute for Apache Flink deployments that use VVR 2.1.5 or later.
Create a sink table
Minimal DDL
The following example shows the minimum required columns to define a sink table:
CREATE TABLE influxdb_sink (
`metric` VARCHAR,
`timestamp` BIGINT,
`tag_value1` VARCHAR,
`field_fieldValue1` DOUBLE
) WITH (
'connector' = 'influxdb',
'url' = 'http://service.cn.influxdb.aliyuncs.com:****',
'database' = '<yourDatabaseName>',
'username' = '<yourDatabaseUserName>',
'password' = '<yourDatabasePassword>'
);
Schema column conventions
Sink table columns must follow a fixed naming convention that maps to the InfluxDB data model. The column order is fixed.
| Position | Column name | Type | Required | Maps to |
|---|---|---|---|---|
| 0 | metric |
VARCHAR | Yes | InfluxDB measurement name |
| 1 | timestamp |
BIGINT | Yes | InfluxDB timestamp; the unit must be milliseconds |
| 2+ | tag_<name> |
VARCHAR | At least one | InfluxDB tag (indexed metadata) |
| 3+ | field_<name> |
Any supported type | At least one | InfluxDB field (data value) |
To write to multiple field columns, define them using the following pattern:
`field_fieldValue1` DOUBLE,
`field_fieldValue2` INTEGER,
`field_fieldValueN` INTEGER
Only metric, timestamp, tag_*, and field_* column names are supported. Any other column name causes an error.
Connector options
| Parameter | Required | Default | Type | Description |
|---|---|---|---|---|
connector |
Yes | — | String | Must be influxdb. |
url |
Yes | — | String | VPC endpoint of the TSDB for InfluxDB instance. Both HTTP and HTTPS are supported. Example: https://localhost:8086 or http://localhost:3242. |
database |
Yes | — | String | Name of the database. Example: db-flink. |
username |
Yes | — | String | Username for the database. The user must have write permissions on the target database. See Manage user accounts and databasesManage user accounts and databasesManage user accounts and databasesManage user accounts and databases. |
password |
Yes | — | String | Password for the specified user. See Manage user accounts and databasesManage user accounts and databasesManage user accounts and databasesManage user accounts and databases. |
batchSize |
No | 300 |
Integer | Number of records to write in a single batch. |
retentionPolicy |
No | autogen |
String | Retention policy of the target database. If not specified, the database's default retention policy (autogen) is used. See Manage user accounts and databasesManage user accounts and databasesManage user accounts and databasesManage user accounts and databases. |
ignoreErrorData |
No | false |
Boolean | How to handle write errors. true: ignore write errors and continue. false: fail the job when a write error occurs. |
Data type mappings
| InfluxDB type | Flink type |
|---|---|
| BOOLEAN | BOOLEAN |
| INT | INT |
| BIGINT | BIGINT |
| FLOAT | FLOAT |
| DECIMAL | DECIMAL |
| DOUBLE | DOUBLE |
| DATE | DATE |
| TIME | TIME |
| TIMESTAMP | TIMESTAMP |
| VARCHAR | VARCHAR |
Example
The following example generates random data using the datagen connector and writes it to TSDB for InfluxDB.
CREATE TEMPORARY TABLE datagen_source (
`metric` VARCHAR,
`timestamp` BIGINT,
`fieldvalue` DOUBLE,
`tagvalue` VARCHAR
) WITH (
'connector' = 'datagen',
'fields.metric.length' = '3',
'fields.tagvalue.length' = '3',
'fields.timestamp.min' = '1587539547000',
'fields.timestamp.max' = '1619075547000',
'fields.fieldvalue.min' = '1',
'fields.fieldvalue.max' = '100000',
'rows-per-second' = '50'
);
CREATE TEMPORARY TABLE influxdb_sink (
`metric` VARCHAR,
`timestamp` BIGINT,
`field_fieldValue1` DOUBLE,
`tag_value1` VARCHAR
) WITH (
'connector' = 'influxdb',
'url' = 'https://***********.influxdata.tsdb.aliyuncs.com:****',
'database' = '<yourDatabaseName>',
'username' = '<yourDatabaseUserName>',
'password' = '<yourDatabasePassword>',
'batchSize' = '100',
'retentionPolicy' = 'autogen',
'ignoreErrorData' = 'false'
);
INSERT INTO influxdb_sink
SELECT
`metric`,
`timestamp`,
`fieldvalue`,
`tagvalue`
FROM datagen_source;