すべてのプロダクト
Search
ドキュメントセンター

Dataphin:Dataphin Flink SQL 構文拡張

最終更新日:Feb 06, 2025

Dataphin は、開発効率の向上を目的として、ANSI-SQL 標準に沿って Flink SQL の構文の改善を導入しました。このトピックでは、新しい構文上の 特徴とその Dataphin Flink SQL 内での適用について概要を説明します。

プロジェクトをまたがる参照をサポート

DDL および DML テーブル名構文の拡張

典型的な Flink SQL タスクでは、完全な DDL および DML ステートメントが必要です。次の例を考えてみましょう。

--DDL
CREATE TABLE input_table (
  	field1 VARCHAR,
    field2 BIGINT
  ) WITH (
		type = 'datagen'
  );
CREATE TABLE output_table (
  	field1 VARCHAR,
    field2 BIGINT
  ) WITH (
		type = 'print'
  );
--DML
INSERT INTO output_table
SELECT field1, field2 FROM input_table;

複数のタスクにわたって繰り返し DDL ステートメント定義する必要性をなくし、開発効率を高めるために、Dataphin は Flink SQL でテーブル名を参照するための構文を改良しました。Dataphin でスキーマを定義すると、以下の構文を使用して任意のタスクで直接参照できるため、繰り返しの宣言を回避し、クロスプロジェクト参照が可能になります。

<compoundTableName> ::= [projectName <DOT>] tableName

例の解説

上記の SQL の例で示されているように、スキーマを定義した後、開発タスクに次のステートメントを含めるだけで済みます。

INSERT INTO output_table -- デフォルトのプロジェクトプレフィックスは現在のプロジェクトです。
SELECT field1, field2 FROM ${project_name}.input_table;

ビルトイン関数と UDX 構文の拡張

通常、Flink SQL タスクでは、UDX の使用を宣言する必要があります。以下の例を参照してください。

CREATE FUNCTION MY_UDX AS 'package.class';
...
CREATE VIEW my_view AS 
SELECT MY_UDX(args)
FROM ...

上記のカスタム UDX 宣言は、多くの場合、複数のタスクで必要になります。この冗長性を回避し、効率を向上させるために、Dataphin はリソースの概念を導入しました。Dataphin に関数を登録すると、以下の構文を使用して任意のタスクで直接参照できるため、繰り返しの宣言が不要になり、クロスプロジェクト参照がサポートされます。

<compoundFunctionName> ::= [projectName <COLON>] functionName

例の解説

上記の SQL の例に続いて、Dataphin に関数を登録した後、開発タスクに次のステートメントを入力するだけで済みます。

CREATE VIEW my_view AS SELECT ${project_name}:MY_UDX(args) -- デフォルトのプロジェクトプレフィックスは現在のプロジェクトです。
FROM ...

特記事項

Dataphin では、ビルトイン関数のオーバーロードまたはオーバーライドが可能で、オーバーロードまたはオーバーライドされたバージョンが優先されます。次の例を考えてみましょう。

  • 例 1: プロジェクトプレフィックスを含む。

SELECT ${project_name}:SUBSTRING('20200108171740', 1, 8) FROM dual;
--Dataphin はデフォルトでオーバーロードされたビルトイン関数 `SUBSTRING` を使用します。`${project_name}` の下にカスタム関数が見つからない場合、プリコンパイル中に失敗します。
  • 例 2: プロジェクトプレフィックスを省略する。

SELECT SUBSTRING('20200108171740', 1, 8) FROM dual;
--Dataphin は、現在のプロジェクトでカスタム `SUBSTRING` 関数を優先的に検索します。見つからない場合は、ビルトイン関数と見なされます。

UpdateStatement と TableHints を使用してタスクレベルの DDL パラメーターを設定する

Dataphin では、既存のスキーマの DDL WITH パラメーターを設定できます。プラットフォームは、Watermark、計算列、プライマリキー、ヘッダーなど、タスクレベルのパラメーターの構成をサポートしています。さらに、Dataphin は SetOption 構文のサポートを導入しました。構文の仕様は次のとおりです。

SET {
	[ projectName <DOT>] tableName <DOT> 
		{ 
			propertyName
			| WATERMARK
			| <computedColumn> 
			| <primaryKey> 
			| <procTime> 
			| {fieldName <DOT> <isHeader>} 
		}
} 	<EQ> {
		@variable = identifier
		| @variable = literal
		| @variable = expression
}

WITH パラメーターを構成する場合は、スキーマで共通のテーブル DDL WITH パラメーターを定義することをお勧めします。タスクレベルのパーソナライズされたパラメーターは、SET ステートメントまたは TableHints を使用してコードで指定でき、複数の SET ステートメントは ; で区切ります。

重要

Flink SQL 予約語を使用する場合は、(` `)記号を追加する必要があります次の例を参照してください。

  • Watermark: SET ステートメントでは、SET [project.]table.`watermark` = ... を使用します。

  • パーティション: SET ステートメントでは、SET [project.]table.`partition` = ... を使用します。

DDL パラメーターの優先順位

タスクレベルのパラメーターの上書き

タスクレベルのパラメーター設定は、メタテーブルで構成されたパラメーター設定を上書きします(上書き可能なパラメーターのみ)。たとえば、メタテーブルでテーブル T のプライマリキーが field1 として設定されている場合、タスクでテーブル T を使用した後、タスクの primary key 設定が優先されます(つまり、タスクでテーブル Tprimary keyfield2 として設定されている場合、そのタスクでは field1 はテーブル T のプライマリキーではなくなります)。

重要

Dataphin のセキュリティポリシーと管理ポリシーにより、スキーマパラメーターの中には、SET ステートメントを使用して SQL タスクで上書きできないものがあります。たとえば、MaxCompute スキーマで定義されたテーブル名は、SET を使用して SQL タスクで変更することはできません。上書きできるパラメーターはスキーマの種類によって異なり、実際の操作によって異なります。

タスク内部 SET パラメーター

内部タスク SET パラメーターはグローバルな優先順位を持ち、TableHints は コンテキスト固有 です。SET パラメーターは TableHints にないものを補完し、同一のパラメーター項目は置き換えられます。

たとえば、バージョン 2.2.5 以降、Blink はディメンションテーブルの INDEXUNIQUE INDEX の設定のサポートを導入しました。現在、JOIN KEY を宣言するためのキーワードは 3 つあります。PRIMARY KEYINDEXUNIQUE INDEX です。Dataphin はこれらに基づいて総合的に判断します。以下の例を参照してください。

SET my_dim_table.primarykey = 'field1';

CREATE VIEW tmp1 AS 
SELECT my_source_table1.*
FROM my_source_table1 JOIN my_dim_table WITH (index = 'field2') FOR SYSTEM_TIME AS OF PROCTIME()
ON ...;

CREATE VIEW tmp2 AS 
SELECT my_source_table2.*
FROM my_source_table2 JOIN my_dim_table WITH (primarykey = 'field3') FOR SYSTEM_TIME AS OF PROCTIME()
ON ...;

上記のコードでは、Dataphin は、tmp1 の作成のために my_dim_tableprimary key として field1 を割り当て(パラメーターパディング)、index として field2 を割り当てると解釈します。tmp2 の作成のために、my_dim_tableprimary key として field3 を設定します(パラメーターの置き換え)。

追加の例の解説:

  • 例 1: Flink SQL ソーステーブルパラメーター。

    たとえば、Kafka ソーステーブルを作成する場合:

    -- vvr 構文
    CREATE TEMPORARY TABLE kafkaTable (
        `user_id` BIGINT,
        `item_id` BIGINT,
        `category_id` BIGINT,
        `behavior` STRING,
        `topic` STRING METADATA VIRTUAL,
        `partition` BIGINT METADATA VIRTUAL
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'my_excellent_topic',
        'properties.bootstrap.servers' = 'mykafka:9092',
        'properties.group.id' = 'my_excellent_group'
        'format' = 'csv',
        'scan.startup.mode' = 'earliest-offset'
    )

    パーソナライズされたパラメーターは、Dataphin Flink SQL SET 式に変換されます。

    -- テーブルプロパティの名前が SQL キーワードであるか、他の文字が含まれている場合は、バックティックでエスケープできます。
    SET kafkaTable.`properties.group.id` = 'new_group_id';
    SET kafkaTable.`scan.startup.mode` = 'latest-offset';
  • 例 2: Flink SQL シンクテーブルパラメーター。

    たとえば、aliHBase 結果テーブルを作成する場合:

    create table hbase_output(
      rk  varchar,
      rk1 varchar,
      rk2 varchar,
      f bigint,
      PRIMARY KEY(rk)
    ) with (
      type='alihbase',
      diamondKey='xxxxxxx',
      diamondGroup='yyyyyyy',
      columnFamily='cf',
      tableName='blink_hbase_test',
      bufferSize=500;
    );

    タスクレベルでスキーマ構成を上書きするには、次の設定を使用します。

    SET hbase_output.bufferSize = 1000;
  • 例 3: Flink SQL ディメンションテーブルパラメーター。

    たとえば、MaxCompute ディメンションテーブルを作成する場合:

     CREATE TABLE white_list (
      id varchar,
      name varchar,
      age int,
      PRIMARY KEY (id)
      --PERIOD FOR SYSTEM_TIME -- 注: blink3.x 以降、ディメンションテーブル DDL には PERIOD FOR SYSTEM_TIME は必要ありません。
    ) with (
      type = 'odps',
      endPoint = 'your_end_point_name',
      project = 'your_project_name',
      tableName = 'your_table_name',
      accessId = 'your_access_id',
      accessKey = 'your_access_key',
      `partition` = 'ds=20180905',
      cache = 'ALL'
    );

    これらは Dataphin Flink SQL SET 式に変換されます。

    SET white_list.cache='ALL';
    SET white_list.cacheTTLMs=86400000;
    --タスクレベルでこのテーブルのキャッシュ更新時間を具体的に設定する場合は、ここに SET を追加できます。
  • 例 4: WATERMARK、計算列、HEADER、PROCTIME、PRIMARY KEY/(UNIQUE)INDEX の構成。

    現在、スキーマはプライマリキーとヘッダー設定のみをサポートしています。WATERMARK、PROCTIME、計算列などは、タスクレベルで設定する必要があります。構文の仕様は次のとおりです。

    • WATERMARK の設定。

      構文は次のとおりです。

      SET {
      	[projectName <DOT>] tableName <DOT> WATERMARK
      } 	<EQ> {
      		WATERMARK [watermarkName] FOR <rowtime_field> AS withOffset(<rowtime_field>, offset)
      }

      WATERMARK の標準 Flink SQL タスク DDL の例を次に示します。

      CREATE TABLE sls_stream(
      		a INT,
      		b BIGINT,
      		c VARCHAR,
      		ts TIMESTAMP,
      		WATERMARK FOR ts AS withOffset(ts, 1000)
      	  ) with (
      		type = 'sls',
        ...);

      これは、WATERMARK を設定するための Dataphin Flink SQL メソッドに変換されます。

      バージョン 3.7 以降、一重引用符が必要です。
      SET sls_stream.`watermark`= 'WATERMARK FOR ts AS withOffset(ts, 1000)';
      バージョン 3.6
      SET sls_stream.`watermark`= WATERMARK FOR ts AS withOffset(ts, 1000);
    • 計算列の設定。

      構文は次のとおりです。

      SET {
      	[projectName <DOT>] tableName <DOT> <computedColumn>
      } 	<EQ> {
      		column_name AS computed_column_expression
          |
          {
          	<LPAREN>
          		column_name AS computed_column_expression
          		(<COMMA> column_name AS computed_column_expression)*
            <RPAREN>  
          }
      }

      計算列の標準 Flink SQL タスク DDL の例を次に示します。

      CREATE TABLE sls_stream(
      		a INT,
      		b BIGINT,
      		c VARCHAR,
      		ts AS to_timestamp(c, 'yyyyMMddHHmmss')
      	  ) with (
      		type = 'sls',
        ...);

      これは、計算列を設定するための Dataphin Flink SQL メソッドに変換されます。

      • 単一の計算列のメソッド。

        SET sls_stream.computedColumn= ts AS to_timestamp(c, 'yyyyMMddHHmmss');
      • 複数の計算列のメソッド。

        SET sls_stream.computedColumn= ( ts1 AS to_timestamp(c, 'yyyyMMddHHmmss'), ts2 AS to_timestamp(d, 'yyyy-MM-dd HH:mm:ss') );
    • PROCTIME の設定。

      構文は次のとおりです。

      SET {
      	[projectName <DOT>] tableName <DOT> <procTime>
      } 	<EQ> {
      		columnName AS PROCTIME()
      }

      これは、PROCTIME を設定するための Dataphin Flink SQL メソッドに変換されます。

      SET sls_stream.procTime= d AS PROCTIME();
    • PRIMARY KEY、INDEX、UNIQUE INDEX の設定。

      重要

      Blink 3.6.0 以降、TableHints はコミュニティで受け入れられている Hints メソッドを採用しています。Ververica Flink もオープンソース Flink も INDEX または UNIQUE INDEX をサポートしていません。

      構文は次のとおりです。

      query :
            select /*+ hint_content */
            ...
            from
                table_name1 /*+ hint_content */
                join 
                table_name2 /*+ hint_content */
            ...
      hint_content :
             hint_item[, hint_item]*
      hint_item :
             hint_name
         |   hint_name(k1=v1 [ , k2=v2 ]*)
         |   hint_name(hint_opt [ ,hint_opt ]*)
      k :
            simple_identifier
      v :
            string_literal
      hint_opt :
            simple_identifier
         |  numeric_literal
         |  string_literal

      TableHints を使用した標準 Flink SQL タスクの例を次に示します。

      INSERT INTO table
      SELECT source_table.*
      FROM source_table JOIN dim_table /*+ primarykey(field1) */ FOR SYSTEM_TIME AS OF PROCTIME()
      ON ...;

      ストリームテーブルを結合する場合、Flink ではユーザーが要件に基づいて JOIN KEY キーワードを指定できます。SET ステートメントはグローバルな優先順位を持ち、TableHints は コンテキスト固有 です。

      SET dim_table.index = 'field1'; -- pk の設定と unique index の設定は同じパターンに従います
      
      -- dim_table の DDL で INDEX (field1) を宣言するのと同じです
      INSERT INTO table1
      SELECT source_table.*
      FROM source_table JOIN dim_table FOR SYSTEM_TIME AS OF PROCTIME()
      ON ...;
      
      -- dim_table の DDL で INDEX (field2) を宣言するのと同じです。
      INSERT INTO table2
      SELECT source_table.*
      FROM source_table JOIN dim_table WITH (index = 'field2') FOR SYSTEM_TIME AS OF PROCTIME()
      ON ...;
      
      
      -- dim_table の DDL で PRIMARY KEY (field1)、INDEX (field2) を宣言するのと同じです。
      INSERT INTO table3
      SELECT source_table.*
      FROM source_table JOIN dim_table WITH (primarykey = 'field2') FOR SYSTEM_TIME AS OF PROCTIME()
      ON ...;
      
      -- dim_table の DDL で PRIMARY KEY (field1)、INDEX (field1) を宣言するのと同じです。
      INSERT INTO table4
      SELECT source_table.*
      FROM source_table JOIN dim_table WITH (primarykey = 'field1') FOR SYSTEM_TIME AS OF PROCTIME()
      ON ...;
      
      -- dim_table の DDL で PRIMARY KEY (field1)、UNIQUE INDEX (field2, field3) を宣言するのと同じです。
      INSERT INTO table5
      SELECT source_table.*
      FROM source_table JOIN dim_table WITH (UNIQUEINDEX = 'field2, field3') FOR SYSTEM_TIME AS OF PROCTIME()
      ON ...;
      重要

      join with 構文は以前は初期の Blink エンジンで使用されていました。新しいバージョンの Blink、Ververica Flink、およびオープンソース Flink では、TableHint を使用してください。

      TableHint の使用例を次に示します。

      -- dim_table のプライマリキーが id フィールドであることを指定します(blink、vvr、apache-flink でサポートされています)。
      INSERT INTO table1
      SELECT source_table.*
      FROM source_table 
      JOIN dim_table/*+primarykey(id)*/ FOR SYSTEM_TIME AS OF PROCTIME()
      ON ...;
      
      -- dim_table のディメンションテーブルのプライマリキーが id であることを指定します(blink のみでサポートされています)。
      INSERT INTO table1
      SELECT source_table.*
      FROM source_table 
      JOIN dim_table/*+index(id)*/ FOR SYSTEM_TIME AS OF PROCTIME()
      ON ...;
      
      -- dim_table のディメンションテーブルの一意なプライマリキーが id であることを指定します(blink のみでサポートされています)。
      INSERT INTO table1
      SELECT source_table.*
      FROM source_table 
      JOIN dim_table/*+uniqueindex(id)*/ FOR SYSTEM_TIME AS OF PROCTIME()
      ON ...;
    • HEADER の構成。

      構文は次のとおりです。

      SET {
      	[projectName <DOT>] tableName <DOT> <isHeader>
      } 	<EQ> {
      		TRUE
      }

      以下の例を参照してください。

      -- blink デモ
      create table kafka_table (
          `message` VARBINARY,
          topic varchar HEADER
      ) WITH (
          type = 'kafka010',
          topic = 'test_kafka_topic',
          `group.id` = 'test_kafka_consumer_group',
          bootstrap.servers = 'ip1:port1,ip2:port2,ip3:port3'
      );
      
      -- vvr デモ
      create table kafka_table (
          `message` VARBINARY,
          topic varchar METADATA VIRTUAL
      ) WITH (
          'connector' = 'kafka',
          'topic' = 'my_excellent_topic',
          'properties.bootstrap.servers' = 'mykafka:9092',
          'properties.group.id' = 'my_excellent_group'
          'format' = 'csv',
          'scan.startup.mode' = 'earliest-offset'
      );

      Dataphin では、SET を次の 2 つの構文オプションで使用できます。

      SET kafka_table.topic.isHeader = true; -- 一重引用符なし。
      
      SET kafka_table.topic.isHeader = 'true'; -- 一重引用符あり。
  • 例 5: TimeTunnel 自動サブスクリプション機能(TimeTunnel を入力として使用)。

    サンプルコードは次のとおりです。

    set xx_project.xx_table.accessId='xxxxxx';
    set xx_project.xx_table.accessKey='xxxxxx';
    • スーパーユーザーアカウントと権限のないアカウントは、SQL を使用して手動でサブスクリプションを構成する必要があります。自動サブスクリプション機能を使用することはできません。

    • その他の個人アカウントの場合、SET 情報の構成はオプションです。通常、JobName が指定されていない場合は、デフォルトの JobName default を使用して、Topic のサブスクリプション情報がログに記録されます。異なるジョブ間で同じ Topic の冗長なサブスクリプションを防ぐために、アーカイブレコードが存在する場合は、別のプリコンパイル関数内でサブスクリプションが繰り返されません。送信関数と公開関数は、特定のジョブに初めて使用されたときに自動的に Topic をサブスクライブし、アーカイブレコードが存在する場合は再度サブスクライブしません。

  • 例 6: テーブル名の競合の解決。

    プロジェクトがオフラインコンピューティングソースにリンクされている場合、オフラインテーブルとストリーミングテーブルの名前が競合する可能性があります。これらの競合を解決するアプローチを以下に概説します。

    1. タスクの SQL 内でストリーミングとオフラインの両方のコンテキストでテーブル名が一致する場合は、プリコンパイルプロセス中に次のメッセージでエラーが発生します。構文 SET XXX.XXX.tableType = 'XX' を使用して、テーブル adi_ae_matrix_rt_slr_prim_cate に単一の型を割り当ててください。使用可能な型は次のとおりです。[CLOUD_HBASE, MAX_COMPUTE]

    2. このような場合は、同じ project 内での名前の重複を避けるために、SQL 内でテーブルのデータ型を明示的に定義する必要があります。エラーメッセージに応じて、CLOUD_HBASE または MAX_COMPUTE を選択します。コードの例は次のとおりです。

      -- MaxCompute 物理テーブルの使用を強制します。
      SET xx_project.xx_table.tableType = 'MAX_COMPUTE';
  • 例 7: MaxCompute (odps) データソーステーブルのサポート。

    MaxCompute データソーステーブルを使用するには、パラメーター欠落エラーを回避するために tableType='odps'; を定義してください。

  • 例 8: ディメンションテーブルの列の切り取り。

    列の切り取りを手動で有効にするには、次のように SET ステートメントを使用します。

    SET {project}.{dimtable}.dataphinColumnPruning='true';
    説明

    注: 列の切り取りはディメンションテーブルにのみ適用可能であり、直接結合テーブルのみをサポートし、直接結合内のサブクエリはサポートしていません。

  • 例 9: ストリームバッチ統合のパラメーターのカスタマイズ。

    ストリームバッチ統合タスクでは、ミラーテーブルがよく使用されます。これらのミラーテーブルは、使用するために適切なストリームテーブルまたはバッチテーブルに変換されます。

    さまざまなストリームテーブルとバッチテーブルがあり、データソースが異なる場合があるため、with 句で異なる key パラメーターが必要になる場合があり、batchSize などの固有の設定がある場合があります。TableHints は、対応するストリームテーブルとバッチテーブルをマッピングするために使用されます。構文は次のとおりです。

    set project.table.${mode}.${key}

    バッチタスクの開始時刻と終了時刻を定義するには、以下の例を参照してください。

    set project.table.batch.startTime='2020-11-11 00:00:00';
    set project.table.batch.endTime='2020-11-12 00:00:00';

    ストリームの AccessKey を設定するには、次の例を参照してください。

    set dwd_lux_trd_ord_subpay_mirror.`stream`.accessId='xxxxxx';
    set dwd_lux_trd_ord_subpay_mirror.`stream`.accessKey='xxxxxx';
  • 例 10: 結果テーブルの列の切り取り。

    ヒントメソッドを使用して、結果テーブルで切り取る列を指定できます。

    重要

    注: この機能は HBase シンクテーブルにのみ適用可能です。

    -- HBase DDL ステートメントが次のようになっているとします
    CREATE TABLE hbase_sink(
      rowkey INT,
      family1 ROW<q1 INT>,
      family2 ROW<q2 STRING, q3 BIGINT>
    ) with (
      'connector'='cloudhbase',
      'table-name'='<yourTableName>',
      'zookeeper.quorum'='<yourZookeeperQuorum>'
    );
    
    -- エラー: テーブルフィールドが一致しないため
    insert into hbase_sink select key, ROW(f1q1) from ...
    
    -- 成功: ここでは、ヒントメソッドでシンクテーブルの切り取りを指定し、rowkey 列と q1 列への書き込みを指定しています
    insert into hbase_sink/*+dataphincolumns(rowkey,family1.q1)*/
    select key, ROW(f1q1) from ...
    
    -- 成功: ここでは、ヒントメソッドでシンクテーブルの切り取りを指定し、rowkey 列と q2、q3 列への書き込みを指定しています
    insert into hbase_sink/*+dataphincolumns(rowkey,family2.q2,family2.q3)*/
    select key, ROW(f2q2,f2q3) from ...