すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:オーバー集計

最終更新日:Jan 08, 2025

このトピックでは、ジョブのオーバー集計クエリを変更した後に、ジョブとジョブの開始に使用される状態データとの互換性について説明します。オーバー集計クエリでは、集計関数に OVER 句が使用されます。

互換性のある変更

  • 非重複集計フィールドを追加、削除、または変更します。集計フィールドは、集計関数によって返される結果です。

    • 部分的に互換性があります: 集計フィールドを追加します。ジョブの開始時に、追加されたフィールドの値が増加します。

    • 完全に互換性があります: 既存の集計フィールドを削除します。削除されたフィールドの状態データは破棄されます。

    • 部分的に互換性があります: 集計フィールドを追加し、既存の集計フィールドを同時に削除します。ジョブの開始時に、追加されたフィールドの値が増加します。削除されたフィールドの状態データは破棄されます。

    • 部分的に互換性があります: 既存の集計フィールドを変更します。この変更は、元の集計フィールドを削除し、新しいフィールドを追加するという操作に分割されます。ジョブの開始時に、追加されたフィールドの値が増加します。削除されたフィールドの状態データは破棄されます。

    説明
    • 集計フィールドを変更しない場合、状態データを使用するかどうかに関係なく、フィールドの計算結果は同じです。

    • オーバー集計クエリは、集計値と対応する入力データを返します。入力スキーマが変更されると、ジョブは状態データと互換性がなくなります。

    -- 元の SQL ステートメント:
    select a, b, c,
      sum(b) over (partition by a order by ts),
      max(c) over (partition by a order by ts)
    from MyTable;
    
    -- 部分的に互換性があります: 集計フィールドである count(c) を追加します。
    -- sum(b) と max(c) の計算結果は影響を受けません。ジョブの開始時に count(c) の値は 0 から増加します。
    select a, b, c,
      sum(b) over (partition by a order by ts),
      max(c) over (partition by a order by ts),
      count(c) over (partition by a order by ts)
    from MyTable;
    
    -- 完全に互換性があります: sum(b) を削除します。
    -- max(c) の計算結果は影響を受けません。
    select a, b, c,
      max(c) over (partition by a order by ts)
    from MyTable;
    
    -- 部分的に互換性があります: 集計フィールドを max(c) から min(c) に変更します。
    -- sum(b) の計算結果は影響を受けません。max(c) フィールドは削除されたと見なされ、その状態データは破棄されます。
    -- min(c) フィールドは新しいフィールドと見なされ、ジョブの開始時にその値が増加します。
    select a, b, c,
      sum(b) over (partition by a order by ts),
      min(c) over (partition by a order by ts)
    from MyTable;
  • 非重複集計フィールドの順序を変更します(完全に互換性があります)。

    -- 元の SQL ステートメント:
    select a, b, c,
      sum(b) over (partition by a order by ts),
      max(c) over (partition by a order by ts)
    from MyTable
    
    -- 完全に互換性があります: sum(b) と max(c) の順序を変更します。
    -- sum(b) と max(c) の計算結果は影響を受けません。
    select a, b, c,
      max(c) over (partition by a order by ts),
      sum(b) over (partition by a order by ts)
    from MyTable;
  • パーティションキーの順序を変更します(完全に互換性があります)。

    -- 元の SQL ステートメント:
    select a, b, c,
      sum(b) over (partition by a,b order by ts),
      max(c) over (partition by a,b order by ts) 
    from MyTable;
    
    -- 完全に互換性があります: パーティションキーの順序を変更します。
    select a, b, c,
      sum(b) over (partition by b,a order by ts),
      max(c) over (partition by b,a order by ts) 
    from MyTable;
  • スキーマのフィールドの順序を変更するか、スキーマを変更せずにフィールドを追加または削除します(完全に互換性があります)。

    -- 元の SQL ステートメント:
    select a, b, c, count(b) 
    over (partition by a order by ts) from MyTable;
    
    -- 完全に互換性があります: OVER 句のフィールド b と c の順序を変更します。
    select a, c, b, count(b) 
    over (partition by a order by ts) from MyTable;
    
    -- 元の SQL ステートメント:
    select a, b, c, cnt from (select a, b, c, d, count(b) 
    over (partition by a order by proctime) as cnt from src);
    
    -- 完全に互換性があります: 集計に使用されていないフィールドを削除します。
    select a, b, c, cnt from (select a, b, c, count(b) 
    over (partition by a order by proctime) as cnt from src);

互換性のない変更

  • オーバー集計クエリの入力スキーマを変更します。

    -- 元の SQL ステートメント:
    select a, b, c,
      sum(b) over (partition by a order by ts),
      max(c) over (partition by a order by ts)
    from MyTable;
    
    -- 互換性がありません: 入力フィールドとして d を追加します。
    select a, b, c, d,
      max(c) over (partition by a order by ts),
      sum(b) over (partition by a order by ts)
    from MyTable;
    
    -- 互換性がありません: 入力フィールド c を変更します。
    select a, b, c,
      max(c) over (partition by a order by ts),
      sum(b) over (partition by a order by ts)
    from (
      select a, b, substring(c, 1, 5) as c from MyTable
    );
  • OVER 句で指定された属性(ウィンドウのパーティションキー、ウィンドウの並べ替え順序、ウィンドウフレームなど)を変更します。

    -- 元の SQL ステートメント:
    select a, b, c,
      max(c) over (partition by a order by ts asc rows between unbounded preceding and current row)
    from MyTable;
    
    -- 互換性がありません: パーティションキーを a から b に変更します。
    select a, b, c,
      max(c) over (partition by b order by ts asc rows between unbounded preceding and current row)
    from MyTable;
    
    -- 互換性がありません: 並べ替え順序を昇順から降順に変更します。
    select a, b, c,
      max(c) over (partition by a order by ts desc rows between unbounded preceding and current row)
    from MyTable;
    
    -- 互換性がありません: 境界定義を unbounded preceding から 2 preceding に変更します。
    select a, b, c,
      max(c) over (partition by a order by ts asc rows between 2 preceding and current row)
    from MyTable;
  • 重複集計フィールドを追加、削除、または変更します。

    -- 元の SQL ステートメント:
    select a, b, c,
      max(c) over (partition by a order by ts)
    from MyTable;
    
    -- 互換性がありません: 重複集計フィールドである count(distinct b) を追加します。
    select a, b, c,
      max(c) over (partition by b order by ts),
      count(distinct b) over (partition by b order by ts)
    from MyTable;
  • すべての集計フィールドを変更します。

    -- 元の SQL ステートメント:
    select a, ts, min(b) over (partition by a order by ts) 
    from MyTable;
    
    -- 互換性がありません: 唯一の集計フィールドを変更します。
    select a, ts, max(b) over (partition by a order by ts) 
    from MyTable;

互換性が不明な変更

変更の前後で Python ユーザー定義集計関数(UDAF)を使用する場合、システムは互換性を判断できません。

-- 互換性が不明: 変更の前後で、次のコードに示す weighted_avg 関数などの Python UDAF を使用します。
select min(a), min(b), min(c), weighted_avg(a, b), min(cnt) 
from (select a, b, c, count(b) 
over (partition by a, b order by ts) as cnt from MyTable);