This topic describes three solutions to real-time data aggregation using Realtime Compute for Apache Flink: stateful aggregation, stateless incremental aggregation, and intermediate aggregation table.
Background and challenges
Traditional real-time data aggregation systems face several challenges in real-world use cases, including:
Handling late data: In distributed real-time processing, data often arrives out of order because of network latency, system jitter, or upstream fluctuations. Without a proper late data-handling mechanism, such as watermarks, state rollbacks, or or window-based reprocessing, historical data can be overwritten by incorrect results. This leads to statistical bias, affecting monitoring accuracy and decision-making reliability.
Complicated state management: Traditionally, handling high-dimensional data or data skew can cause the state size to grow exponentially. A large state size consumes excessive memory resources, slows down checkpointing, increases recovery time, compromises system stability, and increases the risk of job failure.
Balancing resource consumption and performance: Real-time aggregation must trade off several factors, including compute overhead, storage usage, and result accuracy. Relying too heavily on in-memory state increases resource costs, while frequent reads and writes to external storage can create I/O bottlenecks that impact throughput and latency.
Solution comparison
Solution | Core advantages | Typical use cases | Development complexity | O&M complexity | Data accuracy | Pressure on downstream storage | Resource efficiency |
Stateful aggregation |
| Ideal for workloads with stable datasets and data dimensions, along with low ratios of late data. Example:
| Low | Medium to high (State management) | Low (Late data) | High | Medium |
Stateless incremental aggregation |
| Ideal for use cases where accuracy is essential while late data is common. Example:
| Medium | Low | High | Medium to high (Reading historical state) | Medium |
Intermediate data lake aggregation |
| Ideal for use cases with any of the characteristics below:
| High | Medium to high (Component maintenance) | High | Low (Batch writes) | Optimal |
Traditional stateful aggregation
This solution uses Flink's built-in mechanism to maintain states in memory for real-time data aggregation.
The SQL snippet below aggregates a log stream (view_source) in real time. It calculates page views (pv) and clicks by time (ts) and cluster, and writes the results to sink_table.
INSERT INTO sink_table
SELECT
ts, cluster,
SUM(pv) as pv,
SUM(click) as click
FROM view_source
GROUP BY ts, cluster;Solution description
This is a basic solution to real-time aggregation. It uses Flink's stateful aggregation capabilities, enabling you to develop data aggregation tasks using familiar SQL syntax. As data flows through an operator, the engine finds and updates the state based on a predefined key and then writes the aggregated result to downstream storage.
Benefits and limitations
This solution is simple to implement and delivers high data freshness. However, it has clear limitations:
Late data corrupts aggregations after state expiration: Consider a time window that is correctly aggregated to a final result (e.g.,
pv=999). When the state for this window expires (e.g., at 09:30), any subsequent late event with a timestamp in that window will not find the existing state. Instead, Flink may initialize a new state and output an incomplete result (e.g.,pv=1), which then overwrites the previously correct result in the sink.State bloat undermines performance: High concurrency on large datasets can cause aggregation state hotspots. This leads to rapid state expansion, high memory consumption, and significantly slowed checkpointing, impacting system stability and recovery time.
Prolonged fault recovery: During system fault recovery, the entire state must be loaded from a checkpoint. The larger the state, the longer the recovery time, which affects system availability.
Stateless incremental aggregation with UDAFs
This solution pushes complex state management down to the storage system. The stream processing engine performs stateless incremental calculations, and relies on the downstream storage to compute the final aggregated result.
The following user-defined aggregate function (UDAF) implements stateless aggregation. It performs incremental calculations on data in the current batch or window.
public class LongSumAggUDAF extends AggregateFunction<Long, LongAccumulator> {
@Override
public LongAccumulator createAccumulator() {
return new LongAccumulator();
}
public void accumulate(LongAccumulator acc, Long value) {
acc.add(value); // Accumulates data in the current micro batch
}
@Override
public Long getValue(LongAccumulator acc) {
return acc.getValue();
}
}Solution description
UDAF: Unlike standard aggregate functions, a UDAF performs incremental calculations only on data within the current batch or window at the operator level. It does not maintain historical state across batches, focusing only on the current micro-batch's values.
Incremental aggregation via "read-compute-write": The sink connector first reads the last known aggregated value for a given primary key from storage. It then merges the historical value with the incremental result from the current data batch. Finally, it writes the new aggregation result back to storage. This "read-compute-write" pattern ensures correct aggregation, even if the job restarts or late data arrives.
Benefits and limitations
This solution perfectly handles late data. It prevents results from being overwritten incorrectly due to state expiration. Furthermore, because the engine no longer maintains a large state size, its memory efficiency and checkpoint performance greatly improve. However, the disadvantage is that this solution requires an additional read operation for each batch or window, which increases the load on the storage system and processing latency.
Intermediate aggregation table
This solution introduces a data lake table, such as a Paimon table, to store the aggregation results. As shown in the illustration, the aggregation pipeline works like this: Flink performs stateless aggregations on raw data, merges the results in the lake table, and sends them downstream.
The following code creates a Paimon table. When primary keys match, the pv and click columns are aggregated using sum.
-- Create a Paimon aggregation table
CREATE TABLE paimon_agg (
ts TIMESTAMP(3),
cluster STRING,
pv BIGINT,
click BIGINT,
PRIMARY KEY (ts, cluster) NOT ENFORCED
) WITH (
'merge-engine' = 'aggregation',
'fields.pv.aggregate-function' = 'sum',
'fields.click.aggregate-function' = 'sum'
);Solution description
This solution uses the Paimon connector's data merge mechanism to aggregate data. Set 'merge-engine' = 'aggregation' and specify aggregate functions, and the Paimon connector automatically merges new with historical results for a given column. The workflow includes:
Process data streams: Performs stateless aggregation and writes the results to the Paimon table.
Merge data: Merges aggregate results and manages versions.
Send the results: Sends the results downstream in batches and on a regular interval.
Benefits
Reduced pressure on downstream storage: Batch sync significantly reduces the real-time write load, improving the stability of downstream storage.
Simplified aggregation logic: This pipeline eliminates the need for "read-compute-write". It focuses on data ingestion and incremental processing.
Improved flexibility and maintainability: The tasks of each component are clearly defined. In addition, the Paimon table seamlessly integrates with various downstream systems.
Version management and historical snapshot lookups
Solution selection and considerations
Choosing a solution involves trade-offs among costs, efficiency, complexity, data scale, and technical capabilities. Below are summaries of each solution's benefits and limitations:
Stateful aggregation: Simple to implement and offers high freshness. But it struggles with handling late data and managing state effectively.
Stateless incremental aggregation: Effectively handles late data, offers low O&M overhead, and is widely applicable. However, it requires defining a UDAF aligned with your aggregation logic.
Intermediate aggregation table: Capable of handling massive datasets while reducing the load on downstream systems. But it's more complex to implement, requiring significant setup and maintenance.
Note that development and O&M overhead are often inversely related. When choosing a solution, balance cost, efficiency, complexity, data scale, and technical capabilities.