全部產品
Search
文件中心

Realtime Compute for Apache Flink:DataStream的Timer使用最佳實務

更新時間:Jul 16, 2025

本文為您介紹如何在DataStream上使用Timer,及Timer使用建議和注意事項。

Timer介紹

Timer是Flink提供的定時器機制。

通常,Flink作業是事件驅動計算的,但在一些情境下,Flink作業需要基於處理時間(ProcessingTime)或者事件時間(EventTime)驅動計算和發送資料,這時便需要使用Timer。運算元可以註冊一個Timer,當時間達到指定的處理時間,或事件時間浮水印(Watermark)達到指定的事件時間時,便會觸發指定的計算邏輯。Flink中的視窗便是基於Timer實現的。

多數情況下,這類需求可以使用SQL中的視窗滿足。但有時,Flink作業存在更加複雜且定製化的需求,這時可以考慮使用DataStream API,利用其中的Timer機制實現。

Timer使用方法

Flink作業開發人員可以在KeyedStream上使用KeyedProcessFunction,或者在ConnectedStream上使用KeyedCoProcessFunction,又或在BroadcastConnectedStream上使用KeyedBroadcastProcessFunction。通過這些Function中提供的TimerService來使用Timer。其中使用最多的是KeyedProcessFunction。我們以此為例來介紹下如何使用Timer。

KeyedProcessFunction與RichFlatMapFunction非常相近,同樣可以處理單條資料,輸出0到任意多條資料,但KeyedProcessFunction只能在KeyedStream上使用,並提供了額外的Timer支援。

重要

由於Timer會使用KeyedState進行儲存和恢複,因此只能在KeyedProcessFunction中使用Timer,無法在ProcessFunction中使用。

public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction {

    // 處理輸入資料。
	public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;

    // 當達到Timer指定時間時的回調。
	public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}

    // 處理資料中使用的Context,也是Timer回調中使用的Context的基類。
    public abstract class Context {

        // 當前處理的資料或Timer的時間戳記。
        public abstract Long timestamp();

        // 擷取TimerService以進行Timer註冊或刪除操作。
        public abstract TimerService timerService();

        // 將資料作為Side Output輸出。
        public abstract <X> void output(OutputTag<X> outputTag, X value);

        // 擷取當前處理的資料的Key。
        public abstract K getCurrentKey();
    }

    // Timer回調中使用的Context。
    public abstract class OnTimerContext extends Context {
        // 擷取當前Timer的TimeDomain,即使用處理時間還是事件時間。
        public abstract TimeDomain timeDomain();

        // 擷取當前Timer的Key。
        public abstract K getCurrentKey();
    }
}

KeyedProcessFunction.Context提供了訪問TimerService的途徑,可以在處理資料或Timer時使用TimerService註冊新的Timer或刪除已有的Timer。註冊所使用的時間單位均為毫秒。

public interface TimerService {

    // 擷取當前的處理時間。
    long currentProcessingTime();

    // 擷取當前的事件時間浮水印。
    long currentWatermark();

    // 註冊指定處理時間的Timer。
    void registerProcessingTimeTimer(long time);

    // 註冊指定事件時間的Timer。
    void registerEventTimeTimer(long time);

    // 刪除指定處理時間的Timer。
    void deleteProcessingTimeTimer(long time);

    // 刪除指定事件時間的Timer。
    void deleteEventTimeTimer(long time);
}

在processElement中註冊Timer時,會使用當前處理的資料的Key,而在onTimer中註冊Timer時會繼承當前處理的Timer的Key。同一個Key在同一個時間點只會有一個Timer,因此也只會觸發一次計算。不同的Key則會分別觸發計算。註冊的單個Timer均為一次性觸發,如果需要實現周期性觸發的邏輯,則需要在onTimer中註冊下一個觸發時間點的Timer。

Timer使用樣本

如前面所說,Flink的視窗就是使用Timer實現的。首先我們看一下基於事件時間視窗,每分鐘對輸入數值求和並輸出的例子。在DataStream API中使用視窗的程式碼範例如下。

DataStream<Tuple2<String, Long>> sum = inputs
        .keyBy(input->input.f0)
        .window(TumblingEventTimeWindows.of(Time.minutes(1)))
        .reduce(new SumReduceFunction());

我們可以嘗試直接使用KeyedProcessFunction和Timer來實作類別似的邏輯:

DataStream<Tuple2<String, Long>> sum = inputs
    .keyBy(input -> input.f0)
    .process(new KeyedProcessFunction<String, Tuple2<String, Long>, Tuple2<String, Long>>() {
        // 記錄視窗內總和的State。
        private ValueState<Long> sumState;

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            sumState = getRuntimeContext().getState(new ValueStateDescriptor<>("sum", Long.class));
        }

        @Override
        public void processElement(Tuple2<String, Long> value, Context ctx, Collector<Tuple2<String, Long>> out) throws Exception {
            if (sumState.value() == null) {
                // 當某個Key的資料第一次處理,或在Timer觸發後第一次處理時,根據當前資料的事件時間,計算所屬的時間視窗,註冊視窗結束時刻的Timer。
                ctx.timerService().registerEventTimeTimer(getWindowStartWithOffset(ctx.timestamp(), 0, 60 * 1000) + 60 * 1000);
                sumState.update(value.f1);
            } else {
                // 否則進行累加。
                sumState.update(sumState.value() + value.f1);
            }
        }

        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out) throws Exception {
            // 輸出此期間的總和,並清除累積值。
            out.collect(new Tuple2<>(ctx.getCurrentKey(), sumState.value()));
            sumState.clear();
        }

        // 該方法自TimeWindow.java中複製而來,用於計算給定時間戳記所從屬的視窗的起點。
        private long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
            final long remainder = (timestamp - offset) % windowSize;
            // handle both positive and negative cases
            if (remainder < 0) {
                return timestamp - (remainder + windowSize);
            } else {
                return timestamp - remainder;
            }
        }
    });

當一個Key首次有資料輸入時,Function會計算當前資料的事件時間屬於哪一個時間視窗,註冊這個時間視窗結束時刻觸發的Timer,並開始累加資料。事件時間浮水印達到指定時刻之後,Flink會調用onTimer,將累加值輸出出去,並清除累加狀態。此後這個Key再有新的資料輸入時,會重複這個過程。

以上這兩個實現的邏輯基本是相同的。可以發現如果Timer處理後,這個Key不再有資料輸入,後續也不會再輸出這個Key的資料。有時作業的邏輯已知輸入Key是有限個,希望有一個Key輸入一次後,無論後續是否還有資料,都以相同的事件時間周期輸出周期內的累加值,可以將OnTimer的實現修改為:

@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out) throws Exception {
    // 輸出此期間的總和。
    out.collect(new Tuple2<>(ctx.getCurrentKey(), sumState.value()));
    // 重設但不清除累積值。
    sumState.update(0L);
    // 註冊下一次輸出累積值的Timer。該timestamp就是視窗結束時刻,下一個視窗可以直接加60s。
    ctx.timerService().registerEventTimeTimer(timestamp + 60 * 1000);
}

如此便可以使得sumState.value()在賦值一次後永遠不為null,從而實現無論是否有資料,都會繼續定期輸出這個Key的累加值,無資料時會輸出0。

說明

這裡的輸出周期是基於事件時間浮水印的事件時間周期。

如果想要基於處理時間而非事件時間進行彙總,則可以替換processElement中註冊Timer和擷取時間的邏輯,改為:

@Override
public void processElement(Tuple2<String, Long> value, Context ctx, Collector<Tuple2<String, Long>> out) throws Exception {
    if (sumState.value() == null) {
        // 根據當前的處理時間,計算所屬的時間視窗,註冊視窗結束時間的Timer。
        ctx.timerService().registerProcessingTimeTimer(getWindowStartWithOffset(ctx.timerService().currentProcessingTime(), 0, 60 * 1000) + 60 * 1000);
        sumState.update(value.f1);
    } else {
        sumState.update(sumState.value() + value.f1);
    }
}

當處理時間達到指定時間之後,便會調用對應的onTimer邏輯。

基於以上類似的邏輯,修改State計算邏輯和輸出資料的邏輯,可以實現其他類似的計算需求。

另一個單純使用視窗不易實現而需要使用Timer實現的商務邏輯是心跳警告。當一個Key的輸入一次後,如果一分鐘內沒有再輸入新的資料,就發出一個警示訊息。方便起見這裡只使用Key作為輸入,實現的代碼如下。

DataStream<String> sum = inputs
    .keyBy(input->input)
    .process(new KeyedProcessFunction<String, String, String>() {
        // 記錄此前的逾時時間的State。
        private ValueState<Long> lastTimerState;

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            lastTimerState = getRuntimeContext().getState(new ValueStateDescriptor<>("timer", Long.class));
        }

        @Override
        public void processElement(String value, Context ctx, Collector<String> out) throws Exception {
            if (lastTimerState.value() != null) {
                // 清除此前註冊的逾時Timer。
                ctx.timerService().deleteProcessingTimeTimer(lastTimerState.value());
            }
            // 註冊新的逾時Timer,並記錄在State中,用於後續清除。
            long timeout = ctx.timerService().currentProcessingTime() + 60 * 1000;
            ctx.timerService().registerProcessingTimeTimer(timeout);
            lastTimerState.update(timeout);
            // 輸出正常資料。
            out.collect(value);
        }

        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
            // 進入此方法說明已逾時,發送一個心跳逾時警告的訊息。也可以考慮使用SideOutput而非預設輸出資料流進行輸出。
            out.collect("Heartbeat timeout:" + ctx.getCurrentKey());
        });

Timer使用建議

  • 大多數情況下視窗能夠滿足需求,建議優先使用視窗。

  • KeyedProcessFunction的processElement和onTimer方法不會被同時調用,因此不需要擔心同步問題。但這也意味著處理onTimer邏輯是會阻塞處理資料的。

  • Flink沒有提供查詢Timer註冊狀態的API,因此如果預計需要進行Timer刪除操作,Function需要自行記錄登入Timer的時間。

  • Timer會儲存在Checkpoint中,當作業從Failover中恢複,或從Savepoint重新啟動時,Timer也會被恢複。此時:

    • 已經到時間的處理時間Timer,會直接觸發處理。因此作業啟動後短時間內可能會觸發大量的Timer進行資料處理和發送。

    • 事件時間Timer則會在收到對應時間的Watermark後觸發處理。因此作業也有可能在啟動後一段時間後,即事件時間浮水印更新後觸發大量的Timer進行資料的處理和發送。

  • Timer與Key相關,在Checkpoint裡會儲存在KeyedState中,因此只能在KeyedStream,或者有Key的ConnectedStream或BroadcastConnectedStream上使用。無Key的流作業在需要使用Timer時,如果符合以下兩種情況可以按相應的方法使用:

    • 如果Timer的邏輯與特定欄位值無關,每條資料獨立使用一個Timer,可以使用資料內的一個唯一ID(UUID)作為Key進行keyby。

      重要

      該欄位需要存在於上遊資料中,不可以是keyby方法中產生隨機值。

    • 如果全域共用一個Timer,即全域進行彙總計算的情況,則可以使用一個常量作為Key進行keyby,並將並發設為1。

Timer使用注意事項

  • 請盡量避免大量Timer同時觸發的情況,例如數百萬個Key的Timer都在整點觸發。這種情況建議把觸發時間打散到前後數分鐘或更長的範圍內。

  • 請避免在processElement和onTimer中重複註冊Timer,因為這會導致Timer數量急劇膨脹。

  • 通常情況下Timer的開銷是很小的,大量的Key註冊Timer也沒有問題。但仍然建議關注Checkpoint時間和記憶體狀態。如果使用Timer後,Checkpoint時間或者記憶體使用量量增加很多,超過可容忍範圍,可能需要考慮最佳化邏輯,或使用其他方式實現。

  • 如果在有限流上使用處理時間Timer需要注意,當資料處理結束時,未到時間的處理時間Timer將被忽略,這意味著資料可能會丟失。