全部产品
Search
文档中心

实时计算Flink版:Queries语句

更新时间:Apr 24, 2024

本文为您介绍Flink全托管支持的Queries语句详情。

Flink全托管兼容Apache Flink的Queries语句。以下BNF-grammar描述了支持的流批SQL特性的超集。

query:
    values
  | WITH withItem [ , withItem ]* query
  | {
        select
      | selectWithoutFrom
      | query UNION [ ALL ] query
      | query EXCEPT query
      | query INTERSECT query
    }
    [ ORDER BY orderItem [, orderItem ]* ]
    [ LIMIT { count | ALL } ]
    [ OFFSET start { ROW | ROWS } ]
    [ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ONLY]

withItem:
    name
    [ '(' column [, column ]* ')' ]
    AS '(' query ')'

orderItem:
    expression [ ASC | DESC ]

select:
    SELECT [ ALL | DISTINCT ]
    { * | projectItem [, projectItem ]* }
    FROM tableExpression
    [ WHERE booleanExpression ]
    [ GROUP BY { groupItem [, groupItem ]* } ]
    [ HAVING booleanExpression ]
    [ WINDOW windowName AS windowSpec [, windowName AS windowSpec ]* ]

selectWithoutFrom:
    SELECT [ ALL | DISTINCT ]
    { * | projectItem [, projectItem ]* }

projectItem:
    expression [ [ AS ] columnAlias ]
  | tableAlias . *

tableExpression:
    tableReference [, tableReference ]*
  | tableExpression [ NATURAL ] [ LEFT | RIGHT | FULL ] JOIN tableExpression [ joinCondition ]

joinCondition:
    ON booleanExpression
  | USING '(' column [, column ]* ')'

tableReference:
    tablePrimary
    [ matchRecognize ]
    [ [ AS ] alias [ '(' columnAlias [, columnAlias ]* ')' ] ]

tablePrimary:
    [ TABLE ] tablePath [ dynamicTableOptions ] [systemTimePeriod] [[AS] correlationName]
  | LATERAL TABLE '(' functionName '(' expression [, expression ]* ')' ')'
  | [ LATERAL ] '(' query ')'
  | UNNEST '(' expression ')'

tablePath:
    [ [ catalogName . ] databaseName . ] tableName

systemTimePeriod:
    FOR SYSTEM_TIME AS OF dateTimeExpression

dynamicTableOptions:
    /*+ OPTIONS(key=val [, key=val]*) */

key:
    stringLiteral

val:
    stringLiteral

values:
    VALUES expression [, expression ]*

groupItem:
    expression
  | '(' ')'
  | '(' expression [, expression ]* ')'
  | CUBE '(' expression [, expression ]* ')'
  | ROLLUP '(' expression [, expression ]* ')'
  | GROUPING SETS '(' groupItem [, groupItem ]* ')'

windowRef:
    windowName
  | windowSpec

windowSpec:
    [ windowName ]
    '('
    [ ORDER BY orderItem [, orderItem ]* ]
    [ PARTITION BY expression [, expression ]* ]
    [
        RANGE numericOrIntervalExpression {PRECEDING}
      | ROWS numericExpression {PRECEDING}
    ]
    ')'

matchRecognize:
    MATCH_RECOGNIZE '('
    [ PARTITION BY expression [, expression ]* ]
    [ ORDER BY orderItem [, orderItem ]* ]
    [ MEASURES measureColumn [, measureColumn ]* ]
    [ ONE ROW PER MATCH ]
    [ AFTER MATCH
      ( SKIP TO NEXT ROW
      | SKIP PAST LAST ROW
      | SKIP TO FIRST variable
      | SKIP TO LAST variable
      | SKIP TO variable )
    ]
    PATTERN '(' pattern ')'
    [ WITHIN intervalLiteral ]
    DEFINE variable AS condition [, variable AS condition ]*
    ')'

measureColumn:
    expression AS alias

pattern:
    patternTerm [ '|' patternTerm ]*

patternTerm:
    patternFactor [ patternFactor ]*

patternFactor:
    variable [ patternQuantifier ]

patternQuantifier:
    '*'
  | '*?'
  | '+'
  | '+?'
  | '?'
  | '??'
  | '{' { [ minRepeat ], [ maxRepeat ] } '}' ['?']
  | '{' repeat '}'

标识符

对于标识符(表名,列名,函数名),Flink 采用了和Java相似的语法策略:

  • 不管标识符是否被反引号标识,该标识符是大小写敏感的。

  • 标识符的匹配是大小写敏感的。

和Java不同的是,Flink SQL支持标识符包含非英文或数字的字符,例如,以下是符合标准的。

SELECT a AS `my field` FROM t

字符串常量

Flink SQL使用单引号来表示字符串常量,而非使用双引号来表示,例如:

SELECT 'Hello World' 

为了在字符串表示单引号,您可以使用两个单引号来转义。例如:

Flink SQL> SELECT 'Hello World', 'It''s me';
+-------------+---------+
|      EXPR$0 |  EXPR$1 |
+-------------+---------+
| Hello World | It's me |
+-------------+---------+
1 row in set

Flink SQL支持在字符串常量中包含unicode值,您可以通过以下方式声明:

  • 使用反斜杠作为默认转义符

    SELECT U&'\263A' 
  • 使用自定义符号作为转义符

    SELECT U&'#263A' UESCAPE '#' -- 使用'#'作为转义符

Apache Flink V1.15 Queries语句详情如下表所示。

说明

如果您需要查看其它版本Queries语句,请注意切换到对应版本。

Queries语句

相关文档

Hints

SQL Hints

WITH子句

WITH clause

SELECT与WHERE子句

SELECT & WHERE clause

SELECT DISTINCT

SELECT DISTINCT

窗口函数

Windowing table-valued functions (Windowing TVFs)

窗口聚合

Window Aggregation

分组聚合

Group Aggregation

Over聚合

Over Aggregation

Join

Joins

窗口关联

Window Join

集合操作

Set Operations

ORDER BY语句

ORDER BY clause

LIMIT语句

LIMIT clause

Top-N

Top-N

窗口Top-N

Window Top-N

去重

Deduplication

窗口去重

Window Deduplication

模式检测

Pattern Recognition

Query操作运行时信息说明

在流模式下,我们根据是否包含更新消息将处理的流数据分为更新流(包含更新消息)和非更新流(只包含INSERT类型消息的称为非更新流),例如CDC源就是Flink集成自外部的更新流,另外Query内部的一些操作也可能产生更新数据,如分组聚合(Group Aggregation)、Top-N计算等。能产生更新事件的操作通常会使用状态(State),我们一般将这类操作称为状态算子。值得注意的是,并非所有的状态算子都支持处理更新流,例如,Over聚合(Over Aggregation)和Interval Join目前还不支持将更新流作为输入。

以下表格信息基于VVR-6.0.x 及以上版本整理,包括了Query操作对应的运行时算子名称、算子是否使用了状态(State)、是否支持处理更新流、是否产生更新。

Query操作

对应运行时算子名称

是否使用状态(State)

是否支持更新流

是否产生更新

说明

SELECT与WHERE

Calc

无。

Lookup Join

LookupJoin

否*

在VVR-8.0.1及以上版本中设置作业参数‘table.optimizer.non-deterministic-update.strategy’为‘TRY_RESOLVE’且引擎检测到当前作业存在非确定性更新风险时,会自动启用状态(State)来消除非确定性,可以通过设置该参数为'IGNORE'强制关闭使用状态,注意修改该参数改变算子是否使用状态时,会导致作业状态不兼容,需要无状态启动作业。

Table Function

Correlate

无。

SELECT DISTINCT

GroupAggregate

无。

分组聚合(Group Aggregation)

GroupAggregate

LocalGroupAggregate

GlobalGroupAggregate

IncrementalGroupAggregate

是*

LocalGroupAggregate预聚合算子不会使用状态(State)。

Over聚合(Over Aggregation)

OverAggregate

无。

窗口聚合(Window Aggregation)

GroupWindowAggregate

WindowAggregate

LocalWindowAggregate

GlobalWindowAggregate

是*

是*

否*

  • LocalWindowAggregate预聚合算子不会使用状态(State)。

  • 在更新流的支持上和社区版本不同,详情请参见窗口聚合新老语法更新流支持情况

  • 当开启Early或Late Fire实验特性时会产生更新消息,否则不会产生更新。

双流Join(Regular Join)

Join

是*

当使用外连接类型时,例如LEFT、RIGHT、FULL OUTER Join会产生更新。

Interval Join

IntervalJoin

无。

Temporal Join

TemporalJoin

无。

窗口关联(Window Join)

WindowJoin

无。

Top-N

Rank

无。

窗口Top-N

WindowRank

无。

去重(Deduplication)

Deduplicate

是*

基于处理时间(Proctime)使用first row去重时不会产生更新。

窗口去重(Window Deduplication)

WindowDeduplicate

无。

说明

非状态算子仅会透传消息类型,并不会主动产生更新消息,即输出的消息类型和输入的消息类型保持一致;产生更新是指当输入为非更新流时也可能产生更新消息。

窗口聚合新老语法更新流支持情况

窗口聚合算子分为新老语法对应的两种算子实现,老语法窗口聚合对应GroupWindowAggregation算子,支持TUMBLE、HOP、SESSION窗口类型;基于Window TVF新语法的窗口聚合对应WindowAggregate算子,支持TUMBLEHOPCUMULATE、SESSION窗口函数。对更新流的支持情况如下表所示。

窗口函数

老语法

(GroupWindowAggregation算子)

新语法

(WindowAggregate算子)

VVR、社区Flink

VVR

社区Flink

TUMBLE

支持

支持

不支持

HOP

支持

支持

不支持

SESSION

支持

支持

说明

VVR和社区Flink关于Session窗口区别请参见SESSION窗口函数在Flink社区和VVR中的区别

Flink 1.19支持

CUMULATE

N/A

支持

说明

实时计算引擎VVR 8.0.6及以上版本支持。

不支持

在对更新流的支持上,老语法窗口聚合(GroupWindowAggregation算子)支持更新流(VVR和社区Flink保持一致),新语法(WindowAggregate算子)社区Flink(1.16~1.18)不支持更新流,而VVR实现了新老语法的内部融合,可以自动根据输入流的情况选择支持的算子,实现社区Flink新语法中不支持更新流的TUMBLE、HOP窗口聚合对更新流的支持。

SESSION窗口函数在Flink社区和VVR中的区别

实时计算引擎VVR 8.x(对应于Flink 1.17版本)与社区Flink 1.19版本SESSION窗口函数使用区别详情如下:

  • 参数区别

    VVR 8.x不支持PARTITION BY语法,会话窗口分区数据的字段必须为与SESSION窗口函数共同使用的聚合语句中的非window_start、window_end、window_time的GROUP KEY字段。例如下面Flink 1.19中的SQL和VVR中的SQL语法是等价的,都将使用item字段作为SESSION窗口函数的分区字段。

    -- tables must have time attribute, e.g. `bidtime` in this table
    > desc Bid;
    +-------------+------------------------+------+-----+--------+---------------------------------+
    |        name |                   type | null | key | extras |                       watermark |
    +-------------+------------------------+------+-----+--------+---------------------------------+
    |     bidtime | TIMESTAMP(3) *ROWTIME* | true |     |        | `bidtime` - INTERVAL '1' SECOND |
    |       price |         DECIMAL(10, 2) | true |     |        |                                 |
    |        item |                 STRING | true |     |        |                                 |
    +-------------+------------------------+------+-----+--------+---------------------------------+
    
    -- Flink 1.19
    > SELECT window_start, window_end, item, SUM(price) AS total_price
      FROM TABLE(
          SESSION(TABLE Bid PARTITION BY item, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))
      GROUP BY item, window_start, window_end;
      
    -- VVR 8.x
    > SELECT window_start, window_end, item, SUM(price) AS total_price
      FROM TABLE(
          SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))
      GROUP BY item, window_start, window_end;
    • Flink 1.19语法

      SESSION(TABLE data [PARTITION BY(keycols, ...)], DESCRIPTOR(timecol), gap)

      参数含义如下:

      • data:拥有时间属性列的表。

      • keycols:列描述符,决定会话窗口应该使用哪些列来分区数据。

      • timecol:列描述符,决定数据的哪个时间属性列应该映射到窗口。

      • gap:两个事件被认为属于同一个会话窗口的最大时间间隔。

    • VVR 8.x语法

      SESSION(TABLE data, DESCRIPTOR(timecol), gap)

      参数含义如下:

      • data:拥有时间属性列的表。

      • timecol:列描述符,决定数据的哪个时间属性列应该映射到窗口。

      • gap:两个事件被认为属于同一个会话窗口的最大时间间隔。

  • VVR中的SESSION窗口函数仅支持与聚合语句同时使用,无法单独使用(单独使用会报错),与聚合语句同时使用时,暂时不支持下列场景:

    • 在SESSION窗口函数和聚合语句之间,包含对window_start、window_end和window_time字段的过滤或计算。例如:

      -- 包含对window_start的过滤
      > SELECT window_start, window_end, item, SUM(price) AS total_price
          FROM
          (SELECT item, price, window_start, window_end FROM
          TABLE(
          SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))
          where window_start >= TIMESTAMP '2020-04-15 08:06:00.000')
          GROUP BY item, window_start, window_end;
        
      -- 包含对window_start的计算
      > SELECT window_start, window_end, item, SUM(price) AS total_price
          FROM
          (SELECT item, price, window_start + (INTERVAL '1' SECOND) as window_start, window_end FROM
          TABLE(
          SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)))
          GROUP BY item, window_start, window_end;
      
      -- 包含对window_start的计算
      > SELECT window_start, window_end, item, SUM(price) AS total_price
          FROM
          (SELECT item, price, CAST(window_start as varchar) as window_start, window_end FROM
          TABLE(
          SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)))
          GROUP BY item, window_start, window_end;
      
    • SESSION窗口函数和UDTF同时使用。例如:

      > SELECT window_start, window_end, category, SUM(price) AS total_price
          FROM
          (SELECT category, price, window_start, window_end FROM
          TABLE(
          SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)),
          LATERAL TABLE(category_udtf(item)) as T(category))
          GROUP BY category, window_start, window_end;
    • 聚合语句的GROUP KEY中未同时包含window_start和window_end。例如:

       SELECT window_start, item, SUM(price) AS total_price
        FROM TABLE(
            SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))
        GROUP BY item, window_start;
    • 聚合函数使用python UDAF。

    • 聚合函数使用GROUPING SETS、CUBE和ROLLUP语法,导致window_start和window_end不在同一组GROUP KEY中。例如:

      > SELECT item, SUM(price) AS total_price
        FROM TABLE(
            SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))
        GROUP BY GROUPING SETS((item), (window_start), (window_end));
        
      > SELECT item, SUM(price) AS total_price
        FROM TABLE(
            SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))
        GROUP BY CUBE (item, window_start, window_end);
        
      > SELECT item, SUM(price) AS total_price
        FROM TABLE(
            SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))
        GROUP BY ROLLUP (item, window_start, window_end);
    • 聚合语句中的聚合函数,对窗口列window_start、window_end和window_time进行计算。例如:

      > SELECT window_start, window_end, item, SUM(price) AS total_price, max(window_end) AS max_end
        FROM TABLE(
            SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))
        GROUP BY item, window_start, window_end;