All Products
Search
Document Center

Realtime Compute for Apache Flink:Queries

Last Updated:Mar 26, 2026

Realtime Compute for Apache Flink is compatible with native Apache Flink SQL queries. The following Backus-Naur Form (BNF) grammar defines the superset of supported streaming and batch SQL queries.

query:
    values
  | WITH withItem [ , withItem ]* query
  | {
        select
      | selectWithoutFrom
      | query UNION [ ALL ] query
      | query EXCEPT query
      | query INTERSECT query
    }
    [ ORDER BY orderItem [, orderItem ]* ]
    [ LIMIT { count | ALL } ]
    [ OFFSET start { ROW | ROWS } ]
    [ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ONLY]

withItem:
    name
    [ '(' column [, column ]* ')' ]
    AS '(' query ')'

orderItem:
    expression [ ASC | DESC ]

select:
    SELECT [ ALL | DISTINCT ]
    { * | projectItem [, projectItem ]* }
    FROM tableExpression
    [ WHERE booleanExpression ]
    [ GROUP BY { groupItem [, groupItem ]* } ]
    [ HAVING booleanExpression ]
    [ WINDOW windowName AS windowSpec [, windowName AS windowSpec ]* ]

selectWithoutFrom:
    SELECT [ ALL | DISTINCT ]
    { * | projectItem [, projectItem ]* }

projectItem:
    expression [ [ AS ] columnAlias ]
  | tableAlias . *

tableExpression:
    tableReference [, tableReference ]*
  | tableExpression [ NATURAL ] [ LEFT | RIGHT | FULL ] JOIN tableExpression [ joinCondition ]

joinCondition:
    ON booleanExpression
  | USING '(' column [, column ]* ')'

tableReference:
    tablePrimary
    [ matchRecognize ]
    [ [ AS ] alias [ '(' columnAlias [, columnAlias ]* ')' ] ]

tablePrimary:
    [ TABLE ] tablePath [ dynamicTableOptions ] [systemTimePeriod] [[AS] correlationName]
  | LATERAL TABLE '(' functionName '(' expression [, expression ]* ')' ')'
  | [ LATERAL ] '(' query ')'
  | UNNEST '(' expression ')'

tablePath:
    [ [ catalogName . ] databaseName . ] tableName

systemTimePeriod:
    FOR SYSTEM_TIME AS OF dateTimeExpression

dynamicTableOptions:
    /*+ OPTIONS(key=val [, key=val]*) */

key:
    stringLiteral

val:
    stringLiteral

values:
    VALUES expression [, expression ]*

groupItem:
    expression
  | '(' ')'
  | '(' expression [, expression ]* ')'
  | CUBE '(' expression [, expression ]* ')'
  | ROLLUP '(' expression [, expression ]* ')'
  | GROUPING SETS '(' groupItem [, groupItem ]* ')'

windowRef:
    windowName
  | windowSpec

windowSpec:
    [ windowName ]
    '('
    [ ORDER BY orderItem [, orderItem ]* ]
    [ PARTITION BY expression [, expression ]* ]
    [
        RANGE numericOrIntervalExpression {PRECEDING}
      | ROWS numericExpression {PRECEDING}
    ]
    ')'

matchRecognize:
    MATCH_RECOGNIZE '('
    [ PARTITION BY expression [, expression ]* ]
    [ ORDER BY orderItem [, orderItem ]* ]
    [ MEASURES measureColumn [, measureColumn ]* ]
    [ ONE ROW PER MATCH ]
    [ AFTER MATCH
      ( SKIP TO NEXT ROW
      | SKIP PAST LAST ROW
      | SKIP TO FIRST variable
      | SKIP TO LAST variable
      | SKIP TO variable )
    ]
    PATTERN '(' pattern ')'
    [ WITHIN intervalLiteral ]
    DEFINE variable AS condition [, variable AS condition ]*
    ')'

measureColumn:
    expression AS alias

pattern:
    patternTerm [ '|' patternTerm ]*

patternTerm:
    patternFactor [ patternFactor ]*

patternFactor:
    variable [ patternQuantifier ]

patternQuantifier:
    '*'
  | '*?'
  | '+'
  | '+?'
  | '?'
  | '??'
  | '{' { [ minRepeat ], [ maxRepeat ] } '}' ['?']
  | '{' repeat '}'

Identifiers

Flink SQL identifier rules differ from Java in one important way: non-alphanumeric characters are allowed. In all other respects, the rules are the same as Java:

  • Identifier definitions are case-sensitive, whether or not the identifier is enclosed in backticks.

  • Identifier matching is case-sensitive.

Example with a non-alphanumeric identifier:

SELECT a AS `my field` FROM t

String constants

String constants must use single quotation marks ('), not double quotation marks (").

SELECT 'Hello World'

Escape a single quotation mark inside a string by doubling it:

Flink SQL> SELECT 'Hello World', 'It''s me';
+-------------+---------+
|      EXPR$0 |  EXPR$1 |
+-------------+---------+
| Hello World | It's me |
+-------------+---------+
1 row in set

Unicode escape sequences

To include Unicode values in a string constant, use the U& prefix followed by an escape sequence.

MethodSyntaxExample
Default escape (backslash)U&'\<unicode>'SELECT U&'\263A'
Custom escape characterU&'<char><unicode>' UESCAPE '<char>'SELECT U&'#263A' UESCAPE '#' — uses # as the escape character

Supported queries

The following table lists all queries supported by Apache Flink 1.15. To view the reference for a different Flink version, switch versions on the Apache Flink website.

QueryReference
HintsSQL Hints
WITH clauseWITH clause
SELECT and WHERE clausesSELECT & WHERE clause
SELECT DISTINCTSELECT DISTINCT
Window functionsWindowing table-valued functions (Windowing TVFs)
Window aggregationWindow aggregation
Group aggregationGroup aggregation
Over aggregationOver aggregation
JoinJoins
Window joinWindow join
Set operationsSet operations
ORDER BY clauseORDER BY clause
LIMIT clauseLIMIT clause
Top-NTop-N
Window Top-NWindow Top-N
DeduplicationDeduplication
Window deduplicationWindow deduplication
Pattern recognitionPattern recognition

Query execution

In streaming mode, input streams fall into two categories:

  • Non-update streams contain only INSERT-type events.

  • Update streams contain other event types. Change data capture (CDC) sources produce update streams. Certain Flink operations — such as group aggregation and Top-N — also generate update events internally.

Most operations that produce update events rely on stateful operators, which use managed state to track updates. However, not all stateful operators accept update streams as input. For example, over aggregation and Interval Join do not support update stream inputs.

The following table describes the runtime characteristics of each supported query. The information applies to Ververica Runtime (VVR) 6.0.X and later.

QueryRuntime operatorUse state dataConsume update streamsGenerate update eventsNotes
SELECT and WHERECalcNoYesNo
Lookup JoinLookupJoinNo*YesNoFor VVR 8.0.1 and later: setting table.optimizer.non-deterministic-update.strategy to TRY_RESOLVE enables automatic state-based resolution of non-deterministic update problems. Set it to IGNORE to disable state use. Changing this parameter may cause incompatibility and require re-running the query.
Table functionCorrelateNoYesNo
SELECT DISTINCTGroupAggregateYesYesYes
Group aggregationGroupAggregate / LocalGroupAggregate / GlobalGroupAggregate / IncrementalGroupAggregateYes*YesYesThe pre-aggregation operator LocalGroupAggregate does not use state data.
Over aggregationOverAggregateYesNoNo
Window aggregationGroupWindowAggregate / WindowAggregate / LocalWindowAggregate / GlobalWindowAggregateYes*Yes*No*LocalWindowAggregate does not use state data. Update stream support differs between VVR and Apache Flink — see the "Comparison of the support for update streams" section of the Window aggregation topic for details. If the early-fire or late-fire feature (experimental) is enabled, update events are generated.
Join (regular join)JoinYesYesYes*Outer joins (LEFT JOIN, RIGHT JOIN, FULL OUTER JOIN) generate update events.
Interval JoinIntervalJoinYesNoNo
Temporal JoinTemporalJoinYesYesNo
Window joinWindowJoinYesNoNo
Top-NRankYesYesYesTop-N does not support ranking by processing time. Use built-in functions such as CURRENT_TIMESTAMP instead.
Warning

Specifying a processing time field in the ORDER BY clause may cause data errors. This issue is not reported during syntax checks in VVR 8.0.7 and earlier.

Window Top-NWindowRankYesNoNo
DeduplicationDeduplicateYesNoYes*Using the Deduplicate Keep FirstRow policy with processing time (Proctime) does not generate update events.
Window deduplicationWindowDeduplicateYesNoNo
Note

Stateless operators pass through event types without modification — output events are the same type as input events. Stateless operators never generate update events, regardless of whether the input is an update stream.