Anda dapat menggunakan Flink untuk memproses aliran data real-time dan menulis hasilnya ke LindormTSDB guna pemantauan data secara real-time. Topik ini menjelaskan cara menulis hasil pemrosesan data real-time di Flink ke LindormTSDB.
Prasyarat
Anda telah mengaktifkan Realtime Compute for Apache Flink atau membuat layanan Flink yang dikelola sendiri. Untuk informasi lebih lanjut tentang cara mengaktifkan Realtime Compute for Apache Flink, lihat Aktifkan Realtime Compute for Apache Flink.
nullVersi Ververica Runtime (VVR) yang digunakan oleh Realtime Compute for Apache Flink harus 4.0.13 atau lebih baru. VVR 4.0.13 dikembangkan berdasarkan Apache Flink V1.13.
Instansi Lindorm dan ruang kerja Realtime Compute for Apache Flink berada dalam VPC yang sama untuk memastikan konektivitas jaringan.
nullRealtime Compute for Apache Flink tidak dapat diakses melalui Internet secara default. Jika Anda ingin menulis data dari Realtime Compute for Apache Flink ke LindormTSDB melalui Internet, lihat Bagaimana Realtime Compute for Apache Flink mengakses Internet?
LindormTSDB diaktifkan untuk instansi Lindorm Anda.
Versi LindormTSDB adalah 3.4.7 atau lebih baru. Untuk informasi lebih lanjut tentang cara melihat atau meningkatkan versi LindormTSDB, lihat Catatan rilis LindormTSDB dan Tingkatkan versi mesin minor dari instansi Lindorm.
Alamat IP atau blok CIDR Flink ditambahkan ke daftar putih instance Lindorm. Untuk informasi lebih lanjut tentang cara memperoleh blok CIDR dari vSwitch Realtime Compute for Apache Flink, lihat Bagaimana cara mengonfigurasi daftar putih? Untuk informasi lebih lanjut tentang cara menambahkan alamat IP atau blok CIDR ke daftar putih instance Lindorm, lihat Konfigurasikan daftar putih.
Informasi latar belakang
Konektor sink LindormTSDB digunakan untuk menerima data dari berbagai sumber data dan menulis data tersebut ke LindormTSDB. Realtime Compute for Apache Flink menggunakan Flink SQL untuk mendefinisikan tabel sumber, tabel dimensi, dan tabel hasil. Anda dapat mengonfigurasi parameter untuk konektor sink LindormTSDB untuk memetakan tabel hasil ke tabel di LindormTSDB. Dengan cara ini, hasil pemrosesan data Flink ditulis ke tabel tertentu di LindormTSDB. Untuk menggunakan konektor sink LindormTSDB, Anda perlu mendapatkan paket JAR konektor sink LindormTSDB dan kemudian mengunggah paket JAR tersebut ke konsol Realtime Compute for Apache Flink. Untuk informasi lebih lanjut, lihat Kembangkan draf JAR.
Sintaks
Buat tabel hasil di Realtime Compute for Apache Flink. Kemudian, konfigurasikan parameter konektor sink LindormTSDB untuk memetakan tabel hasil ke tabel seri waktu di LindormTSDB.
CREATE TEMPORARY TABLE tsdb_sink(
`timestamp` BIGINT,
tag_<tagname> VARCHAR,
field_<fieldname1> DOUBLE,
field_<fieldname2> VARCHAR,
field_<fieldname3> BIGINT,
field_<fieldname4> BOOLEAN
-- table VARCHAR (opsional)
)
WITH (
'connector' = 'lindormtsdb',
'url'='<lindormTSDBHttpUrl>',
'table'='<yourTableName>',
'defaultDatabase'='<yourDatabaseName>',
'schemaPolicy'='<schemaPolicy>',
'sink.parallelism'='<sinkParallelism>'
'ignoreErrorData'='<ignoreErrorData>',
'maxRetries'='<maxRetries>',
'batchSize'='<batchSize>',
'connectTimeoutMs'='<connectTimeoutMs>',
'sync'='<sync>',
'debug'='<debug>'
);Parameter
Parameter dalam tabel hasil
Parameter | Tipe data | Diperlukan | Deskripsi |
timestamp | BIGINT | Ya | Nama parameter harus Satuan: milidetik. null
|
tag_tagname | VARCHAR | Ya | Tag dari data seri waktu. Contoh: tag_deviceid. null Parameter ini dapat menentukan satu kolom atau beberapa kolom. |
field_fieldname | DOUBLE, VARCHAR, BIGINT, dan BOOLEAN | Ya | Bidang dari data seri waktu. Contoh: field_humidity. null Parameter ini dapat menentukan satu kolom atau beberapa kolom. |
table | VARCHAR | Tidak | Tabel seri waktu tempat Anda ingin menulis data.
|
Parameter dalam klausa WITH
Parameter | Diperlukan | Deskripsi |
connector | Ya | Atur parameter ini ke lindormtsdb, yang menunjukkan konektor sink LindormTSDB. |
url | Ya | Titik akhir LindormTSDB untuk HTTP. Untuk informasi lebih lanjut tentang cara mendapatkan titik akhir, lihat Lihat titik akhir. |
table | Tidak | Tabel seri waktu tempat Anda ingin menulis data.
|
username | Ya (dalam skenario tertentu) | Nama pengguna dan kata sandi yang digunakan untuk terhubung ke LindormTSDB. Jika fitur autentikasi pengguna dan verifikasi izin diaktifkan, Anda harus menentukan nama pengguna dan kata sandi. Jika tidak, kedua parameter ini tidak diperlukan. null Secara default, fitur autentikasi pengguna dan verifikasi izin tidak diaktifkan. Kami sarankan Anda mengaktifkan fitur autentikasi pengguna dan verifikasi izin untuk LindormTSDB untuk memastikan keamanan data. |
password | Ya (dalam skenario tertentu) | |
defaultDatabase | Tidak | Database tempat Anda ingin menulis data. Nilai default: default. |
schemaPolicy | Tidak | Kebijakan batasan untuk skema.
null Untuk informasi lebih lanjut, lihat Kebijakan batasan untuk skema. |
sink.parallelism | Tidak | Jumlah utas konkuren yang dapat digunakan untuk menulis data secara paralel. Anda dapat meningkatkan nilai parameter ini ketika Anda perlu menulis sejumlah besar data. Nilai default: 1. |
ignoreErrorData | Tidak | Menentukan apakah akan mengabaikan kesalahan penulisan. Nilai default: false. Nilai valid:
|
maxRetries | Tidak | Jumlah maksimum kali mencoba mengirim ulang permintaan tulis yang gagal karena kesalahan server internal atau kesalahan jaringan. Nilai default: 3. |
batchSize | Tidak | Jumlah titik data yang dapat ditulis ke database oleh operasi tulis tunggal. Nilai default: 500. |
connectTimeoutMs | Tidak | Periode timeout koneksi HTTP. Nilai default: 90000. Satuan: milidetik. |
debug | Tidak | Menentukan apakah akan mengaktifkan mode debug untuk menampilkan log titik data yang ditulis ke LindormTSDB.
|
sync | Tidak | Menentukan apakah akan menulis data secara sinkron. Nilai default: false. Kami sarankan Anda mempertahankan nilai default untuk parameter ini.
|
Contoh
Kode berikut memberikan contoh tentang cara menulis data yang dihasilkan oleh generator data acak datagen_source ke tabel seri waktu Lindorm bernama mytable:
CREATE TEMPORARY TABLE datagen_source (
id INTEGER,
score DOUBLE,
name STRING
)
WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE tsdb_sink(
tag_tagk VARCHAR,
field_score DOUBLE,
field_name STRING,
`timestamp` BIGINT
)
WITH (
'connector' = 'lindormtsdb',
'url'='http://ld-bp159jt4eivt3****-proxy-tsdb.lindorm.rds.aliyuncs.com:8242',
'table'= 'mytable',
'schemaPolicy'='weak'
);
INSERT INTO tsdb_sink
SELECT
CAST(id as STRING) as tag_tagk,
score as field_score,
name as field_name,
UNIX_TIMESTAMP(now()) * 1000 as `timestamp`
FROM datagen_source;