All Products
Search
Document Center

Realtime Compute for Apache Flink:Develop custom Flink triggers

Last Updated:Mar 25, 2026

A window trigger decides when to compute and emit results for a window. Flink's built-in triggers cover time- and count-based conditions, but complex business logic often requires a custom trigger — for example, firing early when a specific event arrives, or combining time and event-count conditions.

How it works

When events flow into a windowed stream, the trigger is called on each arrival. It returns one of four TriggerResult values that tell Flink what to do next:

ResultEffect
CONTINUEDo nothing. Keep collecting events.
FIRECompute the window now. Retain all state.
FIRE_AND_PURGECompute the window and clear its contents.
PURGEClear window contents without computing.
Important

FIRE_AND_PURGE removes the events stored in the window, but it does not clear trigger state (such as ValueState counters). Clear trigger state explicitly in the clear() method.

Built-in triggers

Flink provides four built-in triggers:

TriggerDescription
EventTimeTriggerFires when the watermark exceeds the window's end time. Default for event-time windows.
ProcessingTimeTriggerFires when processing time reaches the window's end time. Default for processing-time windows.
CountTriggerFires when the number of events in the window reaches the specified threshold.
PurgingTriggerWraps another trigger and automatically purges the window after it fires.
Important

Calling .trigger() on a window replaces its default trigger entirely. For example, attaching a CountTrigger to a TumblingEventTimeWindows disables time-based firing — the window only fires by count. Write a custom trigger when you need both conditions to apply simultaneously.

Built-in triggers are not enough when you need to:

  • Combine multiple conditions (for example, fire after 5 events or after 1 minute, whichever comes first)

  • React to a specific event type, such as a user logout or order completion

  • Control the window lifecycle precisely — early firing, delayed firing, or multiple firings per window

  • Prevent late data from re-triggering a window that already fired

Create a custom trigger

Extend the abstract class Trigger<T, W>:

  • T: the event type in the window

  • W: the window type, such as TimeWindow

public abstract class Trigger<T, W extends Window> implements Serializable {
    public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx);
    public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx);
    public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx);
    public void clear(W window, TriggerContext ctx);
    public boolean canMerge();
    public void onMerge(W window, OnMergeContext mergeContext);
}

Methods:

MethodCalled whenTypical use
onElement()A new event enters the windowCheck whether a trigger condition is met (count threshold, specific event type)
onEventTime()A registered event-time timer firesClose the window when the watermark hits the end timestamp
onProcessingTime()A registered processing-time timer firesProcessing-time window operations (less common)
clear()The window is clearedClean up ValueState to prevent memory leaks
canMerge() / onMerge()Windows are merged (for example, session windows)Update timers correctly during a merge

Two things to note:

  1. onElement(), onEventTime(), and onProcessingTime() each return a TriggerResult. The return value determines whether Flink fires, purges, or continues for that specific invocation.

  2. All three methods can register timers — via ctx.registerEventTimeTimer(timestamp) or ctx.registerProcessingTimeTimer(timestamp) — to schedule future callbacks.

Manage state and timers with `TriggerContext`:

  • Read and write per-key, per-window state: ctx.getPartitionedState(StateDescriptor)

  • Register event-time timers: ctx.registerEventTimeTimer(timestamp)

  • Register processing-time timers: ctx.registerProcessingTimeTimer(timestamp)

  • Clean up state in clear(): call state.clear()

  • Timer cleanup: Flink automatically deletes timers when a window closes. Manual deletion is rarely needed.

Example: early firing on the fifth event

This example uses a 1-minute tumbling event-time window. In addition to the normal end-of-window firing, it fires early when a user's fifth event arrives within the window. The early firing happens at most once per window.

public class CustomCountTrigger extends Trigger<UserEvent, TimeWindow> {

    // Per-key, per-window event counter
    private final ValueStateDescriptor<Integer> countStateDesc =
            new ValueStateDescriptor<>("count", Integer.class);
    // Flag to ensure early firing happens only once per window
    private final ValueStateDescriptor<Boolean> flagStateDesc =
            new ValueStateDescriptor<>("flag", Boolean.class);

    @Override
    public TriggerResult onElement(UserEvent event, long timestamp, TimeWindow window, Trigger.TriggerContext ctx) throws Exception {
        ValueState<Integer> countState = ctx.getPartitionedState(countStateDesc);
        ValueState<Boolean> flagState = ctx.getPartitionedState(flagStateDesc);
        int count = countState.value() == null ? 0 : countState.value();
        boolean flag = flagState.value() == null ? false : flagState.value();

        count += 1;
        countState.update(count);

        // Fire early on the fifth event, but only once per window
        if (count >= 5 && !flag) {
            flagState.update(true);
            return TriggerResult.FIRE;
        }
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE; // Not used in this trigger
    }

    @Override
    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
        return TriggerResult.FIRE_AND_PURGE; // Normal end-of-window firing; clears window contents
    }

    @Override
    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
        ctx.getPartitionedState(countStateDesc).clear();
        ctx.getPartitionedState(flagStateDesc).clear();
    }
}

Attach the trigger to a windowed stream:

DataStream<UserEvent> source = ...; // Existing input stream

source.keyBy(event -> event.getUser_id())
      .window(TumblingEventTimeWindows.of(Time.seconds(60)))
      .trigger(new CustomCountTrigger())
      .process(new ProcessWindowFunction<UserEvent, String, Integer, TimeWindow>() {

          @Override
          public void process(Integer key, Context context, Iterable<UserEvent> elements, Collector<String> out) {
              int count = 0;
              for (UserEvent event : elements) {
                  if (event.action == 1) count++;
              }
              out.collect("Key: " + key + ", Access count: " + count);
          }
      })
      .print();

Verify the output: If 8 events arrive for the same user within one minute, you get two output records:

Key: 101, Access count: 5    // Early firing at the fifth event; window state is retained
Key: 101, Access count: 8    // End-of-window firing; window contents are cleared

Advanced use cases

Fire multiple times before the window closes

Remove flagStateDesc to allow the trigger to fire on every event after the count reaches 5. This is useful for streaming alerts where you want output on each new qualifying event.

With flagStateDesc removed, the same 8-event scenario produces 5 records:

Key: 101, Access count: 5    // count >= 5, early firing, state retained
Key: 101, Access count: 6    // count >= 5, early firing
Key: 101, Access count: 7    // count >= 5, early firing
Key: 101, Access count: 8    // count >= 5, early firing
Key: 101, Access count: 8    // End-of-window firing; window contents cleared

The trigger controls when to fire. The actual output content — which events to include, how to aggregate — is determined in the .process() function. Use state inside .process() to differentiate early results from final results.

Handle idle sources: ensure timely window closure

Event-time window closure depends on watermark advancement. If no events arrive for a long period, watermarks stop advancing and windows never close. The following approaches address this:

ApproachWatermark dependencyTimely firingHandles out-of-order dataWhen to use
Processing-time windowsNoneYesNoEvent-time semantics not required
withIdleness()YesDepends on watermark intervalYesSimple scenarios with idle partitions
Custom watermark generatorYesYes (periodic update)YesStandard approach for event-time semantics
Register a processing-time timer as fallbackOptionalYes (fault-tolerant)No (forces window closure)Enhanced reliability required
Heartbeat messages from external systemsNoneYesYesKafka or workflow-orchestrated pipelines

Approach 1: Use processing-time windows

If event-time semantics are not needed, switch to a processing-time window:

.keyBy(keySelector)
.window(TumblingProcessingTimeWindows.of(Time.seconds(60)))
.process(new MyProcessWindowFunction())

Approach 2: Mark idle sources with `withIdleness()`

WatermarkStrategy.withIdleness() marks a source as idle after a period of inactivity. Idle sources are excluded from the global minimum watermark calculation, so they no longer hold back active sources.

WatermarkStrategy
    .<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
    .withIdleness(Duration.ofMinutes(1)); // Mark source as idle after 1 minute with no events

Approach 3: Use a custom watermark generator

Implement WatermarkGenerator to keep the watermark advancing even when no events arrive:

  • In onEvent(): record the latest event timestamp.

  • In onPeriodicEmit(): if the elapsed time since the last event exceeds the threshold, skip watermark emission (treating the source as idle); otherwise, emit a watermark based on the latest timestamp.

public class IdleAwareWatermarkGenerator implements WatermarkGenerator<MyEvent> {
    private long lastEventTimestamp = Long.MIN_VALUE;
    private final long maxIdleTimeMs;

    public IdleAwareWatermarkGenerator(long maxIdleTimeMs) {
        this.maxIdleTimeMs = maxIdleTimeMs;
    }

    @Override
    public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
        lastEventTimestamp = Math.max(lastEventTimestamp, eventTimestamp);
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        long currentTime = System.currentTimeMillis();
        if (lastEventTimestamp == Long.MIN_VALUE || currentTime - lastEventTimestamp > maxIdleTimeMs) {
            return; // Source is idle; skip watermark emission
        }
        output.emitWatermark(new Watermark(lastEventTimestamp));
    }
}
WatermarkStrategy<MyEvent> strategy = WatermarkStrategy
    .forGenerator(ctx -> new IdleAwareWatermarkGenerator(60_000)) // 60-second idle threshold
    .withTimestampAssigner((event, timestamp) -> event.getEventTime());

Approach 4: Register a processing-time timer as fallback

Register both an event-time timer and a processing-time timer in onElement(). The processing-time timer fires regardless of watermark state, acting as a safety net:

@Override
public TriggerResult onElement(Event event, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
    ctx.registerEventTimeTimer(window.maxTimestamp());
    ctx.registerProcessingTimeTimer(window.maxTimestamp() + 1000); // Fallback: fires even without new events
    return TriggerResult.CONTINUE;
}

@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
    return TriggerResult.FIRE_AND_PURGE; // Force window closure
}

Approach 5: Send heartbeat messages from external systems

Use Kafka or workflows to periodically inject synthetic events downstream. These heartbeats advance the watermark and close idle windows without changing your trigger logic.

Key points

TopicGuidance
FIRE vs FIRE_AND_PURGEFIRE retains window contents for multiple firings. FIRE_AND_PURGE clears them. Neither clears trigger state — do that in clear().
State managementUse ValueState for per-key, per-window counters and flags. Always clean up in clear() to avoid memory leaks.
Required methodsImplement onElement(), onEventTime(), onProcessingTime(), and clear().
Session window mergingFor mergeable windows such as session windows, implement canMerge() and onMerge() to keep timer state consistent across merged windows.
Controlling firing frequencyUse a boolean flag (ValueState<Boolean>) to cap the number of early firings per window when events continue arriving after FIRE.

Complete code

CustomCountTrigger

import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

public class CustomCountTrigger extends Trigger<UserEvent, TimeWindow> {

    private final ValueStateDescriptor<Integer> countStateDesc =
            new ValueStateDescriptor<>("count", Integer.class);
    private final ValueStateDescriptor<Boolean> flagStateDesc =
            new ValueStateDescriptor<>("flag", Boolean.class);

    @Override
    public TriggerResult onElement(UserEvent event, long timestamp, TimeWindow window, Trigger.TriggerContext ctx) throws Exception {
        ValueState<Integer> countState = ctx.getPartitionedState(countStateDesc);
        ValueState<Boolean> flagState = ctx.getPartitionedState(flagStateDesc);
        int count = countState.value() == null ? 0 : countState.value();
        boolean flag = flagState.value() == null ? false : flagState.value();

        count += 1;
        countState.update(count);

        if (count >= 5 && !flag) {
            flagState.update(true);
            return TriggerResult.FIRE;
        }
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
        return TriggerResult.FIRE_AND_PURGE;
    }

    @Override
    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
        ctx.getPartitionedState(countStateDesc).clear();
        ctx.getPartitionedState(flagStateDesc).clear();
    }
}

KafkaTriggerTest

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.time.ZoneId;
import java.time.format.DateTimeFormatter;

public class KafkaTriggerTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                .setBootstrapServers("<BootstrapServers>")
                .setTopics("trigger")
                .setGroupId("trigger")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
                .build();

        // Example record format: 101,alice,1,2025-06-10 10:02:00
        DataStream<UserEvent> userEventStream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source")
                .map(new MapFunction<String, UserEvent>() {
                    @Override
                    public UserEvent map(String value) throws Exception {
                        String[] fields = value.split(",");
                        return new UserEvent(
                                Integer.parseInt(fields[0]),
                                fields[1],
                                fields[2],
                                fields[3]
                        );
                    }
                });

        WatermarkStrategy<UserEvent> watermarkStrategy = WatermarkStrategy
                .<UserEvent>forBoundedOutOfOrderness(java.time.Duration.ofSeconds(2))
                .withTimestampAssigner((event, timestamp) -> {
                    DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
                    return java.time.LocalDateTime.parse(event.getEvent_time(), formatter)
                            .atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
                });

        DataStream<UserEvent> timestampedStream = userEventStream.assignTimestampsAndWatermarks(watermarkStrategy);

        timestampedStream
                .keyBy(UserEvent::getUser_id)
                .window(TumblingEventTimeWindows.of(Time.seconds(60)))
                .trigger(new CustomCountTrigger())
                .process(new ProcessWindowFunction<UserEvent, String, Integer, TimeWindow>() {
                    @Override
                    public void process(Integer userId, Context context, Iterable<UserEvent> userEvents, Collector<String> collector) throws Exception {
                        int count = 0;
                        for (UserEvent event : userEvents) {
                            if (event.getEvent_type().equals("1")) count++;
                        }
                        collector.collect("User ID: " + userId + " | Count: " + count + " | Window: " + context.window());
                    }
                }).print();

        env.execute("Kafka trigger test");
    }
}

UserEvent

public class UserEvent {
    private int user_id;
    private String username;
    private String event_type;
    private String event_time;

    public UserEvent(int user_id, String username, String event_type, String event_time) {
        this.user_id = user_id;
        this.username = username;
        this.event_type = event_type;
        this.event_time = event_time;
    }

    public String toString() {
        return "user_id:" + user_id + " username:" + username + " event_type:" + event_type + " event_time:" + event_time;
    }

    public int getUser_id() { return user_id; }
    public void setUser_id(int user_id) { this.user_id = user_id; }

    public String getUsername() { return username; }
    public void setUsername(String username) { this.username = username; }

    public String getEvent_type() { return event_type; }
    public void setEvent_type(String event_type) { this.event_type = event_type; }

    public String getEvent_time() { return event_time; }
    public void setEvent_time(String event_time) { this.event_time = event_time; }
}