すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:TSDB for InfluxDB® (非推奨)

最終更新日:May 22, 2026
重要

TSDB for InfluxDB コネクタはバージョン 11.7 で非推奨となります。非推奨後はコンソールから削除され、機能アップデートやメンテナンスも提供されなくなります。非推奨スケジュールについては、「TSDB for InfluxDB コネクタのサポート終了について」をご参照ください。本番ジョブが中断されないよう、ワークロードを速やかに移行してください。

TSDB for InfluxDB コネクタは、Flink SQL の結果テーブルからストリーミングデータを Ververica Runtime (VVR) の TSDB for InfluxDB インスタンスに書き込みます。TSDB for InfluxDB は高スループットな書き込みおよびクエリに最適化された時系列データベースであり、DevOps モニタリング、アプリケーションメトリック、IoT センサーデータなどの用途に広く利用されています。

コネクタの機能

項目
テーブルタイプ シンク
実行モード ストリーミング
データフォーマット Point
API タイプ SQL
結果テーブルでのデータ更新または削除 未対応
メトリック numRecordsOutnumRecordsOutPerSecondcurrentSendTime

詳細については、「モニタリングメトリック」をご参照ください。

前提条件

作業を開始する前に、以下の準備が完了していることを確認してください。

制限事項

TSDB for InfluxDB コネクタは、VVR 2.1.5 以降を使用する Realtime Compute for Apache Flink のデプロイメントでのみサポートされます。

結果テーブルの作成

最小限の DDL

次の例は、結果テーブルを定義するために必要な最小限のカラムを示しています。

CREATE TABLE influxdb_sink (
  `metric`            VARCHAR,
  `timestamp`         BIGINT,
  `tag_value1`        VARCHAR,
  `field_fieldValue1` DOUBLE
) WITH (
  'connector' = 'influxdb',
  'url'       = 'http://service.cn.influxdb.aliyuncs.com:****',
  'database'  = '<yourDatabaseName>',
  'username'  = '<yourDatabaseUserName>',
  'password'  = '<yourDatabasePassword>'
);

スキーマカラムの命名規則

結果テーブルのカラムは、InfluxDB データモデルに対応する固定の命名規則に従う必要があります。また、カラムの順序も固定されています。

位置 カラム名 必須 対応先
0 metric VARCHAR はい InfluxDB の measurement 名
1 timestamp BIGINT はい InfluxDB のタイムスタンプ(単位はミリ秒)
2+ tag_<name> VARCHAR 少なくとも 1 つ InfluxDB のタグ(インデックス付きメタデータ)
3+ field_<name> サポートされる任意の型 少なくとも 1 つ InfluxDB のフィールド(データ値)

複数のフィールドカラムに書き込むには、次のパターンで定義します。

`field_fieldValue1` DOUBLE,
`field_fieldValue2` INTEGER,
`field_fieldValueN` INTEGER
説明

metrictimestamptag_*、および field_* のカラム名のみがサポートされます。その他のカラム名を指定するとエラーが発生します。

コネクタオプション

パラメーター 必須 デフォルト タイプ 説明
connector はい String influxdb である必要があります。
url はい String TSDB for InfluxDB インスタンスの VPC エンドポイントです。HTTP および HTTPS の両方がサポートされています。例:https://localhost:8086 または http://localhost:3242
database はい String データベース名です。例:db-flink
username はい String データベースのユーザー名です。このユーザーには、対象データベースへの書き込み権限が必要です。詳細については、「Manage user accounts and databasesManage user accounts and databasesManage user accounts and databasesManage user accounts and databases」をご参照ください。
password はい String 指定されたユーザーのパスワードです。詳細については、「Manage user accounts and databasesManage user accounts and databasesManage user accounts and databasesManage user accounts and databases」をご参照ください。
batchSize いいえ 300 Integer 1 回のバッチで書き込むレコード数です。
retentionPolicy いいえ autogen String 対象データベースの保持ポリシーです。指定しない場合、データベースのデフォルト保持ポリシー (autogen) が使用されます。詳細については、「Manage user accounts and databasesManage user accounts and databasesManage user accounts and databasesManage user accounts and databases」をご参照ください。
ignoreErrorData いいえ false Boolean 書き込みエラー時の処理方法です。true:書き込みエラーを無視して処理を継続します。false:書き込みエラーが発生した場合、ジョブを失敗とします。

データ型のマッピング

InfluxDB 型 Flink 型
BOOLEAN BOOLEAN
INT INT
BIGINT BIGINT
FLOAT FLOAT
DECIMAL DECIMAL
DOUBLE DOUBLE
DATE DATE
TIME TIME
TIMESTAMP TIMESTAMP
VARCHAR VARCHAR

サンプル

次の例では、datagen コネクタを使用してランダムデータを生成し、それを TSDB for InfluxDB に書き込みます。

CREATE TEMPORARY TABLE datagen_source (
  `metric`     VARCHAR,
  `timestamp`  BIGINT,
  `fieldvalue` DOUBLE,
  `tagvalue`   VARCHAR
) WITH (
  'connector'                  = 'datagen',
  'fields.metric.length'       = '3',
  'fields.tagvalue.length'     = '3',
  'fields.timestamp.min'       = '1587539547000',
  'fields.timestamp.max'       = '1619075547000',
  'fields.fieldvalue.min'      = '1',
  'fields.fieldvalue.max'      = '100000',
  'rows-per-second'            = '50'
);

CREATE TEMPORARY TABLE influxdb_sink (
  `metric`            VARCHAR,
  `timestamp`         BIGINT,
  `field_fieldValue1` DOUBLE,
  `tag_value1`        VARCHAR
) WITH (
  'connector'        = 'influxdb',
  'url'              = 'https://***********.influxdata.tsdb.aliyuncs.com:****',
  'database'         = '<yourDatabaseName>',
  'username'         = '<yourDatabaseUserName>',
  'password'         = '<yourDatabasePassword>',
  'batchSize'        = '100',
  'retentionPolicy'  = 'autogen',
  'ignoreErrorData'  = 'false'
);

INSERT INTO influxdb_sink
SELECT
  `metric`,
  `timestamp`,
  `fieldvalue`,
  `tagvalue`
FROM datagen_source;

次のステップ