All Products
Search
Document Center

Realtime Compute for Apache Flink:Window TVF

Last Updated:Mar 26, 2026

When you modify a windowing table-valued function (TVF) in a deployment's SQL statement and restart the deployment with state data, the compatibility outcome depends on what you changed. Three parts of a Window TVF query are permanently bound to the stored state: the window definition (type, size, and time attribute), the group keys, and the set of distinct aggregate metrics. Changes to any of these always discard all state. Non-distinct aggregate metrics are more flexible — you can add, delete, or reorder them with partial or full state reuse.

Quick reference

ModificationOutcome
Add a non-distinct aggregate metricPartial
Delete a non-distinct aggregate metricCompatible
Add and delete non-distinct aggregate metrics simultaneouslyPartial
Modify a non-distinct aggregate metricPartial
Change the position of a non-distinct aggregate metricCompatible
Change the calculation logic of a field in a non-distinct aggregate metricPartial
Add or delete a window attribute field (window_start, window_end, window_time)Compatible
No aggregate metrics exist before and after the modificationCompatible
Change the sequence of window-related group keys (other keys unchanged)Compatible
Change window type, window size, or time attributeIncompatible
Add, delete, or modify a group keyIncompatible
Change the sequence of non-window group keysIncompatible
Add, delete, or modify a distinct aggregate metricIncompatible
Delete all aggregate metricsIncompatible
Add an aggregate metric when the original SQL has noneIncompatible
Only one aggregate metric exists and its calculation logic changesIncompatible
The set of aggregate metrics changes entirely (different before and after)Incompatible
Add or delete a field calculation clause between the window function and GROUP BYIncompatible
Change whether GROUP BY contains both window_start and window_endIncompatible
Add or delete GROUPING SETS, CUBE, or ROLLUPIncompatible
Field calculation clause exists both before and after the modificationUnknown
Add, delete, or retain a filter clause on window fieldsUnknown
Add, delete, or retain a user-defined table function (UDTF)Unknown
Add, delete, or retain a Python user-defined aggregate function (UDAF)Unknown
Add, delete, or retain aggregation on window_start, window_end, or window_timeUnknown
GROUPING SETS, CUBE, or ROLLUP present both before and afterUnknown

Modifications that preserve full compatibility

These modifications let the deployment resume from existing state data without any impact on aggregate results.

  • Delete a non-distinct aggregate metric. The state data for the deleted metric is discarded, and the remaining metrics continue from their stored values without interruption.

  • Change the position of a non-distinct aggregate metric. Reordering aggregate columns in the SELECT list does not affect state. The following example moves max(c) before sum(b) and the deployment remains fully compatible.

    -- Original
    SELECT a, sum(b), max(c)
    FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(ts), INTERVAL '1' MINUTE))
    GROUP BY a, window_start, window_end;
    
    -- Reordered — fully compatible
    SELECT a, max(c), sum(b)
    FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(ts), INTERVAL '1' MINUTE))
    GROUP BY a, window_start, window_end;
  • Add or delete a window attribute field. Adding or removing window_start, window_end, or window_time from the SELECT list does not affect state.

    -- Original
    SELECT a, sum(b), max(c), window_start
    FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(ts), INTERVAL '1' MINUTE))
    GROUP BY a, window_start, window_end;
    
    -- Add window_end — fully compatible
    SELECT a, sum(b), max(c), window_start, window_end
    FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(ts), INTERVAL '1' MINUTE))
    GROUP BY a, window_start, window_end;
    
    -- Delete window_start — fully compatible
    SELECT a, sum(b), max(c)
    FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(ts), INTERVAL '1' MINUTE))
    GROUP BY a, window_start, window_end;
  • No aggregate metrics exist before and after the modification. If neither the original nor the updated SQL includes aggregate metrics, state is fully compatible.

  • Change the sequence of window-related group keys. Reordering the window_start or window_end keys within the GROUP BY clause is safe as long as the relative order of all other group keys stays the same.

    -- Original
    SELECT a, sum(b), max(c)
    FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(ts), INTERVAL '1' MINUTE))
    GROUP BY a, c, window_end, window_start;
    
    -- Swap window_end and window_start — fully compatible
    SELECT a, sum(b), max(c)
    FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(ts), INTERVAL '1' MINUTE))
    GROUP BY a, window_end, c, window_start;

Modifications that cause partial compatibility

These modifications let the deployment reuse state data for unchanged metrics. Newly added metrics start accumulating from zero at deployment restart; deleted metrics have their state discarded.

Note

For metrics you do not modify, the calculation results after state reuse are identical to what a full historical replay would produce.

  • Add a non-distinct aggregate metric. The new metric counts from zero when the deployment starts. Existing metrics are not affected.

  • Add and delete non-distinct aggregate metrics simultaneously. The added metric starts from zero; the deleted metric's state is discarded; unchanged metrics continue normally.

  • Modify a non-distinct aggregate metric (including changing the aggregate function or the calculation logic of its input field). The original metric is treated as deleted — its state is discarded. The new metric starts from zero.

The following example illustrates these cases (add, delete, and modify a metric). For pure deletion with no additions, see the compatible modification for deleting a non-distinct aggregate metric.

-- Original
SELECT a, sum(b), max(c)
FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(ts), INTERVAL '1' MINUTE))
GROUP BY a, window_start, window_end;

-- Add count(c): partially compatible.
-- sum(b) and max(c) continue from stored state. count(c) starts from 0.
SELECT a, sum(b), max(c), count(c)
FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(ts), INTERVAL '1' MINUTE))
GROUP BY a, window_start, window_end;

-- Add count(c) and delete sum(b) simultaneously: partially compatible.
-- max(c) continues from stored state. sum(b) state is discarded. count(c) starts from 0.
SELECT a, max(c), count(c)
FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(ts), INTERVAL '1' MINUTE))
GROUP BY a, window_start, window_end;

-- Change max(c) to min(c): partially compatible.
-- sum(b) continues from stored state.
-- max(c) state is discarded. min(c) starts from 0.
SELECT a, sum(b), min(c)
FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(ts), INTERVAL '1' MINUTE))
GROUP BY a, window_start, window_end;

Changing the calculation logic of an input field also counts as a modification. The following example changes max(c) to max(substring(c, 1, 5)) using a temporary view. The original max(c) state is discarded, and max(c) in the new query (where c is the first five characters of the original) starts from zero.

-- Original
SELECT a, sum(b), max(c)
FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(ts), INTERVAL '1' MINUTE))
GROUP BY a, window_start, window_end;

-- Change input field logic via a temporary view: partially compatible.
-- sum(b) continues from stored state. max(c) state (original) is discarded.
-- max(c) on the new view starts from 0.
CREATE TEMPORARY VIEW MyView AS SELECT a, b, substring(c, 1, 5) AS c, ts FROM MyTable;
SELECT a, sum(b), max(c)
FROM TABLE(TUMBLE(TABLE MyView, DESCRIPTOR(ts), INTERVAL '1' MINUTE))
GROUP BY a, window_start, window_end;

Modifications that cause full incompatibility

These modifications require the deployment to discard all state data. The deployment restarts from scratch as if no prior state exists.

  • Modify window attributes. Changing the window type (for example, TUMBLE to HOP), window size, or time attribute causes full incompatibility.

    -- Original
    SELECT a, sum(b), max(c), window_start
    FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(ts), INTERVAL '1' MINUTE))
    GROUP BY a, window_start, window_end;
    
    -- Change window type to HOP: incompatible
    SELECT a, sum(b), max(c),
      hop_start(ts, INTERVAL '1' MINUTE, INTERVAL '2' MINUTE) AS window_start
    FROM MyTable
    GROUP BY a, hop(ts, INTERVAL '1' MINUTE, INTERVAL '2' MINUTE);
    
    -- Change window size from 1 minute to 2 minutes: incompatible
    SELECT a, sum(b), max(c),
      tumble_start(ts, INTERVAL '2' MINUTE) AS window_start
    FROM MyTable
    GROUP BY a, tumble(ts, INTERVAL '2' MINUTE);
    
    -- Change time attribute from ts to proctime: incompatible
    SELECT a, sum(b), max(c),
      tumble_start(ts, INTERVAL '1' MINUTE) AS window_start
    FROM MyTable
    GROUP BY a, tumble(proctime, INTERVAL '1' MINUTE);
  • Add, delete, or modify a group key (including changing the calculation logic of a group key field).

    -- Original
    SELECT a, sum(b), max(c),
      tumble_start(ts, INTERVAL '1' MINUTE) AS window_start
    FROM MyTable
    GROUP BY a, tumble(ts, INTERVAL '1' MINUTE);
    
    -- Add group key d: incompatible
    SELECT a, sum(b), max(c),
      tumble_start(ts, INTERVAL '1' MINUTE) AS window_start
    FROM MyTable
    GROUP BY a, d, tumble(ts, INTERVAL '1' MINUTE);
  • Change the sequence of non-window group keys. Reordering group keys that are not related to the window function causes full incompatibility.

    -- Original
    SELECT a, sum(b), max(c)
    FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(ts), INTERVAL '1' MINUTE))
    GROUP BY a, c, window_end, window_start;
    
    -- Swap a and c: incompatible
    SELECT a, sum(b), max(c)
    FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(ts), INTERVAL '1' MINUTE))
    GROUP BY c, a, window_end, window_start;
  • Add, delete, or modify a distinct aggregate metric. Distinct aggregate metrics (calculated using distinct aggregate functions such as COUNT(DISTINCT ...)) always cause full incompatibility when added, removed, or changed.

    -- Original
    SELECT a, sum(b), count(DISTINCT b), max(c), count(DISTINCT c),
      tumble_start(ts, INTERVAL '1' MINUTE) AS window_start
    FROM MyTable
    GROUP BY a, tumble(ts, INTERVAL '1' MINUTE);
    
    -- Delete count(DISTINCT b): incompatible
    SELECT a, sum(b), max(c), count(DISTINCT c),
      tumble_start(ts, INTERVAL '1' MINUTE) AS window_start
    FROM MyTable
    GROUP BY a, tumble(ts, INTERVAL '1' MINUTE);
  • Delete all aggregate metrics. All state is discarded and no state data is reused.

    -- Original
    SELECT a, sum(b), count(DISTINCT b), max(c), count(DISTINCT c),
      tumble_start(ts, INTERVAL '1' MINUTE) AS window_start
    FROM MyTable
    GROUP BY a, tumble(ts, INTERVAL '1' MINUTE);
    
    -- Delete all aggregate metrics: incompatible
    SELECT a, tumble_start(ts, INTERVAL '1' MINUTE) AS window_start
    FROM MyTable
    GROUP BY a, tumble(ts, INTERVAL '1' MINUTE);
  • Add an aggregate metric when the original SQL has none. Going from zero aggregate metrics to one or more causes full incompatibility.

    -- Original (no aggregate metrics)
    SELECT a, b
    FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(ts), INTERVAL '1' MINUTE))
    GROUP BY a, b, window_end, window_start;
    
    -- Add count(a): incompatible
    SELECT a, b, count(a)
    FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(ts), INTERVAL '1' MINUTE))
    GROUP BY a, b, window_end, window_start;
  • Only one aggregate metric exists and its calculation logic changes.

    -- Original (single metric)
    INSERT INTO MySink SELECT a, sum(b)
    FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(ts), INTERVAL '1' SECOND))
    GROUP BY a, window_start, window_end;
    
    -- Change input calculation via a temporary view: incompatible
    CREATE TEMPORARY VIEW MyView AS SELECT a, b + 1 AS b, ts FROM MyTable;
    INSERT INTO MySink SELECT a, sum(b)
    FROM TABLE(TUMBLE(TABLE MyView, DESCRIPTOR(ts), INTERVAL '1' SECOND))
    GROUP BY a, window_start, window_end;
  • The set of aggregate metrics is entirely different before and after the modification.

    -- Original
    INSERT INTO MySink SELECT a, sum(b)
    FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(ts), INTERVAL '1' SECOND))
    GROUP BY a, c, window_start, window_end;
    
    -- Replace sum(b) with min(b): incompatible
    INSERT INTO MySink SELECT a, min(b)
    FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(ts), INTERVAL '1' SECOND))
    GROUP BY a, c, window_start, window_end;
  • Add or delete a field calculation clause between the window function and GROUP BY. A field calculation clause transforms window_start, window_end, or window_time between the TVF and the GROUP BY step.

    -- Original
    SELECT a, sum(b), max(c), window_start, window_end
    FROM (SELECT a, b, c, window_start, window_end
          FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(ts), INTERVAL '1' MINUTE)))
    GROUP BY a, window_start, window_end;
    
    -- Add a field calculation clause: incompatible
    SELECT a, sum(b), max(c), window_start, window_end
    FROM (SELECT a, b, c,
                 window_start + (INTERVAL '1' SECOND) AS window_start,
                 window_end
          FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(ts), INTERVAL '1' MINUTE)))
    GROUP BY a, window_start, window_end;
  • Change whether GROUP BY contains both window_start and window_end. If the GROUP BY clause goes from including both fields to excluding one (or vice versa), the deployment is incompatible.

    -- Original (GROUP BY contains only window_start)
    SELECT a, sum(b), max(c), window_start
    FROM (SELECT a, b, c, window_start
          FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(ts), INTERVAL '1' MINUTE)))
    GROUP BY a, window_start;
    
    -- Add window_end to GROUP BY: incompatible
    SELECT a, sum(b), max(c), window_start, window_end
    FROM (SELECT a, b, c, window_start, window_end
          FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(ts), INTERVAL '1' MINUTE)))
    GROUP BY a, window_start, window_end;
  • Add or delete GROUPING SETS, CUBE, or ROLLUP. These clauses group data separately by window_start and window_end, which changes internal state layout.

    -- Original
    SELECT a, sum(b), max(c), window_start, window_end
    FROM (SELECT a, b, c, window_start, window_end
          FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(ts), INTERVAL '1' MINUTE)))
    GROUP BY a, window_start, window_end;
    
    -- Add GROUPING SETS: incompatible
    SELECT a, sum(b), max(c), window_start, window_end
    FROM (SELECT a, b, c, window_start, window_end
          FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(ts), INTERVAL '1' MINUTE)))
    GROUP BY GROUPING SETS ((a), (window_start), (window_end));

Modifications with unknown compatibility

For these patterns, the engine cannot determine in advance whether state data is compatible. Treat these modifications as potentially incompatible and validate behavior after restart.

  • A field calculation clause exists both before and after the modification. When a field calculation clause is present in both the original and modified SQL (even if unchanged), compatibility is unknown.

  • Filter clauses on window fields (window_start, window_end, window_time). Adding, deleting, or retaining a filter on window fields all produce unknown compatibility.

    -- Original
    SELECT a, sum(b), max(c), window_start, window_end
    FROM (SELECT a, b, c, window_start, window_end
          FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(ts), INTERVAL '1' MINUTE)))
    GROUP BY a, window_start, window_end;
    
    -- Add a filter clause: unknown compatibility
    SELECT a, sum(b), max(c), window_start, window_end
    FROM (SELECT a, b, c, window_start, window_end
          FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(ts), INTERVAL '1' MINUTE))
          WHERE window_start >= TIMESTAMP '2024-04-15 08:06:00.000')
    GROUP BY a, window_start, window_end;
  • A user-defined table function (UDTF). Adding, deleting, or keeping a UDTF in the SQL all produce unknown compatibility.

    -- Original
    SELECT a, sum(b), length(c), window_start, window_end
    FROM (SELECT a, b, c, window_start, window_end
          FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(ts), INTERVAL '1' MINUTE)))
    GROUP BY a, c, window_start, window_end;
    
    -- Add a UDTF: unknown compatibility
    SELECT a, sum(b), length(c), window_start, window_end, c1, c2
    FROM (SELECT a, b, c, window_start, window_end, c1, c2
          FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(ts), INTERVAL '1' MINUTE)),
               LATERAL TABLE(split(c)) AS T(c1, c2))
    GROUP BY a, c, window_start, window_end, c1, c2;
  • A Python user-defined aggregate function (UDAF). Adding, deleting, or keeping a Python UDAF all produce unknown compatibility.

    -- Original
    SELECT a, sum(b), max(c), window_start
    FROM (SELECT a, b, c, window_start
          FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(ts), INTERVAL '1' MINUTE)))
    GROUP BY a, window_start;
    
    -- Add a Python UDAF: unknown compatibility
    SELECT a, sum(b), c, window_start
    FROM (SELECT a, b, weighted_avg(c) AS c, window_start
          FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(ts), INTERVAL '1' MINUTE))
          GROUP BY a, b, window_start)
    GROUP BY a, c, window_start;
  • Aggregation on window_start, window_end, or window_time. Applying an aggregate function to a window field — adding, removing, or retaining it — produces unknown compatibility.

    -- Original
    SELECT a, sum(b), max(c), window_start, window_end
    FROM (SELECT a, b, c, window_start, window_end
          FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(ts), INTERVAL '1' MINUTE)))
    GROUP BY a, window_start, window_end;
    
    -- Add aggregation on window_start: unknown compatibility
    SELECT a, sum(b), max(c), MAX(window_start) AS ag, window_end
    FROM (SELECT a, b, c, window_start, window_end
          FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(ts), INTERVAL '1' MINUTE)))
    GROUP BY (a, window_start, window_end);
  • GROUPING SETS, CUBE, or ROLLUP present both before and after the modification. If the GROUP BY clause uses GROUPING SETS, CUBE, or ROLLUP both before and after the change, compatibility is unknown.