Flink可以處理即時資料流,並將處理結果寫入Lindorm時序引擎,以實現即時資料監控等情境。本文介紹如何將Flink上即時的資料處理結果寫入到時序引擎。
前提條件
已開通Realtime ComputeFlink版或者已有自建Flink。Realtime ComputeFlink版的開通,請參見開通Realtime ComputeFlink版。
說明Realtime ComputeFlink版需要VVR 4.0.13及以上版本,VVR 4.0.13版本是基於Apache Flink 1.13。
為了保證網路的連通性,確保Lindorm執行個體和Realtime ComputeFlink使用相同的專用網路。
說明Realtime ComputeFlink版預設不具備訪問公網的能力,如需通過公網將資料寫入時序引擎,請參見如何訪問公網?。
已開通時序引擎。
已將Flink的IP地址添加到Lindorm白名單。如果您使用的是Realtime ComputeFlink版,查看IP地址的操作,請參見如何設定白名單?。添加Lindorm白名單的操作,請參見設定白名單。
背景資訊
時序引擎Sink連接器用於串連其他系統與Lindorm時序引擎,負責從各種資料來源接收資料並寫入到時序引擎。Realtime ComputeFlink版通過Flink SQL定義源表、維表和結果表,通過定義時序引擎Sink連接器的參數,將結果表映射到Lindorm時序表,從而將Flink處理後的結果資料寫入Lindorm時序引擎。使用時序引擎SINK外掛程式,需要先擷取時序引擎SINK外掛程式,再將JAR包上傳至Realtime ComputeFlink版控制台,上傳方法請參見JAR作業開發。
文法
在Realtime ComputeFlink上建立結果表,並配置時序引擎Sink連接器參數,實現Flink結果表到Lindorm時序表的映射。
CREATE TEMPORARY TABLE tsdb_sink(
`timestamp` BIGINT,
tag_<tagname> VARCHAR,
field_<fieldname1> DOUBLE,
field_<fieldname2> VARCHAR,
field_<fieldname3> BIGINT,
field_<fieldname4> BOOLEAN
-- table VARCHAR(可選欄位)
)
WITH (
'connector' = 'lindormtsdb',
'url'='<lindormTSDBHttpUrl>',
'table'='<yourTableName>',
'defaultDatabase'='<yourDatabaseName>',
'schemaPolicy'='<schemaPolicy>',
'sink.parallelism'='<sinkParallelism>'
'ignoreErrorData'='<ignoreErrorData>',
'maxRetries'='<maxRetries>',
'batchSize'='<batchSize>',
'connectTimeoutMs'='<connectTimeoutMs>',
'sync'='<sync>',
'debug'='<debug>'
);參數說明
結果表結構參數說明
欄位名 | 資料類型 | 是否必選 | 說明 |
timestamp | BIGINT | 是 | 欄位名必須為 單位為毫秒(ms)。 說明
|
tag_tagname | VARCHAR | 是 | 指定時序資料的標籤(Tag)。 樣本:tag_deviceid。 說明 tag_tagname可以為一列或者多列。 |
field_fieldname | DOUBLE、VARCHAR、BIGINT、BOOLEAN | 是 | 指定時序資料量測值(Field)。 樣本:field_humidity。 說明 field_fieldname可以為一列或者多列。 |
table | VARCHAR | 否 | 指定時序資料表。
|
WITH參數說明
參數 | 是否必選 | 說明 |
connector | 是 | 固定值lindormtsdb,指定時序引擎SINK外掛程式。 |
url | 是 | Lindorm時序引擎的HTTP串連地址,擷取方法請參見查看串連地址。 |
table | 否 | 指定時序資料表。
|
username | 條件必選 | 串連時序引擎的使用者名稱和密碼。 如已開啟使用者認證與許可權校正,則必須輸入使用者名稱和密碼。否則無需輸入。 說明 時序引擎預設未開啟使用者認證與許可權校正。為了資料安全,建議您開啟時序引擎的使用者認證與許可權校正。 |
password | 條件必選 | |
defaultDatabase | 否 | 寫入資料的資料庫。預設值為default。 |
schemaPolicy | 否 | Schema約束策略。
說明 更多資訊請參見Schema約束。 |
sink.parallelism | 否 | 寫入並發度,當寫入資料量較大時可適當增加並發度,預設值為1 。 |
ignoreErrorData | 否 | 是否忽略寫入錯誤。
|
maxRetries | 否 | 寫入時遇到服務端內部錯誤或者網路錯誤時最大重試次數,預設值為3。 |
batchSize | 否 | 批處理大小,即每次寫入資料庫的資料量,預設值為500個資料點。 |
connectTimeoutMs | 否 | HTTP連線逾時時間,預設值為90000。單位為毫秒(ms)。 |
debug | 否 | 是否開啟debug模式,用來列印詳細資料點日誌。
|
sync | 否 | 是否同步寫入,建議使用false。
|
使用樣本
以datagen_source隨機資料產生器為例,將產生的資料寫入Lindorm時序表mytable中。範例程式碼如下:
CREATE TEMPORARY TABLE datagen_source (
id INTEGER,
score DOUBLE,
name STRING
)
WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE tsdb_sink(
tag_tagk VARCHAR,
field_score DOUBLE,
field_name STRING,
`timestamp` BIGINT
)
WITH (
'connector' = 'lindormtsdb',
'url'='http://ld-bp159jt4eivt3****-proxy-tsdb.lindorm.rds.aliyuncs.com:8242',
'table'= 'mytable',
'schemaPolicy'='weak'
);
INSERT INTO tsdb_sink
SELECT
CAST(id as STRING) as tag_tagk,
score as field_score,
name as field_name,
UNIX_TIMESTAMP(now()) * 1000 as `timestamp`
FROM datagen_source;