すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:TSDB for InfluxDB® (非推奨)

最終更新日:Mar 25, 2026

このトピックでは、TSDB for InfluxDB コネクタの使用方法について説明します。

重要

TSDB for InfluxDB コネクタは、将来のリリースで非推奨になります。非推奨になると、コンソールから削除され、機能更新やメンテナンスは行われなくなります。非推奨のタイムラインについては、「TSDB for InfluxDB コネクタのサポート終了に関するお知らせ」をご参照ください。本番ジョブの中断を避けるために、できるだけ早くワークロードを移行することを推奨します。

背景情報

TSDB for InfluxDB は、高い書き込みおよびクエリの負荷を処理するように設計された時系列データベースサービスです。これにより、DevOps モニタリング、アプリケーションメトリック、IoT センサーなどのソースから得られる大規模な時系列データを保存し、リアルタイム分析を実行できます。 TSDB for InfluxDB の詳細については、「InfluxDB の概要」をご参照ください。

次の表に、TSDB for InfluxDB コネクタでサポートされている機能を示します。

項目

説明

テーブルタイプ

結果テーブル

実行モード

ストリーミングモード

データフォーマット

Point

メトリック

  • numRecordsOut

  • numRecordsOutPerSecond

  • currentSendTime

説明

これらのメトリックの詳細については、「モニタリングメトリック」をご参照ください。

API タイプ

SQL

結果テーブルでのデータの更新または削除

サポートされていません

前提条件

TSDB for InfluxDB でデータベースを作成済みであること。詳細については、「ユーザーアカウントとデータベースの管理」をご参照ください。

制限事項

TSDB for InfluxDB コネクタは、Ververica Runtime (VVR) 2.1.5 以降を使用する Realtime Compute for Apache Flink のデプロイメントでのみサポートされます。

構文

CREATE TABLE stream_test_influxdb(
 `metric` VARCHAR,
 `timestamp` BIGINT,
 `tag_value1` VARCHAR,
 `field_fieldValue1` DOUBLE
) WITH (
  'connector' = 'influxdb',
  'url' = 'http://service.cn.influxdb.aliyuncs.com:****',
  'database' = '<yourDatabaseName>',
  'username' = '<yourDatabaseUserName>',
  'password' = '<yourDatabasePassword>',
  'batchSize' ='300',
  'retentionPolicy' = 'autogen',
  'ignoreErrorData' = 'false'
);

結果テーブルのデフォルトスキーマは次のとおりです。

  • 列 0: metric (VARCHAR)。この列は必須です。

  • 列 1: timestamp (BIGINT)。この列は必須です。単位はミリ秒である必要があります。

  • 列 2: tag_value1 (VARCHAR)。これはタグ列です。少なくとも 1 つのタグ列が必要です。

  • 列 3: field_fieldValue1 (DOUBLE)。これはフィールド列です。少なくとも 1 つのフィールド列が必要です。

    複数のフィールド列にデータを書き込むには、次のフォーマットを使用します。

    field_fieldValue1 DataType,
    field_fieldValue2 DataType,
    ...  
    field_fieldValueN DataType

    例:

    field_fieldValue1 DOUBLE,
    field_fieldValue2 INTEGER,
    ...   
    field_fieldValueN INTEGER
説明

結果テーブルは、metrictimestamptag_*、および field_* のみをサポートします。

WITH パラメーター

パラメーター

説明

必須

備考

connector

結果テーブルのタイプ。

はい

値は influxdb である必要があります。

url

TSDB for InfluxDB インスタンスのサービス URL。

はい

InfluxDB では、URL は VPC エンドポイントです。例: https://localhost:8086 または http://localhost:3242

HTTP と HTTPS の両方がサポートされています。

database

ご利用の TSDB for InfluxDB インスタンス内のデータベースの名前。

はい

例: db-flink

username

データベースへのアクセスに使用するユーザー名。

はい

ユーザーは、ターゲットデータベースに対する書き込み権限を持っている必要があります。詳細については、「ユーザーアカウントとデータベースの管理」をご参照ください。

password

指定されたユーザーのパスワード。

はい

詳細については、「ユーザーアカウントとデータベースの管理」をご参照ください。

batchSize

1 回のバッチで書き込むレコード数。

いいえ

デフォルト値: 300。

retentionPolicy

保持ポリシー。

いいえ

このパラメーターが指定されていない場合、データベースのデフォルトの保持ポリシーである autogen が使用されます。保持ポリシーの詳細については、「ユーザーアカウントとデータベースの管理」をご参照ください。

ignoreErrorData

書き込みに失敗したデータを無視するかどうかを指定します。

いいえ

有効な値:

  • true: 書き込みエラーを無視します。

  • false (デフォルト): 書き込みエラーを無視しません。書き込みエラーが発生した場合、ジョブは失敗します。

データ型のマッピング

InfluxDB 型

Flink 型

BOOLEAN

BOOLEAN

INT

INT

BIGINT

BIGINT

FLOAT

FLOAT

DECIMAL

DECIMAL

DOUBLE

DOUBLE

DATE

DATE

TIME

TIME

TIMESTAMP

TIMESTAMP

VARCHAR

VARCHAR

CREATE TEMPORARY TABLE datahub_source(
 `metric` VARCHAR,
 `timestamp` BIGINT,
 `fieldvalue` DOUBLE,
 `tagvalue` VARCHAR
) WITH (
  'connector' = 'datagen',
  'fields.metric.length' = '3',
  'fields.tagvalue.length' = '3',
  'fields.timestamp.min' = '1587539547000',
  'fields.timestamp.max' = '1619075547000',
  'fields.fieldvalue.min' = '1',
  'fields.fieldvalue.max' = '100000',
  'rows-per-second' = '50'
);

CREATE TEMPORARY TABLE influxdb_sink(
  `metric` VARCHAR,
  `timestamp` BIGINT,
  `field_fieldValue1` DOUBLE,
  `tag_value1` VARCHAR
) WITH (
  'connector' = 'influxdb',
  'url' = 'https://***********.influxdata.tsdb.aliyuncs.com:****',
  'database' = '<yourDatabaseName>',
  'username' = '<yourDatabaseUserName>',
  'password' = '<yourDatabasePassword>',
  'batchSize' ='100',
  'retentionPolicy' = 'autogen',
  'ignoreErrorData' = 'false'
);

INSERT INTO influxdb_sink
SELECT 
  `metric`,
  `timestamp`,
  `fieldvalue`,
  `tagvalue`
FROM datahub_source;