全部产品
Search
文档中心

Realtime Compute for Apache Flink:Konektor TSDB untuk InfluxDB

更新时间:Jul 02, 2025

Topik ini menjelaskan cara menggunakan Time Series Database (TSDB) untuk konektor InfluxDB.

Informasi latar belakang

TSDB untuk InfluxDB adalah layanan database time series yang mampu memproses sejumlah besar permintaan tulis dan kueri. Layanan ini digunakan untuk menyimpan dan menganalisis data time series dalam jumlah besar secara real-time, termasuk data pemantauan DevOps, metrik aplikasi, dan data dari sensor IoT. Untuk informasi lebih lanjut tentang TSDB untuk InfluxDB, lihat TSDB untuk InfluxDB.

Tabel berikut menjelaskan kemampuan yang didukung oleh konektor TSDB untuk InfluxDB.

Item

Deskripsi

Jenis tabel

Tabel sink

Mode operasi

Mode streaming

Format data

Point

Metrik

  • numRecordsOut

  • numRecordsOutPerSecond

  • currentSendTime

Catatan

Untuk informasi lebih lanjut tentang data deret waktu, lihat Data Deret Waktu.

Jenis API

SQL API

Pembaruan atau penghapusan data dalam tabel sink

Tidak didukung

Prasyarat

Database TSDB untuk InfluxDB telah dibuat. Untuk informasi lebih lanjut, lihat Kelola akun pengguna dan database.

Batasan

Hanya Realtime Compute for Apache Flink yang menggunakan Ververica Runtime (VVR) versi 2.1.5 atau lebih baru yang mendukung konektor TSDB untuk InfluxDB.

Sintaks

CREATE TABLE stream_test_influxdb(
 `metric` VARCHAR,
 `timestamp` BIGINT,
 `tag_value1` VARCHAR,
 `field_fieldValue1` DOUBLE
) WITH (
  'connector' = 'influxdb',
  'url' = 'http://service.cn.influxdb.aliyuncs.com:****',
  'database' = '<yourDatabaseName>',
  'username' = '<yourDatabaseUserName>',
  'password' = '<yourDatabasePassword>',
  'batchSize' ='300',
  'retentionPolicy' = 'autogen',
  'ignoreErrorData' = 'false'
);

Format default untuk tabel yang dibuat:

  • Kolom 0: metric (VARCHAR). Kolom ini wajib.

  • Kolom 1: timestamp (BIGINT). Kolom ini wajib. Satuan: milidetik.

  • Kolom 2: tag_value1 (VARCHAR). Kolom ini wajib. Anda harus memasukkan setidaknya satu nilai di kolom ini.

  • Kolom 3: field_fieldValue1 (DOUBLE). Kolom ini wajib. Anda harus memasukkan setidaknya satu nilai di kolom ini.

    Untuk menentukan beberapa nilai field_fieldValue, gunakan format berikut:

    field_fieldValue1 <Tipe data>,
    field_fieldValue2 <Tipe data>,
    ...  
    field_fieldValueN <Tipe data>

    Contoh:

    field_fieldValue1 DOUBLE,
    field_fieldValue2 INTEGER,
    ...   
    field_fieldValueNINTEGER
Catatan

Tabel sink TSDB untuk InfluxDB hanya dapat berisi bidang berikut: metric, timestamp, tag_*, dan field_*.

Parameter dalam klausa WITH

Parameter

Deskripsi

Wajib

Catatan

connector

Jenis tabel sink.

Ya

Tetapkan nilainya menjadi influxdb.

url

URL database TSDB untuk InfluxDB.

Ya

URL database TSDB untuk InfluxDB adalah titik akhir virtual private cloud (VPC) dari database TSDB untuk InfluxDB. Sebagai contoh, Anda dapat mengatur parameter ini ke https://localhost:8086 atau http://localhost:3242.

HTTP dan HTTPS didukung.

database

Nama database TSDB untuk InfluxDB.

Ya

Contoh: db-flink.

username

Nama pengguna akun yang digunakan untuk mengakses database.

Ya

Anda harus memiliki izin tulis pada database TSDB untuk InfluxDB. Untuk informasi lebih lanjut tentang nama pengguna, lihat Kelola akun pengguna dan database.

password

Kata sandi yang digunakan untuk mengakses database.

Ya

Untuk informasi lebih lanjut tentang kata sandi, lihat Kelola akun pengguna dan database.

batchSize

Jumlah catatan data yang dikirimkan secara bersamaan.

Tidak

Secara default, 300 catatan dikirimkan secara bersamaan.

retentionPolicy

Kebijakan retensi.

Tidak

Jika Anda tidak mengonfigurasi parameter ini, kebijakan retensi autogen default untuk setiap database akan digunakan. Untuk informasi lebih lanjut tentang kebijakan retensi, lihat Kelola akun pengguna dan database.

ignoreErrorData

Menentukan apakah akan mengabaikan data abnormal.

Tidak

Nilai valid:

  • true: Sistem mengabaikan data abnormal.

  • false: Sistem tidak mengabaikan data abnormal. Ini adalah nilai default.

Pemetaan tipe data

Tipe data TSDB untuk InfluxDB

Tipe data Flink

BOOLEAN

BOOLEAN

INT

INT

BIGINT

BIGINT

FLOAT

FLOAT

DECIMAL

DECIMAL

DOUBLE

DOUBLE

DATE

DATE

TIME

TIME

TIMESTAMP

TIMESTAMP

VARCHAR

VARCHAR

Contoh kode

CREATE TEMPORARY TABLE datahub_source(
 `metric` VARCHAR,
 `timestamp` BIGINT,
 `filedvalue` DOUBLE,
 `tagvalue` VARCHAR
) WITH (
  'connector' = 'datagen',
  'fields.metric.length' = '3',
  'fields.tagvalue.length' = '3',
  'fields.timestamp.min' = '1587539547000',
  'fields.timestamp.max' = '1619075547000',
  'fields.filedvalue.min' = '1',
  'fields.filedvalue.max' = '100000',
  'rows-per-second' = '50'
);

CREATE TEMPORARY TABLE influxdb_sink(
  `metric` VARCHAR,
  `timestamp` BIGINT,
  `field_fieldValue1` DOUBLE,
  `tag_value1` VARCHAR
) WITH (
  'connector' = 'influxdb',
  'url' = 'https://***********.influxdata.tsdb.aliyuncs.com:****',
  'database' = '<yourDatabaseName>',
  'username' = '<yourDatabaseUserName>',
  'password' = '<yourDatabasePassword>',
  'batchSize' ='100',
  'retentionPolicy' = 'autogen',
  'ignoreErrorData' = 'false'
);

INSERT INTO influxdb_sink
SELECT 
  `metric`,
  `timestamp`,
  `filedvalue`,
  `tagvalue`
FROM datahub_source;