This topic provides the DDL syntax that is used to create an InfluxDB result table, describes the parameters in the WITH clause, and provides data type mappings and sample code.

What is TSDB for InfluxDB®?

Time Series Database (TSDB) for InfluxDB® is a time series database service that can process large numbers of write and query requests. This service stores and analyzes large amounts of time series data in real time, including DevOps monitoring data, application metric data, and data that is collected from Internet of Things (IoT) sensors. For more information about TSDB for InfluxDB®, see TSDB for InfluxDB®.

Prerequisites

An InfluxDB database is created. For more information, see Manage user accounts and databases.

Limits

Only Flink that uses Ververica Runtime (VVR) 2.1.5 or later supports InfluxDB connectors.

DDL syntax

Flink allows you to use InfluxDB to store output data. The following code shows an example:
CREATE TABLE stream_test_influxdb(
 `metric` VARCHAR,
 `timestamp` BIGINT,
 `tag_value1` VARCHAR,
 `field_fieldValue1` DOUBLE
) WITH (
  'connector' = 'influxdb',
  'url' = 'http://service.cn.influxdb.aliyuncs.com:****',
  'database' = '<yourDatabaseName>',
  'username' = '<yourDatabaseUserName>',
  'password' = '<yourDatabasePassword>',
  'batchSize' ='300',
  'retentionPolicy' = 'autogen',
  'ignoreErrorData' = 'false'
);
Default format for the created table:
  • Column 0: metric (VARCHAR). This column is required.
  • Column 1: timestamp (BIGINT). This column is required. Unit: milliseconds.
  • Column 2: tag_value1 (VARCHAR). This column is required. You must enter at least one value in this column.
  • Column 3: field_fieldValue1 (DOUBLE). This column is required. You must enter at least one value in this column.
    To specify multiple field_fieldValue values, use the following format:
    field_fieldValue1 <Data type>,
    field_fieldValue2 <Data type>,
    ...      
    field_fieldValueN <Data type>
    Sample format:
    field_fieldValue1 DOUBLE,
    field_fieldValue2 INTEGER,
    ...      
    field_fieldValueN INTEGER
Note An InfluxDB result table can contain only the metric, timestamp, tag_*, and field_* fields.

Parameters in the WITH clause

Parameter Description Required Remarks
connector The type of the result table. Yes Set the value to InfluxDB.
url The URL of the InfluxDB database. Yes

The URL of an InfluxDB database is the virtual private cloud (VPC) endpoint of the InfluxDB database. For example, you can set this parameter to https://localhost:3242 or http://localhost:8086.

HTTP and HTTPS are supported.

database The name of the InfluxDB database. Yes Example: db-flink.
username The username that is used to access the database. Yes You must have write permissions on the InfluxDB database. For more information about usernames, see Manage user accounts and databases.
password The password that is used to access the database. Yes For more information about passwords, see Manage user accounts and databases.
batchSize The number of data records that are submitted at the same time. No By default, 300 records are submitted at the same time.
retentionPolicy The retention policy. No If you do not specify this parameter, the default retention policy autogen for each database is used. For more information about the retention policy, see Manage user accounts and databases.
ignoreErrorData Specifies whether to ignore abnormal data. No Valid values:
  • true: The system ignores abnormal data.
  • false: The system does not ignore abnormal data. This is the default value.

Data type mapping

InfluxDB data type Flink data type
BOOLEAN BOOLEAN
INT INT
BIGINT BIGINT
FLOAT FLOAT
DECIMAL DECIMAL
DOUBLE DOUBLE
DATE DATE
TIME TIME
TIMESTAMP TIMESTAMP
VARCHAR VARCHAR

Sample code

CREATE TEMPORARY TABLE datahub_source(
 `metric` VARCHAR,
 `timestamp` BIGINT,
 `filedvalue` DOUBLE,
 `tagvalue` VARCHAR
) WITH (
  'connector' = 'datagen',
  'fields.metric.length' = 3,
  'fields.tagvalue.length' = 3,
  'fields.timestamp.min' = 1587539547000,
  'fields.timestamp.max' = 1619075547000,
  'fields.filedvalue.min' = 1,
  'fields.filedvalue.max' = 100000,
  'rows-per-second' = 50
);

CREATE TEMPORARY TABLE influxdb_sink(
  `metric` VARCHAR,
  `timestamp` BIGINT,
  `field_fieldValue1` DOUBLE,
  `tag_value1` VARCHAR
) WITH (
  'connector' = 'influxdb',
  'url' = 'https://***********.influxdata.tsdb.aliyuncs.com:****',
  'database' = '<yourDatabaseName>',
  'username' = '<yourDatabaseUserName>',
  'password' = '<yourDatabasePassword>',
  'batchSize' ='100',
  'retentionPolicy' = 'autogen',
  'ignoreErrorData' = 'false'
);

INSERT INTO influxdb_sink
SELECT 
   `metric`,
   `timestamp`,
   `filedvalue`,
   `tagvalue`
FROM datahub_source;