CREATE JOB は、1 つ以上の Flink SQL ステートメントを名前付きのジョブとしてパッケージ化し、Lindorm ストリームエンジンに実行を依頼します。フィルター、変換、データ補強、集約などのストリーム処理ロジックを実装し、結果を Lindorm に書き込む際に使用します。
前提条件
作業を開始する前に、以下の要件を満たしていることを確認してください。
ストリームエンジンのバージョンが 3.1.8 以降であること。
CREATE JOBはストリームエンジンでのみ使用できます。バージョンを確認するかマイナーバージョンのスペックアップを行うには、Lindorm コンソールにアクセスしてください。
構文
delimiter $$
CREATE JOB job_name
(
flink_sqls
)
$$
delimiter ;flink_sqls は、計算ロジックを定義するセミコロンで終了する 1 つ以上の Flink SQL ステートメント(SET、CREATE TABLE、INSERT INTO など)です。すべてのステートメントは単一のジョブとしてまとめて実行されます。
パラメーター
| パラメーター | 必須 | タイプ | 制約 |
|---|---|---|---|
job_name | はい | 文字列 | 英字、数字、ピリオド (.)、ハイフン (-)、アンダースコア (_) のみ使用可能。先頭に . または - を使用できません。長さは 1~255 文字。 |
flink_sqls | はい | Flink SQL | セミコロンで終了する 1 つ以上の Flink SQL ステートメント。構文の詳細については、「Flink コミュニティのドキュメント」をご参照ください。 |
例
次のジョブは、datagen コネクタを使用してランダムデータを生成し、print コネクタを使用してコンソールに出力します。
delimiter $$
CREATE JOB datagen_job (
SET 'parallelism.default' = '6';
CREATE TABLE datagen (
f_sequence INT,
f_random INT,
f_random_str STRING,
ts AS localtimestamp,
WATERMARK FOR ts AS ts
) WITH (
'connector' = 'datagen',
-- optional options --
'rows-per-second'='5',
'fields.f_sequence.kind'='sequence',
'fields.f_sequence.start'='1',
'fields.f_sequence.end'='50000000',
'fields.f_random.min'='1',
'fields.f_random.max'='500',
'fields.f_random_str.length'='10'
);
CREATE TABLE print_table (
f_sequence INT,
f_random INT,
f_random_str STRING
) WITH (
'connector' = 'print'
);
INSERT INTO print_table select f_sequence,f_random,f_random_str from datagen;
)
$$
delimiter ;ジョブの確認
SHOW JOBS; を実行して、ジョブが正常に作成されたことを確認します。