全部产品
Search
文档中心

Lindorm:Gunakan Flink untuk Menulis Data ke LindormTSDB

更新时间:Jun 24, 2025

Anda dapat menggunakan Flink untuk memproses aliran data real-time dan menulis hasilnya ke LindormTSDB guna pemantauan data secara real-time. Topik ini menjelaskan cara menulis hasil pemrosesan data real-time di Flink ke LindormTSDB.

Prasyarat

  • Anda telah mengaktifkan Realtime Compute for Apache Flink atau membuat layanan Flink yang dikelola sendiri. Untuk informasi lebih lanjut tentang cara mengaktifkan Realtime Compute for Apache Flink, lihat Aktifkan Realtime Compute for Apache Flink.

    null

    Versi Ververica Runtime (VVR) yang digunakan oleh Realtime Compute for Apache Flink harus 4.0.13 atau lebih baru. VVR 4.0.13 dikembangkan berdasarkan Apache Flink V1.13.

  • Instansi Lindorm dan ruang kerja Realtime Compute for Apache Flink berada dalam VPC yang sama untuk memastikan konektivitas jaringan.

    null

    Realtime Compute for Apache Flink tidak dapat diakses melalui Internet secara default. Jika Anda ingin menulis data dari Realtime Compute for Apache Flink ke LindormTSDB melalui Internet, lihat Bagaimana Realtime Compute for Apache Flink mengakses Internet?

  • LindormTSDB diaktifkan untuk instansi Lindorm Anda.

  • Versi LindormTSDB adalah 3.4.7 atau lebih baru. Untuk informasi lebih lanjut tentang cara melihat atau meningkatkan versi LindormTSDB, lihat Catatan rilis LindormTSDB dan Tingkatkan versi mesin minor dari instansi Lindorm.

  • Alamat IP atau blok CIDR Flink ditambahkan ke daftar putih instance Lindorm. Untuk informasi lebih lanjut tentang cara memperoleh blok CIDR dari vSwitch Realtime Compute for Apache Flink, lihat Bagaimana cara mengonfigurasi daftar putih? Untuk informasi lebih lanjut tentang cara menambahkan alamat IP atau blok CIDR ke daftar putih instance Lindorm, lihat Konfigurasikan daftar putih.

Informasi latar belakang

Konektor sink LindormTSDB digunakan untuk menerima data dari berbagai sumber data dan menulis data tersebut ke LindormTSDB. Realtime Compute for Apache Flink menggunakan Flink SQL untuk mendefinisikan tabel sumber, tabel dimensi, dan tabel hasil. Anda dapat mengonfigurasi parameter untuk konektor sink LindormTSDB untuk memetakan tabel hasil ke tabel di LindormTSDB. Dengan cara ini, hasil pemrosesan data Flink ditulis ke tabel tertentu di LindormTSDB. Untuk menggunakan konektor sink LindormTSDB, Anda perlu mendapatkan paket JAR konektor sink LindormTSDB dan kemudian mengunggah paket JAR tersebut ke konsol Realtime Compute for Apache Flink. Untuk informasi lebih lanjut, lihat Kembangkan draf JAR.

Sintaks

Buat tabel hasil di Realtime Compute for Apache Flink. Kemudian, konfigurasikan parameter konektor sink LindormTSDB untuk memetakan tabel hasil ke tabel seri waktu di LindormTSDB.

CREATE TEMPORARY TABLE tsdb_sink(
  `timestamp` BIGINT,
  tag_<tagname> VARCHAR,
  field_<fieldname1> DOUBLE,
  field_<fieldname2> VARCHAR,
  field_<fieldname3> BIGINT,
  field_<fieldname4> BOOLEAN
  -- table VARCHAR (opsional)
)
WITH (
    'connector' = 'lindormtsdb',
    'url'='<lindormTSDBHttpUrl>',
    'table'='<yourTableName>',
    'defaultDatabase'='<yourDatabaseName>',
    'schemaPolicy'='<schemaPolicy>',
    'sink.parallelism'='<sinkParallelism>'
    'ignoreErrorData'='<ignoreErrorData>',
    'maxRetries'='<maxRetries>',
    'batchSize'='<batchSize>',
    'connectTimeoutMs'='<connectTimeoutMs>',
    'sync'='<sync>',
    'debug'='<debug>'
);

Parameter

Parameter dalam tabel hasil

Parameter

Tipe data

Diperlukan

Deskripsi

timestamp

BIGINT

Ya

Nama parameter harus timestamp. Tipe data bidang ini harus BIGINT.

Satuan: milidetik.

null
  • Kurung nama parameter timestamp dengan tanda backtick ('') karena ini adalah kata cadangan.

  • Nilai parameter adalah timestamp 13 digit. Jika Anda menentukan timestamp 10 digit untuk parameter ini, maka akan dikonversi menjadi timestamp 13 digit saat data ditulis ke LindormTSDB.

tag_tagname

VARCHAR

Ya

Tag dari data seri waktu. tag_ adalah awalan tag dan tidak dapat dihilangkan atau diubah. tagname menunjukkan nama tag.

Contoh: tag_deviceid.

null

Parameter ini dapat menentukan satu kolom atau beberapa kolom.

field_fieldname

DOUBLE, VARCHAR, BIGINT, dan BOOLEAN

Ya

Bidang dari data seri waktu. field_ adalah awalan bidang dan tidak dapat dihilangkan atau diubah. fieldname menunjukkan nama bidang.

Contoh: field_humidity.

null

Parameter ini dapat menentukan satu kolom atau beberapa kolom.

table

VARCHAR

Tidak

Tabel seri waktu tempat Anda ingin menulis data.

  • Jika Anda ingin menulis data ke tabel seri waktu tunggal, kami sarankan Anda mengonfigurasi tabel dalam parameter klausa WITH.

  • Jika Anda ingin menulis data ke beberapa tabel seri waktu, kami sarankan Anda mengonfigurasi tabel dalam bidang tabel di skema tabel.

Parameter dalam klausa WITH

Parameter

Diperlukan

Deskripsi

connector

Ya

Atur parameter ini ke lindormtsdb, yang menunjukkan konektor sink LindormTSDB.

url

Ya

Titik akhir LindormTSDB untuk HTTP. Untuk informasi lebih lanjut tentang cara mendapatkan titik akhir, lihat Lihat titik akhir.

table

Tidak

Tabel seri waktu tempat Anda ingin menulis data.

  • Jika Anda ingin menulis data ke tabel seri waktu tunggal, kami sarankan Anda mengonfigurasi tabel dalam parameter klausa WITH.

  • Jika Anda ingin menulis data ke beberapa tabel seri waktu, kami sarankan Anda mengonfigurasi tabel dalam bidang tabel di skema tabel.

username

Ya (dalam skenario tertentu)

Nama pengguna dan kata sandi yang digunakan untuk terhubung ke LindormTSDB.

Jika fitur autentikasi pengguna dan verifikasi izin diaktifkan, Anda harus menentukan nama pengguna dan kata sandi. Jika tidak, kedua parameter ini tidak diperlukan.

null

Secara default, fitur autentikasi pengguna dan verifikasi izin tidak diaktifkan. Kami sarankan Anda mengaktifkan fitur autentikasi pengguna dan verifikasi izin untuk LindormTSDB untuk memastikan keamanan data.

password

Ya (dalam skenario tertentu)

defaultDatabase

Tidak

Database tempat Anda ingin menulis data. Nilai default: default.

schemaPolicy

Tidak

Kebijakan batasan untuk skema.

  • strong (default): Tentukan kebijakan batasan kuat untuk skema. LindormTSDB memverifikasi secara ketat nama tabel tempat data ditulis, nama bidang data yang ditulis, dan jenis data yang ditulis berdasarkan skema tabel yang telah ditentukan sebelumnya. Jika Anda mengatur parameter ini ke strong, Anda harus membuat tabel tempat Anda ingin menulis data terlebih dahulu. Jika tidak, data gagal ditulis ke tabel.

  • weak: Tentukan kebijakan batasan lemah untuk skema. Jika Anda mengatur parameter ini ke weak, ketika tabel tempat data ditulis tidak ada, LindormTSDB tidak melaporkan kesalahan tetapi secara otomatis membuat tabel.

  • none: Jangan tentukan kebijakan batasan untuk skema. Jika Anda mengatur parameter ini ke none, ketika tabel tempat data ditulis tidak ada, LindormTSDB tidak melaporkan kesalahan dan tidak membuat tabel. Jika Anda tidak membuat tabel secara manual, Anda masih dapat menulis data. Namun, data yang ditulis tidak dapat diquery menggunakan Pernyataan SQL.

null

Untuk informasi lebih lanjut, lihat Kebijakan batasan untuk skema.

sink.parallelism

Tidak

Jumlah utas konkuren yang dapat digunakan untuk menulis data secara paralel. Anda dapat meningkatkan nilai parameter ini ketika Anda perlu menulis sejumlah besar data. Nilai default: 1.

ignoreErrorData

Tidak

Menentukan apakah akan mengabaikan kesalahan penulisan. Nilai default: false. Nilai valid:

  • false (default): tidak mengabaikan kesalahan penulisan. Operasi penulisan dibatalkan jika terjadi kesalahan.

  • true: mengabaikan kesalahan penulisan. Operasi penulisan terus berlanjut meskipun terjadi kesalahan.

maxRetries

Tidak

Jumlah maksimum kali mencoba mengirim ulang permintaan tulis yang gagal karena kesalahan server internal atau kesalahan jaringan. Nilai default: 3.

batchSize

Tidak

Jumlah titik data yang dapat ditulis ke database oleh operasi tulis tunggal. Nilai default: 500.

connectTimeoutMs

Tidak

Periode timeout koneksi HTTP. Nilai default: 90000. Satuan: milidetik.

debug

Tidak

Menentukan apakah akan mengaktifkan mode debug untuk menampilkan log titik data yang ditulis ke LindormTSDB.

  • false (default): tidak mengaktifkan mode debug.

  • true: mengaktifkan mode debug.

sync

Tidak

Menentukan apakah akan menulis data secara sinkron. Nilai default: false. Kami sarankan Anda mempertahankan nilai default untuk parameter ini.

  • false: menulis data secara asinkron. Data dapat ditulis lebih efisien jika Anda menggunakan metode ini.

  • true: menulis data secara sinkron.

Contoh

Kode berikut memberikan contoh tentang cara menulis data yang dihasilkan oleh generator data acak datagen_source ke tabel seri waktu Lindorm bernama mytable:

CREATE TEMPORARY TABLE datagen_source (
  id INTEGER,
  score DOUBLE,
  name STRING
)
WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE tsdb_sink(
  tag_tagk VARCHAR,
  field_score DOUBLE,
  field_name STRING,
  `timestamp` BIGINT
)
WITH (
    'connector' = 'lindormtsdb',
    'url'='http://ld-bp159jt4eivt3****-proxy-tsdb.lindorm.rds.aliyuncs.com:8242',
    'table'= 'mytable',
    'schemaPolicy'='weak'
);

INSERT INTO tsdb_sink
SELECT
  CAST(id as STRING) as tag_tagk,
  score as field_score,
  name as field_name,
  UNIX_TIMESTAMP(now()) * 1000  as `timestamp`
FROM datagen_source;