CREATE ETL creates an extract, transform, and load (ETL) task in the Lindorm stream engine. The task continuously reads data from a source table, applies a SQL filter, and writes matching rows to a sink table.
Engine and version
CREATE ETL requires the stream engine at version 3.1.8 or later.
View the current engine version and update the minor version in the console.
Syntax
CREATE ETL [IF NOT EXISTS] etl_name
[WITH (
`property_name` = 'property_value'
[, `property_name` = 'property_value'] ...
)]
AS INSERT INTO [[catalog_name.]db_name.]table_name (column_name [, column_name] ...)
SELECT column_name [, column_name] ...
FROM [[catalog_name.]db_name.]source_table_name
[WHERE condition]Enclose property names in backticks (`) and property values in single quotation marks ('). For example: `parallelism` = '2'.
Parameters
ETL name
The etl_name parameter is required and must follow these rules:
Allowed characters: digits, letters (uppercase and lowercase), periods (
.), hyphens (-), and underscores (_)Cannot start with a period (
.) or a hyphen (-)Length: 1–255 characters
ETL properties
Use the WITH clause to configure the following properties:
| Property | Type | Default | Description |
|---|---|---|---|
parallelism | INTEGER | 1 | The degree of parallelism for the task. |
sink.ignore-update-before | BOOLEAN | false | Whether to ignore -U records during the sink operation. |
sink.ignore-delete | BOOLEAN | false | Whether to ignore -D records during the sink operation. Set to true to skip delete records at the sink. |
sink.null-mode | STRING | NO_OP | How to handle null values during the sink operation. Valid values: NO_OP (retain and write null values from the source) and SKIP (skip and do not write null values). |
udf.xxxx | STRING | None | Configures a user-defined function (UDF). Format: `udf.<udfFunction>` = '<jarName>#<className>', where udfFunction is the function name, jarName is the JAR file name, and className is the class name. Upload the JAR file before using this property. |
stream.xxx | ANY | None | A stream engine job parameter passed directly to the underlying engine. For example, `stream.execution.checkpointing.interval` = '30000' sets the checkpointing interval. |
Sink table
| Parameter | Required | Description |
|---|---|---|
catalog_name | No | The catalog of the sink table. |
db_name | No | The database where the sink table is located. |
table_name | Yes | The name of the sink table. |
column_name | Yes | A column in the sink table to receive data. Specify one column_name entry for each column. |
SELECT statement
The SELECT statement filters and maps data from the source table. For example:
SELECT p1, c1 FROM `lindorm_table`.`default`.`source` WHERE c1 > 10Examples
The following examples use these source and sink tables:
-- Source table
CREATE TABLE source (p1 INT, c1 DOUBLE, PRIMARY KEY (p1));
-- Sink table
CREATE TABLE sink (p1 INT, c1 DOUBLE, PRIMARY KEY (p1));Example 1: Create a basic ETL task
Filter rows where c1 > 10 from source and insert them into sink.
CREATE ETL IF NOT EXISTS filter1
AS
INSERT INTO `lindorm_table`.`default`.`sink` (p1, c1)
SELECT p1, c1 FROM `lindorm_table`.`default`.`source` WHERE c1 > 10;Example 2: Create an ETL task with properties
This task inserts data that meets the specified conditions from the source table source into the sink table sink and adds properties.
CREATE ETL IF NOT EXISTS filter2
WITH (
`parallelism` = '2',
`stream.execution.checkpointing.interval` = '30000'
)
AS
INSERT INTO `lindorm_table`.`default`.`sink` (p1, c1)
SELECT p1, c1 FROM `lindorm_table`.`default`.`source` WHERE c1 > 10;