このトピックでは、Time Series Database (TSDB) for InfluxDBコネクタの使用方法について説明します。
背景情報
TSDB for InfluxDBは、大量の書き込みリクエストとクエリリクエストを処理できる時系列データベースサービスです。このサービスは、DevOps監視データ、アプリケーションメトリックデータ、IoTセンサーから収集されたデータなど、大量の時系列データをリアルタイムで保存および分析するために使用されます。 TSDB for InfluxDBの詳細については、TSDB for InfluxDBをご参照ください。
次の表に、TSDB for InfluxDBコネクタでサポートされている機能を示します。
項目 | 説明 |
テーブルタイプ | シンクテーブル |
実行モード | ストリーミングモード |
データ形式 | ポイント |
メトリック |
説明 メトリックの詳細については、メトリックをご参照ください。 |
APIタイプ | SQL API |
シンクテーブルでのデータの更新または削除 | サポートされていません |
前提条件
TSDB for InfluxDBデータベースが作成されていること。 詳細については、ユーザーアカウントとデータベースの管理をご参照ください。
制限事項
Ververica Runtime (VVR) 2.1.5以降を使用するRealtime Compute for Apache Flinkのみが、TSDB for InfluxDBコネクタをサポートしています。
構文
CREATE TABLE stream_test_influxdb(
`metric` VARCHAR,
`timestamp` BIGINT,
`tag_value1` VARCHAR,
`field_fieldValue1` DOUBLE
) WITH (
'connector' = 'influxdb',
'url' = 'http://service.cn.influxdb.aliyuncs.com:****',
'database' = '<yourDatabaseName>',
'username' = '<yourDatabaseUserName>',
'password' = '<yourDatabasePassword>',
'batchSize' ='300',
'retentionPolicy' = 'autogen',
'ignoreErrorData' = 'false'
);作成されたテーブルのデフォルト形式:
列 0: metric (VARCHAR)。 この列は必須です。
列 1: timestamp (BIGINT)。 この列は必須です。 単位: ミリ秒。
列 2: tag_value1 (VARCHAR)。 この列は必須です。 この列には少なくとも 1 つの値を入力する必要があります。
列 3: field_fieldValue1 (DOUBLE)。 この列は必須です。 この列には少なくとも 1 つの値を入力する必要があります。
複数のfield_fieldValue値を指定するには、次の形式を使用します。
field_fieldValue1 <Data type>, field_fieldValue2 <Data type>, ... field_fieldValueN <Data type>例:
field_fieldValue1 DOUBLE, field_fieldValue2 INTEGER, ... field_fieldValueNINTEGER
TSDB for InfluxDBシンクテーブルには、metric、timestamp、tag_*、および field_* のフィールドのみを含めることができます。
WITH句のパラメータ
パラメータ | 説明 | 必須 | 備考 |
connector | シンクテーブルのタイプ。 | はい | 値をinfluxdbに設定します。 |
url | TSDB for InfluxDBデータベースの URL。 | はい | TSDB for InfluxDBデータベースの URL は、TSDB for InfluxDBデータベースの仮想プライベートクラウド (VPC) エンドポイントです。 たとえば、このパラメータを https://localhost:8086 または http://localhost:3242 に設定できます。 HTTPとHTTPSがサポートされています。 |
database | TSDB for InfluxDBデータベースの名前。 | はい | 例: db-flink。 |
username | データベースへのアクセスに使用するアカウントのユーザー名。 | はい | TSDB for InfluxDBデータベースに対する書き込み権限が必要です。 ユーザー名の詳細については、ユーザーアカウントとデータベースの管理をご参照ください。 |
password | データベースへのアクセスに使用するパスワード。 | はい | パスワードの詳細については、ユーザーアカウントとデータベースの管理をご参照ください。 |
batchSize | 同時に送信されるデータレコードの数。 | いいえ | デフォルトでは、300 レコードが同時に送信されます。 |
retentionPolicy | 保存ポリシー。 | いいえ | このパラメータを設定しない場合、各データベースのデフォルトの保存ポリシーautogenが使用されます。 保存ポリシーの詳細については、ユーザーアカウントとデータベースの管理をご参照ください。 |
ignoreErrorData | 異常なデータを無視するかどうかを指定します。 | いいえ | 有効な値:
|
データ型のマッピング
TSDB for InfluxDBのデータ型 | Flinkのデータ型 |
BOOLEAN | BOOLEAN |
INT | INT |
BIGINT | BIGINT |
FLOAT | FLOAT |
DECIMAL | DECIMAL |
DOUBLE | DOUBLE |
DATE | DATE |
TIME | TIME |
TIMESTAMP | TIMESTAMP |
VARCHAR | VARCHAR |
サンプルコード
CREATE TEMPORARY TABLE datahub_source(
`metric` VARCHAR,
`timestamp` BIGINT,
`filedvalue` DOUBLE,
`tagvalue` VARCHAR
) WITH (
'connector' = 'datagen',
'fields.metric.length' = '3',
'fields.tagvalue.length' = '3',
'fields.timestamp.min' = '1587539547000',
'fields.timestamp.max' = '1619075547000',
'fields.filedvalue.min' = '1',
'fields.filedvalue.max' = '100000',
'rows-per-second' = '50'
);
CREATE TEMPORARY TABLE influxdb_sink(
`metric` VARCHAR,
`timestamp` BIGINT,
`field_fieldValue1` DOUBLE,
`tag_value1` VARCHAR
) WITH (
'connector' = 'influxdb',
'url' = 'https://***********.influxdata.tsdb.aliyuncs.com:****',
'database' = '<yourDatabaseName>',
'username' = '<yourDatabaseUserName>',
'password' = '<yourDatabasePassword>',
'batchSize' ='100',
'retentionPolicy' = 'autogen',
'ignoreErrorData' = 'false'
);
INSERT INTO influxdb_sink
SELECT
`metric`,
`timestamp`,
`filedvalue`,
`tagvalue`
FROM datahub_source;