You can use Flink SQL statements in a Flink job (FJOB) to filter, convert, enhance, or aggregate stream data and write the results to Lindorm. You can execute the Create FJOB statement to submit a Flink job in the Lindorm streaming engine.
Syntax
CREATE FJOB fjob_name (
flink_sqls
); Parameters
Parameter | Required | Description |
fjob_name | Yes | The name of the Flink job that you want to create. |
flink_sqls | Yes | The Flink SQL statements that are used to implement computing logics. For more information, see related documentation. |
Examples
CREATE FJOB order_compute(
SET 'parallelism.default' = '1'; -- Specify the number of concurrent tasks.
SET 'execution.checkpointing.interval' = '60000'; --Specify the checkpoint interval.
CREATE TABLE order_detail(
`biz` VARCHAR,
`order_id` VARCHAR,
`price` DOUBLE,
`detail` VARCHAR,
`timestamp` BIGINT,
`time_ltz` AS TO_TIMESTAMP_LTZ(`timestamp`, 3),
WATERMARK FOR `time_ltz` AS `time_ltz` - INTERVAL '5' SECOND
) WITH (
'connector'='kafka',
'topic'='order_topic',
'properties.group.id' = 'order_group',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers'='Lindorm Stream Kafka endpoint (xxx:30080)',
'format'='json');
CREATE TABLE order_stat(
`biz` VARCHAR,
`window_start` TIMESTAMP(3),
`window_end` TIMESTAMP(3),
`total_order_price` DOUBLE,
`count` BIGINT,
PRIMARY KEY (biz, window_start) NOT ENFORCED
) WITH (
'connector'='lindorm',
'tableName'='order_stat',
'seedServer'='LindormTable endpoint (xxx:30020)',
'namespace'='default',
'userName' = 'root',
'password' = 'root');
INSERT INTO order_stat
SELECT biz, window_start, window_end, SUM(price) AS total_order_price, COUNT(*) AS `count`
FROM TABLE(TUMBLE(TABLE order_detail, DESCRIPTOR(`time_ltz`), INTERVAL '1' HOUR))
GROUP BY biz, window_start, window_end;
);