Dataphin は、開発効率の向上を目的として、ANSI-SQL 標準に沿って Flink SQL の構文の改善を導入しました。このトピックでは、新しい構文上の 特徴とその Dataphin Flink SQL 内での適用について概要を説明します。
プロジェクトをまたがる参照をサポート
DDL および DML テーブル名構文の拡張
典型的な Flink SQL タスクでは、完全な DDL および DML ステートメントが必要です。次の例を考えてみましょう。
--DDL
CREATE TABLE input_table (
field1 VARCHAR,
field2 BIGINT
) WITH (
type = 'datagen'
);
CREATE TABLE output_table (
field1 VARCHAR,
field2 BIGINT
) WITH (
type = 'print'
);
--DML
INSERT INTO output_table
SELECT field1, field2 FROM input_table;複数のタスクにわたって繰り返し DDL ステートメント定義する必要性をなくし、開発効率を高めるために、Dataphin は Flink SQL でテーブル名を参照するための構文を改良しました。Dataphin でスキーマを定義すると、以下の構文を使用して任意のタスクで直接参照できるため、繰り返しの宣言を回避し、クロスプロジェクト参照が可能になります。
<compoundTableName> ::= [projectName <DOT>] tableName例の解説
上記の SQL の例で示されているように、スキーマを定義した後、開発タスクに次のステートメントを含めるだけで済みます。
INSERT INTO output_table -- デフォルトのプロジェクトプレフィックスは現在のプロジェクトです。
SELECT field1, field2 FROM ${project_name}.input_table;ビルトイン関数と UDX 構文の拡張
通常、Flink SQL タスクでは、UDX の使用を宣言する必要があります。以下の例を参照してください。
CREATE FUNCTION MY_UDX AS 'package.class';
...
CREATE VIEW my_view AS
SELECT MY_UDX(args)
FROM ...上記のカスタム UDX 宣言は、多くの場合、複数のタスクで必要になります。この冗長性を回避し、効率を向上させるために、Dataphin はリソースの概念を導入しました。Dataphin に関数を登録すると、以下の構文を使用して任意のタスクで直接参照できるため、繰り返しの宣言が不要になり、クロスプロジェクト参照がサポートされます。
<compoundFunctionName> ::= [projectName <COLON>] functionName例の解説
上記の SQL の例に続いて、Dataphin に関数を登録した後、開発タスクに次のステートメントを入力するだけで済みます。
CREATE VIEW my_view AS SELECT ${project_name}:MY_UDX(args) -- デフォルトのプロジェクトプレフィックスは現在のプロジェクトです。
FROM ...特記事項
Dataphin では、ビルトイン関数のオーバーロードまたはオーバーライドが可能で、オーバーロードまたはオーバーライドされたバージョンが優先されます。次の例を考えてみましょう。
例 1: プロジェクトプレフィックスを含む。
SELECT ${project_name}:SUBSTRING('20200108171740', 1, 8) FROM dual;
--Dataphin はデフォルトでオーバーロードされたビルトイン関数 `SUBSTRING` を使用します。`${project_name}` の下にカスタム関数が見つからない場合、プリコンパイル中に失敗します。例 2: プロジェクトプレフィックスを省略する。
SELECT SUBSTRING('20200108171740', 1, 8) FROM dual;
--Dataphin は、現在のプロジェクトでカスタム `SUBSTRING` 関数を優先的に検索します。見つからない場合は、ビルトイン関数と見なされます。UpdateStatement と TableHints を使用してタスクレベルの DDL パラメーターを設定する
Dataphin では、既存のスキーマの DDL WITH パラメーターを設定できます。プラットフォームは、Watermark、計算列、プライマリキー、ヘッダーなど、タスクレベルのパラメーターの構成をサポートしています。さらに、Dataphin は SetOption 構文のサポートを導入しました。構文の仕様は次のとおりです。
SET {
[ projectName <DOT>] tableName <DOT>
{
propertyName
| WATERMARK
| <computedColumn>
| <primaryKey>
| <procTime>
| {fieldName <DOT> <isHeader>}
}
} <EQ> {
@variable = identifier
| @variable = literal
| @variable = expression
}WITH パラメーターを構成する場合は、スキーマで共通のテーブル DDL WITH パラメーターを定義することをお勧めします。タスクレベルのパーソナライズされたパラメーターは、SET ステートメントまたは TableHints を使用してコードで指定でき、複数の SET ステートメントは ; で区切ります。
Flink SQL 予約語を使用する場合は、(` `)記号を追加する必要があります。次の例を参照してください。
Watermark: SET ステートメントでは、
SET [project.]table.`watermark` = ...を使用します。パーティション: SET ステートメントでは、
SET [project.]table.`partition` = ...を使用します。
DDL パラメーターの優先順位
タスクレベルのパラメーターの上書き
タスクレベルのパラメーター設定は、メタテーブルで構成されたパラメーター設定を上書きします(上書き可能なパラメーターのみ)。たとえば、メタテーブルでテーブル T のプライマリキーが field1 として設定されている場合、タスクでテーブル T を使用した後、タスクの primary key 設定が優先されます(つまり、タスクでテーブル T の primary key が field2 として設定されている場合、そのタスクでは field1 はテーブル T のプライマリキーではなくなります)。
Dataphin のセキュリティポリシーと管理ポリシーにより、スキーマパラメーターの中には、SET ステートメントを使用して SQL タスクで上書きできないものがあります。たとえば、MaxCompute スキーマで定義されたテーブル名は、SET を使用して SQL タスクで変更することはできません。上書きできるパラメーターはスキーマの種類によって異なり、実際の操作によって異なります。
タスク内部 SET パラメーター
内部タスク SET パラメーターはグローバルな優先順位を持ち、TableHints は コンテキスト固有 です。SET パラメーターは TableHints にないものを補完し、同一のパラメーター項目は置き換えられます。
たとえば、バージョン 2.2.5 以降、Blink はディメンションテーブルの INDEX と UNIQUE INDEX の設定のサポートを導入しました。現在、JOIN KEY を宣言するためのキーワードは 3 つあります。PRIMARY KEY、INDEX、UNIQUE INDEX です。Dataphin はこれらに基づいて総合的に判断します。以下の例を参照してください。
SET my_dim_table.primarykey = 'field1';
CREATE VIEW tmp1 AS
SELECT my_source_table1.*
FROM my_source_table1 JOIN my_dim_table WITH (index = 'field2') FOR SYSTEM_TIME AS OF PROCTIME()
ON ...;
CREATE VIEW tmp2 AS
SELECT my_source_table2.*
FROM my_source_table2 JOIN my_dim_table WITH (primarykey = 'field3') FOR SYSTEM_TIME AS OF PROCTIME()
ON ...;上記のコードでは、Dataphin は、tmp1 の作成のために my_dim_table の primary key として field1 を割り当て(パラメーターパディング)、index として field2 を割り当てると解釈します。tmp2 の作成のために、my_dim_table の primary key として field3 を設定します(パラメーターの置き換え)。
追加の例の解説:
例 1: Flink SQL ソーステーブルパラメーター。
たとえば、Kafka ソーステーブルを作成する場合:
-- vvr 構文 CREATE TEMPORARY TABLE kafkaTable ( `user_id` BIGINT, `item_id` BIGINT, `category_id` BIGINT, `behavior` STRING, `topic` STRING METADATA VIRTUAL, `partition` BIGINT METADATA VIRTUAL ) WITH ( 'connector' = 'kafka', 'topic' = 'my_excellent_topic', 'properties.bootstrap.servers' = 'mykafka:9092', 'properties.group.id' = 'my_excellent_group' 'format' = 'csv', 'scan.startup.mode' = 'earliest-offset' )パーソナライズされたパラメーターは、Dataphin Flink SQL SET 式に変換されます。
-- テーブルプロパティの名前が SQL キーワードであるか、他の文字が含まれている場合は、バックティックでエスケープできます。 SET kafkaTable.`properties.group.id` = 'new_group_id'; SET kafkaTable.`scan.startup.mode` = 'latest-offset';例 2: Flink SQL シンクテーブルパラメーター。
たとえば、aliHBase 結果テーブルを作成する場合:
create table hbase_output( rk varchar, rk1 varchar, rk2 varchar, f bigint, PRIMARY KEY(rk) ) with ( type='alihbase', diamondKey='xxxxxxx', diamondGroup='yyyyyyy', columnFamily='cf', tableName='blink_hbase_test', bufferSize=500; );タスクレベルでスキーマ構成を上書きするには、次の設定を使用します。
SET hbase_output.bufferSize = 1000;例 3: Flink SQL ディメンションテーブルパラメーター。
たとえば、MaxCompute ディメンションテーブルを作成する場合:
CREATE TABLE white_list ( id varchar, name varchar, age int, PRIMARY KEY (id) --PERIOD FOR SYSTEM_TIME -- 注: blink3.x 以降、ディメンションテーブル DDL には PERIOD FOR SYSTEM_TIME は必要ありません。 ) with ( type = 'odps', endPoint = 'your_end_point_name', project = 'your_project_name', tableName = 'your_table_name', accessId = 'your_access_id', accessKey = 'your_access_key', `partition` = 'ds=20180905', cache = 'ALL' );これらは Dataphin Flink SQL SET 式に変換されます。
SET white_list.cache='ALL'; SET white_list.cacheTTLMs=86400000; --タスクレベルでこのテーブルのキャッシュ更新時間を具体的に設定する場合は、ここに SET を追加できます。例 4: WATERMARK、計算列、HEADER、PROCTIME、PRIMARY KEY/(UNIQUE)INDEX の構成。
現在、スキーマはプライマリキーとヘッダー設定のみをサポートしています。WATERMARK、PROCTIME、計算列などは、タスクレベルで設定する必要があります。構文の仕様は次のとおりです。
WATERMARK の設定。
構文は次のとおりです。
SET { [projectName <DOT>] tableName <DOT> WATERMARK } <EQ> { WATERMARK [watermarkName] FOR <rowtime_field> AS withOffset(<rowtime_field>, offset) }WATERMARK の標準 Flink SQL タスク DDL の例を次に示します。
CREATE TABLE sls_stream( a INT, b BIGINT, c VARCHAR, ts TIMESTAMP, WATERMARK FOR ts AS withOffset(ts, 1000) ) with ( type = 'sls', ...);これは、WATERMARK を設定するための Dataphin Flink SQL メソッドに変換されます。
バージョン 3.7 以降、一重引用符が必要です。 SET sls_stream.`watermark`= 'WATERMARK FOR ts AS withOffset(ts, 1000)'; バージョン 3.6 SET sls_stream.`watermark`= WATERMARK FOR ts AS withOffset(ts, 1000);計算列の設定。
構文は次のとおりです。
SET { [projectName <DOT>] tableName <DOT> <computedColumn> } <EQ> { column_name AS computed_column_expression | { <LPAREN> column_name AS computed_column_expression (<COMMA> column_name AS computed_column_expression)* <RPAREN> } }計算列の標準 Flink SQL タスク DDL の例を次に示します。
CREATE TABLE sls_stream( a INT, b BIGINT, c VARCHAR, ts AS to_timestamp(c, 'yyyyMMddHHmmss') ) with ( type = 'sls', ...);これは、計算列を設定するための Dataphin Flink SQL メソッドに変換されます。
単一の計算列のメソッド。
SET sls_stream.computedColumn= ts AS to_timestamp(c, 'yyyyMMddHHmmss');複数の計算列のメソッド。
SET sls_stream.computedColumn= ( ts1 AS to_timestamp(c, 'yyyyMMddHHmmss'), ts2 AS to_timestamp(d, 'yyyy-MM-dd HH:mm:ss') );
PROCTIME の設定。
構文は次のとおりです。
SET { [projectName <DOT>] tableName <DOT> <procTime> } <EQ> { columnName AS PROCTIME() }これは、PROCTIME を設定するための Dataphin Flink SQL メソッドに変換されます。
SET sls_stream.procTime= d AS PROCTIME();PRIMARY KEY、INDEX、UNIQUE INDEX の設定。
重要Blink 3.6.0 以降、TableHints はコミュニティで受け入れられている Hints メソッドを採用しています。Ververica Flink もオープンソース Flink も INDEX または UNIQUE INDEX をサポートしていません。
構文は次のとおりです。
query : select /*+ hint_content */ ... from table_name1 /*+ hint_content */ join table_name2 /*+ hint_content */ ... hint_content : hint_item[, hint_item]* hint_item : hint_name | hint_name(k1=v1 [ , k2=v2 ]*) | hint_name(hint_opt [ ,hint_opt ]*) k : simple_identifier v : string_literal hint_opt : simple_identifier | numeric_literal | string_literalTableHints を使用した標準 Flink SQL タスクの例を次に示します。
INSERT INTO table SELECT source_table.* FROM source_table JOIN dim_table /*+ primarykey(field1) */ FOR SYSTEM_TIME AS OF PROCTIME() ON ...;ストリームテーブルを結合する場合、Flink ではユーザーが要件に基づいて
JOIN KEYキーワードを指定できます。SET ステートメントはグローバルな優先順位を持ち、TableHints はコンテキスト固有です。SET dim_table.index = 'field1'; -- pk の設定と unique index の設定は同じパターンに従います -- dim_table の DDL で INDEX (field1) を宣言するのと同じです INSERT INTO table1 SELECT source_table.* FROM source_table JOIN dim_table FOR SYSTEM_TIME AS OF PROCTIME() ON ...; -- dim_table の DDL で INDEX (field2) を宣言するのと同じです。 INSERT INTO table2 SELECT source_table.* FROM source_table JOIN dim_table WITH (index = 'field2') FOR SYSTEM_TIME AS OF PROCTIME() ON ...; -- dim_table の DDL で PRIMARY KEY (field1)、INDEX (field2) を宣言するのと同じです。 INSERT INTO table3 SELECT source_table.* FROM source_table JOIN dim_table WITH (primarykey = 'field2') FOR SYSTEM_TIME AS OF PROCTIME() ON ...; -- dim_table の DDL で PRIMARY KEY (field1)、INDEX (field1) を宣言するのと同じです。 INSERT INTO table4 SELECT source_table.* FROM source_table JOIN dim_table WITH (primarykey = 'field1') FOR SYSTEM_TIME AS OF PROCTIME() ON ...; -- dim_table の DDL で PRIMARY KEY (field1)、UNIQUE INDEX (field2, field3) を宣言するのと同じです。 INSERT INTO table5 SELECT source_table.* FROM source_table JOIN dim_table WITH (UNIQUEINDEX = 'field2, field3') FOR SYSTEM_TIME AS OF PROCTIME() ON ...;重要join with構文は以前は初期の Blink エンジンで使用されていました。新しいバージョンの Blink、Ververica Flink、およびオープンソース Flink では、TableHint を使用してください。TableHint の使用例を次に示します。
-- dim_table のプライマリキーが id フィールドであることを指定します(blink、vvr、apache-flink でサポートされています)。 INSERT INTO table1 SELECT source_table.* FROM source_table JOIN dim_table/*+primarykey(id)*/ FOR SYSTEM_TIME AS OF PROCTIME() ON ...; -- dim_table のディメンションテーブルのプライマリキーが id であることを指定します(blink のみでサポートされています)。 INSERT INTO table1 SELECT source_table.* FROM source_table JOIN dim_table/*+index(id)*/ FOR SYSTEM_TIME AS OF PROCTIME() ON ...; -- dim_table のディメンションテーブルの一意なプライマリキーが id であることを指定します(blink のみでサポートされています)。 INSERT INTO table1 SELECT source_table.* FROM source_table JOIN dim_table/*+uniqueindex(id)*/ FOR SYSTEM_TIME AS OF PROCTIME() ON ...;HEADER の構成。
構文は次のとおりです。
SET { [projectName <DOT>] tableName <DOT> <isHeader> } <EQ> { TRUE }以下の例を参照してください。
-- blink デモ create table kafka_table ( `message` VARBINARY, topic varchar HEADER ) WITH ( type = 'kafka010', topic = 'test_kafka_topic', `group.id` = 'test_kafka_consumer_group', bootstrap.servers = 'ip1:port1,ip2:port2,ip3:port3' ); -- vvr デモ create table kafka_table ( `message` VARBINARY, topic varchar METADATA VIRTUAL ) WITH ( 'connector' = 'kafka', 'topic' = 'my_excellent_topic', 'properties.bootstrap.servers' = 'mykafka:9092', 'properties.group.id' = 'my_excellent_group' 'format' = 'csv', 'scan.startup.mode' = 'earliest-offset' );Dataphin では、SET を次の 2 つの構文オプションで使用できます。
SET kafka_table.topic.isHeader = true; -- 一重引用符なし。 SET kafka_table.topic.isHeader = 'true'; -- 一重引用符あり。
例 5: TimeTunnel 自動サブスクリプション機能(TimeTunnel を入力として使用)。
サンプルコードは次のとおりです。
set xx_project.xx_table.accessId='xxxxxx'; set xx_project.xx_table.accessKey='xxxxxx';スーパーユーザーアカウントと権限のないアカウントは、SQL を使用して手動でサブスクリプションを構成する必要があります。自動サブスクリプション機能を使用することはできません。
その他の個人アカウントの場合、SET 情報の構成はオプションです。通常、JobName が指定されていない場合は、デフォルトの JobName
defaultを使用して、Topic のサブスクリプション情報がログに記録されます。異なるジョブ間で同じ Topic の冗長なサブスクリプションを防ぐために、アーカイブレコードが存在する場合は、別のプリコンパイル関数内でサブスクリプションが繰り返されません。送信関数と公開関数は、特定のジョブに初めて使用されたときに自動的に Topic をサブスクライブし、アーカイブレコードが存在する場合は再度サブスクライブしません。
例 6: テーブル名の競合の解決。
プロジェクトがオフラインコンピューティングソースにリンクされている場合、オフラインテーブルとストリーミングテーブルの名前が競合する可能性があります。これらの競合を解決するアプローチを以下に概説します。
タスクの SQL 内でストリーミングとオフラインの両方のコンテキストでテーブル名が一致する場合は、プリコンパイルプロセス中に次のメッセージでエラーが発生します。構文 SET XXX.XXX.tableType = 'XX' を使用して、テーブル adi_ae_matrix_rt_slr_prim_cate に単一の型を割り当ててください。使用可能な型は次のとおりです。[CLOUD_HBASE, MAX_COMPUTE]。
このような場合は、同じ
project内での名前の重複を避けるために、SQL 内でテーブルのデータ型を明示的に定義する必要があります。エラーメッセージに応じて、CLOUD_HBASE または MAX_COMPUTE を選択します。コードの例は次のとおりです。-- MaxCompute 物理テーブルの使用を強制します。 SET xx_project.xx_table.tableType = 'MAX_COMPUTE';
例 7: MaxCompute (odps) データソーステーブルのサポート。
MaxCompute データソーステーブルを使用するには、パラメーター欠落エラーを回避するために
tableType='odps';を定義してください。例 8: ディメンションテーブルの列の切り取り。
列の切り取りを手動で有効にするには、次のように SET ステートメントを使用します。
SET {project}.{dimtable}.dataphinColumnPruning='true';説明注: 列の切り取りはディメンションテーブルにのみ適用可能であり、直接結合テーブルのみをサポートし、直接結合内のサブクエリはサポートしていません。
例 9: ストリームバッチ統合のパラメーターのカスタマイズ。
ストリームバッチ統合タスクでは、ミラーテーブルがよく使用されます。これらのミラーテーブルは、使用するために適切なストリームテーブルまたはバッチテーブルに変換されます。
さまざまなストリームテーブルとバッチテーブルがあり、データソースが異なる場合があるため、
with句で異なるkeyパラメーターが必要になる場合があり、batchSizeなどの固有の設定がある場合があります。TableHints は、対応するストリームテーブルとバッチテーブルをマッピングするために使用されます。構文は次のとおりです。set project.table.${mode}.${key}バッチタスクの開始時刻と終了時刻を定義するには、以下の例を参照してください。
set project.table.batch.startTime='2020-11-11 00:00:00'; set project.table.batch.endTime='2020-11-12 00:00:00';ストリームの AccessKey を設定するには、次の例を参照してください。
set dwd_lux_trd_ord_subpay_mirror.`stream`.accessId='xxxxxx'; set dwd_lux_trd_ord_subpay_mirror.`stream`.accessKey='xxxxxx';例 10: 結果テーブルの列の切り取り。
ヒントメソッドを使用して、結果テーブルで切り取る列を指定できます。
重要注: この機能は HBase シンクテーブルにのみ適用可能です。
-- HBase DDL ステートメントが次のようになっているとします CREATE TABLE hbase_sink( rowkey INT, family1 ROW<q1 INT>, family2 ROW<q2 STRING, q3 BIGINT> ) with ( 'connector'='cloudhbase', 'table-name'='<yourTableName>', 'zookeeper.quorum'='<yourZookeeperQuorum>' ); -- エラー: テーブルフィールドが一致しないため insert into hbase_sink select key, ROW(f1q1) from ... -- 成功: ここでは、ヒントメソッドでシンクテーブルの切り取りを指定し、rowkey 列と q1 列への書き込みを指定しています insert into hbase_sink/*+dataphincolumns(rowkey,family1.q1)*/ select key, ROW(f1q1) from ... -- 成功: ここでは、ヒントメソッドでシンクテーブルの切り取りを指定し、rowkey 列と q2、q3 列への書き込みを指定しています insert into hbase_sink/*+dataphincolumns(rowkey,family2.q2,family2.q3)*/ select key, ROW(f2q2,f2q3) from ...