すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:OVER

最終更新日:Mar 10, 2026

OVER ウィンドウは、従来型データベースで使用される標準的なウィンドウ関数です。GROUP BY ウィンドウとは異なり、OVER ウィンドウ内の各要素は一意のウィンドウフレームに対応します。これらのフレームは、要素の行またはタイムスタンプ値に基づいて定義できます。そのため、ストリーム要素が複数のウィンドウに存在することが可能です。

OVER ウィンドウを使用するストリームでは、各要素は一意のウィンドウに対応します。各要素は計算を1回トリガーし、その計算をトリガーする行は、当該要素のウィンドウの最終行です。Realtime Compute の基盤となる実装では、OVER ウィンドウのデータは一元管理され、単一のコピーのみが保存されます。論理的には、各要素に対して OVER ウィンドウが作成されます。Realtime Compute は各 OVER ウィンドウのデータを計算し、計算が完了した後に不要となったデータを削除します。詳細については、「Over 集計」をご参照ください。

構文

SELECT
    agg1(col1) OVER (definition1) AS colName,
    ...
    aggN(colN) OVER (definition1) AS colNameN
FROM Tab1;
  • agg1(col1):col1 列に対して計算を行う集計関数。

  • OVER (definition1):ウィンドウを定義する OVER 句。

  • AS colName:結果列のエイリアス。

説明
  • agg1 から aggN までのすべての集約において、OVER 定義は同一である必要があります。

  • 外部の SQL クエリでは、このエイリアスを使用してデータをクエリできます。

種類

Flink SQL では、OVER ウィンドウの定義は標準 SQL 構文に従います。従来の OVER ウィンドウは、より細かいタイプに分類されません。ただし、計算対象の行の定義方法に基づき、以下の 2 種類に分類できます。

  • ROWS OVER ウィンドウ:各行を新しい計算境界として扱います。つまり、各行が新しいウィンドウを定義します。

  • RANGE OVER ウィンドウ:同じ時間値を持つすべての行を単一の計算境界として扱います。つまり、同じ時間値を持つすべての行が同じウィンドウに属します。

プロパティ

直交プロパティ

説明

proctime

eventtime

ROWS OVER ウィンドウ

要素の実際の行に基づいてウィンドウを定義します。

サポート

サポート

RANGE OVER ウィンドウ

要素の実際の値(タイムスタンプ)に基づいてウィンドウを定義します。

サポート

サポート

ROWS OVER ウィンドウのセマンティクス

  • ウィンドウデータ

    ROWS OVER ウィンドウの各要素は、それぞれ独自のウィンドウを定義します。

  • ウィンドウ構文

    SELECT
        agg1(col1) OVER(
         [PARTITION BY (value_expression1,..., value_expressionN)]
         ORDER BY timeCol
         ROWS 
         BETWEEN (UNBOUNDED | rowCount) PRECEDING AND CURRENT ROW) AS colName, ...
    FROM Tab1;       
    • value_expression:パーティション分割に使用する式。

    • timeCol:要素をソートするための時間属性フィールド。

    • rowCount:現在の行を基準として、ウィンドウフレームに含める先行行の数。

  • たとえば、有界 ROWS OVER ウィンドウのシナリオを考えてみます。プロダクト ID、プロダクトタイプ、プロダクト掲載時刻、およびプロダクト価格を含む商品一覧テーブルがあると仮定します。目的は、同じタイプの直近 3 商品の中で最も高い価格を求めることです。

    • tmall_item テーブルのサンプルデータ

      itemid (VARCHAR)

      itemtype (VARCHAR)

      eventtime (VARCHAR)

      price (DOUBLE)

      ITEM001

      Electronic

      2024-11-11 10:01:00

      20

      ITEM002

      Electronic

      2024-11-11 10:02:00

      50

      ITEM003

      Electronic

      2024-11-11 10:03:00

      30

      ITEM004

      Electronic

      2024-11-11 10:03:00

      60

      ITEM005

      Electronic

      2024-11-11 10:05:00

      40

      ITEM006

      Electronic

      2024-11-11 10:06:00

      20

      ITEM007

      Electronic

      2024-11-11 10:07:00

      70

      ITEM008

      Clothes

      2024-11-11 10:08:00

      20

    • サンプルコード

      CREATE TEMPORARY TABLE tmall_item(
        itemid VARCHAR,
        itemtype VARCHAR,
        eventtime varchar,                            
        onselltime AS TO_TIMESTAMP(eventtime),
        price DOUBLE,
        WATERMARK FOR onselltime AS onselltime - INTERVAL '2' SECOND  -- rowtime の Watermark を定義します。
      ) WITH (
        'connector' = 'kafka',
        'topic' = '<yourTopic>',
        'properties.bootstrap.servers' = '<brokers>',
        'scan.startup.mode' = 'earliest-offset',
        'format' = 'csv'
      );
      
      SELECT
          itemid,
          itemtype,
          onselltime,
          price,  
          MAX(price) OVER (
              PARTITION BY itemtype 
              ORDER BY onselltime 
              ROWS BETWEEN 2 preceding AND CURRENT ROW) AS maxprice
      FROM tmall_item;
    • 結果

      itemid

      itemtype

      onselltime

      price

      maxprice

      ITEM001

      Electronic

      2024-11-11 10:01:00

      20

      20

      ITEM002

      Electronic

      2024-11-11 10:02:00

      50

      50

      ITEM003

      Electronic

      2024-11-11 10:03:00

      30

      50

      ITEM004

      Electronic

      2024-11-11 10:03:00

      60

      60

      ITEM005

      Electronic

      2024-11-11 10:05:00

      40

      60

      ITEM006

      Electronic

      2024-11-11 10:06:00

      20

      60

      ITEM007

      Electronic

      2024-11-11 10:07:00

      70

      70

      ITEM008

      Clothes

      2024-11-11 10:08:00

      20

      20

RANGE OVER ウィンドウのセマンティクス

  • ウィンドウデータ

    RANGE OVER ウィンドウでは、ソート列(例:タイムスタンプ)の値が同一であるすべての行がウィンドウフレームを定義します。

  • ウィンドウ構文

    SELECT
        agg1(col1) OVER(
         [PARTITION BY (value_expression1,..., value_expressionN)]
         ORDER BY timeCol
         RANGE 
         BETWEEN (UNBOUNDED | timeInterval) PRECEDING AND CURRENT ROW) AS colName,
    ...
    FROM Tab1;
    • value_expression:パーティション分割に使用する式。

    • timeCol:要素をソートするための時間属性フィールド。

    • timeInterval:現在の行を基準として、ウィンドウフレームに含める先行行の時間間隔。

  • この例では、有界 RANGE OVER ウィンドウを示します。プロダクト ID、タイプ、掲載時刻、および価格を含む商品一覧テーブルがあると仮定します。目的は、現在の商品の掲載時刻を含め、その 2 分前に掲載された同じタイプの商品の中で最も高い価格を求めることです。

    • tmall_item テーブルのサンプルデータ

      itemid (VARCHAR)

      itemtype (VARCHAR)

      eventtime (VARCHAR)

      price (DOUBLE)

      ITEM001

      Electronic

      2024-11-11 10:01:00

      20

      ITEM002

      Electronic

      2024-11-11 10:02:00

      50

      ITEM003

      Electronic

      2024-11-11 10:03:00

      30

      ITEM004

      Electronic

      2024-11-11 10:03:00

      60

      ITEM005

      Electronic

      2024-11-11 10:05:00

      40

      ITEM006

      Electronic

      2024-11-11 10:06:00

      20

      ITEM007

      Electronic

      2024-11-11 10:07:00

      70

      ITEM008

      Clothes

      2024-11-11 10:08:00

      20

    • サンプルコード

      CREATE TEMPORARY TABLE tmall_item(
        itemid VARCHAR,
        itemtype VARCHAR,
        eventtime varchar,                            
        onselltime AS TO_TIMESTAMP(eventtime),
        price DOUBLE,
        WATERMARK FOR onselltime AS onselltime - INTERVAL '2' SECOND  -- rowtime の Watermark を定義します。
      ) WITH (
        'connector' = 'kafka',
        'topic' = '<yourTopic>',
        'properties.bootstrap.servers' = '<brokers>',
        'scan.startup.mode' = 'earliest-offset',
        'format' = 'csv'
      );
      
      SELECT  
          itemid,
          itemtype, 
          onselltime, 
          price,  
          MAX(price) OVER (
              PARTITION BY itemtype 
              ORDER BY onselltime 
              RANGE BETWEEN INTERVAL '2' MINUTE preceding AND CURRENT ROW) AS maxprice
      FROM tmall_item;        
    • 結果

      itemid

      itemtype

      onselltime

      price

      maxprice

      ITEM001

      Electronic

      2024-11-11 10:01:00

      20

      20

      ITEM002

      Electronic

      2024-11-11 10:02:00

      50

      50

      ITEM003

      Electronic

      2024-11-11 10:03:00

      30

      50

      ITEM004

      Electronic

      2024-11-11 10:03:00

      60

      60

      ITEM005

      Electronic

      2024-11-11 10:05:00

      40

      60

      ITEM006

      Electronic

      2024-11-11 10:06:00

      20

      40

      ITEM007

      Electronic

      2024-11-11 10:07:00

      70

      70

      ITEM008

      Clothes

      2024-11-11 10:08:00

      20

      20