本文为您介绍Flink全托管支持的Queries语句详情。
Flink全托管兼容Apache Flink的Queries语句。以下BNF-grammar描述了支持的流批SQL特性的超集。
query:
values
| WITH withItem [ , withItem ]* query
| {
select
| selectWithoutFrom
| query UNION [ ALL ] query
| query EXCEPT query
| query INTERSECT query
}
[ ORDER BY orderItem [, orderItem ]* ]
[ LIMIT { count | ALL } ]
[ OFFSET start { ROW | ROWS } ]
[ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ONLY]
withItem:
name
[ '(' column [, column ]* ')' ]
AS '(' query ')'
orderItem:
expression [ ASC | DESC ]
select:
SELECT [ ALL | DISTINCT ]
{ * | projectItem [, projectItem ]* }
FROM tableExpression
[ WHERE booleanExpression ]
[ GROUP BY { groupItem [, groupItem ]* } ]
[ HAVING booleanExpression ]
[ WINDOW windowName AS windowSpec [, windowName AS windowSpec ]* ]
selectWithoutFrom:
SELECT [ ALL | DISTINCT ]
{ * | projectItem [, projectItem ]* }
projectItem:
expression [ [ AS ] columnAlias ]
| tableAlias . *
tableExpression:
tableReference [, tableReference ]*
| tableExpression [ NATURAL ] [ LEFT | RIGHT | FULL ] JOIN tableExpression [ joinCondition ]
joinCondition:
ON booleanExpression
| USING '(' column [, column ]* ')'
tableReference:
tablePrimary
[ matchRecognize ]
[ [ AS ] alias [ '(' columnAlias [, columnAlias ]* ')' ] ]
tablePrimary:
[ TABLE ] tablePath [ dynamicTableOptions ] [systemTimePeriod] [[AS] correlationName]
| LATERAL TABLE '(' functionName '(' expression [, expression ]* ')' ')'
| [ LATERAL ] '(' query ')'
| UNNEST '(' expression ')'
tablePath:
[ [ catalogName . ] databaseName . ] tableName
systemTimePeriod:
FOR SYSTEM_TIME AS OF dateTimeExpression
dynamicTableOptions:
/*+ OPTIONS(key=val [, key=val]*) */
key:
stringLiteral
val:
stringLiteral
values:
VALUES expression [, expression ]*
groupItem:
expression
| '(' ')'
| '(' expression [, expression ]* ')'
| CUBE '(' expression [, expression ]* ')'
| ROLLUP '(' expression [, expression ]* ')'
| GROUPING SETS '(' groupItem [, groupItem ]* ')'
windowRef:
windowName
| windowSpec
windowSpec:
[ windowName ]
'('
[ ORDER BY orderItem [, orderItem ]* ]
[ PARTITION BY expression [, expression ]* ]
[
RANGE numericOrIntervalExpression {PRECEDING}
| ROWS numericExpression {PRECEDING}
]
')'
matchRecognize:
MATCH_RECOGNIZE '('
[ PARTITION BY expression [, expression ]* ]
[ ORDER BY orderItem [, orderItem ]* ]
[ MEASURES measureColumn [, measureColumn ]* ]
[ ONE ROW PER MATCH ]
[ AFTER MATCH
( SKIP TO NEXT ROW
| SKIP PAST LAST ROW
| SKIP TO FIRST variable
| SKIP TO LAST variable
| SKIP TO variable )
]
PATTERN '(' pattern ')'
[ WITHIN intervalLiteral ]
DEFINE variable AS condition [, variable AS condition ]*
')'
measureColumn:
expression AS alias
pattern:
patternTerm [ '|' patternTerm ]*
patternTerm:
patternFactor [ patternFactor ]*
patternFactor:
variable [ patternQuantifier ]
patternQuantifier:
'*'
| '*?'
| '+'
| '+?'
| '?'
| '??'
| '{' { [ minRepeat ], [ maxRepeat ] } '}' ['?']
| '{' repeat '}'
标识符
对于标识符(表名,列名,函数名),Flink 采用了和Java相似的语法策略:
不管标识符是否被反引号标识,该标识符是大小写敏感的。
标识符的匹配是大小写敏感的。
和Java不同的是,Flink SQL支持标识符包含非英文或数字的字符,例如,以下是符合标准的。
SELECT a AS `my field` FROM t
字符串常量
Flink SQL使用单引号来表示字符串常量,而非使用双引号来表示,例如:
SELECT 'Hello World'
为了在字符串表示单引号,您可以使用两个单引号来转义。例如:
Flink SQL> SELECT 'Hello World', 'It''s me';
+-------------+---------+
| EXPR$0 | EXPR$1 |
+-------------+---------+
| Hello World | It's me |
+-------------+---------+
1 row in set
Flink SQL支持在字符串常量中包含unicode值,您可以通过以下方式声明:
使用反斜杠作为默认转义符
SELECT U&'\263A'
使用自定义符号作为转义符
SELECT U&'#263A' UESCAPE '#' -- 使用'#'作为转义符
Apache Flink V1.15 Queries语句详情如下表所示。
如果您需要查看其它版本Queries语句,请注意切换到对应版本。
Queries语句 | 相关文档 |
Hints | |
WITH子句 | |
SELECT与WHERE子句 | |
SELECT DISTINCT | |
窗口函数 | |
窗口聚合 | |
分组聚合 | |
Over聚合 | |
Join | |
窗口关联 | |
集合操作 | |
ORDER BY语句 | |
LIMIT语句 | |
Top-N | |
窗口Top-N | |
去重 | |
窗口去重 | |
模式检测 |
Query操作运行时信息说明
在流模式下,我们根据是否包含更新消息将处理的流数据分为更新流(包含更新消息)和非更新流(只包含INSERT类型消息的称为非更新流),例如CDC源就是Flink集成自外部的更新流,另外Query内部的一些操作也可能产生更新数据,如分组聚合(Group Aggregation)、Top-N计算等。能产生更新事件的操作通常会使用状态(State),我们一般将这类操作称为状态算子。值得注意的是,并非所有的状态算子都支持处理更新流,例如,Over聚合(Over Aggregation)和Interval Join目前还不支持将更新流作为输入。
以下表格信息基于VVR-6.0.x 及以上版本整理,包括了Query操作对应的运行时算子名称、算子是否使用了状态(State)、是否支持处理更新流、是否产生更新。
Query操作 | 对应运行时算子名称 | 是否使用状态(State) | 是否支持更新流 | 是否产生更新 | 说明 |
SELECT与WHERE | Calc | 否 | 是 | 否 | 无。 |
Lookup Join | LookupJoin | 否* | 是 | 否 | 在VVR-8.0.1及以上版本中设置作业参数‘table.optimizer.non-deterministic-update.strategy’为‘TRY_RESOLVE’且引擎检测到当前作业存在非确定性更新风险时,会自动启用状态(State)来消除非确定性,可以通过设置该参数为'IGNORE'强制关闭使用状态,注意修改该参数改变算子是否使用状态时,会导致作业状态不兼容,需要无状态启动作业。 |
Table Function | Correlate | 否 | 是 | 否 | 无。 |
SELECT DISTINCT | GroupAggregate | 是 | 是 | 是 | 无。 |
分组聚合(Group Aggregation) | GroupAggregate LocalGroupAggregate GlobalGroupAggregate IncrementalGroupAggregate | 是* | 是 | 是 | LocalGroupAggregate预聚合算子不会使用状态(State)。 |
Over聚合(Over Aggregation) | OverAggregate | 是 | 否 | 否 | 无。 |
窗口聚合(Window Aggregation) | GroupWindowAggregate WindowAggregate LocalWindowAggregate GlobalWindowAggregate | 是* | 是* | 否* |
|
双流Join(Regular Join) | Join | 是 | 是 | 是* | 当使用外连接类型时,例如LEFT、RIGHT、FULL OUTER Join会产生更新。 |
Interval Join | IntervalJoin | 是 | 否 | 否 | 无。 |
Temporal Join | TemporalJoin | 是 | 是 | 否 | 无。 |
窗口关联(Window Join) | WindowJoin | 是 | 否 | 否 | 无。 |
Top-N | Rank | 是 | 是 | 是 | 无。 |
窗口Top-N | WindowRank | 是 | 否 | 否 | 无。 |
去重(Deduplication) | Deduplicate | 是 | 否 | 是* | 基于处理时间(Proctime)使用first row去重时不会产生更新。 |
窗口去重(Window Deduplication) | WindowDeduplicate | 是 | 否 | 否 | 无。 |
非状态算子仅会透传消息类型,并不会主动产生更新消息,即输出的消息类型和输入的消息类型保持一致;产生更新是指当输入为非更新流时也可能产生更新消息。
窗口聚合新老语法更新流支持情况
窗口聚合算子分为新老语法对应的两种算子实现,老语法窗口聚合对应GroupWindowAggregation算子,支持TUMBLE、HOP、SESSION窗口类型;基于Window TVF新语法的窗口聚合对应WindowAggregate算子,支持TUMBLE、HOP、CUMULATE、SESSION窗口函数。对更新流的支持情况如下表所示。
窗口函数 | 老语法 (GroupWindowAggregation算子) | 新语法 (WindowAggregate算子) | |
VVR、社区Flink | VVR | 社区Flink | |
TUMBLE | 支持 | 支持 | 不支持 |
HOP | 支持 | 支持 | 不支持 |
SESSION | 支持 | 支持 说明 VVR和社区Flink关于Session窗口区别请参见SESSION窗口函数在Flink社区和VVR中的区别。 | Flink 1.19支持 |
CUMULATE | N/A | 支持 说明 实时计算引擎VVR 8.0.6及以上版本支持。 | 不支持 |
在对更新流的支持上,老语法窗口聚合(GroupWindowAggregation算子)支持更新流(VVR和社区Flink保持一致),新语法(WindowAggregate算子)社区Flink(1.16~1.18)不支持更新流,而VVR实现了新老语法的内部融合,可以自动根据输入流的情况选择支持的算子,实现社区Flink新语法中不支持更新流的TUMBLE、HOP窗口聚合对更新流的支持。
SESSION窗口函数在Flink社区和VVR中的区别
实时计算引擎VVR 8.x(对应于Flink 1.17版本)与社区Flink 1.19版本SESSION窗口函数使用区别详情如下:
参数区别
VVR 8.x不支持PARTITION BY语法,会话窗口分区数据的字段必须为与SESSION窗口函数共同使用的聚合语句中的非window_start、window_end、window_time的GROUP KEY字段。例如下面Flink 1.19中的SQL和VVR中的SQL语法是等价的,都将使用item字段作为SESSION窗口函数的分区字段。
-- tables must have time attribute, e.g. `bidtime` in this table > desc Bid; +-------------+------------------------+------+-----+--------+---------------------------------+ | name | type | null | key | extras | watermark | +-------------+------------------------+------+-----+--------+---------------------------------+ | bidtime | TIMESTAMP(3) *ROWTIME* | true | | | `bidtime` - INTERVAL '1' SECOND | | price | DECIMAL(10, 2) | true | | | | | item | STRING | true | | | | +-------------+------------------------+------+-----+--------+---------------------------------+ -- Flink 1.19 > SELECT window_start, window_end, item, SUM(price) AS total_price FROM TABLE( SESSION(TABLE Bid PARTITION BY item, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)) GROUP BY item, window_start, window_end; -- VVR 8.x > SELECT window_start, window_end, item, SUM(price) AS total_price FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)) GROUP BY item, window_start, window_end;
Flink 1.19语法
SESSION(TABLE data [PARTITION BY(keycols, ...)], DESCRIPTOR(timecol), gap)
参数含义如下:
data:拥有时间属性列的表。
keycols:列描述符,决定会话窗口应该使用哪些列来分区数据。
timecol:列描述符,决定数据的哪个时间属性列应该映射到窗口。
gap:两个事件被认为属于同一个会话窗口的最大时间间隔。
VVR 8.x语法
SESSION(TABLE data, DESCRIPTOR(timecol), gap)
参数含义如下:
data:拥有时间属性列的表。
timecol:列描述符,决定数据的哪个时间属性列应该映射到窗口。
gap:两个事件被认为属于同一个会话窗口的最大时间间隔。
VVR中的SESSION窗口函数仅支持与聚合语句同时使用,无法单独使用(单独使用会报错),与聚合语句同时使用时,暂时不支持下列场景:
在SESSION窗口函数和聚合语句之间,包含对window_start、window_end和window_time字段的过滤或计算。例如:
-- 包含对window_start的过滤 > SELECT window_start, window_end, item, SUM(price) AS total_price FROM (SELECT item, price, window_start, window_end FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)) where window_start >= TIMESTAMP '2020-04-15 08:06:00.000') GROUP BY item, window_start, window_end; -- 包含对window_start的计算 > SELECT window_start, window_end, item, SUM(price) AS total_price FROM (SELECT item, price, window_start + (INTERVAL '1' SECOND) as window_start, window_end FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))) GROUP BY item, window_start, window_end; -- 包含对window_start的计算 > SELECT window_start, window_end, item, SUM(price) AS total_price FROM (SELECT item, price, CAST(window_start as varchar) as window_start, window_end FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))) GROUP BY item, window_start, window_end;
SESSION窗口函数和UDTF同时使用。例如:
> SELECT window_start, window_end, category, SUM(price) AS total_price FROM (SELECT category, price, window_start, window_end FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)), LATERAL TABLE(category_udtf(item)) as T(category)) GROUP BY category, window_start, window_end;
聚合语句的GROUP KEY中未同时包含window_start和window_end。例如:
SELECT window_start, item, SUM(price) AS total_price FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)) GROUP BY item, window_start;
聚合函数使用python UDAF。
聚合函数使用GROUPING SETS、CUBE和ROLLUP语法,导致window_start和window_end不在同一组GROUP KEY中。例如:
> SELECT item, SUM(price) AS total_price FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)) GROUP BY GROUPING SETS((item), (window_start), (window_end)); > SELECT item, SUM(price) AS total_price FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)) GROUP BY CUBE (item, window_start, window_end); > SELECT item, SUM(price) AS total_price FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)) GROUP BY ROLLUP (item, window_start, window_end);
聚合语句中的聚合函数,对窗口列window_start、window_end和window_time进行计算。例如:
> SELECT window_start, window_end, item, SUM(price) AS total_price, max(window_end) AS max_end FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)) GROUP BY item, window_start, window_end;