Realtime Compute for Apache Flink is compatible with native Apache Flink SQL queries. The following Backus-Naur Form (BNF) grammar defines the superset of supported streaming and batch SQL queries.
query:
values
| WITH withItem [ , withItem ]* query
| {
select
| selectWithoutFrom
| query UNION [ ALL ] query
| query EXCEPT query
| query INTERSECT query
}
[ ORDER BY orderItem [, orderItem ]* ]
[ LIMIT { count | ALL } ]
[ OFFSET start { ROW | ROWS } ]
[ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ONLY]
withItem:
name
[ '(' column [, column ]* ')' ]
AS '(' query ')'
orderItem:
expression [ ASC | DESC ]
select:
SELECT [ ALL | DISTINCT ]
{ * | projectItem [, projectItem ]* }
FROM tableExpression
[ WHERE booleanExpression ]
[ GROUP BY { groupItem [, groupItem ]* } ]
[ HAVING booleanExpression ]
[ WINDOW windowName AS windowSpec [, windowName AS windowSpec ]* ]
selectWithoutFrom:
SELECT [ ALL | DISTINCT ]
{ * | projectItem [, projectItem ]* }
projectItem:
expression [ [ AS ] columnAlias ]
| tableAlias . *
tableExpression:
tableReference [, tableReference ]*
| tableExpression [ NATURAL ] [ LEFT | RIGHT | FULL ] JOIN tableExpression [ joinCondition ]
joinCondition:
ON booleanExpression
| USING '(' column [, column ]* ')'
tableReference:
tablePrimary
[ matchRecognize ]
[ [ AS ] alias [ '(' columnAlias [, columnAlias ]* ')' ] ]
tablePrimary:
[ TABLE ] tablePath [ dynamicTableOptions ] [systemTimePeriod] [[AS] correlationName]
| LATERAL TABLE '(' functionName '(' expression [, expression ]* ')' ')'
| [ LATERAL ] '(' query ')'
| UNNEST '(' expression ')'
tablePath:
[ [ catalogName . ] databaseName . ] tableName
systemTimePeriod:
FOR SYSTEM_TIME AS OF dateTimeExpression
dynamicTableOptions:
/*+ OPTIONS(key=val [, key=val]*) */
key:
stringLiteral
val:
stringLiteral
values:
VALUES expression [, expression ]*
groupItem:
expression
| '(' ')'
| '(' expression [, expression ]* ')'
| CUBE '(' expression [, expression ]* ')'
| ROLLUP '(' expression [, expression ]* ')'
| GROUPING SETS '(' groupItem [, groupItem ]* ')'
windowRef:
windowName
| windowSpec
windowSpec:
[ windowName ]
'('
[ ORDER BY orderItem [, orderItem ]* ]
[ PARTITION BY expression [, expression ]* ]
[
RANGE numericOrIntervalExpression {PRECEDING}
| ROWS numericExpression {PRECEDING}
]
')'
matchRecognize:
MATCH_RECOGNIZE '('
[ PARTITION BY expression [, expression ]* ]
[ ORDER BY orderItem [, orderItem ]* ]
[ MEASURES measureColumn [, measureColumn ]* ]
[ ONE ROW PER MATCH ]
[ AFTER MATCH
( SKIP TO NEXT ROW
| SKIP PAST LAST ROW
| SKIP TO FIRST variable
| SKIP TO LAST variable
| SKIP TO variable )
]
PATTERN '(' pattern ')'
[ WITHIN intervalLiteral ]
DEFINE variable AS condition [, variable AS condition ]*
')'
measureColumn:
expression AS alias
pattern:
patternTerm [ '|' patternTerm ]*
patternTerm:
patternFactor [ patternFactor ]*
patternFactor:
variable [ patternQuantifier ]
patternQuantifier:
'*'
| '*?'
| '+'
| '+?'
| '?'
| '??'
| '{' { [ minRepeat ], [ maxRepeat ] } '}' ['?']
| '{' repeat '}'Identifiers
Flink SQL identifier rules differ from Java in one important way: non-alphanumeric characters are allowed. In all other respects, the rules are the same as Java:
Identifier definitions are case-sensitive, whether or not the identifier is enclosed in backticks.
Identifier matching is case-sensitive.
Example with a non-alphanumeric identifier:
SELECT a AS `my field` FROM tString constants
String constants must use single quotation marks ('), not double quotation marks (").
SELECT 'Hello World'Escape a single quotation mark inside a string by doubling it:
Flink SQL> SELECT 'Hello World', 'It''s me';
+-------------+---------+
| EXPR$0 | EXPR$1 |
+-------------+---------+
| Hello World | It's me |
+-------------+---------+
1 row in setUnicode escape sequences
To include Unicode values in a string constant, use the U& prefix followed by an escape sequence.
| Method | Syntax | Example |
|---|---|---|
| Default escape (backslash) | U&'\<unicode>' | SELECT U&'\263A' |
| Custom escape character | U&'<char><unicode>' UESCAPE '<char>' | SELECT U&'#263A' UESCAPE '#' — uses # as the escape character |
Supported queries
The following table lists all queries supported by Apache Flink 1.15. To view the reference for a different Flink version, switch versions on the Apache Flink website.
| Query | Reference |
|---|---|
| Hints | SQL Hints |
| WITH clause | WITH clause |
| SELECT and WHERE clauses | SELECT & WHERE clause |
| SELECT DISTINCT | SELECT DISTINCT |
| Window functions | Windowing table-valued functions (Windowing TVFs) |
| Window aggregation | Window aggregation |
| Group aggregation | Group aggregation |
| Over aggregation | Over aggregation |
| Join | Joins |
| Window join | Window join |
| Set operations | Set operations |
| ORDER BY clause | ORDER BY clause |
| LIMIT clause | LIMIT clause |
| Top-N | Top-N |
| Window Top-N | Window Top-N |
| Deduplication | Deduplication |
| Window deduplication | Window deduplication |
| Pattern recognition | Pattern recognition |
Query execution
In streaming mode, input streams fall into two categories:
Non-update streams contain only INSERT-type events.
Update streams contain other event types. Change data capture (CDC) sources produce update streams. Certain Flink operations — such as group aggregation and Top-N — also generate update events internally.
Most operations that produce update events rely on stateful operators, which use managed state to track updates. However, not all stateful operators accept update streams as input. For example, over aggregation and Interval Join do not support update stream inputs.
The following table describes the runtime characteristics of each supported query. The information applies to Ververica Runtime (VVR) 6.0.X and later.
| Query | Runtime operator | Use state data | Consume update streams | Generate update events | Notes |
|---|---|---|---|---|---|
| SELECT and WHERE | Calc | No | Yes | No | — |
| Lookup Join | LookupJoin | No* | Yes | No | For VVR 8.0.1 and later: setting table.optimizer.non-deterministic-update.strategy to TRY_RESOLVE enables automatic state-based resolution of non-deterministic update problems. Set it to IGNORE to disable state use. Changing this parameter may cause incompatibility and require re-running the query. |
| Table function | Correlate | No | Yes | No | — |
| SELECT DISTINCT | GroupAggregate | Yes | Yes | Yes | — |
| Group aggregation | GroupAggregate / LocalGroupAggregate / GlobalGroupAggregate / IncrementalGroupAggregate | Yes* | Yes | Yes | The pre-aggregation operator LocalGroupAggregate does not use state data. |
| Over aggregation | OverAggregate | Yes | No | No | — |
| Window aggregation | GroupWindowAggregate / WindowAggregate / LocalWindowAggregate / GlobalWindowAggregate | Yes* | Yes* | No* | LocalWindowAggregate does not use state data. Update stream support differs between VVR and Apache Flink — see the "Comparison of the support for update streams" section of the Window aggregation topic for details. If the early-fire or late-fire feature (experimental) is enabled, update events are generated. |
| Join (regular join) | Join | Yes | Yes | Yes* | Outer joins (LEFT JOIN, RIGHT JOIN, FULL OUTER JOIN) generate update events. |
| Interval Join | IntervalJoin | Yes | No | No | — |
| Temporal Join | TemporalJoin | Yes | Yes | No | — |
| Window join | WindowJoin | Yes | No | No | — |
| Top-N | Rank | Yes | Yes | Yes | Top-N does not support ranking by processing time. Use built-in functions such as CURRENT_TIMESTAMP instead. Warning Specifying a processing time field in the ORDER BY clause may cause data errors. This issue is not reported during syntax checks in VVR 8.0.7 and earlier. |
| Window Top-N | WindowRank | Yes | No | No | — |
| Deduplication | Deduplicate | Yes | No | Yes* | Using the Deduplicate Keep FirstRow policy with processing time (Proctime) does not generate update events. |
| Window deduplication | WindowDeduplicate | Yes | No | No | — |
Stateless operators pass through event types without modification — output events are the same type as input events. Stateless operators never generate update events, regardless of whether the input is an update stream.