CREATE ETL 文を使用して、ストリームエンジンに抽出・変換・書き出し (ETL) タスクを作成します。
エンジンとバージョン
CREATE ETL はストリームエンジンにのみ適用されます。バージョン 3.1.8 以降が必要です。
コンソールでエンジンバージョンを表示し、マイナーバージョンを更新できます。
構文
create_etl_statement ::= CREATE ETL [IF NOT EXISTS] etl_name
[WITH etl_properties]
AS INSERT INTO [[catalog_name.]db_name.]table_name column_list
select_statement
etl_properties ::= '(' property_definition (',' property_definition)* ')'
property_definition ::= property_name '=' property_value
column_list ::= '(' column_name (',' column_name)* ')'使用上の注意
ETL 名 (etl_name)
必須。ETL 名は、次のルールに従う必要があります。
名前に使用できるのは、数字、大文字、小文字、ピリオド (.)、ハイフン (-)、アンダースコア (_) です。
名前の先頭にピリオド (.) またはハイフン (-) を使用することはできません。
長さは 1~255 文字である必要があります。
ETL プロパティ (etl_properties)
WITH キーワードを使用して、次の ETL プロパティを追加します。
プロパティ名はバックティック (`) で、プロパティ値は一重引用符 (') で囲みます。たとえば、`parallelism` = '2' です。
プロパティ | データ型 | 説明 | デフォルト値 |
parallelism | INTEGER | タスクの並列処理の次数。 | 1 |
sink.ignore-update-before | BOOLEAN | シンク操作中に -U を無視するかどうかを指定します。 | false |
sink.ignore-delete | BOOLEAN | シンク操作中に -D を無視するかどうかを指定します。 | false |
sink.null-mode | STRING | シンク操作中に null 値を書き込むかどうかを指定します。有効な値:
| NO_OP |
udf.xxxx | STRING | ユーザー定義関数 (UDF) を設定します。このプロパティを使用する前に、UDF JAR ファイルをアップロードする必要があります。パラメーターは | なし |
stream.xxx | ANY | ストリームエンジンタスクのパラメーター。たとえば、 | なし |
結果テーブルの指定
パラメーター | 必須 | 説明 |
catalog_name | いいえ | 結果テーブルのカタログ。 |
db_name | いいえ | 結果テーブルが配置されているデータベース。 |
table_name | はい | 結果テーブルの名前。 |
column_name | はい | 結果テーブルの列名。 |
SQL クエリ文 (select_statement)
SQL クエリ文はデータのフィルター処理に使用されます。たとえば、 SELECT p1, c1 FROM `lindorm_table`.`default`.`source` WHERE c1 > 10; です。
例
LindormTable のソーステーブル source と結果テーブル sink が次の構造を持つと仮定します。
-- ソーステーブル: source
CREATE TABLE source(p1 INT, c1 DOUBLE, PRIMARY KEY(p1));
-- 結果テーブル: sink
CREATE TABLE sink(p1 INT, c1 DOUBLE, PRIMARY KEY(p1));例 1: `filter1` という名前の ETL タスクを作成します。このタスクは、指定された条件を満たすデータをソーステーブル
sourceから結果テーブルsinkに挿入します。CREATE ETL IF NOT EXISTS filter1 AS INSERT INTO `lindorm_table`.`default`.`sink` (p1, c1) SELECT p1, c1 FROM `lindorm_table`.`default`.`source` WHERE c1 > 10;例 2: `filter2` という名前の ETL タスクを作成します。このタスクは、指定された条件を満たすデータをソーステーブル
sourceから結果テーブルsinkに挿入し、プロパティを追加します。CREATE ETL IF NOT EXISTS filter2 WITH ( `parallelism` = '2', `stream.execution.checkpointing.interval` = '30000' ) AS INSERT INTO `lindorm_table`.`default`.`sink` (p1, c1) SELECT p1, c1 FROM `lindorm_table`.`default`.`source` WHERE c1 > 10;