TSDB for InfluxDB コネクタはバージョン 11.7 で非推奨となります。非推奨後はコンソールから削除され、機能アップデートやメンテナンスも提供されなくなります。非推奨スケジュールについては、「TSDB for InfluxDB コネクタのサポート終了について」をご参照ください。本番ジョブが中断されないよう、ワークロードを速やかに移行してください。
TSDB for InfluxDB コネクタは、Flink SQL の結果テーブルからストリーミングデータを Ververica Runtime (VVR) の TSDB for InfluxDB インスタンスに書き込みます。TSDB for InfluxDB は高スループットな書き込みおよびクエリに最適化された時系列データベースであり、DevOps モニタリング、アプリケーションメトリック、IoT センサーデータなどの用途に広く利用されています。
コネクタの機能
| 項目 | 値 |
|---|---|
| テーブルタイプ | シンク |
| 実行モード | ストリーミング |
| データフォーマット | Point |
| API タイプ | SQL |
| 結果テーブルでのデータ更新または削除 | 未対応 |
| メトリック | numRecordsOut、numRecordsOutPerSecond、currentSendTime |
詳細については、「モニタリングメトリック」をご参照ください。
前提条件
作業を開始する前に、以下の準備が完了していることを確認してください。
-
TSDB for InfluxDB で作成されたデータベースです。「ユーザーアカウントとデータベースの管理ユーザーアカウントとデータベースの管理ユーザーアカウントとデータベースの管理ユーザーアカウントとデータベースの管理」をご参照ください。
制限事項
TSDB for InfluxDB コネクタは、VVR 2.1.5 以降を使用する Realtime Compute for Apache Flink のデプロイメントでのみサポートされます。
結果テーブルの作成
最小限の DDL
次の例は、結果テーブルを定義するために必要な最小限のカラムを示しています。
CREATE TABLE influxdb_sink (
`metric` VARCHAR,
`timestamp` BIGINT,
`tag_value1` VARCHAR,
`field_fieldValue1` DOUBLE
) WITH (
'connector' = 'influxdb',
'url' = 'http://service.cn.influxdb.aliyuncs.com:****',
'database' = '<yourDatabaseName>',
'username' = '<yourDatabaseUserName>',
'password' = '<yourDatabasePassword>'
);
スキーマカラムの命名規則
結果テーブルのカラムは、InfluxDB データモデルに対応する固定の命名規則に従う必要があります。また、カラムの順序も固定されています。
| 位置 | カラム名 | 型 | 必須 | 対応先 |
|---|---|---|---|---|
| 0 | metric |
VARCHAR | はい | InfluxDB の measurement 名 |
| 1 | timestamp |
BIGINT | はい | InfluxDB のタイムスタンプ(単位はミリ秒) |
| 2+ | tag_<name> |
VARCHAR | 少なくとも 1 つ | InfluxDB のタグ(インデックス付きメタデータ) |
| 3+ | field_<name> |
サポートされる任意の型 | 少なくとも 1 つ | InfluxDB のフィールド(データ値) |
複数のフィールドカラムに書き込むには、次のパターンで定義します。
`field_fieldValue1` DOUBLE,
`field_fieldValue2` INTEGER,
`field_fieldValueN` INTEGER
metric、timestamp、tag_*、および field_* のカラム名のみがサポートされます。その他のカラム名を指定するとエラーが発生します。
コネクタオプション
| パラメーター | 必須 | デフォルト | タイプ | 説明 |
|---|---|---|---|---|
connector |
はい | — | String | influxdb である必要があります。 |
url |
はい | — | String | TSDB for InfluxDB インスタンスの VPC エンドポイントです。HTTP および HTTPS の両方がサポートされています。例:https://localhost:8086 または http://localhost:3242。 |
database |
はい | — | String | データベース名です。例:db-flink。 |
username |
はい | — | String | データベースのユーザー名です。このユーザーには、対象データベースへの書き込み権限が必要です。詳細については、「Manage user accounts and databasesManage user accounts and databasesManage user accounts and databasesManage user accounts and databases」をご参照ください。 |
password |
はい | — | String | 指定されたユーザーのパスワードです。詳細については、「Manage user accounts and databasesManage user accounts and databasesManage user accounts and databasesManage user accounts and databases」をご参照ください。 |
batchSize |
いいえ | 300 |
Integer | 1 回のバッチで書き込むレコード数です。 |
retentionPolicy |
いいえ | autogen |
String | 対象データベースの保持ポリシーです。指定しない場合、データベースのデフォルト保持ポリシー (autogen) が使用されます。詳細については、「Manage user accounts and databasesManage user accounts and databasesManage user accounts and databasesManage user accounts and databases」をご参照ください。 |
ignoreErrorData |
いいえ | false |
Boolean | 書き込みエラー時の処理方法です。true:書き込みエラーを無視して処理を継続します。false:書き込みエラーが発生した場合、ジョブを失敗とします。 |
データ型のマッピング
| InfluxDB 型 | Flink 型 |
|---|---|
| BOOLEAN | BOOLEAN |
| INT | INT |
| BIGINT | BIGINT |
| FLOAT | FLOAT |
| DECIMAL | DECIMAL |
| DOUBLE | DOUBLE |
| DATE | DATE |
| TIME | TIME |
| TIMESTAMP | TIMESTAMP |
| VARCHAR | VARCHAR |
サンプル
次の例では、datagen コネクタを使用してランダムデータを生成し、それを TSDB for InfluxDB に書き込みます。
CREATE TEMPORARY TABLE datagen_source (
`metric` VARCHAR,
`timestamp` BIGINT,
`fieldvalue` DOUBLE,
`tagvalue` VARCHAR
) WITH (
'connector' = 'datagen',
'fields.metric.length' = '3',
'fields.tagvalue.length' = '3',
'fields.timestamp.min' = '1587539547000',
'fields.timestamp.max' = '1619075547000',
'fields.fieldvalue.min' = '1',
'fields.fieldvalue.max' = '100000',
'rows-per-second' = '50'
);
CREATE TEMPORARY TABLE influxdb_sink (
`metric` VARCHAR,
`timestamp` BIGINT,
`field_fieldValue1` DOUBLE,
`tag_value1` VARCHAR
) WITH (
'connector' = 'influxdb',
'url' = 'https://***********.influxdata.tsdb.aliyuncs.com:****',
'database' = '<yourDatabaseName>',
'username' = '<yourDatabaseUserName>',
'password' = '<yourDatabasePassword>',
'batchSize' = '100',
'retentionPolicy' = 'autogen',
'ignoreErrorData' = 'false'
);
INSERT INTO influxdb_sink
SELECT
`metric`,
`timestamp`,
`fieldvalue`,
`tagvalue`
FROM datagen_source;