全部產品
Search
文件中心

Lindorm:使用Flink寫入時序引擎

更新時間:Aug 02, 2025

Flink可以處理即時資料流,並將處理結果寫入Lindorm時序引擎,以實現即時資料監控等情境。本文介紹如何將Flink上即時的資料處理結果寫入到時序引擎。

前提條件

  • 已開通Realtime ComputeFlink版或者已有自建Flink。Realtime ComputeFlink版的開通,請參見開通Realtime ComputeFlink版

    說明

    Realtime ComputeFlink版需要VVR 4.0.13及以上版本,VVR 4.0.13版本是基於Apache Flink 1.13。

  • 為了保證網路的連通性,確保Lindorm執行個體和Realtime ComputeFlink使用相同的專用網路。

    說明

    Realtime ComputeFlink版預設不具備訪問公網的能力,如需通過公網將資料寫入時序引擎,請參見如何訪問公網?

  • 已開通時序引擎。

  • Lindorm時序引擎為3.4.7及以上版本,如何查看或升級時序引擎版本,請參見時序引擎版本說明升級小版本

  • 已將Flink的IP地址添加到Lindorm白名單。如果您使用的是Realtime ComputeFlink版,查看IP地址的操作,請參見如何設定白名單?。添加Lindorm白名單的操作,請參見設定白名單

背景資訊

時序引擎Sink連接器用於串連其他系統與Lindorm時序引擎,負責從各種資料來源接收資料並寫入到時序引擎。Realtime ComputeFlink版通過Flink SQL定義源表、維表和結果表,通過定義時序引擎Sink連接器的參數,將結果表映射到Lindorm時序表,從而將Flink處理後的結果資料寫入Lindorm時序引擎。使用時序引擎SINK外掛程式,需要先擷取時序引擎SINK外掛程式,再將JAR包上傳至Realtime ComputeFlink版控制台,上傳方法請參見JAR作業開發

文法

在Realtime ComputeFlink上建立結果表,並配置時序引擎Sink連接器參數,實現Flink結果表到Lindorm時序表的映射。

CREATE TEMPORARY TABLE tsdb_sink(
  `timestamp` BIGINT,
  tag_<tagname> VARCHAR,
  field_<fieldname1> DOUBLE,
  field_<fieldname2> VARCHAR,
  field_<fieldname3> BIGINT,
  field_<fieldname4> BOOLEAN
  -- table VARCHAR(可選欄位)
)
WITH (
    'connector' = 'lindormtsdb',
    'url'='<lindormTSDBHttpUrl>',
    'table'='<yourTableName>',
    'defaultDatabase'='<yourDatabaseName>',
    'schemaPolicy'='<schemaPolicy>',
    'sink.parallelism'='<sinkParallelism>'
    'ignoreErrorData'='<ignoreErrorData>',
    'maxRetries'='<maxRetries>',
    'batchSize'='<batchSize>',
    'connectTimeoutMs'='<connectTimeoutMs>',
    'sync'='<sync>',
    'debug'='<debug>'
);

參數說明

結果表結構參數說明

欄位名

資料類型

是否必選

說明

timestamp

BIGINT

欄位名必須為timestamp,且欄位類型必須為BIGINT。

單位為毫秒(ms)。

說明
  • timestamp為保留字,請用反引號(``)將timestamp括起來。

  • 欄位值為13位時間戳記,如果是10位時間戳記,寫入會自動轉換為13位。

tag_tagname

VARCHAR

指定時序資料的標籤(Tag)。tag_表示首碼,不能省略和修改。tagname表示時序資料標籤名稱。

樣本:tag_deviceid。

說明

tag_tagname可以為一列或者多列。

field_fieldname

DOUBLE、VARCHAR、BIGINT、BOOLEAN

指定時序資料量測值(Field)。field_表示首碼,不能省略和修改。fieldname表示時序資料量測值名稱。

樣本:field_humidity。

說明

field_fieldname可以為一列或者多列。

table

VARCHAR

指定時序資料表。

  • 如果寫入一張時序表,建議在WITH參數中配置。

  • 如果同時寫入多張時序表,可以在表結構的table欄位中配置。

WITH參數說明

參數

是否必選

說明

connector

固定值lindormtsdb,指定時序引擎SINK外掛程式。

url

Lindorm時序引擎的HTTP串連地址,擷取方法請參見查看串連地址

table

指定時序資料表。

  • 如果寫入一張時序表,建議在WITH參數中配置。

  • 如果同時寫入多張時序表,可以在表結構的table欄位中配置。

username

條件必選

串連時序引擎的使用者名稱和密碼。

如已開啟使用者認證與許可權校正,則必須輸入使用者名稱和密碼。否則無需輸入。

說明

時序引擎預設未開啟使用者認證與許可權校正。為了資料安全,建議您開啟時序引擎的使用者認證與許可權校正。

password

條件必選

defaultDatabase

寫入資料的資料庫。預設值為default。

schemaPolicy

Schema約束策略。

  • Strong:強約束,預設值。時序引擎會嚴格依據預先定義的表結構對寫入資料的表名、欄位名、類型進行校正。選擇Strong,需提前手動建表,否則資料寫入會失敗。

  • Weak:弱約束。寫入資料的表不存在時,不會報錯,而是會自動建立對應的表。

  • None:無約束。寫入資料的表不存在時,不會報錯,也不會自動建表。如果不手動建表,不影響資料寫入,但無法直接通過SQL查詢寫入的資料。

說明

更多資訊請參見Schema約束

sink.parallelism

寫入並發度,當寫入資料量較大時可適當增加並發度,預設值為1 。

ignoreErrorData

是否忽略寫入錯誤。

  • false:不忽略,預設值。如果遇到錯誤,就跳出寫入。

  • true:忽略。如果遇到錯誤就忽略錯誤,繼續寫入。

maxRetries

寫入時遇到服務端內部錯誤或者網路錯誤時最大重試次數,預設值為3。

batchSize

批處理大小,即每次寫入資料庫的資料量,預設值為500個資料點。

connectTimeoutMs

HTTP連線逾時時間,預設值為90000。單位為毫秒(ms)。

debug

是否開啟debug模式,用來列印詳細資料點日誌。

  • false:不開啟,預設值。

  • true:開啟。

sync

是否同步寫入,建議使用false。

  • false:非同步寫入,預設值,寫入效率高。

  • true:同步寫入。

使用樣本

以datagen_source隨機資料產生器為例,將產生的資料寫入Lindorm時序表mytable中。範例程式碼如下:

CREATE TEMPORARY TABLE datagen_source (
  id INTEGER,
  score DOUBLE,
  name STRING
)
WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE tsdb_sink(
  tag_tagk VARCHAR,
  field_score DOUBLE,
  field_name STRING,
  `timestamp` BIGINT
)
WITH (
    'connector' = 'lindormtsdb',
    'url'='http://ld-bp159jt4eivt3****-proxy-tsdb.lindorm.rds.aliyuncs.com:8242',
    'table'= 'mytable',
    'schemaPolicy'='weak'
);

INSERT INTO tsdb_sink
SELECT
  CAST(id as STRING) as tag_tagk,
  score as field_score,
  name as field_name,
  UNIX_TIMESTAMP(now()) * 1000  as `timestamp`
FROM datagen_source;