A window trigger decides when to compute and emit results for a window. Flink's built-in triggers cover time- and count-based conditions, but complex business logic often requires a custom trigger — for example, firing early when a specific event arrives, or combining time and event-count conditions.
How it works
When events flow into a windowed stream, the trigger is called on each arrival. It returns one of four TriggerResult values that tell Flink what to do next:
| Result | Effect |
|---|---|
CONTINUE | Do nothing. Keep collecting events. |
FIRE | Compute the window now. Retain all state. |
FIRE_AND_PURGE | Compute the window and clear its contents. |
PURGE | Clear window contents without computing. |
FIRE_AND_PURGE removes the events stored in the window, but it does not clear trigger state (such as ValueState counters). Clear trigger state explicitly in the clear() method.
Built-in triggers
Flink provides four built-in triggers:
| Trigger | Description |
|---|---|
EventTimeTrigger | Fires when the watermark exceeds the window's end time. Default for event-time windows. |
ProcessingTimeTrigger | Fires when processing time reaches the window's end time. Default for processing-time windows. |
CountTrigger | Fires when the number of events in the window reaches the specified threshold. |
PurgingTrigger | Wraps another trigger and automatically purges the window after it fires. |
Calling .trigger() on a window replaces its default trigger entirely. For example, attaching a CountTrigger to a TumblingEventTimeWindows disables time-based firing — the window only fires by count. Write a custom trigger when you need both conditions to apply simultaneously.
Built-in triggers are not enough when you need to:
Combine multiple conditions (for example, fire after 5 events or after 1 minute, whichever comes first)
React to a specific event type, such as a user logout or order completion
Control the window lifecycle precisely — early firing, delayed firing, or multiple firings per window
Prevent late data from re-triggering a window that already fired
Create a custom trigger
Extend the abstract class Trigger<T, W>:
T: the event type in the windowW: the window type, such asTimeWindow
public abstract class Trigger<T, W extends Window> implements Serializable {
public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx);
public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx);
public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx);
public void clear(W window, TriggerContext ctx);
public boolean canMerge();
public void onMerge(W window, OnMergeContext mergeContext);
}Methods:
| Method | Called when | Typical use |
|---|---|---|
onElement() | A new event enters the window | Check whether a trigger condition is met (count threshold, specific event type) |
onEventTime() | A registered event-time timer fires | Close the window when the watermark hits the end timestamp |
onProcessingTime() | A registered processing-time timer fires | Processing-time window operations (less common) |
clear() | The window is cleared | Clean up ValueState to prevent memory leaks |
canMerge() / onMerge() | Windows are merged (for example, session windows) | Update timers correctly during a merge |
Two things to note:
onElement(),onEventTime(), andonProcessingTime()each return aTriggerResult. The return value determines whether Flink fires, purges, or continues for that specific invocation.All three methods can register timers — via
ctx.registerEventTimeTimer(timestamp)orctx.registerProcessingTimeTimer(timestamp)— to schedule future callbacks.
Manage state and timers with `TriggerContext`:
Read and write per-key, per-window state:
ctx.getPartitionedState(StateDescriptor)Register event-time timers:
ctx.registerEventTimeTimer(timestamp)Register processing-time timers:
ctx.registerProcessingTimeTimer(timestamp)Clean up state in
clear(): callstate.clear()Timer cleanup: Flink automatically deletes timers when a window closes. Manual deletion is rarely needed.
Example: early firing on the fifth event
This example uses a 1-minute tumbling event-time window. In addition to the normal end-of-window firing, it fires early when a user's fifth event arrives within the window. The early firing happens at most once per window.
public class CustomCountTrigger extends Trigger<UserEvent, TimeWindow> {
// Per-key, per-window event counter
private final ValueStateDescriptor<Integer> countStateDesc =
new ValueStateDescriptor<>("count", Integer.class);
// Flag to ensure early firing happens only once per window
private final ValueStateDescriptor<Boolean> flagStateDesc =
new ValueStateDescriptor<>("flag", Boolean.class);
@Override
public TriggerResult onElement(UserEvent event, long timestamp, TimeWindow window, Trigger.TriggerContext ctx) throws Exception {
ValueState<Integer> countState = ctx.getPartitionedState(countStateDesc);
ValueState<Boolean> flagState = ctx.getPartitionedState(flagStateDesc);
int count = countState.value() == null ? 0 : countState.value();
boolean flag = flagState.value() == null ? false : flagState.value();
count += 1;
countState.update(count);
// Fire early on the fifth event, but only once per window
if (count >= 5 && !flag) {
flagState.update(true);
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE; // Not used in this trigger
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.FIRE_AND_PURGE; // Normal end-of-window firing; clears window contents
}
@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
ctx.getPartitionedState(countStateDesc).clear();
ctx.getPartitionedState(flagStateDesc).clear();
}
}Attach the trigger to a windowed stream:
DataStream<UserEvent> source = ...; // Existing input stream
source.keyBy(event -> event.getUser_id())
.window(TumblingEventTimeWindows.of(Time.seconds(60)))
.trigger(new CustomCountTrigger())
.process(new ProcessWindowFunction<UserEvent, String, Integer, TimeWindow>() {
@Override
public void process(Integer key, Context context, Iterable<UserEvent> elements, Collector<String> out) {
int count = 0;
for (UserEvent event : elements) {
if (event.action == 1) count++;
}
out.collect("Key: " + key + ", Access count: " + count);
}
})
.print();Verify the output: If 8 events arrive for the same user within one minute, you get two output records:
Key: 101, Access count: 5 // Early firing at the fifth event; window state is retained
Key: 101, Access count: 8 // End-of-window firing; window contents are clearedAdvanced use cases
Fire multiple times before the window closes
Remove flagStateDesc to allow the trigger to fire on every event after the count reaches 5. This is useful for streaming alerts where you want output on each new qualifying event.
With flagStateDesc removed, the same 8-event scenario produces 5 records:
Key: 101, Access count: 5 // count >= 5, early firing, state retained
Key: 101, Access count: 6 // count >= 5, early firing
Key: 101, Access count: 7 // count >= 5, early firing
Key: 101, Access count: 8 // count >= 5, early firing
Key: 101, Access count: 8 // End-of-window firing; window contents clearedThe trigger controls when to fire. The actual output content — which events to include, how to aggregate — is determined in the .process() function. Use state inside .process() to differentiate early results from final results.
Handle idle sources: ensure timely window closure
Event-time window closure depends on watermark advancement. If no events arrive for a long period, watermarks stop advancing and windows never close. The following approaches address this:
| Approach | Watermark dependency | Timely firing | Handles out-of-order data | When to use |
|---|---|---|---|---|
| Processing-time windows | None | Yes | No | Event-time semantics not required |
withIdleness() | Yes | Depends on watermark interval | Yes | Simple scenarios with idle partitions |
| Custom watermark generator | Yes | Yes (periodic update) | Yes | Standard approach for event-time semantics |
| Register a processing-time timer as fallback | Optional | Yes (fault-tolerant) | No (forces window closure) | Enhanced reliability required |
| Heartbeat messages from external systems | None | Yes | Yes | Kafka or workflow-orchestrated pipelines |
Approach 1: Use processing-time windows
If event-time semantics are not needed, switch to a processing-time window:
.keyBy(keySelector)
.window(TumblingProcessingTimeWindows.of(Time.seconds(60)))
.process(new MyProcessWindowFunction())Approach 2: Mark idle sources with `withIdleness()`
WatermarkStrategy.withIdleness() marks a source as idle after a period of inactivity. Idle sources are excluded from the global minimum watermark calculation, so they no longer hold back active sources.
WatermarkStrategy
.<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
.withIdleness(Duration.ofMinutes(1)); // Mark source as idle after 1 minute with no eventsApproach 3: Use a custom watermark generator
Implement WatermarkGenerator to keep the watermark advancing even when no events arrive:
In
onEvent(): record the latest event timestamp.In
onPeriodicEmit(): if the elapsed time since the last event exceeds the threshold, skip watermark emission (treating the source as idle); otherwise, emit a watermark based on the latest timestamp.
public class IdleAwareWatermarkGenerator implements WatermarkGenerator<MyEvent> {
private long lastEventTimestamp = Long.MIN_VALUE;
private final long maxIdleTimeMs;
public IdleAwareWatermarkGenerator(long maxIdleTimeMs) {
this.maxIdleTimeMs = maxIdleTimeMs;
}
@Override
public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
lastEventTimestamp = Math.max(lastEventTimestamp, eventTimestamp);
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
long currentTime = System.currentTimeMillis();
if (lastEventTimestamp == Long.MIN_VALUE || currentTime - lastEventTimestamp > maxIdleTimeMs) {
return; // Source is idle; skip watermark emission
}
output.emitWatermark(new Watermark(lastEventTimestamp));
}
}WatermarkStrategy<MyEvent> strategy = WatermarkStrategy
.forGenerator(ctx -> new IdleAwareWatermarkGenerator(60_000)) // 60-second idle threshold
.withTimestampAssigner((event, timestamp) -> event.getEventTime());Approach 4: Register a processing-time timer as fallback
Register both an event-time timer and a processing-time timer in onElement(). The processing-time timer fires regardless of watermark state, acting as a safety net:
@Override
public TriggerResult onElement(Event event, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
ctx.registerEventTimeTimer(window.maxTimestamp());
ctx.registerProcessingTimeTimer(window.maxTimestamp() + 1000); // Fallback: fires even without new events
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.FIRE_AND_PURGE; // Force window closure
}Approach 5: Send heartbeat messages from external systems
Use Kafka or workflows to periodically inject synthetic events downstream. These heartbeats advance the watermark and close idle windows without changing your trigger logic.
Key points
| Topic | Guidance |
|---|---|
FIRE vs FIRE_AND_PURGE | FIRE retains window contents for multiple firings. FIRE_AND_PURGE clears them. Neither clears trigger state — do that in clear(). |
| State management | Use ValueState for per-key, per-window counters and flags. Always clean up in clear() to avoid memory leaks. |
| Required methods | Implement onElement(), onEventTime(), onProcessingTime(), and clear(). |
| Session window merging | For mergeable windows such as session windows, implement canMerge() and onMerge() to keep timer state consistent across merged windows. |
| Controlling firing frequency | Use a boolean flag (ValueState<Boolean>) to cap the number of early firings per window when events continue arriving after FIRE. |
Complete code
CustomCountTrigger
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
public class CustomCountTrigger extends Trigger<UserEvent, TimeWindow> {
private final ValueStateDescriptor<Integer> countStateDesc =
new ValueStateDescriptor<>("count", Integer.class);
private final ValueStateDescriptor<Boolean> flagStateDesc =
new ValueStateDescriptor<>("flag", Boolean.class);
@Override
public TriggerResult onElement(UserEvent event, long timestamp, TimeWindow window, Trigger.TriggerContext ctx) throws Exception {
ValueState<Integer> countState = ctx.getPartitionedState(countStateDesc);
ValueState<Boolean> flagState = ctx.getPartitionedState(flagStateDesc);
int count = countState.value() == null ? 0 : countState.value();
boolean flag = flagState.value() == null ? false : flagState.value();
count += 1;
countState.update(count);
if (count >= 5 && !flag) {
flagState.update(true);
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.FIRE_AND_PURGE;
}
@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
ctx.getPartitionedState(countStateDesc).clear();
ctx.getPartitionedState(flagStateDesc).clear();
}
}KafkaTriggerTest
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
public class KafkaTriggerTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers("<BootstrapServers>")
.setTopics("trigger")
.setGroupId("trigger")
.setStartingOffsets(OffsetsInitializer.earliest())
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
.build();
// Example record format: 101,alice,1,2025-06-10 10:02:00
DataStream<UserEvent> userEventStream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source")
.map(new MapFunction<String, UserEvent>() {
@Override
public UserEvent map(String value) throws Exception {
String[] fields = value.split(",");
return new UserEvent(
Integer.parseInt(fields[0]),
fields[1],
fields[2],
fields[3]
);
}
});
WatermarkStrategy<UserEvent> watermarkStrategy = WatermarkStrategy
.<UserEvent>forBoundedOutOfOrderness(java.time.Duration.ofSeconds(2))
.withTimestampAssigner((event, timestamp) -> {
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
return java.time.LocalDateTime.parse(event.getEvent_time(), formatter)
.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
});
DataStream<UserEvent> timestampedStream = userEventStream.assignTimestampsAndWatermarks(watermarkStrategy);
timestampedStream
.keyBy(UserEvent::getUser_id)
.window(TumblingEventTimeWindows.of(Time.seconds(60)))
.trigger(new CustomCountTrigger())
.process(new ProcessWindowFunction<UserEvent, String, Integer, TimeWindow>() {
@Override
public void process(Integer userId, Context context, Iterable<UserEvent> userEvents, Collector<String> collector) throws Exception {
int count = 0;
for (UserEvent event : userEvents) {
if (event.getEvent_type().equals("1")) count++;
}
collector.collect("User ID: " + userId + " | Count: " + count + " | Window: " + context.window());
}
}).print();
env.execute("Kafka trigger test");
}
}UserEvent
public class UserEvent {
private int user_id;
private String username;
private String event_type;
private String event_time;
public UserEvent(int user_id, String username, String event_type, String event_time) {
this.user_id = user_id;
this.username = username;
this.event_type = event_type;
this.event_time = event_time;
}
public String toString() {
return "user_id:" + user_id + " username:" + username + " event_type:" + event_type + " event_time:" + event_time;
}
public int getUser_id() { return user_id; }
public void setUser_id(int user_id) { this.user_id = user_id; }
public String getUsername() { return username; }
public void setUsername(String username) { this.username = username; }
public String getEvent_type() { return event_type; }
public void setEvent_type(String event_type) { this.event_type = event_type; }
public String getEvent_time() { return event_time; }
public void setEvent_time(String event_time) { this.event_time = event_time; }
}