すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:TSDB for InfluxDBコネクタ

最終更新日:Jan 08, 2025

このトピックでは、Time Series Database (TSDB) for InfluxDBコネクタの使用方法について説明します。

背景情報

TSDB for InfluxDBは、大量の書き込みリクエストとクエリリクエストを処理できる時系列データベースサービスです。このサービスは、DevOps監視データ、アプリケーションメトリックデータ、IoTセンサーから収集されたデータなど、大量の時系列データをリアルタイムで保存および分析するために使用されます。 TSDB for InfluxDBの詳細については、TSDB for InfluxDBをご参照ください。

次の表に、TSDB for InfluxDBコネクタでサポートされている機能を示します。

項目

説明

テーブルタイプ

シンクテーブル

実行モード

ストリーミングモード

データ形式

ポイント

メトリック

  • numRecordsOut

  • numRecordsOutPerSecond

  • currentSendTime

説明

メトリックの詳細については、メトリックをご参照ください。

APIタイプ

SQL API

シンクテーブルでのデータの更新または削除

サポートされていません

前提条件

TSDB for InfluxDBデータベースが作成されていること。 詳細については、ユーザーアカウントとデータベースの管理をご参照ください。

制限事項

Ververica Runtime (VVR) 2.1.5以降を使用するRealtime Compute for Apache Flinkのみが、TSDB for InfluxDBコネクタをサポートしています。

構文

CREATE TABLE stream_test_influxdb(
 `metric` VARCHAR,
 `timestamp` BIGINT,
 `tag_value1` VARCHAR,
 `field_fieldValue1` DOUBLE
) WITH (
  'connector' = 'influxdb',
  'url' = 'http://service.cn.influxdb.aliyuncs.com:****',
  'database' = '<yourDatabaseName>',
  'username' = '<yourDatabaseUserName>',
  'password' = '<yourDatabasePassword>',
  'batchSize' ='300',
  'retentionPolicy' = 'autogen',
  'ignoreErrorData' = 'false'
);

作成されたテーブルのデフォルト形式:

  • 列 0: metric (VARCHAR)。 この列は必須です。

  • 列 1: timestamp (BIGINT)。 この列は必須です。 単位: ミリ秒。

  • 列 2: tag_value1 (VARCHAR)。 この列は必須です。 この列には少なくとも 1 つの値を入力する必要があります。

  • 列 3: field_fieldValue1 (DOUBLE)。 この列は必須です。 この列には少なくとも 1 つの値を入力する必要があります。

    複数のfield_fieldValue値を指定するには、次の形式を使用します。

    field_fieldValue1 <Data type>,
    field_fieldValue2 <Data type>,
    ...  
    field_fieldValueN <Data type>

    例:

    field_fieldValue1 DOUBLE,
    field_fieldValue2 INTEGER,
    ...   
    field_fieldValueNINTEGER
説明

TSDB for InfluxDBシンクテーブルには、metrictimestamptag_*、および field_* のフィールドのみを含めることができます。

WITH句のパラメータ

パラメータ

説明

必須

備考

connector

シンクテーブルのタイプ。

はい

値をinfluxdbに設定します。

url

TSDB for InfluxDBデータベースの URL。

はい

TSDB for InfluxDBデータベースの URL は、TSDB for InfluxDBデータベースの仮想プライベートクラウド (VPC) エンドポイントです。 たとえば、このパラメータを https://localhost:8086 または http://localhost:3242 に設定できます。

HTTPとHTTPSがサポートされています。

database

TSDB for InfluxDBデータベースの名前。

はい

例: db-flink。

username

データベースへのアクセスに使用するアカウントのユーザー名。

はい

TSDB for InfluxDBデータベースに対する書き込み権限が必要です。 ユーザー名の詳細については、ユーザーアカウントとデータベースの管理をご参照ください。

password

データベースへのアクセスに使用するパスワード。

はい

パスワードの詳細については、ユーザーアカウントとデータベースの管理をご参照ください。

batchSize

同時に送信されるデータレコードの数。

いいえ

デフォルトでは、300 レコードが同時に送信されます。

retentionPolicy

保存ポリシー。

いいえ

このパラメータを設定しない場合、各データベースのデフォルトの保存ポリシーautogenが使用されます。 保存ポリシーの詳細については、ユーザーアカウントとデータベースの管理をご参照ください。

ignoreErrorData

異常なデータを無視するかどうかを指定します。

いいえ

有効な値:

  • true: システムは異常なデータを無視します。

  • false: システムは異常なデータを無視しません。 これがデフォルト値です。

データ型のマッピング

TSDB for InfluxDBのデータ型

Flinkのデータ型

BOOLEAN

BOOLEAN

INT

INT

BIGINT

BIGINT

FLOAT

FLOAT

DECIMAL

DECIMAL

DOUBLE

DOUBLE

DATE

DATE

TIME

TIME

TIMESTAMP

TIMESTAMP

VARCHAR

VARCHAR

サンプルコード

CREATE TEMPORARY TABLE datahub_source(
 `metric` VARCHAR,
 `timestamp` BIGINT,
 `filedvalue` DOUBLE,
 `tagvalue` VARCHAR
) WITH (
  'connector' = 'datagen',
  'fields.metric.length' = '3',
  'fields.tagvalue.length' = '3',
  'fields.timestamp.min' = '1587539547000',
  'fields.timestamp.max' = '1619075547000',
  'fields.filedvalue.min' = '1',
  'fields.filedvalue.max' = '100000',
  'rows-per-second' = '50'
);

CREATE TEMPORARY TABLE influxdb_sink(
  `metric` VARCHAR,
  `timestamp` BIGINT,
  `field_fieldValue1` DOUBLE,
  `tag_value1` VARCHAR
) WITH (
  'connector' = 'influxdb',
  'url' = 'https://***********.influxdata.tsdb.aliyuncs.com:****',
  'database' = '<yourDatabaseName>',
  'username' = '<yourDatabaseUserName>',
  'password' = '<yourDatabasePassword>',
  'batchSize' ='100',
  'retentionPolicy' = 'autogen',
  'ignoreErrorData' = 'false'
);

INSERT INTO influxdb_sink
SELECT 
  `metric`,
  `timestamp`,
  `filedvalue`,
  `tagvalue`
FROM datahub_source;