Use the CREATE ETL statement to create an extract, transform, and load (ETL) task in the stream engine.
Engine and version
CREATE ETL applies only to the stream engine. Version 3.1.8 or later is required.
You can view the engine version and update the minor version in the console.
Syntax
create_etl_statement ::= CREATE ETL [IF NOT EXISTS] etl_name
[WITH etl_properties]
AS INSERT INTO [[catalog_name.]db_name.]table_name column_list
select_statement
etl_properties ::= '(' property_definition (',' property_definition)* ')'
property_definition ::= property_name '=' property_value
column_list ::= '(' column_name (',' column_name)* ')'Usage notes
ETL name (etl_name)
Required. The ETL name must follow these rules:
The name can contain digits, uppercase letters, lowercase letters, periods (.), hyphens (-), and underscores (_).
The name cannot start with a period (.) or a hyphen (-).
The length must be from 1 to 255 characters.
ETL properties (etl_properties)
Use the WITH keyword to add the following ETL properties:
Enclose property names in backticks (`) and property values in single quotation marks ('). For example, `parallelism` = '2'.
Property | Data type | Description | Default value |
parallelism | INTEGER | The degree of parallelism for the task. | 1 |
sink.ignore-update-before | BOOLEAN | Specifies whether to ignore -U during the sink operation. | false |
sink.ignore-delete | BOOLEAN | Specifies whether to ignore -D during the sink operation. | false |
sink.null-mode | STRING | Specifies whether to write null values during the sink operation. Valid values:
| NO_OP |
udf.xxxx | STRING | Configures a user-defined function (UDF). You must upload the UDF JAR file before you use this property. The parameter uses the format | None |
stream.xxx | ANY | A parameter for the stream engine job. For example, | None |
Specify the sink table
Parameter | Required | Description |
catalog_name | No | The catalog of the sink table. |
db_name | No | The database where the sink table is located. |
table_name | Yes | The name of the sink table. |
column_name | Yes | The column name in the sink table. |
SQL query statement (select_statement)
The SQL query statement is used to filter data. For example, SELECT p1, c1 FROM `lindorm_table`.`default`.`source` WHERE c1 > 10;.
Examples
Assume that the source table source and the sink table sink in LindormTable have the following structures:
-- Source table: source
CREATE TABLE source(p1 INT, c1 DOUBLE, PRIMARY KEY(p1));
-- Sink table: sink
CREATE TABLE sink(p1 INT, c1 DOUBLE, PRIMARY KEY(p1));Example 1: Create an ETL task named `filter1`. This task inserts data that meets the specified conditions from the source table
sourceinto the sink tablesink.CREATE ETL IF NOT EXISTS filter1 AS INSERT INTO `lindorm_table`.`default`.`sink` (p1, c1) SELECT p1, c1 FROM `lindorm_table`.`default`.`source` WHERE c1 > 10;Example 2: Create an ETL task named `filter2`. This task inserts data that meets the specified conditions from the source table
sourceinto the sink tablesinkand adds properties.CREATE ETL IF NOT EXISTS filter2 WITH ( `parallelism` = '2', `stream.execution.checkpointing.interval` = '30000' ) AS INSERT INTO `lindorm_table`.`default`.`sink` (p1, c1) SELECT p1, c1 FROM `lindorm_table`.`default`.`source` WHERE c1 > 10;