このトピックでは、Realtime Compute for Apache Flink でサポートされているクエリについて説明します。
Realtime Compute for Apache Flink は、Apache Flink のネイティブクエリと互換性があります。次のバッカスナウア記法(BNF)は、サポートされているストリーミング SQL クエリとバッチ SQL クエリのスーパーセットを示しています。
query:
values
| WITH withItem [ , withItem ]* query
| {
select
| selectWithoutFrom
| query UNION [ ALL ] query
| query EXCEPT query
| query INTERSECT query
}
[ ORDER BY orderItem [, orderItem ]* ]
[ LIMIT { count | ALL } ]
[ OFFSET start { ROW | ROWS } ]
[ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ONLY]
withItem:
name
[ '(' column [, column ]* ')' ]
AS '(' query ')'
orderItem:
expression [ ASC | DESC ]
select:
SELECT [ ALL | DISTINCT ]
{ * | projectItem [, projectItem ]* }
FROM tableExpression
[ WHERE booleanExpression ]
[ GROUP BY { groupItem [, groupItem ]* } ]
[ HAVING booleanExpression ]
[ WINDOW windowName AS windowSpec [, windowName AS windowSpec ]* ]
selectWithoutFrom:
SELECT [ ALL | DISTINCT ]
{ * | projectItem [, projectItem ]* }
projectItem:
expression [ [ AS ] columnAlias ]
| tableAlias . *
tableExpression:
tableReference [, tableReference ]*
| tableExpression [ NATURAL ] [ LEFT | RIGHT | FULL ] JOIN tableExpression [ joinCondition ]
joinCondition:
ON booleanExpression
| USING '(' column [, column ]* ')'
tableReference:
tablePrimary
[ matchRecognize ]
[ [ AS ] alias [ '(' columnAlias [, columnAlias ]* ')' ] ]
tablePrimary:
[ TABLE ] tablePath [ dynamicTableOptions ] [systemTimePeriod] [[AS] correlationName]
| LATERAL TABLE '(' functionName '(' expression [, expression ]* ')' ')'
| [ LATERAL ] '(' query ')'
| UNNEST '(' expression ')'
tablePath:
[ [ catalogName . ] databaseName . ] tableName
systemTimePeriod:
FOR SYSTEM_TIME AS OF dateTimeExpression
dynamicTableOptions:
/*+ OPTIONS(key=val [, key=val]*) */
key:
stringLiteral
val:
stringLiteral
values:
VALUES expression [, expression ]*
groupItem:
expression
| '(' ')'
| '(' expression [, expression ]* ')'
| CUBE '(' expression [, expression ]* ')'
| ROLLUP '(' expression [, expression ]* ')'
| GROUPING SETS '(' groupItem [, groupItem ]* ')'
windowRef:
windowName
| windowSpec
windowSpec:
[ windowName ]
'('
[ ORDER BY orderItem [, orderItem ]* ]
[ PARTITION BY expression [, expression ]* ]
[
RANGE numericOrIntervalExpression {PRECEDING}
| ROWS numericExpression {PRECEDING}
]
')'
matchRecognize:
MATCH_RECOGNIZE '('
[ PARTITION BY expression [, expression ]* ]
[ ORDER BY orderItem [, orderItem ]* ]
[ MEASURES measureColumn [, measureColumn ]* ]
[ ONE ROW PER MATCH ]
[ AFTER MATCH
( SKIP TO NEXT ROW
| SKIP PAST LAST ROW
| SKIP TO FIRST variable
| SKIP TO LAST variable
| SKIP TO variable )
]
PATTERN '(' pattern ')'
[ WITHIN intervalLiteral ]
DEFINE variable AS condition [, variable AS condition ]*
')'
measureColumn:
expression AS alias
pattern:
patternTerm [ '|' patternTerm ]*
patternTerm:
patternFactor [ patternFactor ]*
patternFactor:
variable [ patternQuantifier ]
patternQuantifier:
'*'
| '*?'
| '+'
| '+?'
| '?'
| '??'
| '{' { [ minRepeat ], [ maxRepeat ] } '}' ['?']
| '{' repeat '}'識別子
Java と同様に、Flink SQL は、テーブル名、列名、関数名などの識別子に次の構文ポリシーを使用します。
識別子の定義は、バッククォート(`)で囲まれているかどうかに関係なく、大文字と小文字が区別されます。
識別子の一致は大文字と小文字が区別されます。
Java とは異なり、Flink SQL では識別子に英数字以外の文字を使用できます。例:
SELECT a AS `my field` FROM t文字列定数
文字列定数は、二重引用符(")ではなく、単一引用符(')で囲む必要があります。例:
SELECT 'Hello World' エスケープするには、文字列内の単一引用符(')を複製します。例:
Flink SQL> SELECT 'Hello World', 'It''s me';
+-------------+---------+
| EXPR$0 | EXPR$1 |
+-------------+---------+
| Hello World | It's me |
+-------------+---------+
1 row in set文字列定数では Unicode 値がサポートされています。文字列定数に Unicode 値を含めるには、次のいずれかの方法を使用します。
Unicode 値の前にバックスラッシュ(\)を追加して、デフォルトのエスケープ文字を使用します。
SELECT U&'\263A'カスタムエスケープ文字を使用します。例:
SELECT U&'#263A' UESCAPE '#' -- シャープ記号(#)はエスケープ文字として使用されます。
次の表に、Apache Flink 1.15 でサポートされているクエリと、対応するリファレンスを示します。
別のバージョンの Apache Flink のリファレンスを表示する場合は、Apache Flink の Web サイトでバージョンを切り替えることができます。
クエリ | リファレンス |
ヒント | |
WITH 句 | |
SELECT 句と WHERE 句 | |
SELECT DISTINCT | |
ウィンドウ関数 | |
ウィンドウ集計 | |
グループ集計 | |
Over 集計 | |
結合 | |
ウィンドウ結合 | |
集合演算 | |
ORDER BY 句 | |
LIMIT 句 | |
Top-N | |
ウィンドウ Top-N | |
重複除外 | |
ウィンドウ重複除外 | |
パターン認識 |
クエリの実行
ストリーミングモードでは、入力ストリームは更新ストリームと非更新ストリームに分類できます。非更新ストリームには INSERT タイプのイベントのみが含まれますが、更新ストリームには他のタイプのイベントが含まれます。たとえば、Change Data Capture(CDC)ソースは、外部システムから更新ストリームを生成します。グループ集計や Top-N 計算などの Flink 内の特定の操作も、更新イベントを生成します。ほとんどの場合、更新イベントを生成する操作はステートフル演算子によって実行されます。ステートフル演算子は、状態を使用して更新を管理します。ただし、すべてのステートフル演算子が更新ストリームを使用できるわけではありません。たとえば、Over 集計とインターバル結合の演算子は、入力として更新ストリームをサポートしていません。
次の表に、サポートされている各クエリの実行時情報(演算子の名前、演算子がステートフルかどうか、演算子が更新ストリームを使用できるかどうか、更新イベントが生成されるかどうかなど)を示します。この情報は、Ververica Runtime(VVR) 6.0.X 以降に適用されます。
クエリ | ランタイム演算子 | 状態データの使用 | 更新ストリームの使用 | 更新イベントの生成 | 注記 |
SELECT と WHERE | Calc | いいえ | はい | いいえ | 該当なし |
ルックアップ結合 | LookupJoin | いいえ* | はい | いいえ | VVR 8.0.1 以降では、table.optimizer.non-deterministic-update.strategy パラメーターを TRY_RESOLVE に設定し、クエリで 非決定性問題 が検出された場合、状態データが自動的に使用されて問題が解決されます。状態データの使用を無効にする場合は、table.optimizer.non-deterministic-update.strategy パラメーターを IGNORE に設定します。このパラメーターの値を変更した後、非互換性が発生する可能性があることに注意してください。この場合、クエリを再実行する必要があります。 |
テーブル関数 | Correlate | いいえ | はい | いいえ | 該当なし |
SELECT DISTINCT | GroupAggregate | はい | はい | はい | 該当なし |
グループ集計 | GroupAggregate LocalGroupAggregate GlobalGroupAggregate IncrementalGroupAggregate | はい* | はい | はい | 事前集計演算子 LocalGroupAggregate は状態データを使用しません。 |
Over 集計 | OverAggregate | はい | いいえ | いいえ | 該当なし |
ウィンドウ集計 | GroupWindowAggregate WindowAggregate LocalWindowAggregate GlobalWindowAggregate | はい* | はい* | いいえ* |
|
2 つのストリームの結合(標準結合) | Join | はい | はい | はい* | LEFT JOIN、RIGHT JOIN、FULL OUTER JOIN などの外部結合を使用する場合、更新イベントが生成されます。 |
インターバル結合 | IntervalJoin | はい | いいえ | いいえ | 該当なし |
テンポラル結合 | TemporalJoin | はい | はい | いいえ | 該当なし |
ウィンドウ結合 | WindowJoin | はい | いいえ | いいえ | 該当なし |
Top-N | Rank | はい | はい | はい | Top-N クエリは、処理時間に基づくランキングをサポートしていません。CURRENT_TIMESTAMP など、ランキングには他の組み込み関数を使用できます。 警告 Top-N クエリの ORDER BY 句で処理時間フィールドを指定すると、データエラーが発生する可能性があります。この問題は、VVR 8.0.7 以前の構文チェックでは報告されません。CURRENT_TIMESTAMP など、他の組み込み関数を使用することをお勧めします。 |
ウィンドウ Top-N | WindowRank | はい | いいえ | いいえ | 該当なし |
重複除外 | Deduplicate | はい | いいえ | はい* | Deduplicate Keep FirstRow ポリシーを使用して処理時間(Proctime)に基づいてデータを重複除外する場合、更新イベントは生成されません。 |
ウィンドウ重複除外 | WindowDeduplicate | はい | いいえ | いいえ | 該当なし |
ステートレス演算子はイベントタイプのみを渡します。これにより、出力イベントが入力イベントと同じタイプになります。ステートレス演算子は、入力が更新ストリームかどうかに関係なく、更新イベントを生成しません。