All Products
Search
Document Center

Lindorm:CREATE ETL

Last Updated:Mar 28, 2026

CREATE ETL creates an extract, transform, and load (ETL) task in the Lindorm stream engine. The task continuously reads data from a source table, applies a SQL filter, and writes matching rows to a sink table.

Engine and version

CREATE ETL requires the stream engine at version 3.1.8 or later.

View the current engine version and update the minor version in the console.

Syntax

CREATE ETL [IF NOT EXISTS] etl_name
    [WITH (
        `property_name` = 'property_value'
        [, `property_name` = 'property_value'] ...
    )]
    AS INSERT INTO [[catalog_name.]db_name.]table_name (column_name [, column_name] ...)
    SELECT column_name [, column_name] ...
    FROM [[catalog_name.]db_name.]source_table_name
    [WHERE condition]

Enclose property names in backticks (`) and property values in single quotation marks ('). For example: `parallelism` = '2'.

Parameters

ETL name

The etl_name parameter is required and must follow these rules:

  • Allowed characters: digits, letters (uppercase and lowercase), periods (.), hyphens (-), and underscores (_)

  • Cannot start with a period (.) or a hyphen (-)

  • Length: 1–255 characters

ETL properties

Use the WITH clause to configure the following properties:

PropertyTypeDefaultDescription
parallelismINTEGER1The degree of parallelism for the task.
sink.ignore-update-beforeBOOLEANfalseWhether to ignore -U records during the sink operation.
sink.ignore-deleteBOOLEANfalseWhether to ignore -D records during the sink operation. Set to true to skip delete records at the sink.
sink.null-modeSTRINGNO_OPHow to handle null values during the sink operation. Valid values: NO_OP (retain and write null values from the source) and SKIP (skip and do not write null values).
udf.xxxxSTRINGNoneConfigures a user-defined function (UDF). Format: `udf.<udfFunction>` = '<jarName>#<className>', where udfFunction is the function name, jarName is the JAR file name, and className is the class name. Upload the JAR file before using this property.
stream.xxxANYNoneA stream engine job parameter passed directly to the underlying engine. For example, `stream.execution.checkpointing.interval` = '30000' sets the checkpointing interval.

Sink table

ParameterRequiredDescription
catalog_nameNoThe catalog of the sink table.
db_nameNoThe database where the sink table is located.
table_nameYesThe name of the sink table.
column_nameYesA column in the sink table to receive data. Specify one column_name entry for each column.

SELECT statement

The SELECT statement filters and maps data from the source table. For example:

SELECT p1, c1 FROM `lindorm_table`.`default`.`source` WHERE c1 > 10

Examples

The following examples use these source and sink tables:

-- Source table
CREATE TABLE source (p1 INT, c1 DOUBLE, PRIMARY KEY (p1));
-- Sink table
CREATE TABLE sink   (p1 INT, c1 DOUBLE, PRIMARY KEY (p1));

Example 1: Create a basic ETL task

Filter rows where c1 > 10 from source and insert them into sink.

CREATE ETL IF NOT EXISTS filter1
AS
  INSERT INTO `lindorm_table`.`default`.`sink` (p1, c1)
  SELECT p1, c1 FROM `lindorm_table`.`default`.`source` WHERE c1 > 10;

Example 2: Create an ETL task with properties

This task inserts data that meets the specified conditions from the source table source into the sink table sink and adds properties.

CREATE ETL IF NOT EXISTS filter2
WITH (
  `parallelism` = '2',
  `stream.execution.checkpointing.interval` = '30000'
)
AS
  INSERT INTO `lindorm_table`.`default`.`sink` (p1, c1)
  SELECT p1, c1 FROM `lindorm_table`.`default`.`source` WHERE c1 > 10;