全部產品
Search
文件中心

Lindorm:CREATE ETL

更新時間:Oct 14, 2025

CREATE ETL語句用於在流引擎中建立ETL任務。

引擎與版本

CREATE ETL僅適用於流引擎。要求3.1.8及以上版本。

說明

您可以通過控制台查看並升級小版本

文法

create_etl_statement ::= CREATE ETL [IF NOT EXISTS] etl_name
                        [WITH etl_properties]
                        AS INSERT INTO [[catalog_name.]db_name.]table_name column_list 
                        select_statement

etl_properties       ::= '(' property_definition (',' property_definition)* ')'
property_definition  ::= property_name '=' property_value  
column_list          ::= '(' column_name (',' column_name)* ')'

使用說明

ETL名稱(etl_name

必填參數。ETL名稱的設定需遵循以下規則:

  • 可包含數字、大寫英文字元、小寫英文字元、半形句號(.)、中劃線(-)和底線(_)。

  • 不能以半形句號(.)或中劃線(-)開頭。

  • 長度為1~255字元。

ETL屬性(etl_properties)

您可以通過WITH關鍵字添加以下ETL屬性:

重要

設定時,屬性名稱前後需添加反引號(`),屬性值前後需添加單引號(')。例如`parallelism` = '2'

屬性

資料類型

說明

預設值

parallelism

INTEGER

任務並行度。

1

sink.ignore-update-before

BOOLEAN

Sink時是否忽略-U

false

sink.ignore-delete

BOOLEAN

Sink時是否忽略-D

false

sink.null-mode

STRING

Sink時是否寫入Null值,取值如下:

  • NO_OP(保留原資料Null值,直接寫入)

  • SKIP(跳過Null值,不寫入)

NO_OP

udf.xxxx

STRING

配置UDF,需先上傳UDF jar。參數格式如下:udf.<udfFunction> = <jarName>#<className>,其中udfFunction是使用udf的函數名,jarName是該udf的jar包名,className是具體的類名。

stream.xxx

ANY

流引擎作業參數,例如:execution.checkpointing.interval

指定結果表

參數

是否必填

說明

catalog_name

結果表的Catalog。

db_name

結果表所在資料庫。

table_name

結果表的名稱。

column_name

結果表的列名。

SQL查詢語句(select_statement)

用於篩選資料的SQL語句,例如 SELECT p1, c1 FROM `lindorm_table`.`default`.`source` WHERE c1 > 10;

樣本

假設寬表引擎中的源表source和結果表sink的結構如下:

-- 源表source
CREATE TABLE source(p1 INT, c1 DOUBLE, PRIMARY KEY(p1));
-- 結果表1:sink
CREATE TABLE sink(p1 INT, c1 DOUBLE, PRIMARY KEY(p1));
  • 樣本一:建立ETL filter1,將源表source中合格資料插入到結果表sink中。

    CREATE ETL IF NOT EXISTS filter1
    AS
      INSERT INTO `lindorm_table`.`default`.`sink` (p1, c1)
      SELECT p1, c1 FROM `lindorm_table`.`default`.`source` WHERE c1 > 10;
  • 樣本二:建立ETL filter2,將源表source中合格資料插入到結果表sink中,同時添加屬性。

    CREATE ETL IF NOT EXISTS filter2
    WITH (
    `parallelism` = '2',
    `stream.execution.checkpointing.interval` = '30000'
    )
    AS
      INSERT INTO `lindorm_table`.`default`.`sink` (p1, c1)
      SELECT p1, c1 FROM `lindorm_table`.`default`.`source` WHERE c1 > 10;