In Apache Flink, a window trigger determines when data in a window is ready for computation and output. Flink provides various built-in triggers for common use cases. But in complex business scenarios, custom triggers help you flexibly implement your business logic.
Overview
What triggers do
Monitor data elements entering a window.
Determine whether to trigger the processing based on predefined conditions, such as time and event count.
Return one of the following operations:
CONTINUE: Continue collecting data.FIRE: Trigger computation and retain states.FIRE_AND_PURGE: Trigger computation and clear states.PURGE: Clear states without triggering computation.
Built-in triggers
Flink provides the following built-in triggers for time- or count-based window operations:
Trigger type | Description |
EventTimeTrigger | Triggers a window when the watermark's time exceeds the window's end time. It is the default trigger for event-time windows. |
ProcessingTimeTrigger | Triggers a window when the processing time reaches the window's end time. It is the default trigger for processing-time windows. |
CountTrigger | Triggers a window when the number of events in the window reaches the specified threshold. |
PurgingTrigger | Automatically purges a window when triggered. It wraps other triggers. |
If you explicitly set a specific trigger for a window, it will replace the default trigger. For example, if you set a CountTrigger for an event-time window, EventTimeTrigger will no longer take effect.
In real-world use cases, we often need to:
Use more than one trigger condition, such as triggering a window after 5 events or 1 minute.
Trigger a window based on specific events, such as user logout and order completion.
Manage a window's lifecycle, like early firing, delayed firing, and multiple firing.
Avoid default behavior's side effects, such as a window being triggered by late data again.
These complex requirements cannot be satisfied by built-in triggers, making custom triggers necessary.
Create a custom trigger
Inherit the abstract class
Trigger<T, W>.T: the type of data in the window.
W: the window type, such as
TimeWindowor its subclass.
Override the core methods.
public abstract class Trigger<T, W extends Window> implements Serializable { public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx); public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx); public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx); public void clear(W window, TriggerContext ctx); public boolean canMerge(); public void onMerge(W window, OnMergeContext mergeContext); }Methods include:
Method
Description
Operation
onElement()
Called when each new event enters the window.
Determines whether a trigger condition, such as an event count threshold or the arrival of a specific event, is met.
onProcessingTime()
Called when a registered processing-time timer fires.
Used for processing-time window operations. This method is not commonly used.
onEventTime()
Called when a registered event-time timer fires.
Closes a window when the watermark hits the end timestamp.
clear()
Clears a window.
Cleans up window states to prevent memory leaks.
canMerge()
onMerge()
Called when windows (such as session windows) are merged.
If the method used for merging windows, you must correctly update the timers.
Use
TriggerContextto manage states and timers.State management: Obtain window states, such as counters, through
ctx.getPartitionedState(StateDescriptor).Timer management: Register timers through
ctx.registerEventTimeTimer(timestamp).State cleanup: Use
state.clear()inclear()to clear states.Timer deletion: Oftentimes, you do not need to manually delete timers. Flink automatically cleans them up when a window closes.
Example
In a 1-hour tumbling event-time window, the window computation is triggered when the window ends. Additionally, an immediate computation is triggered upon the arrival of a user's fifth event within that window. This early trigger fires at most once per window.
Sample code:
public class CustomCountTrigger extends Trigger<UserEvent, TimeWindow> {
// Record the number of elements of each key in a window
private final ValueStateDescriptor<Integer> countStateDesc =
new ValueStateDescriptor<>("count", Integer.class);
// Record whether computation has already been triggered
private final ValueStateDescriptor<Boolean> flagStateDesc =
new ValueStateDescriptor<>("flag", Boolean.class);
// Call onElement() when a new element enters the window
@Override
public TriggerResult onElement(UserEvent event, long timestamp, TimeWindow window, Trigger.TriggerContext ctx) throws Exception {
// Get the count state of the current key in the current window
ValueState<Integer> countState = ctx.getPartitionedState(countStateDesc);
ValueState<Boolean> flagState = ctx.getPartitionedState(flagStateDesc);
int count = countState.value() == null ? 0 : countState.value();
boolean flag = flagState.value() == null ? false : flagState.value();
// Increment count for each element
count += 1;
countState.update(count); // Update state
// When the count reaches 5, trigger window computation immediately
if (count >= 5 && !flag) {
flagState.update(true); // Update state to ensure this additional computation is triggered only once
return TriggerResult.FIRE;
} else {
return TriggerResult.CONTINUE;
}
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext triggerContext) throws Exception {
// Skip the processing-time timer in this trigger.
return TriggerResult.CONTINUE;
}
// Call onEventTime() when the registered event-time timer fires (e.g., window end time)
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.FIRE_AND_PURGE; // Trigger is fired and window is cleared
}
@Override
public void clear(TimeWindow timeWindow, TriggerContext ctx) throws Exception {
// Clean up window states
ctx.getPartitionedState(countStateDesc).clear();
ctx.getPartitionedState(flagStateDesc).clear();
}
}Trigger usage: The trigger must be used with window operations during stream processing.
DataStream<UserEvent> source = ...; // Existing input stream
source.keyBy(keySelector) // Such as .keyBy(value -> value.userId) to group elements by user
.window(TumblingEventTimeWindows.of(Time.seconds(60))) // Set the tumbling window size to 60 seconds
.trigger(new CustomCountTrigger()) // Custom trigger: trigger after 5 elements or window timeout
.process(new ProcessWindowFunction<UserEvent, String, KeyedType, TimeWindow>() {
@Override
public void process(KeyedType key, Context context, Iterable<UserEvent> elements, Collector<String> out) {
int count = 0;
// Iterate through all elements in the window, count those with action == 1
for (UserEvent event : elements) {
if (event.action == 1) {
count++;
}
}
// Output result
out.collect("Key: " + key + ", Access count: " + count);
}
})
.print();Result verification: If 8 access records from the same user are received in one minute, the output is 2 records:
Key: 101, Access count: 5 // As count >= 5, early computation is triggered, but window states aren't cleaned.
Key: 101, Access count: 8 // Triggers computation when the window closes and clears state.Extensions and summary
Advanced use cases and implementations
Use case 1
Trigger computation more than once before a window ends for alerting.
Remove
flagStateDescto enable triggering computation more than once.Alternatively, add a count marker to end a window only after a certain number of trigger firing (such as for alert events).
### After removing flagStateDesc, 5 records are ouput. Key: 101, Access count: 5 // count >= 5, triggers early computation and doesn't clear the state. Key: 101, Access count: 6 // count >= 5, triggers early computation. Key: 101, Access count: 7 // count >= 5, triggers early computation. Key: 101, Access count: 8 // count >= 5, triggers early computation. Key: 101, Access count: 8 // Trigger computation when the window ends, and clears the state.The trigger only decides when to trigger computation. The actual object information is obtained from
.process, where you can specify producing different collect results based on quantity and condition states.
Use case 2
Window closure requires the advancement of watermarks. How can we ensure timely window computation if no data is generated for a long period of time?
Solution | Dependent on watermark? | Timely triggering guaranteed? | Suitable for out-of-order data? | Suitable scenarios |
Use processing-time windows | No | Yes | No | Event-time processing is not required. |
Use the | Yes | No (depends on the watermark interval) | Yes | Suitable for simple scenarios, such as an input partition is idle. |
Use a custom watermark generator | Yes | Yes (a watermark is periodically updated) | Yes | This is the standard approach. |
Register a timer | Yes (optional) | Yes (fault-tolerant) | No (forced window closure requires appropriate timing) | Enhanced reliability is required. |
Send heartbeat messages from external systems | No | Yes | Yes | Kafka requires additional maintenance, while task orchestration does not. |
Solution 1: Use processing-time window
If you don't need event time semantics (namely, the time that events occur is not a concern), you can use processing-time windows:
.keyBy(keySelector) .window(TumblingProcessingTimeWindows.of(Time.seconds(60))) // Use processing time .process(new MyProcessWindowFunction())Solution 2: Use the
withIdlenessmethodFlink's
WatermarkStrategyprovides thewithIdlenessmethod to automatically mark a data source as idle after a specified period of inactivity, preventing it from blocking watermark generation.// An idle data source will not longer involve in the computation of minimum watermarks, and will not hold back the advancement of watermarks of active data sources. WatermarkStrategy .<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20)) .withIdleness(Duration.ofMinutes(1)); //Indicates that if a data source or partition has no events for 1 minute, it is marked as idleSolution 3: Use a custom watermark generator
If you need to use event time semantics, define a custom watermark generator that ensures watermark continues advancing even if no new data arrives:
Record the arrival time of the latest event in the
onEvent()method.Check the interval between the current time and the last time an event is received in the
onPeriodicEmit()method.If the interval exceeds the set threshold, consider the data source as idle, skip generating a watermark or directly generate a specific watermark.
public class IdleAwareWatermarkGenerator implements WatermarkGenerator<MyEvent> { private long lastEventTimestamp = Long.MIN_VALUE; private final long maxIdleTimeMs; // Max idle time public IdleAwareWatermarkGenerator(long maxIdleTimeMs) { this.maxIdleTimeMs = maxIdleTimeMs; } @Override public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) { lastEventTimestamp = Math.max(lastEventTimestamp, eventTimestamp); } @Override public void onPeriodicEmit(WatermarkOutput output) { long currentTime = System.currentTimeMillis(); if (lastEventTimestamp == Long.MIN_VALUE || currentTime - lastEventTimestamp > maxIdleTimeMs) { // If no events arrive for a long time, don't emit new watermarks return; } output.emitWatermark(new Watermark(lastEventTimestamp)); } }WatermarkStrategy<MyEvent> strategy = WatermarkStrategy .forGenerator((ctx) -> new IdleAwareWatermarkGenerator(60_000)) // Set maximum idle time to 60 seconds .withTimestampAssigner((event, timestamp) -> event.getEventTime());Solution 4: Register a timer
@Override public TriggerResult onElement(Event event, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception { ctx.registerEventTimeTimer(window.maxTimestamp()); // Register an event-time timer ctx.registerProcessingTimeTimer(window.maxTimestamp() + 1000); // Fault tolerance: ensure a timer is triggered even without events return TriggerResult.CONTINUE; } @Override public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception { return TriggerResult.FIRE_AND_PURGE; // Forced triggering and cleanup }Solution 5: Send heartbeat messages from external systems
Use Kafka or workflows to send regular heartbeat messages downstream to trigger window closure.
Summary
Key points | Description |
Understand a window's lifecycle |
|
Use state and timers appropriately | Use |
Override methods | Implement the following methods: |
Enable window merging (such as session windows) | For mergeable windows like session windows, implement |
Avoid repeated firing | Control the number of times a trigger can fire, especially when events may continue to arrive after |
Complete code
CustomCountTrigger
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
public class CustomCountTrigger extends Trigger<UserEvent, TimeWindow> {
// Record the number of elements of each key in the window
private final ValueStateDescriptor<Integer> countStateDesc =
new ValueStateDescriptor<>("count", Integer.class);
// Record whether computation has already been triggered
private final ValueStateDescriptor<Boolean> flagStateDesc =
new ValueStateDescriptor<>("flag", Boolean.class);
// onElement is called when a new element enters the window
@Override
public TriggerResult onElement(UserEvent event, long timestamp, TimeWindow window, Trigger.TriggerContext ctx) throws Exception {
// Get the state of an element of the current key in the current window
ValueState<Integer> countState = ctx.getPartitionedState(countStateDesc);
ValueState<Boolean> flagState = ctx.getPartitionedState(flagStateDesc);
int count = countState.value() == null ? 0 : countState.value();
boolean flag = flagState.value() == null ? false : flagState.value();
// Increment count for each new element
count += 1;
countState.update(count); // Update the state
// If count reaches 5, trigger window computation immediately
if (count >= 5 && !flag) {
flagState.update(true); // Update state to ensure additional computation is triggered only once
return TriggerResult.FIRE;
} else {
return TriggerResult.CONTINUE;
}
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext triggerContext) throws Exception {
// Skip processing-time timers in this trigger
return TriggerResult.CONTINUE;
}
// onEventTime is called when registered event-time timer is triggered (such as upon window closure)
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.FIRE_AND_PURGE; // Trigger is fired and and the window is cleared
}
@Override
public void clear(TimeWindow timeWindow, TriggerContext ctx) throws Exception {
// Clean up window state
ctx.getPartitionedState(countStateDesc).clear();
ctx.getPartitionedState(flagStateDesc).clear();
}
}KafkaTriggerTest
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
public class KafkaTriggerTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers("<BootstrapServers>")
.setTopics("trigger")
.setGroupId("trigger")
.setStartingOffsets(OffsetsInitializer.earliest())
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
.build();
// Example data: 101,alie,1,2025-6-10 10:02:00
DataStream<UserEvent> userEventStream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source")
.map(new MapFunction<String, UserEvent>() {
@Override
public UserEvent map(String value) throws Exception {
String[] fields = value.split(",");
return new UserEvent(
Integer.parseInt(fields[0]),
fields[1],
fields[2],
fields[3]
);
}
});
WatermarkStrategy<UserEvent> watermarkStrategy = WatermarkStrategy
.<UserEvent>forBoundedOutOfOrderness(java.time.Duration.ofSeconds(2))
.withTimestampAssigner((event, timestamp) -> {
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
return java.time.LocalDateTime.parse(event.getEvent_time(), formatter).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
});
DataStream<UserEvent> timestampedStream = userEventStream.assignTimestampsAndWatermarks(watermarkStrategy);
timestampedStream
.keyBy(UserEvent::getUser_id)
.window(TumblingEventTimeWindows.of(Time.seconds(60))) // Set the tumbling window size to 60 seconds
.trigger(new CustomCountTrigger()) // Custom trigger: fired upon the arrival of 5 data elements or timeout
.process(new ProcessWindowFunction<UserEvent, String, Integer, TimeWindow>() {
@Override
public void process(Integer userId, ProcessWindowFunction<UserEvent, String, Integer, TimeWindow>.Context context, Iterable<UserEvent> userEvents, Collector<String> collector) throws Exception {
int count = 0;
for (UserEvent event : userEvents) {
if (event.getEvent_type().equals("1"))
count++;
}
collector.collect("User ID: " + userId + " | Count: " + count + " | Window: " + context.window());
}
}).print();
env.execute("Kafka Partitioner Data Stream");
}
}UserEvent
public class UserEvent {
private int user_id;
private String username;
private String event_type;
private String event_time;
public UserEvent(int user_id, String username, String event_type, String event_time) {
this.user_id = user_id;
this.username = username;
this.event_type = event_type;
this.event_time = event_time;
}
public String toString() {
return "user_id:" + user_id + " username:" + username + " event_type:" + event_type + " event_time:" + event_time;
}
public int getUser_id() {
return user_id;
}
public void setUser_id(int user_id) {
this.user_id = user_id;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public String getEvent_type() {
return event_type;
}
public void setEvent_type(String event_type) {
this.event_type = event_type;
}
public String getEvent_time() {
return event_time;
}
public void setEvent_time(String event_time) {
this.event_time = event_time;
}
}