すべてのプロダクト
Search
ドキュメントセンター

Lindorm:CREATE ETL

最終更新日:Oct 29, 2025

CREATE ETL 文を使用して、ストリームエンジンに抽出・変換・書き出し (ETL) タスクを作成します。

エンジンとバージョン

CREATE ETL はストリームエンジンにのみ適用されます。バージョン 3.1.8 以降が必要です。

説明

コンソールでエンジンバージョンを表示し、マイナーバージョンを更新できます。

構文

create_etl_statement ::= CREATE ETL [IF NOT EXISTS] etl_name
                        [WITH etl_properties]
                        AS INSERT INTO [[catalog_name.]db_name.]table_name column_list 
                        select_statement

etl_properties       ::= '(' property_definition (',' property_definition)* ')'
property_definition  ::= property_name '=' property_value  
column_list          ::= '(' column_name (',' column_name)* ')'

使用上の注意

ETL 名 (etl_name)

必須。ETL 名は、次のルールに従う必要があります。

  • 名前に使用できるのは、数字、大文字、小文字、ピリオド (.)、ハイフン (-)、アンダースコア (_) です。

  • 名前の先頭にピリオド (.) またはハイフン (-) を使用することはできません。

  • 長さは 1~255 文字である必要があります。

ETL プロパティ (etl_properties)

WITH キーワードを使用して、次の ETL プロパティを追加します。

重要

プロパティ名はバックティック (`) で、プロパティ値は一重引用符 (') で囲みます。たとえば、`parallelism` = '2' です。

プロパティ

データ型

説明

デフォルト値

parallelism

INTEGER

タスクの並列処理の次数。

1

sink.ignore-update-before

BOOLEAN

シンク操作中に -U を無視するかどうかを指定します。

false

sink.ignore-delete

BOOLEAN

シンク操作中に -D を無視するかどうかを指定します。

false

sink.null-mode

STRING

シンク操作中に null 値を書き込むかどうかを指定します。有効な値:

  • NO_OP: ソースデータの null 値を保持して書き込みます。

  • SKIP: null 値をスキップして書き込みません。

NO_OP

udf.xxxx

STRING

ユーザー定義関数 (UDF) を設定します。このプロパティを使用する前に、UDF JAR ファイルをアップロードする必要があります。パラメーターは udf.<udfFunction> = <jarName>#<className> のフォーマットを使用します。ここで、`udfFunction` は関数名、`jarName` は JAR パッケージ名、`className` はクラス名です。

なし

stream.xxx

ANY

ストリームエンジンタスクのパラメーター。たとえば、execution.checkpointing.interval です。

なし

結果テーブルの指定

パラメーター

必須

説明

catalog_name

いいえ

結果テーブルのカタログ。

db_name

いいえ

結果テーブルが配置されているデータベース。

table_name

はい

結果テーブルの名前。

column_name

はい

結果テーブルの列名。

SQL クエリ文 (select_statement)

SQL クエリ文はデータのフィルター処理に使用されます。たとえば、 SELECT p1, c1 FROM `lindorm_table`.`default`.`source` WHERE c1 > 10; です。

LindormTable のソーステーブル source と結果テーブル sink が次の構造を持つと仮定します。

-- ソーステーブル: source
CREATE TABLE source(p1 INT, c1 DOUBLE, PRIMARY KEY(p1));
-- 結果テーブル: sink
CREATE TABLE sink(p1 INT, c1 DOUBLE, PRIMARY KEY(p1));
  • 例 1: `filter1` という名前の ETL タスクを作成します。このタスクは、指定された条件を満たすデータをソーステーブル source から結果テーブル sink に挿入します。

    CREATE ETL IF NOT EXISTS filter1
    AS
      INSERT INTO `lindorm_table`.`default`.`sink` (p1, c1)
      SELECT p1, c1 FROM `lindorm_table`.`default`.`source` WHERE c1 > 10;
  • 例 2: `filter2` という名前の ETL タスクを作成します。このタスクは、指定された条件を満たすデータをソーステーブル source から結果テーブル sink に挿入し、プロパティを追加します。

    CREATE ETL IF NOT EXISTS filter2
    WITH (
    `parallelism` = '2',
    `stream.execution.checkpointing.interval` = '30000'
    )
    AS
      INSERT INTO `lindorm_table`.`default`.`sink` (p1, c1)
      SELECT p1, c1 FROM `lindorm_table`.`default`.`source` WHERE c1 > 10;