All Products
Search
Document Center

Lindorm:CREATE ETL

Last Updated:Oct 28, 2025

Use the CREATE ETL statement to create an extract, transform, and load (ETL) task in the stream engine.

Engine and version

CREATE ETL applies only to the stream engine. Version 3.1.8 or later is required.

Note

You can view the engine version and update the minor version in the console.

Syntax

create_etl_statement ::= CREATE ETL [IF NOT EXISTS] etl_name
                        [WITH etl_properties]
                        AS INSERT INTO [[catalog_name.]db_name.]table_name column_list 
                        select_statement

etl_properties       ::= '(' property_definition (',' property_definition)* ')'
property_definition  ::= property_name '=' property_value  
column_list          ::= '(' column_name (',' column_name)* ')'

Usage notes

ETL name (etl_name)

Required. The ETL name must follow these rules:

  • The name can contain digits, uppercase letters, lowercase letters, periods (.), hyphens (-), and underscores (_).

  • The name cannot start with a period (.) or a hyphen (-).

  • The length must be from 1 to 255 characters.

ETL properties (etl_properties)

Use the WITH keyword to add the following ETL properties:

Important

Enclose property names in backticks (`) and property values in single quotation marks ('). For example, `parallelism` = '2'.

Property

Data type

Description

Default value

parallelism

INTEGER

The degree of parallelism for the task.

1

sink.ignore-update-before

BOOLEAN

Specifies whether to ignore -U during the sink operation.

false

sink.ignore-delete

BOOLEAN

Specifies whether to ignore -D during the sink operation.

false

sink.null-mode

STRING

Specifies whether to write null values during the sink operation. Valid values:

  • NO_OP: Retains and writes the null values from the source data.

  • SKIP: Skips and does not write null values.

NO_OP

udf.xxxx

STRING

Configures a user-defined function (UDF). You must upload the UDF JAR file before you use this property. The parameter uses the format udf.<udfFunction> = <jarName>#<className>, where `udfFunction` is the function name, `jarName` is the JAR package name, and `className` is the class name.

None

stream.xxx

ANY

A parameter for the stream engine job. For example, execution.checkpointing.interval.

None

Specify the sink table

Parameter

Required

Description

catalog_name

No

The catalog of the sink table.

db_name

No

The database where the sink table is located.

table_name

Yes

The name of the sink table.

column_name

Yes

The column name in the sink table.

SQL query statement (select_statement)

The SQL query statement is used to filter data. For example, SELECT p1, c1 FROM `lindorm_table`.`default`.`source` WHERE c1 > 10;.

Examples

Assume that the source table source and the sink table sink in LindormTable have the following structures:

-- Source table: source
CREATE TABLE source(p1 INT, c1 DOUBLE, PRIMARY KEY(p1));
-- Sink table: sink
CREATE TABLE sink(p1 INT, c1 DOUBLE, PRIMARY KEY(p1));
  • Example 1: Create an ETL task named `filter1`. This task inserts data that meets the specified conditions from the source table source into the sink table sink.

    CREATE ETL IF NOT EXISTS filter1
    AS
      INSERT INTO `lindorm_table`.`default`.`sink` (p1, c1)
      SELECT p1, c1 FROM `lindorm_table`.`default`.`source` WHERE c1 > 10;
  • Example 2: Create an ETL task named `filter2`. This task inserts data that meets the specified conditions from the source table source into the sink table sink and adds properties.

    CREATE ETL IF NOT EXISTS filter2
    WITH (
    `parallelism` = '2',
    `stream.execution.checkpointing.interval` = '30000'
    )
    AS
      INSERT INTO `lindorm_table`.`default`.`sink` (p1, c1)
      SELECT p1, c1 FROM `lindorm_table`.`default`.`source` WHERE c1 > 10;