ジョブを使用すると、オープンソースの Flink SQL 構文を使用して、フィルタリング、変換、エンハンスメント、集約などのストリームコンピューティングロジックを実装できます。その後、ジョブは結果を Lindorm に書き込みます。
エンジンとバージョン
CREATE JOB 文はストリームエンジンにのみ適用されます。バージョン 3.1.8 以降が必要です。
説明
コンソールでエンジンのバージョンを確認し、マイナーバージョンの更新を実行できます。
構文
delimiter $$
create_job_statement ::= CREATE JOB job_name
'('
flink_sqls
')'
$$
delimiter ;使用上の注意
Flink ジョブ名 (job_name)
必須。Flink ジョブ名は、次の要件を満たす必要があります:
文字、数字、ピリオド (.)、ハイフン (-)、アンダースコア (_) を使用できます。
ピリオド (.) またはハイフン (-) で始めることはできません。
長さは 1~255 文字である必要があります。
Flink SQL 文 (flink_sqls)
必須。これらの文は計算ロジックを定義します。構文の詳細については、「Flink コミュニティドキュメント」をご参照ください。
例
この例では、ランダムに生成されたデータを出力します。
delimiter $$
CREATE JOB datagen_job (
SET 'parallelism.default' = '6';
CREATE TABLE datagen (
f_sequence INT,
f_random INT,
f_random_str STRING,
ts AS localtimestamp,
WATERMARK FOR ts AS ts
) WITH (
'connector' = 'datagen',
-- オプションのオプション --
'rows-per-second'='5',
'fields.f_sequence.kind'='sequence',
'fields.f_sequence.start'='1',
'fields.f_sequence.end'='50000000',
'fields.f_random.min'='1',
'fields.f_random.max'='500',
'fields.f_random_str.length'='10'
);
CREATE TABLE print_table (
f_sequence INT,
f_random INT,
f_random_str STRING
) WITH (
'connector' = 'print'
);
INSERT INTO print_table select f_sequence,f_random,f_random_str from datagen;
)
$$
delimiter ;結果の検証
SHOW JOBS; 文を実行して、ジョブが作成されたことを確認できます。