All Products
Search
Document Center

:Create FJOB

Last Updated:Jan 23, 2025

You can use Flink SQL statements in a Flink job (FJOB) to filter, convert, enhance, or aggregate stream data and write the results to Lindorm. You can execute the Create FJOB statement to submit a Flink job in the Lindorm streaming engine.

Syntax

CREATE FJOB fjob_name (
	flink_sqls
);  

Parameters

Parameter

Required

Description

fjob_name

Yes

The name of the Flink job that you want to create.

flink_sqls

Yes

The Flink SQL statements that are used to implement computing logics. For more information, see related documentation.

Examples

CREATE FJOB order_compute(
   SET 'parallelism.default' = '1'; -- Specify the number of concurrent tasks.
   SET 'execution.checkpointing.interval' = '60000'; --Specify the checkpoint interval.
  
   CREATE TABLE order_detail( 
      `biz` VARCHAR,
      `order_id` VARCHAR,
      `price` DOUBLE,
      `detail` VARCHAR,
      `timestamp` BIGINT,
      `time_ltz` AS TO_TIMESTAMP_LTZ(`timestamp`, 3),
      WATERMARK FOR `time_ltz` AS `time_ltz` - INTERVAL '5' SECOND
    ) WITH (  
      'connector'='kafka',
      'topic'='order_topic',
      'properties.group.id' = 'order_group',
      'scan.startup.mode' = 'earliest-offset',
      'properties.bootstrap.servers'='Lindorm Stream Kafka endpoint (xxx:30080)',
      'format'='json');    

  CREATE TABLE order_stat(
      `biz` VARCHAR,
      `window_start` TIMESTAMP(3),
      `window_end` TIMESTAMP(3),
      `total_order_price` DOUBLE,
      `count` BIGINT,
      PRIMARY KEY (biz, window_start) NOT ENFORCED
    ) WITH (  
      'connector'='lindorm',
      'tableName'='order_stat',
      'seedServer'='LindormTable endpoint (xxx:30020)',
      'namespace'='default',
      'userName' = 'root',
      'password' = 'root');

  INSERT INTO order_stat
  SELECT biz, window_start, window_end, SUM(price) AS total_order_price, COUNT(*) AS `count`
  FROM TABLE(TUMBLE(TABLE order_detail, DESCRIPTOR(`time_ltz`), INTERVAL '1' HOUR))
  GROUP BY biz, window_start, window_end;
);