You can use local-global optimization to resolve data skew issues of an aggregate node.
Background information
When you use local-global optimization, the aggregation process is divided into two phases: local aggregation and global aggregation. They are similar to the combine and reduce phases in MapReduce. In the local aggregation phase, the system aggregates data that is locally buffered at the input node into batches and generates an accumulator for each batch of data. In the global aggregation phase, the system merges the accumulators to obtain the global aggregation result.

Optimization method
Sample code
import org.apache.flink.table.functions.AggregateFunction;
public class CountUdaf extends AggregateFunction<Long, CountUdaf.CountAccum> {
// Define the data structure of the accumulator that stores state data of the COUNT UDAF.
public static class CountAccum {
public long total;
}
// Initialize the accumulator of the COUNT UDAF.
public CountAccum createAccumulator() {
CountAccum acc = new CountAccum();
acc.total = 0;
return acc;
}
// The getValue method is used to compute the result of the COUNT UDAF based on the accumulator that stores state data.
public Long getValue(CountAccum accumulator) {
return accumulator.total;
}
// The accumulate method is used to update the accumulator that is used by the COUNT UDAF to store state data based on input data.
public void accumulate(CountAccum accumulator, Object iValue) {
accumulator.total++;
}
public void merge(CountAccum accumulator, Iterable<CountAccum> its) {
for (CountAccum other : its) {
accumulator.total += other.total;
}
}
}