すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:カスタムトリガーの開発

最終更新日:Jul 09, 2025

Apache Flink では、ウィンドウトリガーは、ウィンドウ内のデータが計算と出力の準備ができたタイミングを決定します。 Flink は、一般的なユースケース向けにさまざまな組み込みトリガーを提供しています。 しかし、複雑なビジネスシナリオでは、カスタムトリガーを使用すると、ビジネスロジックを柔軟に実装できます。

概要

トリガーの機能

  1. ウィンドウに入るデータ要素を監視します。

  2. 時間やイベント数など、事前定義された条件に基づいて処理をトリガーするかどうかを決定します。

  3. 次の操作のいずれかを返します。

    • CONTINUE: データの収集を続けます。

    • FIRE: 計算をトリガーし、状態を保持します。

    • FIRE_AND_PURGE: 計算をトリガーし、状態をクリアします。

    • PURGE: 計算をトリガーせずに状態をクリアします。

組み込みトリガー

Flink は、時間ベースまたはカウントベースのウィンドウ操作向けに、次の組み込みトリガーを提供しています。

トリガータイプ

説明

EventTimeTrigger

ウォーターマークの時刻がウィンドウの終了時刻を超えると、ウィンドウをトリガーします。 イベントタイムウィンドウのデフォルトトリガーです。

ProcessingTimeTrigger

処理時間がウィンドウの終了時刻に達すると、ウィンドウをトリガーします。 処理時間ウィンドウのデフォルトトリガーです。

CountTrigger

ウィンドウ内のイベント数が指定されたしきい値に達すると、ウィンドウをトリガーします。

PurgingTrigger

トリガーされると、ウィンドウを自動的に消去します。 他のトリガーをラップします。

重要

ウィンドウに特定のトリガーを明示的に設定すると、デフォルトトリガーが置き換えられます。 たとえば、イベントタイムウィンドウに CountTrigger を設定すると、EventTimeTrigger は有効になりません。

実際のユースケースでは、多くの場合、以下を行う必要があります。

  • 5 つのイベントまたは 1 分後にウィンドウをトリガーするなど、複数のトリガー条件を使用します。

  • ユーザーログアウトや注文完了など、特定のイベントに基づいてウィンドウをトリガーします。

  • 早期起動、遅延起動、複数起動など、ウィンドウのライフサイクルを管理します。

  • 遅延データによってウィンドウが再度トリガーされるなど、デフォルトの動作の副作用を回避します。

これらの複雑な要件は、組み込みトリガーでは満たすことができないため、カスタムトリガーが必要になります。

カスタムトリガーの作成

  1. 抽象クラス Trigger<T, W> を継承します。

    • T: ウィンドウ内のデータのタイプ。

    • W: TimeWindow などのウィンドウタイプ、またはそのサブクラス。

  2. コアメソッドをオーバーライドします。

    public abstract class Trigger<T, W extends Window> implements Serializable {
        public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx);
        public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx);
        public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx);
        public void clear(W window, TriggerContext ctx);
        public boolean canMerge();
        public void onMerge(W window, OnMergeContext mergeContext);
    }

    メソッドには以下が含まれます。

    メソッド

    説明

    操作

    onElement()

    新しいイベントがウィンドウに入るたびに呼び出されます。

    イベント数のしきい値や特定のイベントの到着など、トリガー条件が満たされているかどうかを判断します。

    onProcessingTime()

    登録済みの処理時間タイマーが起動すると呼び出されます。

    処理時間ウィンドウ操作に使用されます。 このメソッドはあまり使用されません。

    onEventTime()

    登録済みのイベントタイムタイマーが起動すると呼び出されます。

    ウォーターマークが終了タイムスタンプに達すると、ウィンドウを閉じます。

    clear()

    ウィンドウをクリアします。

    メモリリークを防ぐために、ウィンドウの状態をクリーンアップします。

    canMerge()

    onMerge()

    (セッションウィンドウなどの)ウィンドウがマージされると呼び出されます。

    ウィンドウのマージに使用されるメソッドの場合、タイマーを正しく更新する必要があります。

  3. TriggerContext を使用して、状態とタイマーを管理します。

    1. 状態管理: ctx.getPartitionedState(StateDescriptor) を介して、カウンターなどのウィンドウ状態を取得します。

    2. タイマー管理: ctx.registerEventTimeTimer(timestamp) を介してタイマーを登録します。

    3. 状態のクリーンアップ: clear()state.clear() を使用して状態をクリアします。

    4. タイマーの削除: 多くの場合、タイマーを手動で削除する必要はありません。 ウィンドウが閉じると、Flink によって自動的にクリーンアップされます。

1 時間のタンブリングイベントタイムウィンドウでは、ウィンドウが終了するとウィンドウ計算がトリガーされます。 さらに、そのウィンドウ内でユーザーの 5 番目のイベントが到着すると、即時計算がトリガーされます。 この早期トリガーは、ウィンドウごとに最大 1 回起動します。

サンプルコード:

public class CustomCountTrigger extends Trigger<UserEvent, TimeWindow> {

    // ウィンドウ内の各キーの要素数を記録します
    private final ValueStateDescriptor<Integer> countStateDesc =
            new ValueStateDescriptor<>("count", Integer.class);
    // 計算がすでにトリガーされているかどうかを記録します
    private final ValueStateDescriptor<Boolean> flagStateDesc =
            new ValueStateDescriptor<>("flag", Boolean.class);

    // 新しい要素がウィンドウに入ると onElement() を呼び出します
    @Override
    public TriggerResult onElement(UserEvent event, long timestamp, TimeWindow window, Trigger.TriggerContext ctx) throws Exception {
        // 現在のウィンドウ内の現在のキーのカウント状態を取得します
        ValueState<Integer> countState = ctx.getPartitionedState(countStateDesc);
        ValueState<Boolean> flagState = ctx.getPartitionedState(flagStateDesc);
        int count = countState.value() == null ? 0 : countState.value();
        boolean flag = flagState.value() == null ? false : flagState.value();

        // 要素ごとにカウントを増分します
        count += 1;
        countState.update(count); // 状態を更新します

        // カウントが 5 に達すると、ウィンドウ計算をすぐにトリガーします
        if (count >= 5 && !flag) {
            flagState.update(true); // この追加計算が 1 回だけトリガーされるように状態を更新します
            return TriggerResult.FIRE;
        } else {
            return TriggerResult.CONTINUE;
        }
    }

    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext triggerContext) throws Exception {
        // このトリガーでは、処理時間タイマーをスキップします。
        return TriggerResult.CONTINUE;
    }

    // 登録済みのイベントタイムタイマーが起動すると onEventTime() を呼び出します(例: ウィンドウの終了時刻)
    @Override
    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
            return TriggerResult.FIRE_AND_PURGE; // トリガーが起動し、ウィンドウがクリアされます
    }

    @Override
    public void clear(TimeWindow timeWindow, TriggerContext ctx) throws Exception {
        // ウィンドウの状態をクリーンアップします
        ctx.getPartitionedState(countStateDesc).clear();
        ctx.getPartitionedState(flagStateDesc).clear();
    }
}

トリガーの使用法: ストリーム処理中に、トリガーをウィンドウ操作で使用する必要があります。

DataStream<UserEvent> source = ...; // 既存の入力ストリーム

source.keyBy(keySelector)  // たとえば、.keyBy(value -> value.userId) を使用して、要素をユーザー別にグループ化します
      .window(TumblingEventTimeWindows.of(Time.seconds(60))) // タンブリングウィンドウサイズを 60 秒に設定します
      .trigger(new CustomCountTrigger()) // カスタムトリガー: 5 つの要素またはウィンドウタイムアウト後にトリガーします
      .process(new ProcessWindowFunction<UserEvent, String, KeyedType, TimeWindow>() {

          @Override
          public void process(KeyedType key, Context context, Iterable<UserEvent> elements, Collector<String> out) {
              int count = 0;

              // ウィンドウ内のすべての要素を反復処理し、action == 1 の要素をカウントします
              for (UserEvent event : elements) {
                  if (event.action == 1) {
                      count++;
                  }
              }

              // 結果を出力します
              out.collect("キー: " + key + ", アクセス数: " + count);
          }
      })
      .print();

結果の検証: 同じユーザーからの 8 つのアクセスレコードが 1 分で受信された場合、出力は 2 つのレコードになります。

キー: 101, アクセス数: 5       // count >= 5 のため、早期計算がトリガーされますが、ウィンドウの状態はクリアされません。
キー: 101, アクセス数: 8      // ウィンドウが閉じると計算がトリガーされ、状態がクリアされます。

拡張機能とサマリー

高度なユースケースと実装

ユースケース 1

アラートのために、ウィンドウが終了する前に複数回計算をトリガーします。

  1. flagStateDesc を削除して、計算を複数回トリガーできるようにします。

  2. または、カウントマーカーを追加して、特定の回数のトリガー起動(アラートイベントなど)の後でのみウィンドウを終了します。

    ### flagStateDesc を削除した後、5 つのレコードが出力されます。
    キー: 101, アクセス数: 5       // count >= 5 のため、早期計算がトリガーされ、状態はクリアされません。
    キー: 101, アクセス数: 6       // count >= 5 のため、早期計算がトリガーされます。
    キー: 101, アクセス数: 7      // count >= 5 のため、早期計算がトリガーされます。
    キー: 101, アクセス数: 8      // count >= 5 のため、早期計算がトリガーされます。
    キー: 101, アクセス数: 8      // ウィンドウが終了すると計算がトリガーされ、状態がクリアされます。
  3. トリガーは、計算をトリガーするタイミングのみを決定します。 実際のオブジェクト情報は .process から取得されます。ここでは、数量と条件の状態に基づいて異なる収集結果を生成するように指定できます。

ユースケース 2

ウィンドウを閉じるには、ウォーターマークを進める必要があります。 長期間データが生成されない場合、タイムリーなウィンドウ計算をどのように保証できますか?

ソリューション

ウォーターマークに依存?

タイムリーなトリガーが保証されている?

順序が正しくないデータに適している?

適切なシナリオ

処理時間ウィンドウを使用する

いいえ

はい

いいえ

イベント時間処理は不要です。

withIdleness メソッドを使用する

はい

いいえ(ウォーターマーク間隔に依存)

はい

入力パーティションがアイドル状態であるなど、単純なシナリオに適しています。

カスタムウォーターマークジェネレーターを使用する

はい

はい(ウォーターマークは定期的に更新されます)

はい

これは標準的なアプローチです。

タイマーを登録する

はい(オプション)

はい(耐障害性)

いいえ(強制的なウィンドウクローズには適切なタイミングが必要です)

信頼性の向上が必要です。

外部システムからハートビートメッセージを送信する

いいえ

はい

はい

Kafka には追加のメンテナンスが必要ですが、タスクオーケストレーションには必要ありません。

  • ソリューション 1: 処理時間ウィンドウを使用する

    イベント時間セマンティクス(つまり、イベントが発生した時刻は問題ではない)が不要な場合は、処理時間ウィンドウを使用できます。

    .keyBy(keySelector)
    .window(TumblingProcessingTimeWindows.of(Time.seconds(60))) // 処理時間を使用します
    .process(new MyProcessWindowFunction())
  • ソリューション 2: withIdleness メソッドを使用する

    Flink の WatermarkStrategy は、withIdleness メソッドを提供して、指定された期間非アクティブになった後、データソースを自動的にアイドルとしてマークし、ウォーターマークの生成をブロックしないようにします。

    // アイドル状態のデータソースは、最小ウォーターマークの計算に関与しなくなり、アクティブなデータソースのウォーターマークの進行を妨げなくなります。
    WatermarkStrategy
        .<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
        .withIdleness(Duration.ofMinutes(1));  //データソースまたはパーティションに 1 分間イベントがない場合、アイドル状態としてマークされることを示します
  • ソリューション 3: カスタムウォーターマークジェネレーターを使用する

    イベント時間セマンティクスを使用する必要がある場合は、新しいデータが到着しなくてもウォーターマークが前進し続けるカスタムウォーターマークジェネレーターを定義します。

    • onEvent() メソッドで最新のイベントの到着時刻を記録します。

    • onPeriodicEmit() メソッドで、現在の時刻とイベントが最後に受信された時刻の間隔を確認します。

    • 間隔が設定されたしきい値を超える場合は、データソースをアイドル状態と見なし、ウォーターマークの生成をスキップするか、特定のウォーターマークを直接生成します。

    public class IdleAwareWatermarkGenerator implements WatermarkGenerator<MyEvent> {
        private long lastEventTimestamp = Long.MIN_VALUE;
        private final long maxIdleTimeMs; // 最大アイドル時間
    
        public IdleAwareWatermarkGenerator(long maxIdleTimeMs) {
            this.maxIdleTimeMs = maxIdleTimeMs;
        }
    
        @Override
        public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
            lastEventTimestamp = Math.max(lastEventTimestamp, eventTimestamp);
        }
    
        @Override
        public void onPeriodicEmit(WatermarkOutput output) {
            long currentTime = System.currentTimeMillis();
            if (lastEventTimestamp == Long.MIN_VALUE || currentTime - lastEventTimestamp > maxIdleTimeMs) {
                // 長時間イベントが到着しない場合は、新しいウォーターマークを発行しません
                return;
            }
            output.emitWatermark(new Watermark(lastEventTimestamp));
        }
    }
    WatermarkStrategy<MyEvent> strategy = WatermarkStrategy
        .forGenerator((ctx) -> new IdleAwareWatermarkGenerator(60_000)) // 最大アイドル時間を 60 秒に設定します
        .withTimestampAssigner((event, timestamp) -> event.getEventTime());
  • ソリューション 4: タイマーを登録する

    @Override
    public TriggerResult onElement(Event event, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
        ctx.registerEventTimeTimer(window.maxTimestamp()); // イベントタイムタイマーを登録します
        ctx.registerProcessingTimeTimer(window.maxTimestamp() + 1000); // 耐障害性: イベントがなくてもタイマーがトリガーされるようにします
        return TriggerResult.CONTINUE;
    }
    
    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
        return TriggerResult.FIRE_AND_PURGE; // 強制的なトリガーとクリーンアップ
    }
  • ソリューション 5: 外部システムからハートビートメッセージを送信する

    Kafka または ワークフロー を使用して、定期的なハートビートメッセージをダウンストリームに送信し、ウィンドウのクローズをトリガーします。

サマリー

キーポイント

説明

ウィンドウのライフサイクルを理解する

FIRE はウィンドウの状態をクリアしませんが、FIRE_AND_PURGE はウィンドウを閉じてクリーンアップします。 これらを適切に使用して、トリガーを複数回または 1 回起動します。

状態とタイマーを適切に使用する

ValueState を使用してカウントとフラグを追跡し、clear() を使用してリソースを解放します。

メソッドをオーバーライドする

次のメソッドを実装します: onElementonEventTimeonProcessingTimeclear を実装して完全性を確保します。

ウィンドウのマージを有効にする(セッションウィンドウなど)

セッションウィンドウなどのマージ可能なウィンドウの場合、canMerge()onMerge() を実装してタイマーの一貫性を維持します。

繰り返しの起動を回避する

FIRE が呼び出された後もイベントが到着し続ける可能性がある場合は特に、トリガーが起動できる回数を制御します。

完全なコード

CustomCountTrigger

import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

public class CustomCountTrigger extends Trigger<UserEvent, TimeWindow> {

    // ウィンドウ内の各キーの要素数を記録します
    private final ValueStateDescriptor<Integer> countStateDesc =
            new ValueStateDescriptor<>("count", Integer.class);
    // 計算がすでにトリガーされているかどうかを記録します
    private final ValueStateDescriptor<Boolean> flagStateDesc =
            new ValueStateDescriptor<>("flag", Boolean.class);

    // 新しい要素がウィンドウに入ると onElement が呼び出されます
    @Override
    public TriggerResult onElement(UserEvent event, long timestamp, TimeWindow window, Trigger.TriggerContext ctx) throws Exception {
        // 現在のウィンドウ内の現在のキーの要素の状態を取得します
        ValueState<Integer> countState = ctx.getPartitionedState(countStateDesc);
        ValueState<Boolean> flagState = ctx.getPartitionedState(flagStateDesc);
        int count = countState.value() == null ? 0 : countState.value();
        boolean flag = flagState.value() == null ? false : flagState.value();

        // 新しい要素ごとにカウントを増分します
        count += 1;
        countState.update(count); // 状態を更新します

        // カウントが 5 に達すると、ウィンドウ計算をすぐにトリガーします
        if (count >= 5 && !flag) {
            flagState.update(true); // 追加計算が 1 回だけトリガーされるように状態を更新します
            return TriggerResult.FIRE;
        } else {
            return TriggerResult.CONTINUE;
        }
    }

    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext triggerContext) throws Exception {
        // このトリガーでは、処理時間タイマーをスキップします
        return TriggerResult.CONTINUE;
    }

    // 登録済みのイベントタイムタイマーがトリガーされると onEventTime が呼び出されます(例: ウィンドウのクローズ時)
    @Override
    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
            return TriggerResult.FIRE_AND_PURGE; // トリガーが起動し、ウィンドウがクリアされます
    }

    @Override
    public void clear(TimeWindow timeWindow, TriggerContext ctx) throws Exception {
        // ウィンドウの状態をクリーンアップします
        ctx.getPartitionedState(countStateDesc).clear();
        ctx.getPartitionedState(flagStateDesc).clear();
    }
}

KafkaTriggerTest

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.time.ZoneId;
import java.time.format.DateTimeFormatter;


public class KafkaTriggerTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                .setBootstrapServers("<BootstrapServers>")
                .setTopics("trigger")
                .setGroupId("trigger")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
                .build();

        // データ例: 101,alie,1,2025-6-10 10:02:00
        DataStream<UserEvent> userEventStream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka ソース")
                .map(new MapFunction<String, UserEvent>() {
                    @Override
                    public UserEvent map(String value) throws Exception {
                        String[] fields = value.split(",");
                        return new UserEvent(
                                Integer.parseInt(fields[0]),
                                fields[1],
                                fields[2],
                                fields[3]
                        );
                    }
                });

        WatermarkStrategy<UserEvent> watermarkStrategy = WatermarkStrategy
                .<UserEvent>forBoundedOutOfOrderness(java.time.Duration.ofSeconds(2))
                .withTimestampAssigner((event, timestamp) -> {
                    DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
                    return java.time.LocalDateTime.parse(event.getEvent_time(), formatter).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
                });

        DataStream<UserEvent> timestampedStream = userEventStream.assignTimestampsAndWatermarks(watermarkStrategy);

        timestampedStream
                .keyBy(UserEvent::getUser_id)
                .window(TumblingEventTimeWindows.of(Time.seconds(60))) // タンブリングウィンドウサイズを 60 秒に設定します
                .trigger(new CustomCountTrigger()) // カスタムトリガー: 5 つのデータ要素の到着時またはタイムアウト時に起動します
                .process(new ProcessWindowFunction<UserEvent, String, Integer, TimeWindow>() {

                    @Override
                             public void process(Integer userId, ProcessWindowFunction<UserEvent, String, Integer, TimeWindow>.Context context, Iterable<UserEvent> userEvents, Collector<String> collector) throws Exception {
                                 int count = 0;
                                 for (UserEvent event : userEvents) {
                                     if (event.getEvent_type().equals("1"))
                                         count++;
                                 }
                                 collector.collect("ユーザー ID: " + userId + " | カウント: " + count + " | ウィンドウ: " + context.window());
                             }
                         }).print();
        env.execute("Kafka パーティショナーデータストリーム");
    }
}

UserEvent

public class UserEvent {
      private  int  user_id;
      private  String username;
      private  String event_type;
      private  String event_time;

      public UserEvent(int user_id, String username, String event_type, String event_time) {
              this.user_id = user_id;
              this.username = username;
              this.event_type = event_type;
              this.event_time = event_time;
      }
      public String toString() {
              return "user_id:" + user_id + " username:" + username + " event_type:" + event_type + " event_time:" + event_time;
      }

    public int getUser_id() {
        return user_id;
    }

    public void setUser_id(int user_id) {
        this.user_id = user_id;
    }

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public String getEvent_type() {
        return event_type;
    }

    public void setEvent_type(String event_type) {
        this.event_type = event_type;
    }

    public String getEvent_time() {
        return event_time;
    }

    public void setEvent_time(String event_time) {
        this.event_time = event_time;
    }
}