您可以在一個作業(JOB)中通過Flink SQL開源文法實現業務的Realtime Compute處理邏輯,包括過濾、轉換、增強和彙總,並將計算結果寫入Lindorm中。
引擎與版本
CREATE JOB僅適用於流引擎。要求3.1.8及以上版本。
說明
您可以通過控制台查看並升級小版本。
文法
delimiter $$
create_job_statement ::= CREATE JOB job_name
'('
flink_sqls
')'
$$
delimiter ;使用說明
Flink任務名稱(job_name)
必填參數。Flink任務名稱的設定需遵循以下規則:
可包含數字、大寫英文字元、小寫英文字元、半形句號(.)、中劃線(-)和底線(_)。
不能以半形句號(.)或中劃線(-)開頭。
長度為1~255字元。
Flink SQL語句(flink_sqls)
必填參數。用於描述計算邏輯。詳細文法,請參見Flink社區文檔。
樣本
列印隨機產生的資料。
delimiter $$
CREATE JOB datagen_job (
SET 'parallelism.default' = '6';
CREATE TABLE datagen (
f_sequence INT,
f_random INT,
f_random_str STRING,
ts AS localtimestamp,
WATERMARK FOR ts AS ts
) WITH (
'connector' = 'datagen',
-- optional options --
'rows-per-second'='5',
'fields.f_sequence.kind'='sequence',
'fields.f_sequence.start'='1',
'fields.f_sequence.end'='50000000',
'fields.f_random.min'='1',
'fields.f_random.max'='500',
'fields.f_random_str.length'='10'
);
CREATE TABLE print_table (
f_sequence INT,
f_random INT,
f_random_str STRING
) WITH (
'connector' = 'print'
);
INSERT INTO print_table select f_sequence,f_random,f_random_str from datagen;
)
$$
delimiter ;結果驗證
您可以執行SHOW JOBS;語句來查看是否建立成功。