すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:GROUP BY で使用されるウィンドウ集計関数

最終更新日:Jan 08, 2025

このトピックでは、デプロイメントの SQL ステートメントで GROUP BY で使用されるウィンドウ集計関数を変更した後、デプロイメントと状態データ間の互換性がどのように影響を受けるかについて説明します。

互換性に影響を与えない、または部分的に影響を与える変更

  • 非 DISTINCT 統計メトリックを追加、削除、および変更します。統計メトリックは、集計関数を使用して計算されます。

    • デプロイメントの SQL ステートメントに統計メトリックを追加すると、デプロイメントは状態データと部分的に互換性を持つようになります。追加された統計メトリックの値は、デプロイメントの開始時にインクリメントされます。

    • デプロイメントの SQL ステートメントから統計メトリックを削除すると、デプロイメントは状態データと完全な互換性を維持します。削除された統計メトリックの状態データは破棄されます。

    • デプロイメントの SQL ステートメントに統計メトリックを追加し、ステートメントから統計メトリックも削除すると、デプロイメントは状態データと部分的に互換性を持つようになります。追加された統計メトリックの値は、デプロイメントの開始時にインクリメントされます。削除された統計メトリックの状態データは破棄されます。

    • デプロイメントの SQL ステートメントで統計メトリックを変更すると、変更中に元の統計メトリックが削除され、新しい統計メトリックが追加されます。デプロイメントは状態データと部分的に互換性を持つようになります。追加された統計メトリックの値は、デプロイメントの開始時にインクリメントされます。削除された統計メトリックの状態データは破棄されます。

    説明

    変更しない統計メトリックの場合、デプロイメントの状態データが再利用された後のデプロイメントの計算結果は、履歴データに基づいて実行されるデプロイメントの計算結果と同じです。

    -- 元の SQL ステートメント:
    select a, sum(b), max(c)
    from MyTable
    group by a, tumble(ts, interval '1' minute);
    
    -- 統計メトリック count(c) を追加します。この変更後、デプロイメントは状態データと部分的に互換性を持つようになります。
    -- sum(b) と max(c) の計算結果には影響ありません。 count(c) の値は、デプロイメントの開始時に 0 からインクリメントされます。
    select a, sum(b), max(c), count(c)
    from MyTable
    group by a, tumble(ts, interval '1' minute);
    
    -- 統計メトリック sum(b) を削除します。この変更後、デプロイメントは状態データと完全な互換性を維持します。
    -- max(c) の計算結果には影響ありません。
    select a, max(c)
    from MyTable
    group by a, tumble(ts, interval '1' minute);
    
    -- 統計メトリック max(c) を min(c) に変更します。この変更後、デプロイメントは状態データと部分的に互換性を持つようになります。
    -- sum(b) の計算結果には影響ありません。 max(c) は削除されたと見なされ、その状態データは破棄されます。
    -- min(c) は新しいメトリックと見なされ、その値はデプロイメントの開始時に計算されます。
    select a, sum(b), min(c)
    from MyTable
    group by a, tumble(ts, interval '1' minute);
  • 非 DISTINCT 統計メトリックの位置を変更します。この変更後、デプロイメントは状態データと完全な互換性を維持します。

    -- 元の SQL ステートメント:
    select a, sum(b), max(c)
    from MyTable
    group by a, tumble(ts, interval '1' minute);
    
    -- 統計メトリック sum(b) と max(c) の位置を変更します。この変更後、デプロイメントは状態データと完全な互換性を維持します。
    -- 統計メトリック sum(b) と max(c) の計算結果には影響ありません。
    select a, max(c), sum(b)
    from MyTable
    group by a, tumble(ts, interval '1' minute);
  • 非 DISTINCT 統計メトリックのフィールドの計算ロジックが変更された場合、統計メトリックは変更されたと見なされます。この変更後、デプロイメントは状態データと部分的に互換性を持つようになります。

    -- 元の SQL ステートメント:
    select a, sum(b), max(c)
    from MyTable
    group by a, tumble(ts, interval '1' minute);
    
    -- 統計メトリック max(c) を max(substring(c,1, 5)) に変更します。この変更後、デプロイメントは状態データと部分的に互換性を持つようになります。
    -- sum(b) の計算結果には影響ありません。 max(c) は削除されたと見なされ、その状態データは破棄されます。
    -- max(substring(c, 1, 5)) は新しい統計メトリックと見なされます。 max(substring(c, 1, 5)) の値は、デプロイメントの開始時にインクリメントされます。
    select a, sum(b), max(c)
    from (select a, b, substring(c, 1, 5) as c from MyTable)
    group by a, tumble(ts, interval '1' minute);
  • ウィンドウ属性フィールドを追加または削除します。この変更後、デプロイメントは状態データと完全な互換性を維持します。

    -- 元の SQL ステートメント:
    select a,
      sum(b),
      max(c),
      tumble_start(ts, interval '1' minute) as window_start
    from MyTable
      group by a, tumble(ts, interval '1' minute);
    
    -- ウィンドウ終了属性を追加します。この変更後、デプロイメントは状態データと完全な互換性を維持します。
    select a,
      sum(b),
      max(c),
      tumble_start(ts, interval '1' minute) as window_start,
      tumble_end(ts, interval '1' minute) as window_end
    from MyTable
      group by a, tumble(ts, interval '1' minute);
    
    -- ウィンドウ開始属性を削除します。この変更後、デプロイメントは状態データと完全な互換性を維持します。
    select a,
      sum(b),
      max(c)
    from MyTable
      group by a, tumble(ts, interval '1' minute);
  • GROUP BY 句のキーの順序を変更します。ウィンドウ関数のキーの順序のみが変更され、他のキーの順序が変更されない場合、デプロイメントは状態データと完全な互換性を維持します。

    -- 元の SQL ステートメント:
    select a, sum(distinct b), max(distinct c), count(c)
      from MyTable
    group by a, b, tumble(rowtime, interval '15' minute);
    
    -- GROUP BY 句のキーの順序を変更します。ウィンドウ関数のキーの順序のみが変更され、他のキーの順序は変更されていません。この変更後、デプロイメントは状態データと完全な互換性を維持します。
    select a, sum(distinct b), max(distinct c), count(c)
      from MyTable
    group by a, tumble(rowtime, interval '15' minute), b;
  • デプロイメントへの変更の前後で、SQL ステートメントに統計メトリックが関係していません。この変更後、デプロイメントは状態データと完全な互換性を維持します。

完全な非互換性を引き起こす変更

  • ウィンドウタイプ、ウィンドウサイズ、時間関連属性などのウィンドウ関連属性を変更すると、デプロイメントは状態データと互換性がなくなります。

    -- 元の SQL ステートメント:
    select a,
      sum(b),
      max(c),
      tumble_start(ts, interval '1' minute) as window_start
    from MyTable
      group by a, tumble(ts, interval '1' minute);
    
    -- ウィンドウタイプを TUMBLE から HOP に変更します。この変更後、デプロイメントは状態データと互換性がなくなります。
    select a,
      sum(b),
      max(c),
      hop_start(ts, interval '1' minute, interval '2' minute) as window_start
    from MyTable
      group by a, hop(ts, interval '1' minute, interval '2' minute);
    
    -- ウィンドウサイズを 1 分から 2 分に変更すると、デプロイメントは状態データと互換性がなくなります。
    select a,
      sum(b),
      max(c),
      tumble_start(ts, interval '2' minute) as window_start
    from MyTable
      group by a, tumble(ts, interval '2' minute);
    
    -- 時間関連属性を ts から proctime に変更します。この変更後、デプロイメントは状態データと互換性がなくなります。
    select a,
      sum(b),
      max(c),
      tumble_start(ts, interval '1' minute) as window_start
    from MyTable
      group by a, tumble(proctime, interval '1' minute);
  • デプロイメントの SQL ステートメントで統計ディメンションを追加、削除、または変更するか、統計ディメンションに関連するフィールドの計算ロジックを変更すると、デプロイメントは状態データと互換性がなくなります。統計ディメンションは、GROUP BY 句のキーによって指定されます。

    -- 元の SQL ステートメント:
    select a,
      sum(b),
      max(c),
      tumble_start(ts, interval '1' minute) as window_start
    from MyTable
      group by a, tumble(ts, interval '1' minute);
    
    -- 統計ディメンション d を追加します。この変更後、デプロイメントは状態データと互換性がなくなります。
    select a,
      sum(b),
      max(c),
      tumble_start(ts, interval '1' minute) as window_start
    from MyTable
      group by a, d, tumble(ts, interval '1' minute);
    
    -- その他の例については、「GROUP BY で使用される集計関数」のキー変更の例を参照してください。

  • デプロイメントの SQL ステートメントで DISTINCT 統計メトリックを追加、削除、または変更するか、DISTINCT 統計メトリックに関連するフィールドの計算ロジックを変更すると、デプロイメントは状態データと互換性がなくなります。 DISTINCT 統計メトリックは、集計関数を使用して計算されます。

    -- 元の SQL ステートメント:
    select a,
      sum(b),
      max(c),
      count(distinct c),
      tumble_start(ts, interval '1' minute) as window_start
    from MyTable
      group by a, tumble(ts, interval '1' minute);
    
    -- DISTINCT 統計メトリック count(distinct b) を追加します。この変更後、デプロイメントは状態データと互換性がなくなります。
    select a,
      sum(b),
      count(distinct b),
      max(c),
      count(distinct c),
      tumble_start(ts, interval '1' minute) as window_start
    from MyTable
      group by a, tumble(ts, interval '1' minute);
    -- その他の例については、「GROUP BY で使用される集計関数」の DISTINCT 統計メトリック変更の例を参照してください。

  • デプロイメントの SQL ステートメントからすべての統計メトリックを削除すると、デプロイメントは状態データと互換性がなくなります。すべての統計メトリックの状態データは破棄され、状態データは再利用されません。

    -- 元の SQL ステートメント:
    select a,
      sum(b),
      count(distinct b),
      max(c),
      count(distinct c),
      tumble_start(ts, interval '1' minute) as window_start
    from MyTable
      group by a, tumble(ts, interval '1' minute);
    
    -- すべての統計メトリック sum(b) と max(c) を削除します。この変更後、デプロイメントは状態データと互換性がなくなります。
    select a,
      tumble_start(ts, interval '1' minute) as window_start
    from MyTable
      group by a, tumble(ts, interval '1' minute);
  • デプロイメントの SQL ステートメントでウィンドウの早期起動または遅延起動を追加、削除、または変更すると、デプロイメントは状態データと互換性がなくなります。

  • GROUP BY 句のキーの順序を変更します。ウィンドウ関数のキー以外の GROUP BY 句のキーの順序が変更された場合、デプロイメントは状態データと互換性がなくなります。

    -- 元の SQL ステートメント:
    select a, sum(distinct b), max(distinct c), count(c)
      from MyTable
    group by a, b, tumble(rowtime, interval '15' minute);
    
    -- GROUP BY 句のキーの順序を変更します。ウィンドウ関数のキー以外の GROUP BY 句のキーの順序が変更されています。この変更後、デプロイメントは状態データと互換性がなくなります。
    select a, sum(distinct b), max(distinct c), count(c)
      from MyTable
    group by b, a, tumble(rowtime, interval '15' minute);
  • 統計メトリックのないデプロイメントの SQL ステートメントに統計メトリックを追加すると、デプロイメントは状態データと互換性がなくなります。

    -- 元の SQL ステートメント:
    select a, b, c
      from MyTable
    group by a, b, c, tumble(rowtime, interval '15' minute);
    
    -- 統計メトリックを追加します。この変更後、デプロイメントは状態データと互換性がなくなります。
    select a, b, c, count(c)
      from MyTable
    group by a, b, c, tumble(rowtime, interval '15' minute);
  • SQL ステートメントに統計メトリックが 1 つだけ存在し、デプロイメントへの変更後に計算ロジックが変更された場合、デプロイメントは状態データと互換性がなくなります。

    -- 元の SQL ステートメント:
    select a, sum(b), max(b), max(c)
      from MyTable
    group by b, a, tumble(ts, interval '15' minute);
    
    -- デプロイメントに統計メトリックが 1 つだけ存在し、計算ロジックが変更されています。この変更後、デプロイメントは状態データと互換性がなくなります。
    select a, max(c)
      from (select a, b, c + 1 as c, ts from MyTable)
    group by b, a, tumble(ts, interval '15' minute);
  • デプロイメントへの変更前後の統計メトリックが異なる場合、デプロイメントは状態データと互換性がなくなります。

    -- 元の SQL ステートメント:
    select a, sum(b), max(b), max(c) from MyTable
    group by a, tumble(ts, interval '1' second);
    
    -- 変更前後の統計メトリックが異なります。この変更後、デプロイメントは状態データと互換性がなくなります。
    select a, min(b), avg(b) from MyTable
    group by a, tumble(ts, interval '1' second);
  • デプロイメントへの変更の前後で、デプロイメントの SQL ステートメントに 'table.exec.emit.early-fire.enabled' = 'true' または 'table.exec.emit.late-fire.enabled' = 'true' 設定が含まれている場合、デプロイメントは状態データと互換性がなくなります。

    -- 元の SQL ステートメント:
    select a, max(c)
    from (select a, b, c + 1 as c, ts from MyTable)
    group by b, a, TUMBLE(ts, interval '15' minute);
    
    -- 設定を追加します。この変更後、デプロイメントは状態データと互換性がなくなります。
    set 'table.exec.emit.early-fire.enabled' = 'true';
    set 'table.exec.emit.early-fire.delay' = '500ms';
    -- または
    set 'table.exec.emit.late-fire.enabled' = 'true';
    set 'able.exec.emit.late-fire.delay' = '1s';
    set 'table.exec.emit.allow-lateness' = '5s';
    
    select a, max(c)
    from (select a, b, c + 1 as c, ts from MyTable)
    group by b, a, TUMBLE(ts, interval '15' minute);
  • デプロイメントへの変更の前後で、デプロイメントの SQL ステートメントに Python ユーザー定義集計関数(UDAF)が含まれている場合、デプロイメントと状態データ間の互換性は不明です。

    -- 変更の前後で、SQL ステートメントに weighter_avg 関数などの Python UDAF が含まれています。変更後、デプロイメントと状態データ間の互換性は不明です。
    SELECT COUNT(DISTINCT b), a,  SUM(DISTINCT b),weighted_avg(a, b)
    FROM MyTable GROUP BY a, c;