LindormTSDB シンクコネクタを使用すると、リアルタイムで処理された Flink の結果をストリーミングし、時系列ストレージおよびモニタリング用に LindormTSDB へ書き込むことができます。
前提条件
開始する前に、以下の条件を満たしていることを確認してください。
Realtime Compute for Apache Flink を有効化するか、自社管理型の Flink サービスをセットアップします。詳細については、「Realtime Compute for Apache Flink の有効化」をご参照ください。
説明Realtime Compute for Apache Flink で使用される Ververica Runtime (VVR) のバージョンは、4.0.13 以降である必要があります。VVR 4.0.13 は Apache Flink v1.13 をベースとしています。
Lindorm インスタンスと Realtime Compute for Apache Flink のワークスペースが、同一の仮想プライベートクラウド (VPC) 内にあること。
説明Realtime Compute for Apache Flink はデフォルトでインターネット経由でのアクセスが許可されていません。インターネット経由でデータを書き込む場合は、「Realtime Compute for Apache Flink によるインターネットへのアクセス方法」をご参照ください。
Lindorm インスタンス上で LindormTSDB が有効化されていること。
LindormTSDB バージョン 3.4.7 以降。バージョンの確認またはアップグレードについては、「LindormTSDB のリリースノート」および「Lindorm インスタンスのマイナーエンジンバージョンをアップグレードする」をご参照ください。
Flink サービスの IP アドレスまたは CIDR ブロックが、Lindorm インスタンスのホワイトリストに追加されていること。Realtime Compute for Apache Flink の vSwitch の CIDR ブロックを取得するには、「ホワイトリストの設定方法」をご参照ください。Lindorm インスタンスのホワイトリストに IP アドレスまたは CIDR ブロックを追加するには、「ホワイトリストの設定」をご参照ください。
エンドツーエンドのフロー
Flink の結果を LindormTSDB に書き込む手順は以下のとおりです。
LindormTSDB シンクコネクタの JAR パッケージをダウンロードし、Realtime Compute for Apache Flink コンソールにアップロードします。
Flink SQL でソーステーブルおよび結果テーブルを定義します。結果テーブルの `WITH` 句を構成し、LindormTSDB を指すように設定します。
INSERT INTO文を作成して、ソーステーブルのフィールドを結果テーブルにマップします。Flink ジョブを送信します。このとき、コネクタが指定された LindormTSDB の時系列テーブルにデータポイントを書き込みます。
コネクタのセットアップ
LindormTSDB シンクコネクタ JAR パッケージをダウンロードし、Realtime Compute for Apache Flink コンソールにアップロードします。アップロード手順については、「JAR ドラフトの開発」をご参照ください。
結果テーブルの定義
Flink SQL で結果テーブルを作成し、`WITH` 句を使用して LindormTSDB の時系列テーブルにマップします。
CREATE TEMPORARY TABLE tsdb_sink(
`timestamp` BIGINT,
tag_<tagname> VARCHAR,
field_<fieldname1> DOUBLE,
field_<fieldname2> VARCHAR,
field_<fieldname3> BIGINT,
field_<fieldname4> BOOLEAN
-- table VARCHAR (任意)
)
WITH (
'connector' = 'lindormtsdb',
'url' = '<lindormTSDBHttpUrl>',
'table' = '<yourTableName>',
'defaultDatabase' = '<yourDatabaseName>',
'schemaPolicy' = '<schemaPolicy>',
'sink.parallelism' = '<sinkParallelism>',
'ignoreErrorData' = '<ignoreErrorData>',
'maxRetries' = '<maxRetries>',
'batchSize' = '<batchSize>',
'connectTimeoutMs' = '<connectTimeoutMs>',
'sync' = '<sync>',
'debug' = '<debug>'
);テーブルスキーマのパラメーター
| パラメーター | 型 | 必須 | 説明 |
|---|---|---|---|
timestamp | BIGINT | はい | データポイントのタイムスタンプです。必ず timestamp という名前を付けてください — これは予約キーワードであり、バックティックで囲む必要があります。単位はミリ秒(13 桁)。10 桁のタイムスタンプは、書き込み時に自動的に 13 桁に変換されます。 |
tag_<tagname> | VARCHAR | はい | タグ列です。tag_ プレフィックスは必須であり、変更できません。<tagname> を実際のタグ名(例: tag_deviceid)に置き換えてください。複数のタグ列をサポートしています。 |
field_<fieldname> | DOUBLE、VARCHAR、BIGINT、または BOOLEAN | はい | フィールド(メトリック値)列です。field_ プレフィックスは必須であり、変更できません。<fieldname> を実際のフィールド名(例: field_humidity)に置き換えてください。複数のフィールド列をサポートしています。 |
table | VARCHAR | いいえ | 対象の時系列テーブルです。動的に複数のテーブルに書き込む場合、`WITH` 句ではなくテーブルスキーマ内にこの列を定義します。 |
WITH 句のパラメーター
| パラメーター | 必須 | デフォルト | 例 | 説明 |
|---|---|---|---|---|
connector | はい | — | lindormtsdb | lindormtsdb に設定する必要があります。 |
url | はい | — | http://ld-xxx-proxy-tsdb.lindorm.rds.aliyuncs.com:8242 | LindormTSDB の HTTP エンドポイントです。「エンドポイントの表示」をご参照ください。 |
table | いいえ | — | mytable | 対象の時系列テーブルです。単一テーブルに書き込む場合に使用します。複数のテーブルに書き込む場合は、代わりにテーブルスキーマ内の table 列を使用します。 |
username | 条件付き | — | admin | LindormTSDB への接続に使用するユーザー名です。ユーザー認証および権限検証が有効な場合にのみ必要です。 |
password | 条件付き | — | — | 指定されたユーザー名のパスワードです。ユーザー認証および権限検証が有効な場合にのみ必要です。 |
defaultDatabase | いいえ | default | mydb | 対象のデータベースです。 |
schemaPolicy | いいえ | strong | weak | スキーマ制約ポリシーです。「スキーマポリシー」をご参照ください。 |
sink.parallelism | いいえ | 1 | 4 | 並列書き込みスレッド数です。高スループットのワークロードでは、この値を増加させることを推奨します。 |
ignoreErrorData | いいえ | false | true | 書き込みエラーをスキップして継続するかどうかを指定します。false の場合、ジョブはエラーで停止します。 |
maxRetries | いいえ | 3 | 5 | サーバーまたはネットワークエラーにより失敗した書き込みに対する最大リトライ回数です。 |
batchSize | いいえ | 500 | 1000 | 1 バッチあたりの書き込みデータポイント数です。 |
connectTimeoutMs | いいえ | 90000 | 30000 | HTTP 接続タイムアウトです。単位はミリ秒です。 |
sync | いいえ | false | false | 同期書き込みを行うかどうかを指定します。スループット向上のため、デフォルト値 (false) を維持することを推奨します。厳密な書き込み順序が必要な場合にのみ、true に設定します。 |
debug | いいえ | false | true | 書き込まれたデータポイントについてデバッグログを有効化するかどうかを指定します。 |
ユーザー認証および権限検証はデフォルトで無効化されています。データセキュリティを向上させるために、有効化することを推奨します。
スキーマポリシー
schemaPolicy パラメーターは、LindormTSDB が対象テーブルのスキーマをどのように処理するかを制御します。
| ポリシー | 動作 | 使用タイミング |
|---|---|---|
strong (デフォルト) | LindormTSDB は、事前に定義されたスキーマに対して、テーブル名、フィールド名、およびデータの型を厳密に検証します。対象テーブルは、書き込み前に手動で作成しておく必要があります。 | 厳密なスキーマ適用が必要な場合 |
weak | 対象テーブルが存在しない場合、LindormTSDB が自動的に作成します。 | 迅速なプロトタイピングや、テーブルスキーマが事前に定義されていない場合 |
none | 対象テーブルが存在しない場合、エラーは発生せず、テーブルも作成されません。データは書き込まれますが、SQL を使用したクエリはできません。 | 高度なユースケースのみ |
詳細については、「スキーマの制約ポリシー」をご参照ください。
サンプル
以下のサンプルでは、DataGen ソースコネクタからデータを読み取り、mytable という名前の LindormTSDB 時系列テーブルに書き込みます。ソーステーブルは、id、score、name フィールドに対してランダム値を生成します。
-- ソーステーブル:DataGen コネクタを使用してランダムデータを生成
CREATE TEMPORARY TABLE datagen_source (
id INTEGER,
score DOUBLE,
name STRING
)
WITH (
'connector' = 'datagen'
);
-- 結果テーブル:LindormTSDB にマップ
-- schemaPolicy='weak' を指定すると、対象テーブルが存在しない場合に LindormTSDB が自動的に作成します。
-- 高スループットのジョブでは、sink.parallelism および batchSize の値を増加させることを推奨します。
CREATE TEMPORARY TABLE tsdb_sink(
tag_tagk VARCHAR,
field_score DOUBLE,
field_name STRING,
`timestamp` BIGINT
)
WITH (
'connector' = 'lindormtsdb',
'url' = 'http://ld-bp159jt4eivt3****-proxy-tsdb.lindorm.rds.aliyuncs.com:8242',
'table' = 'mytable',
'schemaPolicy' = 'weak'
-- 'sink.parallelism' = '4', -- 高スループットのワークロード向けに増加
-- 'batchSize' = '1000' -- 書き込みパフォーマンスを調整するためにバッチサイズを調整
);
-- ソーステーブルから LindormTSDB へデータを書き込み
INSERT INTO tsdb_sink
SELECT
CAST(id AS STRING) AS tag_tagk,
score AS field_score,
name AS field_name,
UNIX_TIMESTAMP(now()) * 1000 AS `timestamp`
FROM datagen_source;結果の確認
Flink ジョブを送信した後、LindormTSDB へのデータ書き込みが正しく行われているかを確認します。
Realtime Compute for Apache Flink コンソールで、ジョブのステータスが 実行中 であることを確認します。
LindormTSDB に SQL コンソールまたはクライアントツールで接続し、以下のクエリを実行します。
SELECT * FROM mytable LIMIT 10;クエリ結果として行が返された場合、書き込みパイプラインは正常に動作しています。
行が返されない場合は、Flink ジョブのログを確認し、接続エラー、スキーマ検証失敗、またはタイムアウトメッセージがないかを確認します。