このトピックでは、DataStream でのタイマーの使用方法について説明します。また、タイマーを使用する際の推奨事項と注意事項についても説明します。
タイマーとは
Flink はタイマーメカニズムを提供します。
ほとんどの場合、Flink デプロイはイベントに基づいてデータを計算するように駆動されます。特定のシナリオでは、Flink デプロイは処理時間 (ProcessingTime) またはイベント時間 (EventTime) に基づいてデータの計算と送信を行うように駆動されます。この場合、タイマーが必要です。オペレーターはタイマーを登録できます。時間が指定された処理時間に達するか、イベントタイムウォーターマークが指定されたイベント時間に達すると、指定された計算ロジックがトリガーされます。Flink のウィンドウは、タイマーに基づいて処理されます。
ほとんどの場合、前述のシナリオの要件は SQL のウィンドウを使用することで満たすことができます。ただし、特定のシナリオでは、Flink デプロイはより複雑でカスタマイズされた要件を満たす必要があります。この場合、DataStream API でサポートされているタイマーメカニズムを使用できます。
タイマーの使用方法
Flink デプロイの開発者は、KeyedStream で KeyedProcessFunction、ConnectedStream で KeyedCoProcessFunction、または BroadcastConnectedStream で KeyedBroadcastProcessFunction を使用できます。これらの関数によって提供される TimerService サービスを使用すると、タイマーを使用できます。KeyedProcessFunction は、タイマーを使用する場合に最も一般的に使用される関数です。次の例では、KeyedProcessFunction でタイマーを使用する方法について説明します。
RichFlatMapFunction と同様に、KeyedProcessFunction を使用して単一のデータレコードを処理し、ゼロまたは任意の数のデータレコードを生成できます。ただし、KeyedProcessFunction は KeyedStream でのみ使用でき、タイマーを提供します。
タイマーは KeyedState に基づいて保存および復元されます。したがって、タイマーは ProcessFunction ではなく、KeyedProcessFunction でのみ使用できます。
public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction {
// 入力データを処理します。
public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;
// タイマーで指定された時刻にコールバックを開始します。
public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}
// 処理されたデータで使用されるコンテキストを取得します。このコンテキストは、タイマーのコールバックにおけるコンテキストの基底クラスです。
public abstract class Context {
// 処理されたデータまたは現在のタイマーのタイムスタンプを取得します。
public abstract Long timestamp();
// 新しいタイマーを登録するか、現在のタイマーを削除するために TimerService を取得します。
public abstract TimerService timerService();
// データをサイド出力として使用します。
public abstract <X> void output(OutputTag<X> outputTag, X value);
// 処理されたデータのキーを取得します。
public abstract K getCurrentKey();
}
// タイマーのコールバックでコンテキストを取得します。
public abstract class OnTimerContext extends Context {
// 現在のタイマーの TimeDomain を取得します。TimeDomain は、タイマーが処理時間タイマーかイベント時間タイマーかを指定します。
public abstract TimeDomain timeDomain();
// 現在のタイマーのキーを取得します。
public abstract K getCurrentKey();
}
}KeyedProcessFunction.Context は TimerService へのアクセスを提供します。processElement メソッドまたは onTimer メソッドを使用する場合、TimerService を使用して新しいタイマーを登録したり、既存のタイマーを削除したりできます。登録済みタイマーの単位はミリ秒です。
public interface TimerService {
// 現在の処理時間を取得します。
long currentProcessingTime();
// 現在のイベントタイムウォーターマークを取得します。
long currentWatermark();
// 処理時間タイマーを登録します。
void registerProcessingTimeTimer(long time);
// イベント時間タイマーを登録します。
void registerEventTimeTimer(long time);
// 処理時間タイマーを削除します。
void deleteProcessingTimeTimer(long time);
// イベント時間タイマーを削除します。
void deleteEventTimeTimer(long time);
}processElement メソッドでタイマーを登録する場合、現在処理されているデータのキーが使用されます。onTimer メソッドでタイマーを登録する場合、現在のタイマーのキーが使用されます。同じキーには、同時に 1 つのタイマーしかありません。したがって、計算は 1 回だけトリガーされます。異なるキーは個別に計算をトリガーできます。登録されている各タイマーは 1 回だけトリガーされます。タイマーを定期的にトリガーするロジックを適用する場合は、onTimer メソッドで次の時点でトリガーされるタイマーを登録する必要があります。
例
Flink のウィンドウはタイマーに基づいて使用されます。この例では、入力値の合計を計算し、イベント時間に基づくウィンドウで 1 分ごとに出力データを生成するロジックを示しています。次のサンプルコードは、DataStream API でウィンドウを使用してロジックを実装する方法の例を示しています。
DataStream<Tuple2<String, Long>> sum = inputs
.keyBy(input->input.f0)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.reduce(new SumReduceFunction());次のサンプルコードは、KeyedProcessFunction とタイマーメカニズムを使用してロジックを実装する方法の例を示しています。
DataStream<Tuple2<String, Long>> sum = inputs
.keyBy(input -> input.f0)
.process(new KeyedProcessFunction<String, Tuple2<String, Long>, Tuple2<String, Long>>() {
// ウィンドウ内の合計の状態を記録します。
private ValueState<Long> sumState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
sumState = getRuntimeContext().getState(new ValueStateDescriptor<>("sum", Long.class));
}
@Override
public void processElement(Tuple2<String, Long> value, Context ctx, Collector<Tuple2<String, Long>> out) throws Exception {
if (sumState.value() == null) {
// キーのデータが初めて処理されたとき、またはタイマーがトリガーされた後にデータが初めて処理されたとき、KeyedProcessFunction は現在のデータのイベント時間に基づいて時間ウィンドウを計算し、ウィンドウの終了時にトリガーされるタイマーを登録します。
ctx.timerService().registerEventTimeTimer(getWindowStartWithOffset(ctx.timestamp(), 0, 60 * 1000) + 60 * 1000);
sumState.update(value.f1);
} else {
// 前述の条件が満たされない場合、KeyedProcessFunction は入力値を累積します。
sumState.update(sumState.value() + value.f1);
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out) throws Exception {
// タイマーが登録された時刻から onTimer メソッドがコールバックされた時刻までに取得された入力値の合計を生成します。次に、累積値を削除します。
out.collect(new Tuple2<>(ctx.getCurrentKey(), sumState.value()));
sumState.clear();
}
// getWindowsStartWithOffset メソッドは TimeWindow.java からコピーされ、指定されたタイムスタンプが属するウィンドウの開始時刻を計算するために使用されます。
private long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
final long remainder = (timestamp - offset) % windowSize;
// 正と負の両方のケースを処理します
if (remainder < 0) {
return timestamp - (remainder + windowSize);
} else {
return timestamp - remainder;
}
}
});キーのデータが初めて入力されると、KeyedProcessFunction は現在のデータのイベント時間に基づいて時間ウィンドウを計算し、このウィンドウの終了時にトリガーされるタイマーを登録し、データの累積を開始します。イベントタイムウォーターマークが指定された時間に達すると、Flink は onTimer メソッドを呼び出して累積値をエクスポートし、累積状態をクリアします。このプロセスは、このキーに新しいデータが入力されると繰り返されます。
前述の 2 つのメソッドのロジックは同じです。タイマーがトリガーされてキーのデータを処理した後、キーに新しいデータが入力されず、キーの出力データは生成されません。デプロイに存在する入力キーの数が限られており、将来新しいデータが入力されるかどうかに関係なく、キーにデータが 1 回入力された後、同じイベント時間間隔で累積値を取得する場合は、OnTimer メソッドのロジックを変更できます。次のサンプルコードは、メソッドのロジックを変更する方法の例を示しています。
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out) throws Exception {
// タイマーが登録された時刻から onTimer メソッドがコールバックされた時刻までに取得された合計値を生成します。
out.collect(new Tuple2<>(ctx.getCurrentKey(), sumState.value()));
// 累積値をリセットしますが、削除しません。
sumState.update(0L);
// 次の累積値を生成するために使用されるタイマーを登録します。timestamp はウィンドウの終了時刻を指定します。次のウィンドウに 60 秒を追加できます。
ctx.timerService().registerEventTimeTimer(timestamp + 60 * 1000);
}このようにして、sumState.value () に値が割り当てられた後、sumState.value() の値が null になることはありません。キーの累積値は、キーにデータが入力されるかどうかに関係なく、一定の間隔で生成されます。キーにデータが入力されない場合、出力値は 0 になります。
キーの累積値が生成される間隔は、イベントタイムウォーターマークで指定されたイベント時間間隔です。
イベント時間ではなく処理時間に基づいてデータを集計する場合は、タイマーを登録し、processElement メソッドで時間取得するために使用されるロジックを変更できます。次のサンプルコードは、メソッドのロジックを変更する方法の例を示しています。
@Override
public void processElement(Tuple2<String, Long> value, Context ctx, Collector<Tuple2<String, Long>> out) throws Exception {
if (sumState.value() == null) {
// 現在の処理時間に基づいて時間ウィンドウを計算し、ウィンドウの終了時にトリガーされるタイマーを登録します。
ctx.timerService().registerProcessingTimeTimer(getWindowStartWithOffset(ctx.timerService().currentProcessingTime(), 0, 60 * 1000) + 60 * 1000);
sumState.update(value.f1);
} else {
sumState.update(sumState.value() + value.f1);
}
}処理時間が指定された時間に達すると、onTimer メソッドが呼び出されます。
前述のロジックに基づいて状態の計算ロジックと出力データのロジックを変更して、他の同様の計算要件を満たすことができます。
ハートビートタイムアウトアラートのビジネスロジックをサポートするためにも、タイマーメカニズムが必要です。ウィンドウのみを使用する場合、ハートビートタイムアウトアラートの要件を満たすことができません。キーにデータが 1 回入力された後、1 分以内にキーに新しいデータが入力されない場合、アラートメッセージが送信されます。便宜上、データ入力にはキーのみが使用されます。次のサンプルコードは、ロジックを実装する方法の例を示しています。
DataStream<String> sum = inputs
.keyBy(input->input)
.process(new KeyedProcessFunction<String, String, String>() {
// 前回のタイムアウト期間の状態を記録します。
private ValueState<Long> lastTimerState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
lastTimerState = getRuntimeContext().getState(new ValueStateDescriptor<>("timer", Long.class));
}
@Override
public void processElement(String value, Context ctx, Collector<String> out) throws Exception {
if (lastTimerState.value() != null) {
// 以前に登録された期限切れのタイマーを削除します。
ctx.timerService().deleteProcessingTimeTimer(lastTimerState.value());
}
// 新しいタイマーを登録し、後でクリアするために状態にタイマーを記録します。
long timeout = ctx.timerService().currentProcessingTime() + 60 * 1000;
ctx.timerService().registerProcessingTimeTimer(timeout);
lastTimerState.update(timeout);
// 通常の出力データを生成します。
out.collect(value);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
// このメソッドはハートビートタイムアウトを示します。ハートビートタイムアウトメッセージが送信されます。データ出力には、デフォルトの出力ストリームの代わりに SideOutput を使用することもできます。
out.collect("Heartbeat timeout:" + ctx.getCurrentKey());
});推奨事項
ほとんどの場合、ウィンドウでビジネス要件を満たせる場合は、ウィンドウを使用することをお勧めします。
KeyedProcessFunction の processElement メソッドと onTimer メソッドは同時に呼び出すことができません。したがって、同期の問題について心配する必要はありません。ただし、onTimer メソッドのロジックはデータ処理をブロックする可能性があります。
Flink は、タイマーの登録状態を照会するための API を提供していません。タイマーを削除する予定の場合は、使用する関数がタイマーが登録された時刻を記録する必要があります。関数は、KeyedProcessFunction、KeyedCoProcessFunction、または KeyedBroadcastProcessFunction のいずれかです。
タイマーはチェックポイントに保存されます。デプロイがフェイルオーバーから復元された場合、またはセーブポイントから再起動された場合、タイマーも復元されます。
指定された時間に達した処理時間タイマーがトリガーされます。したがって、デプロイの起動後短期間でデプロイがデータを処理して送信するために、多数のタイマーがトリガーされる可能性があります。
イベント時間を使用するタイマーは、イベントタイムウォーターマークを受信した後にトリガーされます。したがって、イベントタイムウォーターマークが更新された後、デプロイがデータを処理して送信するために、多数のタイマーがトリガーされる可能性があります。イベントタイムウォーターマークは、デプロイが長期間起動された後に更新されます。
タイマーはキーに関連付けられており、チェックポイントの KeyedState に格納されます。したがって、タイマーは KeyedStream、またはキーを持つ ConnectedStream または BroadcastConnectedStream でのみ使用できます。キーを持たないストリーミングデプロイでタイマーを使用する場合は、次のいずれかの方法を使用できます。
タイマーのロジックが特定のフィールド値に依存せず、タイマーが各データレコードに個別に使用される場合は、データ内の UUID を keyBy メソッドのキーとして使用できます。
説明このフィールドは入力データに存在する必要があり、keyBy メソッドのランダム値を生成するために使用することはできません。
タイマーがグローバル集計で共有される場合は、keyBy メソッドで定数をキーとして使用し、並列度を 1 に設定できます。
注意事項
多数のタイマーが同時にトリガーされる状況は避けてください。たとえば、数百万のキーのすべてのタイマーが時間通りにトリガーされるように指定されている場合は、タイマーのトリガー時間を時間の前後数分、またはより長い時間範囲に調整することをお勧めします。
processElement メソッドと onTimer メソッドでタイマーを繰り返し登録することは避けてください。この操作により、タイマーの数が劇的に増加する可能性があります。
ほとんどの場合、タイマーのオーバーヘッドは小さいです。したがって、多数のキーにタイマーを登録できます。ただし、チェックポイント時間とメモリ使用量に注意することをお勧めします。タイマーを使用した後、チェックポイント時間またはメモリ使用量が指定されたしきい値を超えた場合は、タイマーのロジックを最適化するか、別の方法を使用する必要がある場合があります。
境界付きストリームで処理時間タイマーを使用する場合、データ処理が終了すると、指定された時間に達しない処理時間タイマーは無視されます。この場合、データが失われる可能性があります。