This topic describes how to use timers in DataStream. This topic also provides suggestions and precautions when you use timers.
What is a timer?
Flink provides a timer mechanism.
In most cases, Flink deployments are driven to compute data based on events. In specific scenarios, Flink deployments are driven to compute and send data based on the processing time (ProcessingTime) or the event time (EventTime). In this case, timers are required. An operator can register a timer. When the time reaches the specified processing time or the event-time watermark reaches the specified event time, the specified computing logic is triggered. Windows in Flink are processed based on timers.
In most cases, the requirements in the preceding scenarios can be met by using windows in SQL. However, Flink deployments need to meet more complex and customized requirements in specific scenarios. In this case, you can use the timer mechanism that is supported by DataStream APIs.
How do I use timers?
Developers of Flink deployments can use KeyedProcessFunction on KeyedStream, KeyedCoProcessFunction on ConnectedStream, or KeyedBroadcastProcessFunction on BroadcastConnectedStream. The TimerService service that is provided by these functions allows you to use timers. KeyedProcessFunction is the most commonly used function when you use timers. The following example describes how to use timers in KeyedProcessFunction.
Similar to RichFlatMapFunction, KeyedProcessFunction can be used to process a single data record and generate zero or any number of data records. However, KeyedProcessFunction can be used only on KeyedStream and provides timers.
Timers are saved and restored based on KeyedState. Therefore, you can use timers only in KeyedProcessFunction, rather than in ProcessFunction.
public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction {
// Process the input data.
public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;
// Initiate a callback at the time that is specified by a timer.
public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}
// Obtain the context that is used in the processed data. This context is the base class of the context in the callback of the timer.
public abstract class Context {
// Obtain the timestamp in the processed data or the current timer.
public abstract Long timestamp();
// Obtain TimerService to register a new timer or delete the current timer.
public abstract TimerService timerService();
// Use the data as the side output.
public abstract <X> void output(OutputTag<X> outputTag, X value);
// Obtain the key of the processed data.
public abstract K getCurrentKey();
}
// Obtain the context in the callback of the timer.
public abstract class OnTimerContext extends Context {
// Obtain TimeDomain of the current timer. TimeDomain specifies whether the timer is a processing-time timer or an event-time timer.
public abstract TimeDomain timeDomain();
// Obtain the key of the current timer.
public abstract K getCurrentKey();
}
}
KeyedProcessFunction.Context provides access to TimerService. When you use the processElement method or the onTimer method, you can use TimerService to register a new timer or delete an existing timer. The unit of the registered timer is milliseconds.
public interface TimerService {
// Obtain the current processing time.
long currentProcessingTime();
// Obtain the current event-time watermark.
long currentWatermark();
// Register a processing-time timer.
void registerProcessingTimeTimer(long time);
// Register an event-time timer.
void registerEventTimeTimer(long time);
// Delete the processing-time timer.
void deleteProcessingTimeTimer(long time);
// Delete the event-time timer.
void deleteEventTimeTimer(long time);
}
When you register a timer in the processElement method, the key of the current processed data is used. When you register a timer in the onTimer method, the key of the current timer is used. The same key has only one timer at the same time. Therefore, computation is triggered only once. Different keys can separately trigger computation. Each registered timer is triggered only once. If you want to apply the logic that periodically triggers a timer, you must register a timer that is triggered at the next time point in the onTimer method.
Example
The windows of Flink are used based on timers. This example shows the logic of calculating the sum of input values and generating output data every minute in windows that are based on the event time. The following sample code provides an example on how to use windows in a DataStream API to implement the logic.
DataStream<Tuple2<String, Long>> sum = inputs
.keyBy(input->input.f0)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.reduce(new SumReduceFunction());
The following sample code provides an example on how to use KeyedProcessFunction and the timer mechanism to implement the logic.
DataStream<Tuple2<String, Long>> sum = inputs
.keyBy(input -> input.f0)
.process(new KeyedProcessFunction<String, Tuple2<String, Long>, Tuple2<String, Long>>() {
// Record the state of the sum in a window.
private ValueState<Long> sumState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
sumState = getRuntimeContext().getState(new ValueStateDescriptor<>("sum", Long.class));
}
@Override
public void processElement(Tuple2<String, Long> value, Context ctx, Collector<Tuple2<String, Long>> out) throws Exception {
if (sumState.value() == null) {
// When the data of a key is processed for the first time or the data is processed for the first time after the timer is triggered, KeyedProcessFunction calculates the time window based on the event time of the current data and registers the timer that is triggered at the end time of the window.
ctx.timerService().registerEventTimeTimer(getWindowStartWithOffset(ctx.timestamp(), 0, 60 * 1000) + 60 * 1000);
sumState.update(value.f1);
} else {
// If the preceding conditions are not met, KeyedProcessFunction accumulates the input values.
sumState.update(sumState.value() + value.f1);
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out) throws Exception {
// Generate the sum of input values that are obtained from the time the timer is registered to the time the onTimer method is called back. Then, delete the accumulated value.
out.collect(new Tuple2<>(ctx.getCurrentKey(), sumState.value()));
sumState.clear();
}
// The getWindowsStartWithOffset method is copied from TimeWindow.java and is used to calculate the start time of the window to which a specified timestamp belongs.
private long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
final long remainder = (timestamp - offset) % windowSize;
// handle both positive and negative cases
if (remainder < 0) {
return timestamp - (remainder + windowSize);
} else {
return timestamp - remainder;
}
}
});
When data is entered for the key for the first time, KeyedProcessFunction calculates the time window based on the event time of the current data, registers the timer that is triggered at the end time of this window, and starts to accumulate data. After the event-time watermark reaches the specified time, Flink calls the onTimer method to export the accumulated value and clears the accumulation state. This process is repeated when new data is entered for this key.
The logic of the preceding two methods is the same. After a timer is triggered to process the data of the key, no new data is entered for the key and no output data is generated for the key. If a limited number of input keys exist in a deployment and you want to obtain an accumulated value at the same event time interval after data is entered for a key once regardless of whether new data is entered in the future, you can modify the logic of the OnTimer method. The following sample code provides an example on how to modify the logic of the method.
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out) throws Exception {
// Generate the sum value that is obtained from the time at which the timer is registered to the time at which the onTimer method is called back.
out.collect(new Tuple2<>(ctx.getCurrentKey(), sumState.value()));
// Reset but do not delete the accumulated value.
sumState.update(0L);
// Register the timer that is used to generate the next accumulated value. The timestamp specifies the end time of the window. You can add 60s to the next window.
ctx.timerService().registerEventTimeTimer(timestamp + 60 * 1000);
}
This way, the value of sumState.value()
can never be null after a value is assigned to sumState.value(). The accumulated value of the key is generated at a regular interval regardless of whether data is entered for the key. If no data is entered for the key, the output value is 0.
The interval at which the accumulated value of the key is generated is the event time interval that is specified by the event-time watermark.
If you want to aggregate data based on the processing time instead of the event time, you can modify the logic that is used to register a timer and obtain the time in the processElement method. The following sample code provides an example on how to modify the logic of the method.
@Override
public void processElement(Tuple2<String, Long> value, Context ctx, Collector<Tuple2<String, Long>> out) throws Exception {
if (sumState.value() == null) {
// Calculate the time window based on the current processing time, and register the timer that is triggered at the end time of the window.
ctx.timerService().registerProcessingTimeTimer(getWindowStartWithOffset(ctx.timerService().currentProcessingTime(), 0, 60 * 1000) + 60 * 1000);
sumState.update(value.f1);
} else {
sumState.update(sumState.value() + value.f1);
}
}
When the processing time reaches the specified time, the onTimer method is called.
You can modify the computing logic of states and the logic of output data based on the preceding logic to meet other similar computing requirements.
The timer mechanism is also required to support the business logic of heartbeat timeout alerts. If you use only windows, the requirements for heartbeat timeout alerts cannot be met. If no new data is entered for a key within 1 minute after data is entered for the key once, an alert message is sent. For your convenience, only keys are used for data input. The following sample code provides an example on how to implement the logic.
DataStream<String> sum = inputs
.keyBy(input->input)
.process(new KeyedProcessFunction<String, String, String>() {
// Record the state of the previous timeout period.
private ValueState<Long> lastTimerState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
lastTimerState = getRuntimeContext().getState(new ValueStateDescriptor<>("timer", Long.class));
}
@Override
public void processElement(String value, Context ctx, Collector<String> out) throws Exception {
if (lastTimerState.value() != null) {
// Delete the previously registered timer that expires.
ctx.timerService().deleteProcessingTimeTimer(lastTimerState.value());
}
// Register a new timer and record the timer in the state for subsequent clearing.
long timeout = ctx.timerService().currentProcessingTime() + 60 * 1000;
ctx.timerService().registerProcessingTimeTimer(timeout);
lastTimerState.update(timeout);
// Generate normal output data.
out.collect(value);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
// This method indicates heartbeat timeout. A heartbeat timeout message is sent. You can also use SideOutput instead of the default output stream for data output.
out.collect("Heartbeat timeout:" + ctx.getCurrentKey());
});
Suggestions
In most cases, we recommend that you use windows if windows can meet your business requirements.
The processElement and onTimer methods of KeyedProcessFunction cannot be called at the same time. Therefore, you do not need to worry about synchronization issues. However, the logic of the onTimer method may block data processing.
Flink does not provide an API for querying the registration status of a timer. If a timer is expected to be deleted, the function that you use must record the time when the timer is registered. The function can be KeyedProcessFunction, KeyedCoProcessFunction, or KeyedBroadcastProcessFunction.
Timers are saved in checkpoints. When a deployment is restored from a failover or is restarted from a savepoint, the timers are also restored.
The processing-time timers that reach the specified time are triggered. Therefore, a large number of timers may be triggered for the deployment to process and send data within a short period of time after the deployment starts.
The timer that uses the event time is triggered after the event-time watermark is received. Therefore, a large number of timers may be triggered for the deployment to process and send data after the event-time watermark is updated. The event-time watermark is updated after the deployment has been started for an extended period of time.
Timers are related to keys and are stored in KeyedState in checkpoints. Therefore, timers can be used only in KeyedStream or in ConnectedStream or BroadcastConnectedStream that has keys. If you want to use timers for a streaming deployment that does not have keys, you can use one of the following methods:
If the logic of timers is independent of a specific field value and a timer is independently used for each data record, you can use a UUID in the data as the key in the keyBy method.
NoteThis field must exist in the input data and cannot be used to generate a random value for the keyBy method.
If a timer is shared for global aggregation, you can use a constant as the key in the keyBy method and set the parallelism to 1.
Precautions
Avoid a situation in which a large number of timers are triggered at the same time. For example, if all timers for millions of keys are specified to trigger on the hour, we recommend that you adjust the trigger time of the timers to several minutes before or after the hour or a longer time range.
Avoid repeatedly registering timers in the processElement and onTimer methods because this operation can cause the number of timers to dramatically increase.
In most cases, the overhead of timers is small. Therefore, you can register timers for a large number of keys. However, we recommend that you take note of the checkpoint time and memory usage. If the checkpoint time or memory usage exceeds a specified threshold after timers are used, you may need to optimize the logic of timers or use another method.
If you use processing-time timers on a bounded stream, the processing-time timer that does not reach the specified time is ignored when data processing ends. In this case, data may be lost.