このトピックでは、TSDB for InfluxDB コネクタの使用方法について説明します。
TSDB for InfluxDB コネクタは、将来のリリースで非推奨になります。非推奨になると、コンソールから削除され、機能更新やメンテナンスは行われなくなります。非推奨のタイムラインについては、「TSDB for InfluxDB コネクタのサポート終了に関するお知らせ」をご参照ください。本番ジョブの中断を避けるために、できるだけ早くワークロードを移行することを推奨します。
背景情報
TSDB for InfluxDB は、高い書き込みおよびクエリの負荷を処理するように設計された時系列データベースサービスです。これにより、DevOps モニタリング、アプリケーションメトリック、IoT センサーなどのソースから得られる大規模な時系列データを保存し、リアルタイム分析を実行できます。 TSDB for InfluxDB の詳細については、「InfluxDB の概要」をご参照ください。
次の表に、TSDB for InfluxDB コネクタでサポートされている機能を示します。
項目 | 説明 |
テーブルタイプ | 結果テーブル |
実行モード | ストリーミングモード |
データフォーマット | Point |
メトリック |
説明 これらのメトリックの詳細については、「モニタリングメトリック」をご参照ください。 |
API タイプ | SQL |
結果テーブルでのデータの更新または削除 | サポートされていません |
前提条件
TSDB for InfluxDB でデータベースを作成済みであること。詳細については、「ユーザーアカウントとデータベースの管理」をご参照ください。
制限事項
TSDB for InfluxDB コネクタは、Ververica Runtime (VVR) 2.1.5 以降を使用する Realtime Compute for Apache Flink のデプロイメントでのみサポートされます。
構文
CREATE TABLE stream_test_influxdb(
`metric` VARCHAR,
`timestamp` BIGINT,
`tag_value1` VARCHAR,
`field_fieldValue1` DOUBLE
) WITH (
'connector' = 'influxdb',
'url' = 'http://service.cn.influxdb.aliyuncs.com:****',
'database' = '<yourDatabaseName>',
'username' = '<yourDatabaseUserName>',
'password' = '<yourDatabasePassword>',
'batchSize' ='300',
'retentionPolicy' = 'autogen',
'ignoreErrorData' = 'false'
);結果テーブルのデフォルトスキーマは次のとおりです。
列 0: metric (VARCHAR)。この列は必須です。
列 1: timestamp (BIGINT)。この列は必須です。単位はミリ秒である必要があります。
列 2: tag_value1 (VARCHAR)。これはタグ列です。少なくとも 1 つのタグ列が必要です。
列 3: field_fieldValue1 (DOUBLE)。これはフィールド列です。少なくとも 1 つのフィールド列が必要です。
複数のフィールド列にデータを書き込むには、次のフォーマットを使用します。
field_fieldValue1 DataType, field_fieldValue2 DataType, ... field_fieldValueN DataType例:
field_fieldValue1 DOUBLE, field_fieldValue2 INTEGER, ... field_fieldValueN INTEGER
結果テーブルは、metric、timestamp、tag_*、および field_* のみをサポートします。
WITH パラメーター
パラメーター | 説明 | 必須 | 備考 |
connector | 結果テーブルのタイプ。 | はい | 値は |
url | TSDB for InfluxDB インスタンスのサービス URL。 | はい | InfluxDB では、URL は VPC エンドポイントです。例: https://localhost:8086 または http://localhost:3242。 HTTP と HTTPS の両方がサポートされています。 |
database | ご利用の TSDB for InfluxDB インスタンス内のデータベースの名前。 | はい | 例: |
username | データベースへのアクセスに使用するユーザー名。 | はい | ユーザーは、ターゲットデータベースに対する書き込み権限を持っている必要があります。詳細については、「ユーザーアカウントとデータベースの管理」をご参照ください。 |
password | 指定されたユーザーのパスワード。 | はい | 詳細については、「ユーザーアカウントとデータベースの管理」をご参照ください。 |
batchSize | 1 回のバッチで書き込むレコード数。 | いいえ | デフォルト値: 300。 |
retentionPolicy | 保持ポリシー。 | いいえ | このパラメーターが指定されていない場合、データベースのデフォルトの保持ポリシーである |
ignoreErrorData | 書き込みに失敗したデータを無視するかどうかを指定します。 | いいえ | 有効な値:
|
データ型のマッピング
InfluxDB 型 | Flink 型 |
BOOLEAN | BOOLEAN |
INT | INT |
BIGINT | BIGINT |
FLOAT | FLOAT |
DECIMAL | DECIMAL |
DOUBLE | DOUBLE |
DATE | DATE |
TIME | TIME |
TIMESTAMP | TIMESTAMP |
VARCHAR | VARCHAR |
例
CREATE TEMPORARY TABLE datahub_source(
`metric` VARCHAR,
`timestamp` BIGINT,
`fieldvalue` DOUBLE,
`tagvalue` VARCHAR
) WITH (
'connector' = 'datagen',
'fields.metric.length' = '3',
'fields.tagvalue.length' = '3',
'fields.timestamp.min' = '1587539547000',
'fields.timestamp.max' = '1619075547000',
'fields.fieldvalue.min' = '1',
'fields.fieldvalue.max' = '100000',
'rows-per-second' = '50'
);
CREATE TEMPORARY TABLE influxdb_sink(
`metric` VARCHAR,
`timestamp` BIGINT,
`field_fieldValue1` DOUBLE,
`tag_value1` VARCHAR
) WITH (
'connector' = 'influxdb',
'url' = 'https://***********.influxdata.tsdb.aliyuncs.com:****',
'database' = '<yourDatabaseName>',
'username' = '<yourDatabaseUserName>',
'password' = '<yourDatabasePassword>',
'batchSize' ='100',
'retentionPolicy' = 'autogen',
'ignoreErrorData' = 'false'
);
INSERT INTO influxdb_sink
SELECT
`metric`,
`timestamp`,
`fieldvalue`,
`tagvalue`
FROM datahub_source;