本文為您介紹如何在DataStream上使用Timer,及Timer使用建議和注意事項。
Timer介紹
Timer是Flink提供的定時器機制。
通常,Flink作業是事件驅動計算的,但在一些情境下,Flink作業需要基於處理時間(ProcessingTime)或者事件時間(EventTime)驅動計算和發送資料,這時便需要使用Timer。運算元可以註冊一個Timer,當時間達到指定的處理時間,或事件時間浮水印(Watermark)達到指定的事件時間時,便會觸發指定的計算邏輯。Flink中的視窗便是基於Timer實現的。
多數情況下,這類需求可以使用SQL中的視窗滿足。但有時,Flink作業存在更加複雜且定製化的需求,這時可以考慮使用DataStream API,利用其中的Timer機制實現。
Timer使用方法
Flink作業開發人員可以在KeyedStream上使用KeyedProcessFunction,或者在ConnectedStream上使用KeyedCoProcessFunction,又或在BroadcastConnectedStream上使用KeyedBroadcastProcessFunction。通過這些Function中提供的TimerService來使用Timer。其中使用最多的是KeyedProcessFunction。我們以此為例來介紹下如何使用Timer。
KeyedProcessFunction與RichFlatMapFunction非常相近,同樣可以處理單條資料,輸出0到任意多條資料,但KeyedProcessFunction只能在KeyedStream上使用,並提供了額外的Timer支援。
由於Timer會使用KeyedState進行儲存和恢複,因此只能在KeyedProcessFunction中使用Timer,無法在ProcessFunction中使用。
public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction {
// 處理輸入資料。
public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;
// 當達到Timer指定時間時的回調。
public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}
// 處理資料中使用的Context,也是Timer回調中使用的Context的基類。
public abstract class Context {
// 當前處理的資料或Timer的時間戳記。
public abstract Long timestamp();
// 擷取TimerService以進行Timer註冊或刪除操作。
public abstract TimerService timerService();
// 將資料作為Side Output輸出。
public abstract <X> void output(OutputTag<X> outputTag, X value);
// 擷取當前處理的資料的Key。
public abstract K getCurrentKey();
}
// Timer回調中使用的Context。
public abstract class OnTimerContext extends Context {
// 擷取當前Timer的TimeDomain,即使用處理時間還是事件時間。
public abstract TimeDomain timeDomain();
// 擷取當前Timer的Key。
public abstract K getCurrentKey();
}
}KeyedProcessFunction.Context提供了訪問TimerService的途徑,可以在處理資料或Timer時使用TimerService註冊新的Timer或刪除已有的Timer。註冊所使用的時間單位均為毫秒。
public interface TimerService {
// 擷取當前的處理時間。
long currentProcessingTime();
// 擷取當前的事件時間浮水印。
long currentWatermark();
// 註冊指定處理時間的Timer。
void registerProcessingTimeTimer(long time);
// 註冊指定事件時間的Timer。
void registerEventTimeTimer(long time);
// 刪除指定處理時間的Timer。
void deleteProcessingTimeTimer(long time);
// 刪除指定事件時間的Timer。
void deleteEventTimeTimer(long time);
}在processElement中註冊Timer時,會使用當前處理的資料的Key,而在onTimer中註冊Timer時會繼承當前處理的Timer的Key。同一個Key在同一個時間點只會有一個Timer,因此也只會觸發一次計算。不同的Key則會分別觸發計算。註冊的單個Timer均為一次性觸發,如果需要實現周期性觸發的邏輯,則需要在onTimer中註冊下一個觸發時間點的Timer。
Timer使用樣本
如前面所說,Flink的視窗就是使用Timer實現的。首先我們看一下基於事件時間視窗,每分鐘對輸入數值求和並輸出的例子。在DataStream API中使用視窗的程式碼範例如下。
DataStream<Tuple2<String, Long>> sum = inputs
.keyBy(input->input.f0)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.reduce(new SumReduceFunction());我們可以嘗試直接使用KeyedProcessFunction和Timer來實作類別似的邏輯:
DataStream<Tuple2<String, Long>> sum = inputs
.keyBy(input -> input.f0)
.process(new KeyedProcessFunction<String, Tuple2<String, Long>, Tuple2<String, Long>>() {
// 記錄視窗內總和的State。
private ValueState<Long> sumState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
sumState = getRuntimeContext().getState(new ValueStateDescriptor<>("sum", Long.class));
}
@Override
public void processElement(Tuple2<String, Long> value, Context ctx, Collector<Tuple2<String, Long>> out) throws Exception {
if (sumState.value() == null) {
// 當某個Key的資料第一次處理,或在Timer觸發後第一次處理時,根據當前資料的事件時間,計算所屬的時間視窗,註冊視窗結束時刻的Timer。
ctx.timerService().registerEventTimeTimer(getWindowStartWithOffset(ctx.timestamp(), 0, 60 * 1000) + 60 * 1000);
sumState.update(value.f1);
} else {
// 否則進行累加。
sumState.update(sumState.value() + value.f1);
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out) throws Exception {
// 輸出此期間的總和,並清除累積值。
out.collect(new Tuple2<>(ctx.getCurrentKey(), sumState.value()));
sumState.clear();
}
// 該方法自TimeWindow.java中複製而來,用於計算給定時間戳記所從屬的視窗的起點。
private long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
final long remainder = (timestamp - offset) % windowSize;
// handle both positive and negative cases
if (remainder < 0) {
return timestamp - (remainder + windowSize);
} else {
return timestamp - remainder;
}
}
});當一個Key首次有資料輸入時,Function會計算當前資料的事件時間屬於哪一個時間視窗,註冊這個時間視窗結束時刻觸發的Timer,並開始累加資料。事件時間浮水印達到指定時刻之後,Flink會調用onTimer,將累加值輸出出去,並清除累加狀態。此後這個Key再有新的資料輸入時,會重複這個過程。
以上這兩個實現的邏輯基本是相同的。可以發現如果Timer處理後,這個Key不再有資料輸入,後續也不會再輸出這個Key的資料。有時作業的邏輯已知輸入Key是有限個,希望有一個Key輸入一次後,無論後續是否還有資料,都以相同的事件時間周期輸出周期內的累加值,可以將OnTimer的實現修改為:
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out) throws Exception {
// 輸出此期間的總和。
out.collect(new Tuple2<>(ctx.getCurrentKey(), sumState.value()));
// 重設但不清除累積值。
sumState.update(0L);
// 註冊下一次輸出累積值的Timer。該timestamp就是視窗結束時刻,下一個視窗可以直接加60s。
ctx.timerService().registerEventTimeTimer(timestamp + 60 * 1000);
}如此便可以使得sumState.value()在賦值一次後永遠不為null,從而實現無論是否有資料,都會繼續定期輸出這個Key的累加值,無資料時會輸出0。
這裡的輸出周期是基於事件時間浮水印的事件時間周期。
如果想要基於處理時間而非事件時間進行彙總,則可以替換processElement中註冊Timer和擷取時間的邏輯,改為:
@Override
public void processElement(Tuple2<String, Long> value, Context ctx, Collector<Tuple2<String, Long>> out) throws Exception {
if (sumState.value() == null) {
// 根據當前的處理時間,計算所屬的時間視窗,註冊視窗結束時間的Timer。
ctx.timerService().registerProcessingTimeTimer(getWindowStartWithOffset(ctx.timerService().currentProcessingTime(), 0, 60 * 1000) + 60 * 1000);
sumState.update(value.f1);
} else {
sumState.update(sumState.value() + value.f1);
}
}當處理時間達到指定時間之後,便會調用對應的onTimer邏輯。
基於以上類似的邏輯,修改State計算邏輯和輸出資料的邏輯,可以實現其他類似的計算需求。
另一個單純使用視窗不易實現而需要使用Timer實現的商務邏輯是心跳警告。當一個Key的輸入一次後,如果一分鐘內沒有再輸入新的資料,就發出一個警示訊息。方便起見這裡只使用Key作為輸入,實現的代碼如下。
DataStream<String> sum = inputs
.keyBy(input->input)
.process(new KeyedProcessFunction<String, String, String>() {
// 記錄此前的逾時時間的State。
private ValueState<Long> lastTimerState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
lastTimerState = getRuntimeContext().getState(new ValueStateDescriptor<>("timer", Long.class));
}
@Override
public void processElement(String value, Context ctx, Collector<String> out) throws Exception {
if (lastTimerState.value() != null) {
// 清除此前註冊的逾時Timer。
ctx.timerService().deleteProcessingTimeTimer(lastTimerState.value());
}
// 註冊新的逾時Timer,並記錄在State中,用於後續清除。
long timeout = ctx.timerService().currentProcessingTime() + 60 * 1000;
ctx.timerService().registerProcessingTimeTimer(timeout);
lastTimerState.update(timeout);
// 輸出正常資料。
out.collect(value);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
// 進入此方法說明已逾時,發送一個心跳逾時警告的訊息。也可以考慮使用SideOutput而非預設輸出資料流進行輸出。
out.collect("Heartbeat timeout:" + ctx.getCurrentKey());
});Timer使用建議
大多數情況下視窗能夠滿足需求,建議優先使用視窗。
KeyedProcessFunction的processElement和onTimer方法不會被同時調用,因此不需要擔心同步問題。但這也意味著處理onTimer邏輯是會阻塞處理資料的。
Flink沒有提供查詢Timer註冊狀態的API,因此如果預計需要進行Timer刪除操作,Function需要自行記錄登入Timer的時間。
Timer會儲存在Checkpoint中,當作業從Failover中恢複,或從Savepoint重新啟動時,Timer也會被恢複。此時:
已經到時間的處理時間Timer,會直接觸發處理。因此作業啟動後短時間內可能會觸發大量的Timer進行資料處理和發送。
事件時間Timer則會在收到對應時間的Watermark後觸發處理。因此作業也有可能在啟動後一段時間後,即事件時間浮水印更新後觸發大量的Timer進行資料的處理和發送。
Timer與Key相關,在Checkpoint裡會儲存在KeyedState中,因此只能在KeyedStream,或者有Key的ConnectedStream或BroadcastConnectedStream上使用。無Key的流作業在需要使用Timer時,如果符合以下兩種情況可以按相應的方法使用:
如果Timer的邏輯與特定欄位值無關,每條資料獨立使用一個Timer,可以使用資料內的一個唯一ID(UUID)作為Key進行keyby。
重要該欄位需要存在於上遊資料中,不可以是keyby方法中產生隨機值。
如果全域共用一個Timer,即全域進行彙總計算的情況,則可以使用一個常量作為Key進行keyby,並將並發設為1。
Timer使用注意事項
請盡量避免大量Timer同時觸發的情況,例如數百萬個Key的Timer都在整點觸發。這種情況建議把觸發時間打散到前後數分鐘或更長的範圍內。
請避免在processElement和onTimer中重複註冊Timer,因為這會導致Timer數量急劇膨脹。
通常情況下Timer的開銷是很小的,大量的Key註冊Timer也沒有問題。但仍然建議關注Checkpoint時間和記憶體狀態。如果使用Timer後,Checkpoint時間或者記憶體使用量量增加很多,超過可容忍範圍,可能需要考慮最佳化邏輯,或使用其他方式實現。
如果在有限流上使用處理時間Timer需要注意,當資料處理結束時,未到時間的處理時間Timer將被忽略,這意味著資料可能會丟失。