Topik ini menjelaskan cara menggunakan Time Series Database (TSDB) untuk konektor InfluxDB.
Informasi latar belakang
TSDB untuk InfluxDB adalah layanan database time series yang mampu memproses sejumlah besar permintaan tulis dan kueri. Layanan ini digunakan untuk menyimpan dan menganalisis data time series dalam jumlah besar secara real-time, termasuk data pemantauan DevOps, metrik aplikasi, dan data dari sensor IoT. Untuk informasi lebih lanjut tentang TSDB untuk InfluxDB, lihat TSDB untuk InfluxDB.
Tabel berikut menjelaskan kemampuan yang didukung oleh konektor TSDB untuk InfluxDB.
Item | Deskripsi |
Jenis tabel | Tabel sink |
Mode operasi | Mode streaming |
Format data | Point |
Metrik |
Catatan Untuk informasi lebih lanjut tentang data deret waktu, lihat Data Deret Waktu. |
Jenis API | SQL API |
Pembaruan atau penghapusan data dalam tabel sink | Tidak didukung |
Prasyarat
Database TSDB untuk InfluxDB telah dibuat. Untuk informasi lebih lanjut, lihat Kelola akun pengguna dan database.
Batasan
Hanya Realtime Compute for Apache Flink yang menggunakan Ververica Runtime (VVR) versi 2.1.5 atau lebih baru yang mendukung konektor TSDB untuk InfluxDB.
Sintaks
CREATE TABLE stream_test_influxdb(
`metric` VARCHAR,
`timestamp` BIGINT,
`tag_value1` VARCHAR,
`field_fieldValue1` DOUBLE
) WITH (
'connector' = 'influxdb',
'url' = 'http://service.cn.influxdb.aliyuncs.com:****',
'database' = '<yourDatabaseName>',
'username' = '<yourDatabaseUserName>',
'password' = '<yourDatabasePassword>',
'batchSize' ='300',
'retentionPolicy' = 'autogen',
'ignoreErrorData' = 'false'
);Format default untuk tabel yang dibuat:
Kolom 0: metric (VARCHAR). Kolom ini wajib.
Kolom 1: timestamp (BIGINT). Kolom ini wajib. Satuan: milidetik.
Kolom 2: tag_value1 (VARCHAR). Kolom ini wajib. Anda harus memasukkan setidaknya satu nilai di kolom ini.
Kolom 3: field_fieldValue1 (DOUBLE). Kolom ini wajib. Anda harus memasukkan setidaknya satu nilai di kolom ini.
Untuk menentukan beberapa nilai field_fieldValue, gunakan format berikut:
field_fieldValue1 <Tipe data>, field_fieldValue2 <Tipe data>, ... field_fieldValueN <Tipe data>Contoh:
field_fieldValue1 DOUBLE, field_fieldValue2 INTEGER, ... field_fieldValueNINTEGER
Tabel sink TSDB untuk InfluxDB hanya dapat berisi bidang berikut: metric, timestamp, tag_*, dan field_*.
Parameter dalam klausa WITH
Parameter | Deskripsi | Wajib | Catatan |
connector | Jenis tabel sink. | Ya | Tetapkan nilainya menjadi influxdb. |
url | URL database TSDB untuk InfluxDB. | Ya | URL database TSDB untuk InfluxDB adalah titik akhir virtual private cloud (VPC) dari database TSDB untuk InfluxDB. Sebagai contoh, Anda dapat mengatur parameter ini ke https://localhost:8086 atau http://localhost:3242. HTTP dan HTTPS didukung. |
database | Nama database TSDB untuk InfluxDB. | Ya | Contoh: db-flink. |
username | Nama pengguna akun yang digunakan untuk mengakses database. | Ya | Anda harus memiliki izin tulis pada database TSDB untuk InfluxDB. Untuk informasi lebih lanjut tentang nama pengguna, lihat Kelola akun pengguna dan database. |
password | Kata sandi yang digunakan untuk mengakses database. | Ya | Untuk informasi lebih lanjut tentang kata sandi, lihat Kelola akun pengguna dan database. |
batchSize | Jumlah catatan data yang dikirimkan secara bersamaan. | Tidak | Secara default, 300 catatan dikirimkan secara bersamaan. |
retentionPolicy | Kebijakan retensi. | Tidak | Jika Anda tidak mengonfigurasi parameter ini, kebijakan retensi autogen default untuk setiap database akan digunakan. Untuk informasi lebih lanjut tentang kebijakan retensi, lihat Kelola akun pengguna dan database. |
ignoreErrorData | Menentukan apakah akan mengabaikan data abnormal. | Tidak | Nilai valid:
|
Pemetaan tipe data
Tipe data TSDB untuk InfluxDB | Tipe data Flink |
BOOLEAN | BOOLEAN |
INT | INT |
BIGINT | BIGINT |
FLOAT | FLOAT |
DECIMAL | DECIMAL |
DOUBLE | DOUBLE |
DATE | DATE |
TIME | TIME |
TIMESTAMP | TIMESTAMP |
VARCHAR | VARCHAR |
Contoh kode
CREATE TEMPORARY TABLE datahub_source(
`metric` VARCHAR,
`timestamp` BIGINT,
`filedvalue` DOUBLE,
`tagvalue` VARCHAR
) WITH (
'connector' = 'datagen',
'fields.metric.length' = '3',
'fields.tagvalue.length' = '3',
'fields.timestamp.min' = '1587539547000',
'fields.timestamp.max' = '1619075547000',
'fields.filedvalue.min' = '1',
'fields.filedvalue.max' = '100000',
'rows-per-second' = '50'
);
CREATE TEMPORARY TABLE influxdb_sink(
`metric` VARCHAR,
`timestamp` BIGINT,
`field_fieldValue1` DOUBLE,
`tag_value1` VARCHAR
) WITH (
'connector' = 'influxdb',
'url' = 'https://***********.influxdata.tsdb.aliyuncs.com:****',
'database' = '<yourDatabaseName>',
'username' = '<yourDatabaseUserName>',
'password' = '<yourDatabasePassword>',
'batchSize' ='100',
'retentionPolicy' = 'autogen',
'ignoreErrorData' = 'false'
);
INSERT INTO influxdb_sink
SELECT
`metric`,
`timestamp`,
`filedvalue`,
`tagvalue`
FROM datahub_source;