すべてのプロダクト
Search
ドキュメントセンター

Lindorm:Flink を使用して LindormTSDB にデータを書き込む

最終更新日:Jan 23, 2025

Flink を使用してリアルタイム データストリームを処理し、結果を LindormTSDB に書き込んでリアルタイム データ監視を行うことができます。このトピックでは、Flink でのリアルタイム データ処理結果を LindormTSDB に書き込む方法について説明します。

前提条件

  • Realtime Compute for Apache Flink を有効化するか、セルフマネージド Flink サービスを作成済みであること。Realtime Compute for Apache Flink の有効化方法の詳細については、「Realtime Compute for Apache Flink を有効化する」をご参照ください。

    説明

    Realtime Compute for Apache Flink で使用する Ververica Runtime(VVR)のバージョンは 4.0.13 以降である必要があります。VVR 4.0.13 は Apache Flink V1.13 に基づいて開発されています。

  • Lindorm インスタンスと Realtime Compute for Apache Flink ワークスペースが同じ VPC に配置されていること。ネットワーク接続を確保するためです。

    説明

    Realtime Compute for Apache Flink は、デフォルトではインターネット経由でアクセスできません。インターネット経由で LindormTSDB にデータを書き込む場合は、「Realtime Compute for Apache Flink はどのようにインターネットにアクセスしますか?」をご参照ください。

  • Lindorm インスタンスで LindormTSDB が有効化されていること。

  • LindormTSDB のバージョンが 3.4.7 以降であること。LindormTSDB バージョンの表示またはアップグレード方法の詳細については、「LindormTSDB のリリースノート」および「Lindorm インスタンスのマイナーエンジンバージョンをアップグレードする」をご参照ください。

  • Flink の IP アドレスまたは CIDR ブロックが Lindorm インスタンスのホワイトリストに追加されます。Realtime Compute for Apache Flink の vSwitch の CIDR ブロックを取得する方法の詳細については、「ホワイトリストを構成する方法」をご参照ください。Lindorm インスタンスのホワイトリストに IP アドレスまたは CIDR ブロックを追加する方法の詳細については、「ホワイトリストを構成する」をご参照ください。

背景情報

LindormTSDB シンク コネクタは、さまざまなデータソースからデータを受信し、LindormTSDB にデータを書き込むために使用されます。Realtime Compute for Apache Flink は、Flink SQL を使用してソーステーブル、ディメンションテーブル、および結果テーブルを定義します。LindormTSDB シンク コネクタのパラメーターを設定して、結果テーブルを LindormTSDB のテーブルにマッピングできます。このようにして、Flink のデータ処理結果が LindormTSDB の指定されたテーブルに書き込まれます。LindormTSDB シンクコネクタを使用するには、LindormTSDB シンク コネクタの JAR パッケージを取得し、Realtime Compute for Apache Flink コンソールにアップロードする必要があります。詳細については、「JAR ドラフトを開発する」をご参照ください。

構文

Realtime Compute for Apache Flink で結果テーブルを作成します。次に、LindormTSDB シンク コネクタのパラメーターを設定して、結果テーブルを LindormTSDB の時系列テーブルにマッピングします。

CREATE TEMPORARY TABLE tsdb_sink(
  `timestamp` BIGINT,
  tag_<tagname> VARCHAR,
  field_<fieldname1> DOUBLE,
  field_<fieldname2> VARCHAR,
  field_<fieldname3> BIGINT,
  field_<fieldname4> BOOLEAN
  -- table VARCHAR (optional)
)
WITH (
    'connector' = 'lindormtsdb',
    'url'='<lindormTSDBHttpUrl>',
    'table'='<yourTableName>',
    'defaultDatabase'='<yourDatabaseName>',
    'schemaPolicy'='<schemaPolicy>',
    'sink.parallelism'='<sinkParallelism>',
    'ignoreErrorData'='<ignoreErrorData>',
    'maxRetries'='<maxRetries>',
    'batchSize'='<batchSize>',
    'connectTimeoutMs'='<connectTimeoutMs>',
    'sync'='<sync>',
    'debug'='<debug>'
);

パラメーター

結果テーブルのパラメーター

パラメーター

データ型

必須

説明

timestamp

BIGINT

はい

パラメーター名は timestamp である必要があります。このフィールドのデータ型は BIGINT である必要があります。

単位:ミリ秒。

説明
  • パラメーター名 timestamp は予約語であるため、バックティック(``)で囲みます。

  • パラメーター値は 13 桁のタイムスタンプです。このパラメーターに 10 桁のタイムスタンプを指定すると、データが LindormTSDB に書き込まれるときに 13 桁のタイムスタンプに変換されます。

tag_tagname

VARCHAR

はい

時系列データのタグ。tag_ はタグのプレフィックスであり、省略または変更することはできません。tagname はタグ名を示します。

例:tag_deviceid。

説明

このパラメーターには、1 つの列または複数の列を指定できます。

field_fieldname

DOUBLE、VARCHAR、BIGINT、および BOOLEAN

はい

時系列データのフィールド。field_ はフィールドのプレフィックスであり、省略または変更することはできません。fieldname はフィールド名を示します。

例:field_humidity。

説明

このパラメーターには、1 つの列または複数の列を指定できます。

table

VARCHAR

いいえ

データを書き込む時系列テーブル。

  • 単一の時系列テーブルにデータを書き込む場合は、WITH 句のパラメーターでテーブルを設定することをお勧めします。

  • 複数の時系列テーブルにデータを書き込む場合は、テーブルスキーマの table フィールドでテーブルを設定することをお勧めします。

WITH 句のパラメーター

password

パラメーター

必須

説明

connector

はい

このパラメーターを lindormtsdb に設定します。これは LindormTSDB シンク コネクタを示します。

url

はい

HTTP 用の LindormTSDB エンドポイント。エンドポイントの取得方法の詳細については、「エンドポイントを表示する」をご参照ください。

table

いいえ

データを書き込む時系列テーブル。

  • 単一の時系列テーブルにデータを書き込む場合は、WITH 句のパラメーターでテーブルを設定することをお勧めします。

  • 複数の時系列テーブルにデータを書き込む場合は、テーブルスキーマの table フィールドでテーブルを設定することをお勧めします。

username

はい(特定のシナリオの場合)

LindormTSDB に接続するために使用するユーザー名とパスワード。

ユーザー認証と権限検証機能が有効になっている場合は、ユーザー名とパスワードを指定する必要があります。そうでない場合、これら 2 つのパラメーターは必須ではありません。

説明

デフォルトでは、ユーザー認証と権限検証機能は有効になっていません。データ セキュリティを確保するために、LindormTSDB のユーザー認証と権限検証機能を有効にすることをお勧めします。

はい(特定のシナリオの場合)

LindormTSDB に接続するために使用するパスワード。

defaultDatabase

いいえ

データを書き込むデータベース。デフォルト値:default。

schemaPolicy

いいえ

スキーマの制約ポリシー。

  • strong(デフォルト):スキーマに強力な制約ポリシーを指定します。LindormTSDB は、事前に定義されたテーブル スキーマに基づいて、データが書き込まれるテーブルの名前、書き込まれるデータのフィールド名、および書き込まれるデータの型を厳密に検証します。このパラメーターを strong に設定する場合は、事前にデータを書き込むテーブルを手動で作成する必要があります。そうでない場合、データはテーブルに書き込まれません。

  • weak:スキーマに弱い制約ポリシーを指定します。このパラメーターを weak に設定すると、データが書き込まれるテーブルが存在しない場合、LindormTSDB はエラーを報告せずにテーブルを自動的に作成します。

  • none:スキーマに制約ポリシーを指定しません。このパラメーターを none に設定すると、データが書き込まれるテーブルが存在しない場合、LindormTSDB はエラーを報告せず、テーブルを作成しません。テーブルを手動で作成しなくても、データを書き込むことはできます。ただし、書き込まれたデータは SQL 文を使用してクエリすることはできません。

説明

詳細については、「スキーマの制約ポリシー」をご参照ください。

sink.parallelism

いいえ

データを並行して書き込むために使用できる同時スレッド数。大量のデータを書き込む必要がある場合は、このパラメーターの値を増やすことができます。デフォルト値:1。

ignoreErrorData

いいえ

書き込みエラーを無視するかどうかを指定します。デフォルト値:false。有効な値:

  • false(デフォルト):書き込みエラーを無視しません。エラーが発生した場合、書き込み操作はキャンセルされます。

  • true:書き込みエラーを無視します。エラーが発生した場合でも、書き込み操作は続行されます。

maxRetries

いいえ

内部サーバー エラーまたはネットワーク エラーが原因で失敗した書き込みリクエストを再送信する最大回数。デフォルト値:3。

batchSize

いいえ

1 回の書き込み操作でデータベースに書き込むことができるデータ ポイントの数。デフォルト値:500。

connectTimeoutMs

いいえ

HTTP 接続のタイムアウト期間。デフォルト値:90000。単位:ミリ秒。

debug

いいえ

デバッグ モードを有効にして、LindormTSDB に書き込まれたデータ ポイントのログを表示するかどうかを指定します。

  • false(デフォルト):デバッグ モードを有効にしません。

  • true:デバッグ モードを有効にします。

sync

いいえ

データを同期して書き込むかどうかを指定します。デフォルト値:false。このパラメーターにはデフォルト値を維持することをお勧めします。

  • false:データを非同期で書き込みます。この方法を使用すると、データをより効率的に書き込むことができます。

  • true:データを同期して書き込みます。

次のコードは、datagen_source ランダム データ ジェネレーターによって生成されたデータを mytable という名前の Lindorm 時系列テーブルに書き込む方法の例を示しています。

CREATE TEMPORARY TABLE datagen_source (
  id INTEGER,
  score DOUBLE,
  name STRING
)
WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE tsdb_sink(
  tag_tagk VARCHAR,
  field_score DOUBLE,
  field_name STRING,
  `timestamp` BIGINT
)
WITH (
    'connector' = 'lindormtsdb',
    'url'='http://ld-bp159jt4eivt3****-proxy-tsdb.lindorm.rds.aliyuncs.com:8242',
    'table'= 'mytable',
    'schemaPolicy'='weak'
);

INSERT INTO tsdb_sink
SELECT
  CAST(id as STRING) as tag_tagk,
  score as field_score,
  name as field_name,
  UNIX_TIMESTAMP(now()) * 1000  as `timestamp`
FROM datagen_source;