All Products
Search
Document Center

Realtime Compute for Apache Flink:Develop custom triggers

Last Updated:Jul 08, 2025

In Apache Flink, a window trigger determines when data in a window is ready for computation and output. Flink provides various built-in triggers for common use cases. But in complex business scenarios, custom triggers help you flexibly implement your business logic.

Overview

What triggers do

  1. Monitor data elements entering a window.

  2. Determine whether to trigger the processing based on predefined conditions, such as time and event count.

  3. Return one of the following operations:

    • CONTINUE: Continue collecting data.

    • FIRE: Trigger computation and retain states.

    • FIRE_AND_PURGE: Trigger computation and clear states.

    • PURGE: Clear states without triggering computation.

Built-in triggers

Flink provides the following built-in triggers for time- or count-based window operations:

Trigger type

Description

EventTimeTrigger

Triggers a window when the watermark's time exceeds the window's end time. It is the default trigger for event-time windows.

ProcessingTimeTrigger

Triggers a window when the processing time reaches the window's end time. It is the default trigger for processing-time windows.

CountTrigger

Triggers a window when the number of events in the window reaches the specified threshold.

PurgingTrigger

Automatically purges a window when triggered. It wraps other triggers.

Important

If you explicitly set a specific trigger for a window, it will replace the default trigger. For example, if you set a CountTrigger for an event-time window, EventTimeTrigger will no longer take effect.

In real-world use cases, we often need to:

  • Use more than one trigger condition, such as triggering a window after 5 events or 1 minute.

  • Trigger a window based on specific events, such as user logout and order completion.

  • Manage a window's lifecycle, like early firing, delayed firing, and multiple firing.

  • Avoid default behavior's side effects, such as a window being triggered by late data again.

These complex requirements cannot be satisfied by built-in triggers, making custom triggers necessary.

Create a custom trigger

  1. Inherit the abstract class Trigger<T, W>.

    • T: the type of data in the window.

    • W: the window type, such as TimeWindow or its subclass.

  2. Override the core methods.

    public abstract class Trigger<T, W extends Window> implements Serializable {
        public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx);
        public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx);
        public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx);
        public void clear(W window, TriggerContext ctx);
        public boolean canMerge();
        public void onMerge(W window, OnMergeContext mergeContext);
    }

    Methods include:

    Method

    Description

    Operation

    onElement()

    Called when each new event enters the window.

    Determines whether a trigger condition, such as an event count threshold or the arrival of a specific event, is met.

    onProcessingTime()

    Called when a registered processing-time timer fires.

    Used for processing-time window operations. This method is not commonly used.

    onEventTime()

    Called when a registered event-time timer fires.

    Closes a window when the watermark hits the end timestamp.

    clear()

    Clears a window.

    Cleans up window states to prevent memory leaks.

    canMerge()

    onMerge()

    Called when windows (such as session windows) are merged.

    If the method used for merging windows, you must correctly update the timers.

  3. Use TriggerContext to manage states and timers.

    1. State management: Obtain window states, such as counters, through ctx.getPartitionedState(StateDescriptor).

    2. Timer management: Register timers through ctx.registerEventTimeTimer(timestamp).

    3. State cleanup: Use state.clear() in clear() to clear states.

    4. Timer deletion: Oftentimes, you do not need to manually delete timers. Flink automatically cleans them up when a window closes.

Example

In a 1-hour tumbling event-time window, the window computation is triggered when the window ends. Additionally, an immediate computation is triggered upon the arrival of a user's fifth event within that window. This early trigger fires at most once per window.

Sample code:

public class CustomCountTrigger extends Trigger<UserEvent, TimeWindow> {

    // Record the number of elements of each key in a window
    private final ValueStateDescriptor<Integer> countStateDesc =
            new ValueStateDescriptor<>("count", Integer.class);
    // Record whether computation has already been triggered
    private final ValueStateDescriptor<Boolean> flagStateDesc =
            new ValueStateDescriptor<>("flag", Boolean.class);

    // Call onElement() when a new element enters the window
    @Override
    public TriggerResult onElement(UserEvent event, long timestamp, TimeWindow window, Trigger.TriggerContext ctx) throws Exception {
        // Get the count state of the current key in the current window
        ValueState<Integer> countState = ctx.getPartitionedState(countStateDesc);
        ValueState<Boolean> flagState = ctx.getPartitionedState(flagStateDesc);
        int count = countState.value() == null ? 0 : countState.value();
        boolean flag = flagState.value() == null ? false : flagState.value();

        // Increment count for each element
        count += 1;
        countState.update(count); // Update state
        
        // When the count reaches 5, trigger window computation immediately
        if (count >= 5 && !flag) {
            flagState.update(true); // Update state to ensure this additional computation is triggered only once
            return TriggerResult.FIRE;
        } else {
            return TriggerResult.CONTINUE;
        }
    }

    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext triggerContext) throws Exception {
        // Skip the processing-time timer in this trigger.
        return TriggerResult.CONTINUE;
    }

    // Call onEventTime() when the registered event-time timer fires (e.g., window end time)
    @Override
    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
            return TriggerResult.FIRE_AND_PURGE; // Trigger is fired and window is cleared
    }

    @Override
    public void clear(TimeWindow timeWindow, TriggerContext ctx) throws Exception {
        // Clean up window states
        ctx.getPartitionedState(countStateDesc).clear();
        ctx.getPartitionedState(flagStateDesc).clear();
    }
}

Trigger usage: The trigger must be used with window operations during stream processing.

DataStream<UserEvent> source = ...; // Existing input stream

source.keyBy(keySelector)  // Such as .keyBy(value -> value.userId) to group elements by user
      .window(TumblingEventTimeWindows.of(Time.seconds(60))) // Set the tumbling window size to 60 seconds
      .trigger(new CustomCountTrigger()) // Custom trigger: trigger after 5 elements or window timeout
      .process(new ProcessWindowFunction<UserEvent, String, KeyedType, TimeWindow>() {

          @Override
          public void process(KeyedType key, Context context, Iterable<UserEvent> elements, Collector<String> out) {
              int count = 0;

              // Iterate through all elements in the window, count those with action == 1
              for (UserEvent event : elements) {
                  if (event.action == 1) {
                      count++;
                  }
              }

              // Output result
              out.collect("Key: " + key + ", Access count: " + count);
          }
      })
      .print();

Result verification: If 8 access records from the same user are received in one minute, the output is 2 records:

Key: 101, Access count: 5       // As count >= 5, early computation is triggered, but window states aren't cleaned.
Key: 101, Access count: 8      //  Triggers computation when the window closes and clears state.

Extensions and summary

Advanced use cases and implementations

Use case 1

Trigger computation more than once before a window ends for alerting.

  1. Remove flagStateDesc to enable triggering computation more than once.

  2. Alternatively, add a count marker to end a window only after a certain number of trigger firing (such as for alert events).

    ### After removing flagStateDesc, 5 records are ouput.  
    Key: 101, Access count: 5       // count >= 5, triggers early computation and doesn't clear the state.
    Key: 101, Access count: 6       // count >= 5, triggers early computation.
    Key: 101, Access count: 7      // count >= 5, triggers early computation.
    Key: 101, Access count: 8      // count >= 5, triggers early computation.
    Key: 101, Access count: 8      // Trigger computation when the window ends, and clears the state.
  3. The trigger only decides when to trigger computation. The actual object information is obtained from .process, where you can specify producing different collect results based on quantity and condition states.

Use case 2

Window closure requires the advancement of watermarks. How can we ensure timely window computation if no data is generated for a long period of time?

Solution

Dependent on watermark?

Timely triggering guaranteed?

Suitable for out-of-order data?

Suitable scenarios

Use processing-time windows

No

Yes

No

Event-time processing is not required.

Use the withIdleness method

Yes

No (depends on the watermark interval)

Yes

Suitable for simple scenarios, such as an input partition is idle.

Use a custom watermark generator

Yes

Yes (a watermark is periodically updated)

Yes

This is the standard approach.

Register a timer

Yes (optional)

Yes (fault-tolerant)

No (forced window closure requires appropriate timing)

Enhanced reliability is required.

Send heartbeat messages from external systems

No

Yes

Yes

Kafka requires additional maintenance, while task orchestration does not.

  • Solution 1: Use processing-time window

    If you don't need event time semantics (namely, the time that events occur is not a concern), you can use processing-time windows:

    .keyBy(keySelector)
    .window(TumblingProcessingTimeWindows.of(Time.seconds(60))) // Use processing time
    .process(new MyProcessWindowFunction())
  • Solution 2: Use the withIdleness method

    Flink's WatermarkStrategy provides the withIdleness method to automatically mark a data source as idle after a specified period of inactivity, preventing it from blocking watermark generation.

    // An idle data source will not longer involve in the computation of minimum watermarks, and will not hold back the advancement of watermarks of active data sources.
    WatermarkStrategy
        .<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
        .withIdleness(Duration.ofMinutes(1));  //Indicates that if a data source or partition has no events for 1 minute, it is marked as idle
  • Solution 3: Use a custom watermark generator

    If you need to use event time semantics, define a custom watermark generator that ensures watermark continues advancing even if no new data arrives:

    • Record the arrival time of the latest event in the onEvent() method.

    • Check the interval between the current time and the last time an event is received in the onPeriodicEmit() method.

    • If the interval exceeds the set threshold, consider the data source as idle, skip generating a watermark or directly generate a specific watermark.

    public class IdleAwareWatermarkGenerator implements WatermarkGenerator<MyEvent> {
        private long lastEventTimestamp = Long.MIN_VALUE;
        private final long maxIdleTimeMs; // Max idle time
    
        public IdleAwareWatermarkGenerator(long maxIdleTimeMs) {
            this.maxIdleTimeMs = maxIdleTimeMs;
        }
    
        @Override
        public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
            lastEventTimestamp = Math.max(lastEventTimestamp, eventTimestamp);
        }
    
        @Override
        public void onPeriodicEmit(WatermarkOutput output) {
            long currentTime = System.currentTimeMillis();
            if (lastEventTimestamp == Long.MIN_VALUE || currentTime - lastEventTimestamp > maxIdleTimeMs) {
                // If no events arrive for a long time, don't emit new watermarks
                return;
            }
            output.emitWatermark(new Watermark(lastEventTimestamp));
        }
    }
    WatermarkStrategy<MyEvent> strategy = WatermarkStrategy
        .forGenerator((ctx) -> new IdleAwareWatermarkGenerator(60_000)) // Set maximum idle time to 60 seconds
        .withTimestampAssigner((event, timestamp) -> event.getEventTime());
  • Solution 4: Register a timer

    @Override
    public TriggerResult onElement(Event event, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
        ctx.registerEventTimeTimer(window.maxTimestamp()); // Register an event-time timer
        ctx.registerProcessingTimeTimer(window.maxTimestamp() + 1000); // Fault tolerance: ensure a timer is triggered even without events
        return TriggerResult.CONTINUE;
    }
    
    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
        return TriggerResult.FIRE_AND_PURGE; // Forced triggering and cleanup
    }
  • Solution 5: Send heartbeat messages from external systems

    Use Kafka or workflows to send regular heartbeat messages downstream to trigger window closure.

Summary

Key points

Description

Understand a window's lifecycle

FIRE does not clear window states while FIRE_AND_PURGE closes and cleans up a window. Use them properly to fire triggers multiple times or once.

Use state and timers appropriately

Use ValueState to track counts and flags and release resources with clear().

Override methods

Implement the following methods: onElement, onEventTime, onProcessingTime, and clear to ensure completeness.

Enable window merging (such as session windows)

For mergeable windows like session windows, implement canMerge() and onMerge() to maintain timer consistency.

Avoid repeated firing

Control the number of times a trigger can fire, especially when events may continue to arrive after FIRE is called.

Complete code

CustomCountTrigger

import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

public class CustomCountTrigger extends Trigger<UserEvent, TimeWindow> {

    // Record the number of elements of each key in the window
    private final ValueStateDescriptor<Integer> countStateDesc =
            new ValueStateDescriptor<>("count", Integer.class);
    // Record whether computation has already been triggered
    private final ValueStateDescriptor<Boolean> flagStateDesc =
            new ValueStateDescriptor<>("flag", Boolean.class);

    // onElement is called when a new element enters the window
    @Override
    public TriggerResult onElement(UserEvent event, long timestamp, TimeWindow window, Trigger.TriggerContext ctx) throws Exception {
        // Get the state of an element of the current key in the current window
        ValueState<Integer> countState = ctx.getPartitionedState(countStateDesc);
        ValueState<Boolean> flagState = ctx.getPartitionedState(flagStateDesc);
        int count = countState.value() == null ? 0 : countState.value();
        boolean flag = flagState.value() == null ? false : flagState.value();

        // Increment count for each new element
        count += 1;
        countState.update(count); // Update the state

        // If count reaches 5, trigger window computation immediately
        if (count >= 5 && !flag) {
            flagState.update(true); // Update state to ensure additional computation is triggered only once
            return TriggerResult.FIRE;
        } else {
            return TriggerResult.CONTINUE;
        }
    }

    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext triggerContext) throws Exception {
        // Skip processing-time timers in this trigger
        return TriggerResult.CONTINUE;
    }

    // onEventTime is called when registered event-time timer is triggered (such as upon window closure)
    @Override
    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
            return TriggerResult.FIRE_AND_PURGE; // Trigger is fired and and the window is cleared
    }

    @Override
    public void clear(TimeWindow timeWindow, TriggerContext ctx) throws Exception {
        // Clean up window state
        ctx.getPartitionedState(countStateDesc).clear();
        ctx.getPartitionedState(flagStateDesc).clear();
    }
}

KafkaTriggerTest

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.time.ZoneId;
import java.time.format.DateTimeFormatter;


public class KafkaTriggerTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                .setBootstrapServers("<BootstrapServers>")
                .setTopics("trigger")
                .setGroupId("trigger")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
                .build();

        // Example data: 101,alie,1,2025-6-10 10:02:00
        DataStream<UserEvent> userEventStream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source")
                .map(new MapFunction<String, UserEvent>() {
                    @Override
                    public UserEvent map(String value) throws Exception {
                        String[] fields = value.split(",");
                        return new UserEvent(
                                Integer.parseInt(fields[0]),
                                fields[1],
                                fields[2],
                                fields[3]
                        );
                    }
                });

        WatermarkStrategy<UserEvent> watermarkStrategy = WatermarkStrategy
                .<UserEvent>forBoundedOutOfOrderness(java.time.Duration.ofSeconds(2))
                .withTimestampAssigner((event, timestamp) -> {
                    DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
                    return java.time.LocalDateTime.parse(event.getEvent_time(), formatter).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
                });

        DataStream<UserEvent> timestampedStream = userEventStream.assignTimestampsAndWatermarks(watermarkStrategy);

        timestampedStream
                .keyBy(UserEvent::getUser_id)
                .window(TumblingEventTimeWindows.of(Time.seconds(60))) // Set the tumbling window size to 60 seconds
                .trigger(new CustomCountTrigger()) // Custom trigger: fired upon the arrival of 5 data elements or timeout
                .process(new ProcessWindowFunction<UserEvent, String, Integer, TimeWindow>() {

                    @Override
                             public void process(Integer userId, ProcessWindowFunction<UserEvent, String, Integer, TimeWindow>.Context context, Iterable<UserEvent> userEvents, Collector<String> collector) throws Exception {
                                 int count = 0;
                                 for (UserEvent event : userEvents) {
                                     if (event.getEvent_type().equals("1"))
                                         count++;
                                 }
                                 collector.collect("User ID: " + userId + " | Count: " + count + " | Window: " + context.window());
                             }
                         }).print();
        env.execute("Kafka Partitioner Data Stream");
    }
}

UserEvent

public class UserEvent {
      private  int  user_id;
      private  String username;
      private  String event_type;
      private  String event_time;

      public UserEvent(int user_id, String username, String event_type, String event_time) {
              this.user_id = user_id;
              this.username = username;
              this.event_type = event_type;
              this.event_time = event_time;
      }
      public String toString() {
              return "user_id:" + user_id + " username:" + username + " event_type:" + event_type + " event_time:" + event_time;
      }

    public int getUser_id() {
        return user_id;
    }

    public void setUser_id(int user_id) {
        this.user_id = user_id;
    }

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public String getEvent_type() {
        return event_type;
    }

    public void setEvent_type(String event_type) {
        this.event_type = event_type;
    }

    public String getEvent_time() {
        return event_time;
    }

    public void setEvent_time(String event_time) {
        this.event_time = event_time;
    }
}