Flink を使用してリアルタイム データストリームを処理し、結果を LindormTSDB に書き込んでリアルタイム データ監視を行うことができます。このトピックでは、Flink でのリアルタイム データ処理結果を LindormTSDB に書き込む方法について説明します。
前提条件
Realtime Compute for Apache Flink を有効化するか、セルフマネージド Flink サービスを作成済みであること。Realtime Compute for Apache Flink の有効化方法の詳細については、「Realtime Compute for Apache Flink を有効化する」をご参照ください。
説明Realtime Compute for Apache Flink で使用する Ververica Runtime(VVR)のバージョンは 4.0.13 以降である必要があります。VVR 4.0.13 は Apache Flink V1.13 に基づいて開発されています。
Lindorm インスタンスと Realtime Compute for Apache Flink ワークスペースが同じ VPC に配置されていること。ネットワーク接続を確保するためです。
説明Realtime Compute for Apache Flink は、デフォルトではインターネット経由でアクセスできません。インターネット経由で LindormTSDB にデータを書き込む場合は、「Realtime Compute for Apache Flink はどのようにインターネットにアクセスしますか?」をご参照ください。
Lindorm インスタンスで LindormTSDB が有効化されていること。
LindormTSDB のバージョンが 3.4.7 以降であること。LindormTSDB バージョンの表示またはアップグレード方法の詳細については、「LindormTSDB のリリースノート」および「Lindorm インスタンスのマイナーエンジンバージョンをアップグレードする」をご参照ください。
Flink の IP アドレスまたは CIDR ブロックが Lindorm インスタンスのホワイトリストに追加されます。Realtime Compute for Apache Flink の vSwitch の CIDR ブロックを取得する方法の詳細については、「ホワイトリストを構成する方法」をご参照ください。Lindorm インスタンスのホワイトリストに IP アドレスまたは CIDR ブロックを追加する方法の詳細については、「ホワイトリストを構成する」をご参照ください。
背景情報
LindormTSDB シンク コネクタは、さまざまなデータソースからデータを受信し、LindormTSDB にデータを書き込むために使用されます。Realtime Compute for Apache Flink は、Flink SQL を使用してソーステーブル、ディメンションテーブル、および結果テーブルを定義します。LindormTSDB シンク コネクタのパラメーターを設定して、結果テーブルを LindormTSDB のテーブルにマッピングできます。このようにして、Flink のデータ処理結果が LindormTSDB の指定されたテーブルに書き込まれます。LindormTSDB シンクコネクタを使用するには、LindormTSDB シンク コネクタの JAR パッケージを取得し、Realtime Compute for Apache Flink コンソールにアップロードする必要があります。詳細については、「JAR ドラフトを開発する」をご参照ください。
構文
Realtime Compute for Apache Flink で結果テーブルを作成します。次に、LindormTSDB シンク コネクタのパラメーターを設定して、結果テーブルを LindormTSDB の時系列テーブルにマッピングします。
CREATE TEMPORARY TABLE tsdb_sink(
`timestamp` BIGINT,
tag_<tagname> VARCHAR,
field_<fieldname1> DOUBLE,
field_<fieldname2> VARCHAR,
field_<fieldname3> BIGINT,
field_<fieldname4> BOOLEAN
-- table VARCHAR (optional)
)
WITH (
'connector' = 'lindormtsdb',
'url'='<lindormTSDBHttpUrl>',
'table'='<yourTableName>',
'defaultDatabase'='<yourDatabaseName>',
'schemaPolicy'='<schemaPolicy>',
'sink.parallelism'='<sinkParallelism>',
'ignoreErrorData'='<ignoreErrorData>',
'maxRetries'='<maxRetries>',
'batchSize'='<batchSize>',
'connectTimeoutMs'='<connectTimeoutMs>',
'sync'='<sync>',
'debug'='<debug>'
);パラメーター
結果テーブルのパラメーター
パラメーター | データ型 | 必須 | 説明 |
timestamp | BIGINT | はい | パラメーター名は 単位:ミリ秒。 説明
|
tag_tagname | VARCHAR | はい | 時系列データのタグ。 例:tag_deviceid。 説明 このパラメーターには、1 つの列または複数の列を指定できます。 |
field_fieldname | DOUBLE、VARCHAR、BIGINT、および BOOLEAN | はい | 時系列データのフィールド。 例:field_humidity。 説明 このパラメーターには、1 つの列または複数の列を指定できます。 |
table | VARCHAR | いいえ | データを書き込む時系列テーブル。
|
WITH 句のパラメーター
パラメーター | 必須 | 説明 |
connector | はい | このパラメーターを lindormtsdb に設定します。これは LindormTSDB シンク コネクタを示します。 |
url | はい | HTTP 用の LindormTSDB エンドポイント。エンドポイントの取得方法の詳細については、「エンドポイントを表示する」をご参照ください。 |
table | いいえ | データを書き込む時系列テーブル。
|
username | はい(特定のシナリオの場合) | LindormTSDB に接続するために使用するユーザー名とパスワード。 ユーザー認証と権限検証機能が有効になっている場合は、ユーザー名とパスワードを指定する必要があります。そうでない場合、これら 2 つのパラメーターは必須ではありません。 説明 デフォルトでは、ユーザー認証と権限検証機能は有効になっていません。データ セキュリティを確保するために、LindormTSDB のユーザー認証と権限検証機能を有効にすることをお勧めします。 |
はい(特定のシナリオの場合) | LindormTSDB に接続するために使用するパスワード。 | |
defaultDatabase | いいえ | データを書き込むデータベース。デフォルト値:default。 |
schemaPolicy | いいえ | スキーマの制約ポリシー。
説明 詳細については、「スキーマの制約ポリシー」をご参照ください。 |
sink.parallelism | いいえ | データを並行して書き込むために使用できる同時スレッド数。大量のデータを書き込む必要がある場合は、このパラメーターの値を増やすことができます。デフォルト値:1。 |
ignoreErrorData | いいえ | 書き込みエラーを無視するかどうかを指定します。デフォルト値:false。有効な値:
|
maxRetries | いいえ | 内部サーバー エラーまたはネットワーク エラーが原因で失敗した書き込みリクエストを再送信する最大回数。デフォルト値:3。 |
batchSize | いいえ | 1 回の書き込み操作でデータベースに書き込むことができるデータ ポイントの数。デフォルト値:500。 |
connectTimeoutMs | いいえ | HTTP 接続のタイムアウト期間。デフォルト値:90000。単位:ミリ秒。 |
debug | いいえ | デバッグ モードを有効にして、LindormTSDB に書き込まれたデータ ポイントのログを表示するかどうかを指定します。
|
sync | いいえ | データを同期して書き込むかどうかを指定します。デフォルト値:false。このパラメーターにはデフォルト値を維持することをお勧めします。
|
例
次のコードは、datagen_source ランダム データ ジェネレーターによって生成されたデータを mytable という名前の Lindorm 時系列テーブルに書き込む方法の例を示しています。
CREATE TEMPORARY TABLE datagen_source (
id INTEGER,
score DOUBLE,
name STRING
)
WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE tsdb_sink(
tag_tagk VARCHAR,
field_score DOUBLE,
field_name STRING,
`timestamp` BIGINT
)
WITH (
'connector' = 'lindormtsdb',
'url'='http://ld-bp159jt4eivt3****-proxy-tsdb.lindorm.rds.aliyuncs.com:8242',
'table'= 'mytable',
'schemaPolicy'='weak'
);
INSERT INTO tsdb_sink
SELECT
CAST(id as STRING) as tag_tagk,
score as field_score,
name as field_name,
UNIX_TIMESTAMP(now()) * 1000 as `timestamp`
FROM datagen_source;