Flink timers let operators trigger computation at a specific processing time or event time. Windows in Flink are built on timers internally. For use cases that window APIs cannot cover—custom aggregation logic, heartbeat timeout alerts, or expiring stale state—you can register and manage timers directly through the DataStream API.
How timers work
Timers are a core building block of Flink streaming applications, giving you access to:
Events (stream elements) — the primary driver of computation
State (fault-tolerant, consistent) — persisted across records and restarts
Timers (event time and processing time) — time-driven callbacks, available only on keyed streams
Think of KeyedProcessFunction as a FlatMapFunction with access to keyed state and timers. It processes each incoming record and lets you schedule a callback at a future time.
Supported functions
Timers are available through TimerService, which is accessible from the Context object in these functions:
| Function | Stream type |
|---|---|
KeyedProcessFunction | KeyedStream |
KeyedCoProcessFunction | ConnectedStream |
KeyedBroadcastProcessFunction | BroadcastConnectedStream |
KeyedProcessFunction is the most commonly used.
Timers are saved and restored as part of keyed state. Use them in KeyedProcessFunction, not in ProcessFunction.
KeyedProcessFunction API
KeyedProcessFunction extends AbstractRichFunction and has two key methods:
public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction {
// Called for each incoming record.
public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;
// Called when a registered timer fires.
public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}
// Context available in both processElement and onTimer.
public abstract class Context {
// Timestamp of the current record or timer.
public abstract Long timestamp();
// Access TimerService to register or delete timers.
public abstract TimerService timerService();
// Emit a record to a side output.
public abstract <X> void output(OutputTag<X> outputTag, X value);
// Key of the current record.
public abstract K getCurrentKey();
}
// Extended context available only in onTimer.
public abstract class OnTimerContext extends Context {
// Whether this is a processing-time or event-time timer.
public abstract TimeDomain timeDomain();
// Key of the current timer.
public abstract K getCurrentKey();
}
}Both processElement and onTimer receive a Context object with access to TimerService and a Collector for emitting results.
TimerService API
TimerService is the interface for registering and deleting timers. All timestamps are in milliseconds.
public interface TimerService {
// Current processing time.
long currentProcessingTime();
// Current event-time watermark.
long currentWatermark();
// Register a processing-time timer.
void registerProcessingTimeTimer(long time);
// Register an event-time timer.
void registerEventTimeTimer(long time);
// Delete a processing-time timer.
void deleteProcessingTimeTimer(long time);
// Delete an event-time timer.
void deleteEventTimeTimer(long time);
}Timer deduplication
The same key has only one timer at the same time. If you register multiple timers for the same key at the same time, onTimer is called exactly once.
Timers registered in
processElementuse the key of the current record.Timers registered in
onTimeruse the key of the current timer.
Each registered timer fires only once. To repeat logic periodically, register the next timer inside onTimer.
Thread safety
Flink synchronizes invocations of onTimer and processElement. They are never called concurrently for the same key, so you do not need to handle concurrent state modification. However, long-running onTimer logic can delay record processing.
Fault tolerance
Timers are checkpointed along with the application state and restored on failover or savepoint restart.
Processing-time timers: Timers that should have fired before the restore point fire immediately after restart. This can cause many timers to fire in a short window.
Event-time timers: Timers fire after the corresponding watermark is received. After a long pause, a large watermark advance can trigger many timers at once.
Timers on non-keyed streams
Timers are stored in keyed state and require a keyed stream. For streams without a natural key, use one of these approaches:
UUID key: Use a UUID field from the input data as the key in
keyBy. The UUID must exist in the input data — it cannot be generated insidekeyBy.Constant key: Use a constant as the key in
keyByand set parallelism to 1. This is suitable for global aggregation.
Use timers in practice
When to use timers
Use window APIs when they meet your requirements. Reach for KeyedProcessFunction and timers when you need logic that window APIs cannot express, such as:
Custom aggregation with complex state transitions
Heartbeat timeout alerts (fire when no data arrives within a time threshold)
Expiring stale state
Event-time windowing with a timer
The window API and a custom timer can produce equivalent results. The following example calculates the sum of input values for each key over 1-minute event-time windows.
Window API (recommended for standard windowing):
DataStream<Tuple2<String, Long>> sum = inputs
.keyBy(input -> input.f0)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.reduce(new SumReduceFunction());Equivalent logic with KeyedProcessFunction:
The implementation below does the following:
Stores the running sum in a
ValueState<Long>, scoped per key.On the first record for a key (or the first record after a timer fires), calculates the current 1-minute event-time window and registers a timer at the window boundary.
On subsequent records, accumulates the sum.
When the timer fires, emits the accumulated sum and clears the state.
DataStream<Tuple2<String, Long>> sum = inputs
.keyBy(input -> input.f0)
.process(new KeyedProcessFunction<String, Tuple2<String, Long>, Tuple2<String, Long>>() {
private ValueState<Long> sumState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
sumState = getRuntimeContext().getState(new ValueStateDescriptor<>("sum", Long.class));
}
@Override
public void processElement(Tuple2<String, Long> value, Context ctx, Collector<Tuple2<String, Long>> out) throws Exception {
if (sumState.value() == null) {
// First record for this key in this window: calculate the window boundary and register a timer.
ctx.timerService().registerEventTimeTimer(getWindowStartWithOffset(ctx.timestamp(), 0, 60 * 1000) + 60 * 1000);
sumState.update(value.f1);
} else {
// Accumulate.
sumState.update(sumState.value() + value.f1);
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out) throws Exception {
// Emit the accumulated sum and clear state.
out.collect(new Tuple2<>(ctx.getCurrentKey(), sumState.value()));
sumState.clear();
}
// Copied from TimeWindow.java. Calculates the start of the window containing the given timestamp.
private long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
final long remainder = (timestamp - offset) % windowSize;
if (remainder < 0) {
return timestamp - (remainder + windowSize);
} else {
return timestamp - remainder;
}
}
});Things to consider:
After
onTimerfires,sumStateis cleared. The next record for that key starts a new window and registers a new timer.If no new records arrive for a key after the timer fires, no output is generated for that key. If you need periodic output even when no data arrives, see the next section.
Periodic output with no incoming data
To output a value for every window interval regardless of whether new data arrives, modify onTimer to reset state and register the next timer instead of clearing it:
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out) throws Exception {
// Emit the accumulated sum.
out.collect(new Tuple2<>(ctx.getCurrentKey(), sumState.value()));
// Reset to 0 instead of clearing, so sumState.value() is never null.
sumState.update(0L);
// Register the timer for the next window. timestamp is the current window end, so add 60s.
ctx.timerService().registerEventTimeTimer(timestamp + 60 * 1000);
}Output timing is determined by the event-time watermark, not by wall clock time.
Processing-time windowing
To base windows on processing time instead of event time, replace the timer registration and time retrieval in processElement:
@Override
public void processElement(Tuple2<String, Long> value, Context ctx, Collector<Tuple2<String, Long>> out) throws Exception {
if (sumState.value() == null) {
// Calculate the window boundary based on current processing time and register a timer.
ctx.timerService().registerProcessingTimeTimer(getWindowStartWithOffset(ctx.timerService().currentProcessingTime(), 0, 60 * 1000) + 60 * 1000);
sumState.update(value.f1);
} else {
sumState.update(sumState.value() + value.f1);
}
}When the processing time reaches the registered timestamp, onTimer is called.
Heartbeat timeout alert
The following example sends an alert when no data arrives for a key within 60 seconds. It uses a processing-time timer and deletes the previous timer each time a new record arrives.
The logic:
On each incoming record, delete the previously registered timeout timer (if any).
Register a new timeout timer 60 seconds from now.
If
onTimerfires, no record arrived within 60 seconds — emit an alert.
DataStream<String> alerts = inputs
.keyBy(input -> input)
.process(new KeyedProcessFunction<String, String, String>() {
// Stores the timestamp of the currently registered timeout timer.
private ValueState<Long> lastTimerState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
lastTimerState = getRuntimeContext().getState(new ValueStateDescriptor<>("timer", Long.class));
}
@Override
public void processElement(String value, Context ctx, Collector<String> out) throws Exception {
if (lastTimerState.value() != null) {
// Cancel the previous timeout timer.
ctx.timerService().deleteProcessingTimeTimer(lastTimerState.value());
}
// Register a new timeout 60 seconds from now, and record it for later deletion.
long timeout = ctx.timerService().currentProcessingTime() + 60 * 1000;
ctx.timerService().registerProcessingTimeTimer(timeout);
lastTimerState.update(timeout);
// Pass the record through.
out.collect(value);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
// No data arrived within 60 seconds. Emit an alert.
out.collect("Heartbeat timeout: " + ctx.getCurrentKey());
}
});Things to consider:
To delete a timer, the function must record the registration timestamp in state. Flink provides no API to query whether a timer is currently registered.
Consider using a side output for alert messages to keep them separate from the main data stream.
Usage notes
Avoid timer storms
When many timers for different keys are scheduled at the same timestamp, they all fire at once. This can cause latency spikes and high resource usage. To reduce the risk, adjust the trigger time of timers to be spread across a range of minutes rather than all landing on the same point in time (for example, avoid scheduling all timers on the hour).
Avoid timer accumulation
Registering a new timer on every call to processElement or onTimer without deleting previous ones causes the timer count to grow unboundedly. Track the registered timer in state and delete it before registering a new one, as shown in the heartbeat example.
Monitor checkpoint time and memory
The overhead of individual timers is small, and registering timers for many keys is common. If checkpoint duration or memory usage increases significantly after introducing timers, review the timer registration logic or consider an alternative approach.
Processing-time timers on bounded streams
On a bounded stream, processing-time timers that have not yet reached their trigger time are discarded when data processing ends. Records that depend on these timers are lost. Use event-time timers or handle end-of-stream explicitly if this matters for your use case.