All Products
Search
Document Center

Realtime Compute for Apache Flink:TSDB for InfluxDB connector

Last Updated:Apr 07, 2024

This topic describes how to use the Time Series Database (TSDB) for InfluxDB connector.

Background information

TSDB for InfluxDB is a time series database service that can process large numbers of write and query requests. This service is used to store and analyze large amounts of time series data in real time, including DevOps monitoring data, application metric data, and data that is collected from IoT sensors. For more information about TSDB for InfluxDB, see TSDB for InfluxDB.

The following table describes the capabilities supported by the TSDB for InfluxDB connector.

Item

Description

Table type

Result table

Running mode

Streaming mode

Data format

Point

Metric

  • numRecordsOut

  • numRecordsOutPerSecond

  • currentSendTime

Note

For more information about the metrics, see Metrics.

API type

SQL API

Data update or deletion in a result table

Not supported

Prerequisites

A TSDB for InfluxDB database is created. For more information, see Manage user accounts and databases.

Limits

Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 2.1.5 or later supports the TSDB for InfluxDB connector.

Syntax

CREATE TABLE stream_test_influxdb(
 `metric` VARCHAR,
 `timestamp` BIGINT,
 `tag_value1` VARCHAR,
 `field_fieldValue1` DOUBLE
) WITH (
  'connector' = 'influxdb',
  'url' = 'http://service.cn.influxdb.aliyuncs.com:****',
  'database' = '<yourDatabaseName>',
  'username' = '<yourDatabaseUserName>',
  'password' = '<yourDatabasePassword>',
  'batchSize' ='300',
  'retentionPolicy' = 'autogen',
  'ignoreErrorData' = 'false'
);

Default format for the created table:

  • Column 0: metric (VARCHAR). This column is required.

  • Column 1: timestamp (BIGINT). This column is required. Unit: milliseconds.

  • Column 2: tag_value1 (VARCHAR). This column is required. You must enter at least one value in this column.

  • Column 3: field_fieldValue1 (DOUBLE). This column is required. You must enter at least one value in this column.

    To specify multiple field_fieldValue values, use the following format:

    field_fieldValue1 <Data type>,
    field_fieldValue2 <Data type>,
    ...  
    field_fieldValueN <Data type>

    Example:

    field_fieldValue1 DOUBLE,
    field_fieldValue2 INTEGER,
    ...   
    field_fieldValueNINTEGER
Note

A TSDB for InfluxDB result table can contain only the following fields: metric, timestamp, tag_*, and field_*.

Parameters in the WITH clause

Parameter

Description

Required

Remarks

connector

The type of the result table.

Yes

Set the value to influxdb.

url

The URL of the TSDB for InfluxDB database.

Yes

The URL of a TSDB for InfluxDB database is the virtual private cloud (VPC) endpoint of the TSDB for InfluxDB database. For example, you can set this parameter to https://localhost:8086 or http://localhost:3242.

HTTP and HTTPS are supported.

database

The name of the TSDB for InfluxDB database.

Yes

Example: db-flink.

username

The username of the account that is used to access the database.

Yes

You must have write permissions on the TSDB for InfluxDB database. For more information about the username, see Manage user accounts and databases.

password

The password that is used to access the database.

Yes

For more information about the password, see Manage user accounts and databases.

batchSize

The number of data records that are submitted at the same time.

No

By default, 300 records are submitted at the same time.

retentionPolicy

The retention policy.

No

If you do not configure this parameter, the default retention policy autogen for each database is used. For more information about the retention policy, see Manage user accounts and databases.

ignoreErrorData

Specifies whether to ignore abnormal data.

No

Valid values:

  • true: The system ignores abnormal data.

  • false: The system does not ignore abnormal data. This is the default value.

Data type mappings

Data type of TSDB for InfluxDB

Data type of Flink

BOOLEAN

BOOLEAN

INT

INT

BIGINT

BIGINT

FLOAT

FLOAT

DECIMAL

DECIMAL

DOUBLE

DOUBLE

DATE

DATE

TIME

TIME

TIMESTAMP

TIMESTAMP

VARCHAR

VARCHAR

Sample code

CREATE TEMPORARY TABLE datahub_source(
 `metric` VARCHAR,
 `timestamp` BIGINT,
 `filedvalue` DOUBLE,
 `tagvalue` VARCHAR
) WITH (
  'connector' = 'datagen',
  'fields.metric.length' = '3',
  'fields.tagvalue.length' = '3',
  'fields.timestamp.min' = '1587539547000',
  'fields.timestamp.max' = '1619075547000',
  'fields.filedvalue.min' = '1',
  'fields.filedvalue.max' = '100000',
  'rows-per-second' = '50'
);

CREATE TEMPORARY TABLE influxdb_sink(
  `metric` VARCHAR,
  `timestamp` BIGINT,
  `field_fieldValue1` DOUBLE,
  `tag_value1` VARCHAR
) WITH (
  'connector' = 'influxdb',
  'url' = 'https://***********.influxdata.tsdb.aliyuncs.com:****',
  'database' = '<yourDatabaseName>',
  'username' = '<yourDatabaseUserName>',
  'password' = '<yourDatabasePassword>',
  'batchSize' ='100',
  'retentionPolicy' = 'autogen',
  'ignoreErrorData' = 'false'
);

INSERT INTO influxdb_sink
SELECT 
  `metric`,
  `timestamp`,
  `filedvalue`,
  `tagvalue`
FROM datahub_source;