すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:グループ集約

最終更新日:Jan 08, 2025

このトピックでは、ジョブのグループ集約クエリを変更した後に、ジョブとジョブの開始に使用される状態データとの互換性について説明します。グループ集約クエリでは、集約関数は GROUP BY 句と共に使用されます。

互換性のある変更

  • DISTINCT キーワードを含まない集約フィールドを追加、削除、または変更します。集約フィールドは、データレコードのグループに集約関数を適用することによって生成されます。

    • 部分的に互換性があります: 集約フィールドを追加します。ジョブの開始時に、追加されたフィールドの値が増加します。

    • 完全に互換性があります: 既存の集約フィールドを削除します。削除されたフィールドの状態データは破棄されます。

    • 部分的に互換性があります: 集約フィールドを追加し、既存の集約フィールドを同時に削除します。ジョブの開始時に、追加されたフィールドの値が増加します。削除されたフィールドの状態データは破棄されます。

    • 部分的に互換性があります: 既存の集約フィールドを変更します。この変更は、次の操作に分割されます。元の集約フィールドを削除し、新しい集約フィールドを追加します。ジョブの開始時に、追加されたフィールドの値が増加します。削除されたフィールドの状態データは破棄されます。

    説明

    集約フィールドを変更しない場合、フィールドの計算結果は、状態データを使用するかどうかに関係なく同じです。

    -- オリジナルの SQL ステートメント:
    SELECT a, SUM(b), MAX(c) FROM MyTable GROUP BY a;
    
    -- 部分的に互換性があります: 集約フィールドである count(c) を追加します。
    -- sum(b) と max(c) の計算結果には影響しません。ジョブの開始時に count(c) の値は 0 から増加します。
    SELECT a, SUM(b), MAX(c), COUNT(c) FROM MyTable GROUP BY a;
    
    -- 完全に互換性があります: sum(b) を削除します。
    -- max(c) の計算結果には影響しません。
    SELECT a, MAX(c) FROM MyTable GROUP BY a;
    
    -- 部分的に互換性があります: 集約フィールドを max(c) から min(c) に変更します。
    -- sum(b) の計算結果には影響しません。max(c) フィールドは削除されたと見なされ、その状態データは破棄されます。
    -- min(c) フィールドは新しいフィールドと見なされ、ジョブの開始時にその値が増加します。
    SELECT a, SUM(b), MIN(c) FROM MyTable GROUP BY a;
  • DISTINCT キーワードを含まない集約フィールドの順序を変更します(完全に互換性があります)。

    -- オリジナルの SQL ステートメント:
    SELECT a, SUM(b), MAX(c) FROM MyTable GROUP BY a;
    
    -- 完全に互換性があります: sum(b) と max(c) の順序を変更します。
    -- sum(b) と max(c) の計算結果には影響しません。
    SELECT a, MAX(c), SUM(b) FROM MyTable GROUP BY a;
  • DISTINCT キーワードを含まない集約フィールドの計算ロジックを変更します(部分的に互換性があります)。この場合、集約フィールドは変更されたと見なされます。

    -- オリジナルの SQL ステートメント:
    SELECT a, SUM(b), MAX(c) FROM MyTable GROUP BY a;
    
    -- 部分的に互換性があります: 集約フィールドを max(c) から max(substring(c, 1, 5)) に変更します。これは、集約前に substring 関数を適用することで計算ロジックを変更します。
    -- sum(b) の計算結果には影響しません。max(c) フィールドは削除されたと見なされ、その状態データは破棄されます。
    -- max(substring(c, 1, 5)) フィールドは新しい集約フィールドと見なされ、ジョブの開始時にその値が増加します。
    SELECT a, SUM(b), MAX(c) FROM (
      SELECT a, b, SUBSTRING(c, 1, 5) AS c FROM MyTable
    ) GROUP BY a;
  • すべての集約関数の順序を変更するときに、DISTINCT キーワードを含む集約関数の順序を維持します(完全に互換性があります)。

    -- オリジナルの SQL ステートメント:
    INSERT INTO MySink
    SELECT a, MAX(b), SUM(DISTINCT b), COUNT(DISTINCT b) FROM MyTable GROUP BY a;
    
    -- 完全に互換性があります: 変更後も SUM(DISTINCT b) は COUNT(DISTINCT b) の前にあります。
    INSERT INTO MySink
    SELECT a, SUM(DISTINCT b), COUNT(DISTINCT b), MAX(b)  FROM MyTable GROUP BY a;
  • 変更の前後で集約フィールドを使用しません(完全に互換性があります)。

  • 取り消し操作後に、取り消しをサポートする集約関数を削除します(完全に互換性があります)。

    -- オリジナルの SQL ステートメント:
    SELECT c/2, AVG(avg_a) AS avg_avg_a, MAX(max_b) max_max_b FROM
        (SELECT c, MAX(b) AS max_b, AVG(a) AS avg_a FROM MyTable GROUP BY c)
    GROUP BY c/2;
    
    -- 完全に互換性があります: 取り消しをサポートする集約関数を削除します。
    SELECT c/2, AVG(avg_a) AS avg_avg_a FROM
        (SELECT c, MAX(b) AS max_b, AVG(a) AS avg_a FROM MyTable GROUP BY c)
    GROUP BY c/2;

互換性のない変更

  • GROUP BY 句のフィールドを追加、削除、または変更するか、フィールドの計算ロジックを変更します。

    -- オリジナルの SQL ステートメント:
    SELECT a, SUM(b), MAX(c) FROM MyTable GROUP BY a;
    
    -- 互換性がありません: GROUP BY 句に d を追加します。
    SELECT a, SUM(b), MAX(c) FROM MyTable GROUP BY a, d;
    
    -- 互換性がありません: GROUP BY 句を削除することで、GROUP BY 句から a を削除します。
    SELECT SUM(b), MAX(c) FROM MyTable;
    
    -- 互換性がありません: GROUP BY 句のフィールドを a から d に変更します。
    SELECT d, SUM(b), MIN(c) FROM MyTable GROUP BY d;
    
    -- 互換性がありません: GROUP BY 句のフィールドを a から a + 1 に変更します。
    SELECT a, SUM(b), MAX(c) FROM (
      SELECT a + 1 AS a, b, c FROM MyTable 
    ) GROUP BY a;
  • DISTINCT キーワードを含む集約フィールドを追加、削除、または変更するか、フィールドの計算ロジックを変更します。

    -- オリジナルの SQL ステートメント:
    SELECT a, SUM(b), MAX(c), SUM(DISTINCT b), COUNT(DISTINCT c) FROM MyTable GROUP BY a;
    
    -- 互換性がありません: count(distinct b) を追加します。
    SELECT a, SUM(b), MAX(c), SUM(DISTINCT b), COUNT(DISTINCT b), COUNT(DISTINCT c) FROM MyTable GROUP BY a;
    
    -- 互換性がありません: sum(distinct b) を削除します。
    SELECT a, SUM(b), MAX(c), COUNT(DISTINCT c) FROM MyTable GROUP BY a;
    
    -- 互換性がありません: sum(distinct b) を avg(distinct b) に変更します。
    SELECT a, SUM(b), MAX(c), AVG(DISTINCT b), COUNT(DISTINCT c) FROM MyTable GROUP BY a;
    
    -- 互換性がありません: count(distinct c) を count(distinct avg(c)) に変更します。
    SELECT a, SUM(b), MAX(c), SUM(DISTINCT b), COUNT(DISTINCT c) FROM (
         SELECT a, b, AVG(c) AS c from MyTable GROUP BY a, b
    ) GROUP BY a;
  • 複数レベルの集約に集約フィールドを追加します。複数レベルの集約では取り消しが発生するため、追加されたフィールドの計算結果は予測できません。その結果、この変更は互換性がありません。

    -- オリジナルの SQL ステートメント:
    SELECT a/2, AVG(b), MIN(c) FROM (
        SELECT a, SUM(b) AS b, MAX(c) AS c FROM MyTable GROUP BY a
    ) GROUP BY a/2;
    
    -- 互換性がありません: count(c) を追加します。
    SELECT a/2, AVG(b), MIN(c), COUNT(c) FROM (
        SELECT a, SUM(b) AS b, MAX(c) AS c FROM MyTable GROUP BY a
    ) GROUP BY a/2;
  • すべての集約フィールドを削除します。この変更は、フィールドの状態データが破棄され、状態データが再利用されないため、互換性がありません。

    -- オリジナルの SQL ステートメント:
    SELECT a, SUM(b), MAX(c) FROM MyTable GROUP BY a;
    
    -- 互換性がありません: sum(b) と max(c) を削除します。
    SELECT a FROM MyTable GROUP BY a;
  • DISTINCT キーワードを含む集約関数の順序を変更します。

    -- オリジナルの SQL ステートメント:
    INSERT INTO MySink
    SELECT a, MAX(b), SUM(DISTINCT b), COUNT(DISTINCT b) FROM MyTable GROUP BY a;
    
    -- 互換性がありません: SUM(DISTINCT b) と COUNT(DISTINCT b) の順序を逆にします。
    INSERT INTO MySink
    SELECT COUNT(DISTINCT b), a, MAX(b), SUM(DISTINCT b) FROM MyTable GROUP BY a;
  • 集約フィールドを含まないジョブに集約フィールドを追加します。

    -- オリジナルの SQL ステートメント:
    INSERT INTO MySink
    SELECT a, b FROM MyTable GROUP BY a,b;
    
    -- 互換性がありません: 集約フィールドを追加します。
    INSERT INTO MySink
    SELECT a, b, SUM(b) FROM MyTable GROUP BY a,b;
  • 集約フィールドを 1 つだけ保持し、そのフィールドの計算ロジックを変更します。

    -- オリジナルの SQL ステートメント:
    INSERT INTO MySink
    SELECT a, SUM(b), MAX(b), MAX(c) FROM MyTable GROUP BY a;
    
    -- 互換性がありません: MAX(c) フィールドのみを保持し、その計算ロジックを変更します。
    INSERT INTO MySink 
    SELECT a, MAX(c) FROM (SELECT a, b, c + 1 AS c, ts FROM MyTable) GROUP BY a;
  • ジョブ内のすべての集約フィールドを変更します。

    -- オリジナルの SQL ステートメント:
    INSERT INTO MySink
    SELECT a, b, MAX(c) FROM MyTable GROUP BY a,b;
    
    -- 互換性がありません: 唯一の集約フィールドを変更します。
    INSERT INTO MySink
    SELECT a, b, MIN(c) FROM MyTable GROUP BY a,b;
  • 取り消し操作後に、取り消しをサポートする集約関数を追加または変更します。

    -- オリジナルの SQL ステートメント:
    SELECT c/2, AVG(avg_a) AS avg_avg_a, MAX(max_b) max_max_b FROM
        (SELECT c, MAX(b) AS max_b, AVG(a) AS avg_a FROM MyTable GROUP BY c)
    GROUP BY c/2;
    
    -- 互換性がありません: 取り消しをサポートする集約関数を追加します。
    SELECT c/2, AVG(avg_a) AS avg_avg_a, MIN(max_b) min_max_b, MAX(max_b) max_max_b FROM
        (SELECT c, MAX(b) AS max_b, AVG(a) AS avg_a FROM MyTable GROUP BY c)
    GROUP BY c/2;
    
    -- 互換性がありません: 取り消しをサポートする集約関数を変更します。
    SELECT c/2, AVG(avg_a) AS avg_avg_a, MIN(max_b) max_max_b FROM
        (SELECT c, MAX(b) AS max_b, AVG(a) AS avg_a FROM MyTable GROUP BY c)
    GROUP BY c/2;

互換性が不明な変更

変更の前後で Python ユーザー定義集約関数(UDAF)を使用する場合、システムは互換性を判断できません。

-- 互換性が不明: 変更の前後で、次のコードに示す weighted_avg 関数などの Python UDAF を使用します。
SELECT a, MAX(b), SUM(DISTINCT b), COUNT(DISTINCT b), weighted_avg(a, b) 
FROM MyTable GROUP BY a, b;