このトピックでは、DataStream でタイマーを使用する方法について説明します。また、タイマーを使用する際の提案と注意事項についても説明します。
タイマーの概要
Flink はタイマーメカニズムを提供します。
ほとんどの場合、Flink デプロイメントはイベントに基づいてデータ処理を行うように駆動されます。特定のシナリオでは、Flink デプロイメントは処理時間 (ProcessingTime) またはイベント時間 (EventTime) に基づいてデータを計算および送信するように駆動されます。この場合、タイマーが必要です。演算子はタイマーを登録できます。時間が指定された処理時間に達するか、イベントタイム Watermark が指定されたイベント時間に達すると、指定された計算ロジックがトリガーされます。Flink のウィンドウはタイマーに基づいて処理されます。
ほとんどの場合、SQL ウィンドウを使用してこれらの要件を満たすことができます。ただし、より複雑でカスタムなシナリオでは、DataStream API のタイマーメカニズムを使用できます。
タイマーの使用方法
Flink デプロイメントの開発者は、KeyedStream で KeyedProcessFunction、ConnectedStream で KeyedCoProcessFunction、または BroadcastConnectedStream で KeyedBroadcastProcessFunction を使用できます。これらの関数によって提供される TimerService サービスにより、タイマーを使用できます。KeyedProcessFunction は、タイマーを使用する際に最も一般的に使用される関数です。次の例では、KeyedProcessFunction でタイマーを使用する方法について説明します。
RichFlatMapFunction と同様に、KeyedProcessFunction は単一のデータレコードを処理し、ゼロまたは任意の数のデータレコードを生成するために使用できます。ただし、KeyedProcessFunction は KeyedStream でのみ使用でき、タイマーを提供します。
タイマーは KeyedState に基づいて保存および復元されます。そのため、タイマーは ProcessFunction ではなく、KeyedProcessFunction でのみ使用できます。
public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction {
// 入力データを処理します。
public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;
// タイマーがトリガーされたときのコールバック。
public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}
// データ処理のコンテキスト。タイマーコールバックのコンテキストの基底クラスでもあります。
public abstract class Context {
// 現在のデータレコードまたはタイマーのタイムスタンプ。
public abstract Long timestamp();
// タイマーを登録または削除するための TimerService を取得します。
public abstract TimerService timerService();
// サイド出力としてデータを出力します。
public abstract <X> void output(OutputTag<X> outputTag, X value);
// 現在のデータレコードのキーを取得します。
public abstract K getCurrentKey();
}
// タイマーコールバックで使用されるコンテキスト。
public abstract class OnTimerContext extends Context {
// 現在のタイマーの TimeDomain を取得します。これは処理時間またはイベント時間を示します。
public abstract TimeDomain timeDomain();
// 現在のタイマーのキーを取得します。
public abstract K getCurrentKey();
}
}
KeyedProcessFunction の `Context` は `TimerService` へのアクセスを提供します。データ処理時またはタイマーコールバック処理時に、`TimerService` を使用して新しいタイマーを登録したり、既存のタイマーを削除したりできます。登録の時間単位はミリ秒です。
public interface TimerService {
// 現在の処理時間を取得します。
long currentProcessingTime();
// 現在のイベントタイム Watermark を取得します。
long currentWatermark();
// 処理時間タイマーを登録します。
void registerProcessingTimeTimer(long time);
// イベント時間タイマーを登録します。
void registerEventTimeTimer(long time);
// 処理時間タイマーを削除します。
void deleteProcessingTimeTimer(long time);
// イベント時間タイマーを削除します。
void deleteEventTimeTimer(long time);
}
processElement メソッドでタイマーを登録すると、現在処理されているデータのキーが使用されます。onTimer メソッドでタイマーを登録すると、現在のタイマーのキーが使用されます。同じキーには同時に 1 つのタイマーしかありません。したがって、計算は 1 回だけトリガーされます。異なるキーは個別に計算をトリガーできます。登録された各タイマーは 1 回だけトリガーされます。タイマーを定期的にトリガーするロジックを適用したい場合は、onTimer メソッドで次の時点にトリガーされるタイマーを登録する必要があります。
タイマーの使用例
Flink のウィンドウはタイマーに基づいて使用されます。この例では、イベント時間に基づくウィンドウで、入力値の合計を計算し、1 分ごとにデータを出力するロジックを示します。次のサンプルコードは、DataStream API でウィンドウを使用してこのロジックを実装する方法の例を示しています。
DataStream<Tuple2<String, Long>> sum = inputs
.keyBy(input->input.f0)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.reduce(new SumReduceFunction());
KeyedProcessFunction とタイマーを直接使用して、同様のロジックを実装できます。
DataStream<Tuple2<String, Long>> sum = inputs
.keyBy(input -> input.f0)
.process(new KeyedProcessFunction<String, Tuple2<String, Long>, Tuple2<String, Long>>() {
// ウィンドウの合計を格納する状態。
private ValueState<Long> sumState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
sumState = getRuntimeContext().getState(new ValueStateDescriptor<>("sum", Long.class));
}
@Override
public void processElement(Tuple2<String, Long> value, Context ctx, Collector<Tuple2<String, Long>> out) throws Exception {
if (sumState.value() == null) {
// キーのデータが初めて処理されるとき、またはタイマーがトリガーされた後に初めて処理されるときに、現在のデータのイベント時間に基づいてタイムウィンドウを計算します。次に、そのウィンドウの終了時刻にタイマーを登録します。
ctx.timerService().registerEventTimeTimer(getWindowStartWithOffset(ctx.timestamp(), 0, 60 * 1000) + 60 * 1000);
sumState.update(value.f1);
} else {
// それ以外の場合は、合計に追加します。
sumState.update(sumState.value() + value.f1);
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out) throws Exception {
// この期間の合計を出力し、累積値をクリアします。
out.collect(new Tuple2<>(ctx.getCurrentKey(), sumState.value()));
sumState.clear();
}
// このメソッドは TimeWindow.java からコピーされています。指定されたタイムスタンプが属するウィンドウの開始を計算します。
private long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
final long remainder = (timestamp - offset) % windowSize;
// handle both positive and negative cases
if (remainder < 0) {
return timestamp - (remainder + windowSize);
} else {
return timestamp - remainder;
}
}
});
キーのデータが初めて入力されると、KeyedProcessFunction は現在のデータのイベント時間に基づいてタイムウィンドウを計算し、このウィンドウの終了時刻にトリガーされるタイマーを登録し、データの累積を開始します。イベントタイム Watermark が指定された時間に達すると、Flink は onTimer メソッドを呼び出して累積値を出力し、累積状態をクリアします。このプロセスは、このキーに新しいデータが入力されるたびに繰り返されます。
上記の 2 つのメソッドのロジックは同じです。タイマーがトリガーされてキーのデータが処理された後、そのキーに新しいデータが入力されず、そのキーに対して出力データが生成されない場合があります。デプロイメントに限られた数の入力キーが存在し、将来新しいデータが入力されるかどうかにかかわらず、キーにデータが一度入力された後、同じイベント時間間隔で累積値を取得したい場合は、onTimer メソッドのロジックを変更できます。次のサンプルコードは、メソッドのロジックを変更する方法の例を示しています。
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out) throws Exception {
// この期間の合計を出力します。
out.collect(new Tuple2<>(ctx.getCurrentKey(), sumState.value()));
// 累積値をリセットしますが、クリアしません。
sumState.update(0L);
// 次の出力のタイマーを登録します。タイムスタンプはウィンドウの終了時刻なので、次のウィンドウのために 60 秒を追加できます。
ctx.timerService().registerEventTimeTimer(timestamp + 60 * 1000);
}
これにより、初期割り当て後に sumState.value() が null になることはありません。新しいデータが受信されない場合でも、ジョブはこのキーの合計を定期的に出力します。データが利用できない場合、ジョブは 0 を出力します。
出力エポックは、イベントタイム Watermark によって決定されるイベント時間エポックです。
イベント時間の代わりに処理時間に基づいて集約を実行するには、タイマー登録と時間取得のための processElement のロジックを次のように置き換えることができます。
@Override
public void processElement(Tuple2<String, Long> value, Context ctx, Collector<Tuple2<String, Long>> out) throws Exception {
if (sumState.value() == null) {
// 現在の処理時間に基づいてタイムウィンドウを計算し、ウィンドウの終了時刻にタイマーを登録します。
ctx.timerService().registerProcessingTimeTimer(getWindowStartWithOffset(ctx.timerService().currentProcessingTime(), 0, 60 * 1000) + 60 * 1000);
sumState.update(value.f1);
} else {
sumState.update(sumState.value() + value.f1);
}
}
処理時間が指定された時間に達すると、onTimer メソッドが呼び出されます。
上記のロジックに基づいて、状態の計算ロジックと出力データのロジックを変更して、他の同様の計算要件を満たすことができます。
タイマーメカニズムは、ハートビートタイムアウトアラートのビジネスロジックをサポートするためにも必要です。ウィンドウのみを使用する場合、ハートビートタイムアウトアラートの要件は満たされません。キーにデータが一度入力された後、1 分以内に新しいデータが入力されない場合、アラートメッセージが送信されます。便宜上、データ入力にはキーのみが使用されます。次のサンプルコードは、このロジックを実装する方法の例を示しています。
DataStream<String> sum = inputs
.keyBy(input->input)
.process(new KeyedProcessFunction<String, String, String>() {
// 以前のタイムアウト時間を格納する状態。
private ValueState<Long> lastTimerState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
lastTimerState = getRuntimeContext().getState(new ValueStateDescriptor<>("timer", Long.class));
}
@Override
public void processElement(String value, Context ctx, Collector<String> out) throws Exception {
if (lastTimerState.value() != null) {
// 以前に登録されたタイムアウトタイマーを削除します。
ctx.timerService().deleteProcessingTimeTimer(lastTimerState.value());
}
// 新しいタイムアウトタイマーを登録し、後で削除するために状態に記録します。
long timeout = ctx.timerService().currentProcessingTime() + 60 * 1000;
ctx.timerService().registerProcessingTimeTimer(timeout);
lastTimerState.update(timeout);
// 通常のデータを出力します。
out.collect(value);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
// このメソッドに入ったということは、タイムアウトが発生したことを意味します。ハートビートタイムアウト警告メッセージを送信します。デフォルトの出力ストリームの代わりにサイド出力を使用することも検討できます。
out.collect("Heartbeat timeout:" + ctx.getCurrentKey());
});
タイマーの使用に関する提案
-
ほとんどの場合、ビジネス要件を満たせるのであれば、ウィンドウを使用することを推奨します。
-
KeyedProcessFunction の processElement メソッドと onTimer メソッドは同時に呼び出すことはできません。そのため、同期の問題を心配する必要はありません。ただし、onTimer メソッドのロジックがデータ処理をブロックする可能性があります。
-
Flink はタイマーの登録ステータスをクエリする API を提供していません。そのため、タイマーを削除するには、関数がその登録時間を記録する必要があります。
-
タイマーはチェックポイントに保存されます。デプロイメントがフェールオーバーから復元されるか、セーブポイントから再起動されると、タイマーも復元されます。
-
指定された時間に達した処理時間タイマーはトリガーされます。そのため、デプロイメントの開始後、短期間に多くのタイマーがトリガーされ、デプロイメントがデータを処理および送信する可能性があります。
-
イベント時間を使用するタイマーは、イベントタイム Watermark が受信された後にトリガーされます。そのため、イベントタイム Watermark が更新された後、多くのタイマーがトリガーされ、デプロイメントがデータを処理および送信する可能性があります。イベントタイム Watermark は、デプロイメントが長期間実行された後に更新されます。
-
-
タイマーはキーに関連しており、チェックポイントの KeyedState に保存されます。そのため、タイマーは KeyedStream、またはキーを持つ ConnectedStream や BroadcastConnectedStream でのみ使用できます。キーを持たないストリーミングデプロイメントでタイマーを使用したい場合は、次のいずれかの方法を使用できます。
-
タイマーのロジックが特定のフィールド値に依存せず、各データレコードに対してタイマーが個別に使用される場合、データ内の UUID を keyBy メソッドのキーとして使用できます。
重要このフィールドは入力データに存在する必要があります。`keyBy` メソッドで生成されるランダムな値であってはなりません。
-
グローバル集約のためにタイマーが共有される場合、keyBy メソッドで定数をキーとして使用し、並列処理を 1 に設定できます。
-
タイマーの使用に関する注意事項
-
多くのタイマーが同時にトリガーされる状況を避けてください。たとえば、数百万のキーに対するすべてのタイマーが毎正時にトリガーされるように指定されている場合、タイマーのトリガー時間を正時の数分前または数分後、あるいはより長い時間範囲に調整することを推奨します。
-
processElement メソッドと onTimer メソッドでタイマーを繰り返し登録することは避けてください。この操作により、タイマーの数が劇的に増加する可能性があります。
-
ほとんどの場合、タイマーのオーバーヘッドは小さいです。そのため、多くのキーに対してタイマーを登録できます。ただし、チェックポイント時間とメモリ使用量に注意することを推奨します。タイマーの使用後にチェックポイント時間またはメモリ使用量が指定されたしきい値を超えた場合、タイマーのロジックを最適化するか、別の方法を使用する必要があるかもしれません。
-
有界ストリームで処理時間タイマーを使用する場合、データ処理が終了すると、指定された時間に達しない処理時間タイマーは無視されます。この場合、データが失われる可能性があります。