すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:Window TVF

最終更新日:Jan 08, 2025

このトピックでは、デプロイメントの SQL ステートメントでウィンドウテーブル値関数 (TVF) を変更した後、デプロイメントと状態データ間の互換性がどのように影響を受けるかについて説明します。

互換性に影響を与えない、または部分的に影響を与える変更

  • 非重複統計メトリックを追加、削除、および変更します。統計メトリックは、集計関数を使用して計算されます。

    • デプロイメントの SQL ステートメントに統計メトリックの集計関数を追加すると、デプロイメントは状態データと部分的に互換性を持つようになります。追加された統計メトリックの値は、デプロイメントの開始時にインクリメントされます。

    • デプロイメントの SQL ステートメントから統計メトリックを削除すると、デプロイメントは状態データと完全な互換性を維持します。削除された統計メトリックの状態データは破棄されます。

    • デプロイメントの SQL ステートメントに統計メトリックを追加し、ステートメントから統計メトリックを削除すると、デプロイメントは状態データと部分的に互換性を持つようになります。追加された統計メトリックの値は、デプロイメントの開始時にインクリメントされます。削除された統計メトリックの状態データは破棄されます。

    • デプロイメントの SQL ステートメントで統計メトリックを変更すると、元の統計メトリックが削除され、新しい統計メトリックが追加されます。デプロイメントは状態データと部分的に互換性を持つようになります。追加された統計メトリックの値は、デプロイメントの開始時にインクリメントされます。削除された統計メトリックの状態データは破棄されます。

    説明

    変更していない統計メトリックの場合、デプロイメントの状態データが再利用された後のデプロイメントの計算結果は、履歴データに基づいて実行されるデプロイメントの計算結果と同じです。

    -- Original SQL statement: 
    select a, sum(b), max(c)
    from table (tumble(table MyTable, descriptor(ts), interval '1' minute))
    group by a, window_start, window_end;
    
    -- 統計メトリック count(c) を追加します。この変更後、デプロイメントは状態データと部分的に互換性を持つようになります。
    -- sum(b) と max(c) の計算結果は影響を受けません。 count(c) の値は、デプロイメントの開始時に 0 からインクリメントされます。
    select a, sum(b), max(c), count(c)
    from table (tumble(table MyTable, descriptor(ts), interval '1' minute))
    group by a, window_start, window_end;
    
    -- 統計メトリック sum(b) を削除します。この変更後、デプロイメントは状態データと完全な互換性を維持します。
    -- max(c) の計算結果は影響を受けません。
    select a, max(c)
    from table (tumble(table MyTable, descriptor(ts), interval '1' minute))
    group by a, window_start, window_end;
    
    -- 統計メトリック max(c) を min(c) に変更します。この変更後、デプロイメントは状態データと部分的に互換性を持つようになります。
    -- sum(b) の計算結果は影響を受けません。 max(c) は削除されたと見なされ、その状態データは破棄されます。
    -- min(c) は新しいメトリックと見なされ、その値はデプロイメントの開始時に計算されます。
    select a, sum(b), min(c)
    from table (tumble(table MyTable, descriptor(ts), interval '1' minute))
    group by a, window_start, window_end;
  • 非重複統計メトリックの位置を変更します。この変更後、デプロイメントは状態データと完全な互換性を維持します。

    -- Original SQL statement: 
    select a, sum(b), max(c)
    from table (tumble(table MyTable, descriptor(ts), interval '1' minute))
    group by a, window_start, window_end;
    
    -- 統計メトリック sum(b) と max(c) の位置を変更します。この変更後、デプロイメントは状態データと完全な互換性を維持します。
    -- 統計メトリック sum(b) と max(c) の計算結果は影響を受けません。
    select a, max(c), sum(b)
    from table (tumble(table MyTable, descriptor(ts), interval '1' minute))
    group by a, window_start, window_end;
  • 非重複統計メトリックのフィールドの計算ロジックが変更された場合、統計メトリックは変更されたと見なされます。この変更後、デプロイメントは状態データと部分的に互換性を持つようになります。

    -- Original SQL statement: 
    select a, sum(b), max(c)
    from table (tumble(table MyTable, descriptor(ts), interval '1' minute))
    group by a, window_start, window_end;
    
    -- 統計メトリック max(c) を max(substring(c,1, 5)) に変更します。この変更後、デプロイメントは状態データと部分的に互換性を持つようになります。
    -- sum(b) の計算結果は影響を受けません。 max(c) は削除されたと見なされ、その状態データは破棄されます。
    -- max(substring(c, 1, 5)) は新しい統計メトリックと見なされます。 max(substring(c, 1, 5)) の値は、デプロイメントの開始時にインクリメントされます。
    create temporary view MyView select a, b, substring(c, 1, 5) as c, ts from MyTable;
    select a, sum(b), max(c)
    from table (tumble(table MyView, descriptor(ts), interval '1' minute))
    group by a, window_start, window_end;
  • ウィンドウ属性フィールドを追加または削除します。この変更後、デプロイメントは状態データと完全な互換性を維持します。

    -- Original SQL statement: 
    select a,
      sum(b),
      max(c),
      window_start
    from table (tumble(table MyTable, descriptor(ts), interval '1' minute))
      group by a, window_start, window_end;
    
    -- ウィンドウ終了属性を追加します。この変更後、デプロイメントは状態データと完全な互換性を維持します。
    select a,
      sum(b),
      max(c),
      window_start,
      window_end
    from table (tumble(table MyTable, descriptor(ts), interval '1' minute))
      group by a, window_start, window_end;
    
    -- ウィンドウ開始属性を削除します。この変更後、デプロイメントは状態データと完全な互換性を維持します。
    select a,
      sum(b),
      max(c)
    from table (tumble(table MyTable, descriptor(ts), interval '1' minute))
      group by a, window_start, window_end;
  • デプロイメントの変更前後に、SQL ステートメントに統計メトリックが含まれていない場合、デプロイメントは状態データと完全な互換性を維持します。

  • グループキーの順序を変更します。ウィンドウ関数に関連するグループキーの順序のみが変更され、他のグループキーの順序が変更されない場合、デプロイメントは状態データと完全な互換性を維持します。

    -- Original SQL statement: 
    select a, sum(b), max(c)
      from table (tumble(table MyTable, descriptor(ts), interval '1' minute))
    group by a, c, window_end, window_start;
    
    -- ウィンドウ関数に関連するグループキーの順序を変更します。この変更後、デプロイメントは状態データと完全な互換性を維持します。
    select a, sum(b), max(c)
      from table (tumble(table MyTable, descriptor(ts), interval '1' minute))
    group by a, window_end, c, window_start;

完全な非互換性を引き起こす変更

  • ウィンドウタイプ、ウィンドウサイズ、時間関連属性などのウィンドウ関連属性を変更すると、デプロイメントは状態データと互換性がなくなります。

    -- Original SQL statement: 
    select a,
      sum(b),
      max(c),
      window_start
    from table (tumble(table MyTable, descriptor(ts), interval '1' minute))
      group by a, window_start, window_end;
    
    -- ウィンドウタイプを TUMBLE から HOP に変更します。この変更後、デプロイメントは状態データと互換性がなくなります。
    select a,
      sum(b),
      max(c),
      hop_start(ts, interval '1' minute, interval '2' minute) as window_start
    from MyTable
      group by a, hop(ts, interval '1' minute, interval '2' minute);
    
    -- ウィンドウサイズを「1」分から「2」分に変更します。この変更後、デプロイメントは状態データと互換性がなくなります。
    select a,
      sum(b),
      max(c),
      tumble_start(ts, interval '2' minute) as window_start
    from MyTable
      group by a, tumble(ts, interval '2' minute);
    
    -- 時間関連属性を ts から proctime に変更します。この変更後、デプロイメントは状態データと互換性がなくなります。
    select a,
      sum(b),
      max(c),
      tumble_start(ts, interval '1' minute) as window_start
    from MyTable
      group by a, tumble(proctime, interval '1' minute);
  • デプロイメントの SQL ステートメントで、グループキーを追加、削除、または変更するか、グループキーとして使用されるフィールドの計算ロジックを変更すると、デプロイメントは状態データと互換性がなくなります。

    -- Original SQL statement: 
    select a,
      sum(b),
      max(c),
      tumble_start(ts, interval '1' minute) as window_start
    from MyTable
      group by a, tumble(ts, interval '1' minute);
    
    -- グループキー d を追加します。この変更後、デプロイメントは状態データと互換性がなくなります。
    select a,
      sum(b),
      max(c),
      tumble_start(ts, interval '1' minute) as window_start
    from MyTable
      group by a, d, tumble(ts, interval '1' minute);
    
    -- その他の例については、「GROUP BY で使用される集計関数」のグループキーの変更例を参照してください。
    
  • デプロイメントの SQL ステートメントで、重複除外統計メトリックを追加、削除、または変更するか、重複除外統計メトリックに関連するフィールドの計算ロジックを変更すると、デプロイメントは状態データと互換性がなくなります。重複除外統計メトリックは、重複除外集計関数を使用して計算されます。

    -- Original SQL statement: 
    select a,
      sum(b),
      count(distinct b),
      max(c),
      count(distinct c),
      tumble_start(ts, interval '1' minute) as window_start
    from MyTable
      group by a, tumble(ts, interval '1' minute);
    
    -- 重複除外統計メトリック count(distinct b) を追加します。この変更後、デプロイメントは状態データと互換性がなくなります。
    select a,
      sum(b),
      max(c),
      count(distinct c),
      tumble_start(ts, interval '1' minute) as window_start
    from MyTable
      group by a, tumble(ts, interval '1' minute);
    
    -- その他の例については、「GROUP BY で使用される集計関数」の重複除外統計メトリックの変更例を参照してください。
    
  • デプロイメントの SQL ステートメントからすべての統計メトリックを削除すると、デプロイメントは状態データと互換性がなくなります。すべての統計メトリックの状態データは破棄され、状態データは再利用されません。

    -- Original SQL statement: 
    select a,
      sum(b),
      count(distinct b),
      max(c),
      count(distinct c),
      tumble_start(ts, interval '1' minute) as window_start
    from MyTable
      group by a, tumble(ts, interval '1' minute);
    
    -- sum(b) と max(c) を含むすべての統計メトリックを削除します。この変更後、デプロイメントは状態データと互換性がなくなります。
    select a,
      tumble_start(ts, interval '1' minute) as window_start
    from MyTable
      group by a, tumble(ts, interval '1' minute);
  • GROUP BY 句のキーの順序を変更します。ウィンドウ関数に関連するキー以外のグループキーの順序が変更された場合、デプロイメントは状態データと互換性がなくなります。

    -- Original SQL statement: 
    select a, sum(b), max(c)
      from table (tumble(table MyTable, descriptor(ts), interval '1' minute))
    group by a, c, window_end, window_start;
    
    -- ウィンドウ関数に関連するキー以外のグループキーの順序を変更します。この変更後、デプロイメントは状態データと互換性がなくなります。
    select a, sum(b), max(c)
      from table (tumble(table MyTable, descriptor(ts), interval '1' minute))
    group by c, a, window_end, window_start;
  • もともと統計メトリックが含まれていないデプロイメントの SQL ステートメントに統計メトリックを追加すると、デプロイメントは状態データと互換性がなくなります。

    -- Original SQL statement: 
    select a, b
    from table (tumble(table MyTable, descriptor(ts), interval '1' minute))
    group by a, b, window_end, window_start;
    
    -- 新しい統計メトリックを追加します。この変更後、デプロイメントは状態データと互換性がなくなります。
    select a, b, count(a)
    from table (tumble(table MyTable, descriptor(ts), interval '1' minute))
    group by a, b, window_end, window_start;
  • SQL ステートメントに統計メトリックが 1 つだけ存在し、その統計メトリックの計算ロジックが変更された場合、デプロイメントは状態データと互換性がなくなります。

    -- Original SQL statement: 
    insert into MySink select a, sum(b) from
        table(tumble(table MyTable, descriptor(ts), interval '1' second)) 
    group by a, window_start, window_end;
    
    -- デプロイメントの SQL ステートメントに統計メトリックが 1 つだけ存在し、その統計メトリックの計算ロジックが変更されています。この変更後、デプロイメントは状態データと互換性がなくなります。
    create temporary view MyView as select a, b + 1 as b, ts from MyTable; 
    insert into MySink select a, sum(b) from
        table(tumble(table MyView, descriptor(ts), interval '1' second)) 
    group by a, window_start, window_end;
  • デプロイメントの変更前後の統計メトリックが異なる場合、デプロイメントは状態データと互換性がなくなります。

    -- Original SQL statement:
    insert into MySink select a, sum(b) from
        table(tumble(table MyTable, descriptor(ts), interval '1' second)) 
    group by a, c, window_start, window_end;
    
    -- デプロイメントの変更前後の統計メトリックが異なります。この変更後、デプロイメントは状態データと互換性がなくなります。
    insert into MySink select a, min(b) from
        table(tumble(table MyTable, descriptor(ts), interval '1' second)) 
    group by a, c, window_start, window_end;
  • ウィンドウ関数と GROUP BY 句の間に、window_start、window_end、および window_time フィールドの計算のための句を追加または削除します。

    • デプロイメントの SQL ステートメントにフィールド計算のための句を追加すると、デプロイメントは状態データと互換性がなくなります。

    • デプロイメントの SQL ステートメントからフィールド計算のための句を削除すると、デプロイメントは状態データと互換性がなくなります。

      -- Original SQL statement: 
      select a, sum(b), max(c), window_start,window_end
      from (select a, b, c, window_start, window_end  from table (tumble(table MyTable, descriptor(ts), interval '1' minute)))
      group by a, window_start, window_end;
      
      -- フィールド計算のための句を追加します。
      select a, sum(b), max(c), window_start,window_end
      from (select a, b, c, window_start + (INTERVAL '1' SECOND) as window_start, window_end  from table (tumble(table MyTable, descriptor(ts), interval '1' minute)))
      group by a, window_start, window_end;
  • GROUP BY 句に window_start フィールドと window_end フィールドの両方が含まれていません。

    • 変更前にデプロイメントの GROUP BY 句に window_start フィールドと window_end フィールドの両方が含まれており、変更後に両方のフィールドが含まれていない場合、デプロイメントは状態データと互換性がなくなります。

    • 変更前にデプロイメントの GROUP BY 句に window_start フィールドと window_end フィールドの両方が含まれておらず、変更後に両方のフィールドが含まれている場合、デプロイメントは状態データと互換性がなくなります。

      -- Original SQL statement: 
      select a, sum(b), max(c), window_start
      from (select a, b, c, window_start  from table (tumble(table MyTable, descriptor(ts), interval '1' minute)))
      group by a, window_start;
      
      -- GROUP BY 句に window_start フィールドと window_end フィールドの両方が含まれています。
      select a, sum(b), max(c), window_start,window_end
      from (select a, b, c, window_start, window_end  from table (tumble(table MyTable, descriptor(ts), interval '1' minute)))
      group by a, window_start, window_end;
  • GROUP BY 句で GROUPING SETS、CUBE、または ROLLUP が使用されています。この場合、データは window_start フィールドと window_end フィールドで個別にグループ化されます。

    • 変更後にデプロイメントの GROUP BY 句で GROUPING SETS、CUBE、または ROLLUP が使用されている場合、デプロイメントは状態データと互換性がなくなります。

    • 変更後にデプロイメントの GROUP BY 句から GROUPING SETS、CUBE、または ROLLUP が削除された場合、デプロイメントは状態データと互換性がなくなります。

      -- Original SQL statement: 
      select a, sum(b), max(c), window_start, window_end
      from (select a, b, c, window_start, window_end  from table (tumble(table MyTable, descriptor(ts), interval '1' minute)))
      group by a, window_start, window_end;
      
      -- SQL ステートメントに GROUPING SETS を追加します。
      select a, sum(b), max(c), window_start, window_end
      from (select a, b, c, window_start, window_end  from table (tumble(table MyTable, descriptor(ts), interval '1' minute)))
      group by GROUPING sets((a), (window_start), (window_end));

互換性が不明な変更

  • ウィンドウ関数と GROUP BY 句の間に、window_start、window_end、および window_time フィールドの計算のための句を追加または削除します。

    変更前後にデプロイメントの SQL ステートメントにフィールド計算のための句が存在する場合、デプロイメントと状態データ間の互換性は不明です。

  • ウィンドウ関数と GROUP BY 句の間で、window_start、window_end、および window_time フィールドをフィルタリングします。

    • デプロイメントの SQL ステートメントにフィルタ句が追加された場合、デプロイメントと状態データ間の互換性は不明です。

    • デプロイメントの SQL ステートメントからフィルタ句が削除された場合、デプロイメントと状態データ間の互換性は不明です。

    • 変更前後にデプロイメントの SQL ステートメントにフィルタ句が存在する場合、デプロイメントと状態データ間の互換性は不明です。

      -- Original SQL statement 
      select a, sum(b), max(c), window_start,window_end
      from (select a, b, c, window_start, window_end  from table (tumble(table MyTable, descriptor(ts), interval '1' minute)))
      group by a, window_start, window_end;
      
      -- SQL ステートメントにフィルタ句を追加します。
      select a, sum(b), max(c), window_start,window_end
      from (select a, b, c, window_start, window_end  from table (tumble(table MyTable, descriptor(ts), interval '1' minute))
          WHERE window_start >= TIMESTAMP '2024-04-15 08:06:00.000')
      group by a, window_start, window_end;
  • ウィンドウ関数がユーザー定義テーブル関数 (UDTF) と共に使用されています。

    • 変更後にデプロイメントの SQL ステートメントに UDTF が追加された場合、デプロイメントと状態データ間の互換性は不明です。

    • 変更後にデプロイメントの SQL ステートメントから UDTF が削除された場合、デプロイメントと状態データ間の互換性は不明です。

    • 変更前後にデプロイメントの SQL ステートメントに UDTF が存在する場合、デプロイメントと状態データ間の互換性は不明です。

      -- Original SQL statement: 
      select a, sum(b), length(c), window_start,window_end from (
        select a, b, c, window_start,window_end from table (tumble(table MyTable, descriptor(ts), interval '1' minute)))
      group by a,c, window_start,window_end;
      
      -- UDTF を追加します。
      select a, sum(b), length(c), window_start, window_end, c1, c2
      from (select a, b, c, window_start, window_end, c1, c2  from table (tumble(table MyTable, descriptor(ts), interval '1' minute)), LATERAL TABLE(split(c)) as T(c1, c2))
      group by a,c, window_start, window_end, c1, c2;
  • Python ユーザー定義集計関数 (UDAF) が使用されています。

    • 変更後にデプロイメントの SQL ステートメントに Python UDAF が追加された場合、デプロイメントと状態データ間の互換性は不明です。

    • 変更後にデプロイメントの SQL ステートメントから Python UDAF が削除された場合、デプロイメントと状態データ間の互換性は不明です。

    • 変更前後にデプロイメントの SQL ステートメントに Python UDAF が存在する場合、デプロイメントと状態データ間の互換性は不明です。

      -- Original SQL statement: 
      select a, sum(b), max(c), window_start
      from (select a, b, c, window_start  from table (tumble(table MyTable, descriptor(ts), interval '1' minute)))
      group by a, window_start;
      
      -- Python UDAF を追加します。
      select a, sum(b), c, window_start
      from (select a, b, weighted_avg(c) as c, window_start from table (tumble(table MyTable, descriptor(ts), interval '1' minute))
      GROUP by a,b,window_start)
      group by a,c, window_start;
  • 集計関数を使用して、window_start、window_end、および window_time フィールドの集計を実行します。

    • 変更後にデプロイメントの SQL ステートメントに集計が追加された場合、デプロイメントと状態データ間の互換性は不明です。

    • 変更後にデプロイメントの SQL ステートメントから集計が削除された場合、デプロイメントと状態データ間の互換性は不明です。

    • 変更前後にデプロイメントの SQL ステートメントに集計が存在する場合、デプロイメントと状態データ間の互換性は不明です。

      -- Original SQL statement: 
      select a, sum(b), max(c), window_start, window_end
      from (select a, b, c, window_start, window_end  from table (tumble(table MyTable, descriptor(ts), interval '1' minute)))
      group by a, window_start, window_end;
      
      - データ計算のための集計関数を追加します。
      select a, sum(b), max(c), MAX(window_start) as ag, window_end 
      from (select a, b, c, window_start, window_end  from table (tumble(table MyTable, descriptor(ts), interval '1' minute)))
      group by (a, window_start, window_end);
      
  • GROUP BY 句で GROUPING SETS、CUBE、または ROLLUP が使用されています。この場合、データは window_start フィールドと window_end フィールドで個別にグループ化されます。

    変更前後にデプロイメントの GROUP BY 句で GROUPING SETS、CUBE、または ROLLUP が使用されている場合、デプロイメントと状態データ間の互換性は不明です。