全部产品
Search
文档中心

Realtime Compute for Apache Flink:Praktik terbaik untuk menggunakan timer dalam DataStream

更新时间:Jun 19, 2025

Topik ini menjelaskan cara menggunakan timer dalam DataStream serta memberikan saran dan tindakan pencegahan saat menggunakan timer.

Apa itu timer?

Flink menyediakan mekanisme timer.

Dalam banyak kasus, penyebaran Flink digerakkan oleh perhitungan data berdasarkan peristiwa. Namun, dalam skenario tertentu, penyebaran Flink juga digerakkan oleh waktu pemrosesan (ProcessingTime) atau waktu peristiwa (EventTime). Dalam hal ini, penggunaan timer menjadi penting. Operator dapat mendaftarkan timer, yang memicu logika komputasi ketika waktu mencapai waktu pemrosesan yang ditentukan atau watermark waktu-peristiwa mencapai waktu peristiwa yang ditentukan. Windows di Flink diproses berdasarkan timer.

Dalam kebanyakan kasus, kebutuhan dalam skenario tersebut dapat dipenuhi dengan menggunakan windows dalam SQL. Namun, untuk kebutuhan yang lebih kompleks dan disesuaikan, Anda dapat menggunakan mekanisme timer yang didukung oleh API DataStream.

Bagaimana cara saya menggunakan timer?

Pengembang penyebaran Flink dapat menggunakan KeyedProcessFunction pada KeyedStream, KeyedCoProcessFunction pada ConnectedStream, atau KeyedBroadcastProcessFunction pada BroadcastConnectedStream. Layanan TimerService yang disediakan oleh fungsi-fungsi ini memungkinkan Anda menggunakan timer. KeyedProcessFunction adalah fungsi yang paling sering digunakan saat menggunakan timer. Contoh berikut menunjukkan cara menggunakan timer dalam KeyedProcessFunction.

Mirip dengan RichFlatMapFunction, KeyedProcessFunction dapat digunakan untuk memproses satu rekaman data tunggal dan menghasilkan nol atau sejumlah rekaman data. Namun, KeyedProcessFunction hanya dapat digunakan pada KeyedStream dan menyediakan timer.

null

Timer disimpan dan dipulihkan berdasarkan KeyedState. Oleh karena itu, Anda hanya dapat menggunakan timer dalam KeyedProcessFunction, bukan dalam ProcessFunction.

    public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction {
    
        // Proses data masukan.
    	public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;
    
        // Mulai panggilan balik pada waktu yang ditentukan oleh timer.
    	public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}
    
        // Dapatkan konteks yang digunakan dalam data yang diproses. Konteks ini adalah kelas dasar dari konteks dalam panggilan balik timer.
        public abstract class Context {
    
            // Dapatkan timestamp dalam data yang diproses atau timer saat ini.
            public abstract Long timestamp();
    
            // Dapatkan TimerService untuk mendaftarkan timer baru atau menghapus timer saat ini.
            public abstract TimerService timerService();
    
            // Gunakan data sebagai side output.
            public abstract <X> void output(OutputTag<X> outputTag, X value);
    
            // Dapatkan kunci dari data yang diproses.
            public abstract K getCurrentKey();
        }
    
        // Dapatkan konteks dalam panggilan balik timer.
        public abstract class OnTimerContext extends Context {
            // Dapatkan TimeDomain dari timer saat ini. TimeDomain menentukan apakah timer adalah timer waktu-pemrosesan atau timer waktu-peristiwa.
            public abstract TimeDomain timeDomain();
    
            // Dapatkan kunci dari timer saat ini.
            public abstract K getCurrentKey();
        }
    }

    KeyedProcessFunction.Context memberikan akses ke TimerService. Saat menggunakan metode processElement atau onTimer, Anda dapat menggunakan TimerService untuk mendaftarkan timer baru atau menghapus timer yang ada. Unit timer yang didaftarkan adalah milidetik.

    public interface TimerService {
    
        // Dapatkan waktu pemrosesan saat ini.
        long currentProcessingTime();
    
        // Dapatkan watermark waktu-peristiwa saat ini.
        long currentWatermark();
    
        // Daftarkan timer waktu-pemrosesan.
        void registerProcessingTimeTimer(long time);
    
        // Daftarkan timer waktu-peristiwa.
        void registerEventTimeTimer(long time);
    
        // Hapus timer waktu-pemrosesan.
        void deleteProcessingTimeTimer(long time);
    
        // Hapus timer waktu-peristiwa.
        void deleteEventTimeTimer(long time);
    }

    Saat mendaftarkan timer dalam metode processElement, kunci data yang sedang diproses digunakan. Saat mendaftarkan timer dalam metode onTimer, kunci timer saat ini digunakan. Setiap kunci hanya memiliki satu timer pada waktu yang sama, sehingga komputasi hanya dipicu sekali. Kunci yang berbeda dapat secara terpisah memicu komputasi. Jika ingin menerapkan logika yang secara berkala memicu timer, Anda harus mendaftarkan timer yang dipicu pada titik waktu berikutnya dalam metode onTimer.

    Contoh

    Windows Flink digunakan berdasarkan timer. Contoh ini menunjukkan logika untuk menghitung jumlah nilai input dan menghasilkan data keluaran setiap menit dalam windows yang didasarkan pada waktu peristiwa. Kode sampel berikut menunjukkan cara menggunakan windows dalam API DataStream untuk mengimplementasikan logika tersebut.

    DataStream<Tuple2<String, Long>> sum = inputs
            .keyBy(input->input.f0)
            .window(TumblingEventTimeWindows.of(Time.minutes(1)))
            .reduce(new SumReduceFunction());

    Kode sampel berikut menunjukkan cara menggunakan KeyedProcessFunction dan mekanisme timer untuk mengimplementasikan logika tersebut.

    DataStream<Tuple2<String, Long>> sum = inputs
        .keyBy(input -> input.f0)
        .process(new KeyedProcessFunction<String, Tuple2<String, Long>, Tuple2<String, Long>>() {
            // Rekam status jumlah dalam window.
            private ValueState<Long> sumState;
    
            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                sumState = getRuntimeContext().getState(new ValueStateDescriptor<>("sum", Long.class));
            }
    
            @Override
            public void processElement(Tuple2<String, Long> value, Context ctx, Collector<Tuple2<String, Long>> out) throws Exception {
                if (sumState.value() == null) {
                    // Saat data untuk sebuah kunci diproses untuk pertama kalinya atau data diproses untuk pertama kalinya setelah timer dipicu, KeyedProcessFunction menghitung jendela waktu berdasarkan waktu peristiwa dari data saat ini dan mendaftarkan timer yang dipicu pada akhir waktu jendela.
                    ctx.timerService().registerEventTimeTimer(getWindowStartWithOffset(ctx.timestamp(), 0, 60 * 1000) + 60 * 1000);
                    sumState.update(value.f1);
                } else {
                    // Jika kondisi sebelumnya tidak terpenuhi, KeyedProcessFunction mengakumulasi nilai input.
                    sumState.update(sumState.value() + value.f1);
                }
            }
    
            @Override
            public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out) throws Exception {
                // Hasilkan jumlah nilai input yang diperoleh dari waktu pendaftaran timer hingga waktu pemanggilan balik metode onTimer. Kemudian, hapus nilai yang terakumulasi.
                out.collect(new Tuple2<>(ctx.getCurrentKey(), sumState.value()));
                sumState.clear();
            }
    
            // Metode getWindowsStartWithOffset disalin dari TimeWindow.java dan digunakan untuk menghitung waktu mulai dari jendela tempat timestamp tertentu termasuk.
            private long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
                final long remainder = (timestamp - offset) % windowSize;
                // tangani kasus positif dan negatif
                if (remainder < 0) {
                    return timestamp - (remainder + windowSize);
                } else {
                    return timestamp - remainder;
                }
            }
        });

    Saat data dimasukkan untuk kunci pertama kali, KeyedProcessFunction menghitung jendela waktu berdasarkan waktu peristiwa dari data saat ini, mendaftarkan timer yang dipicu pada akhir waktu jendela ini, dan mulai mengakumulasi data. Setelah watermark waktu-peristiwa mencapai waktu yang ditentukan, Flink memanggil metode onTimer untuk mengekspor nilai yang terakumulasi dan membersihkan status akumulasi. Proses ini diulang saat data baru dimasukkan untuk kunci ini.

    Logika kedua metode sebelumnya adalah sama. Setelah timer dipicu untuk memproses data dari kunci, tidak ada data baru yang dimasukkan untuk kunci dan tidak ada data keluaran yang dihasilkan untuk kunci. Jika sejumlah kunci input terbatas ada dalam penyebaran dan Anda ingin mendapatkan nilai terakumulasi pada interval waktu peristiwa yang sama setelah data dimasukkan untuk sebuah kunci sekali tanpa memperhatikan apakah data baru dimasukkan di masa depan, Anda dapat memodifikasi logika metode OnTimer. Kode sampel berikut menunjukkan cara memodifikasi logika metode tersebut.

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out) throws Exception {
        // Hasilkan nilai jumlah yang diperoleh dari waktu pendaftaran timer hingga waktu pemanggilan balik metode onTimer.
        out.collect(new Tuple2<>(ctx.getCurrentKey(), sumState.value()));
        // Atur ulang tetapi jangan hapus nilai yang terakumulasi.
        sumState.update(0L);
        // Daftarkan timer yang digunakan untuk menghasilkan nilai terakumulasi berikutnya. Timestamp menentukan waktu akhir jendela. Anda dapat menambahkan 60 detik ke jendela berikutnya.
        ctx.timerService().registerEventTimeTimer(timestamp + 60 * 1000);
    }

    Dengan cara ini, nilai sumState.value() tidak pernah bisa menjadi null setelah nilai ditetapkan untuk sumState.value(). Nilai terakumulasi dari kunci dihasilkan pada interval reguler tanpa memperhatikan apakah data dimasukkan untuk kunci. Jika tidak ada data yang dimasukkan untuk kunci, nilai keluaran adalah 0.

    null

    Interval di mana nilai terakumulasi dari kunci dihasilkan adalah interval waktu peristiwa yang ditentukan oleh watermark waktu-peristiwa.

    Jika Anda ingin mengumpulkan data berdasarkan waktu pemrosesan alih-alih waktu peristiwa, Anda dapat memodifikasi logika yang digunakan untuk mendaftarkan timer dan mendapatkan waktu dalam metode processElement. Kode sampel berikut menunjukkan cara memodifikasi logika metode tersebut.

    @Override
    public void processElement(Tuple2<String, Long> value, Context ctx, Collector<Tuple2<String, Long>> out) throws Exception {
        if (sumState.value() == null) {
            // Hitung jendela waktu berdasarkan waktu pemrosesan saat ini, dan daftarkan timer yang dipicu pada akhir waktu jendela.
            ctx.timerService().registerProcessingTimeTimer(getWindowStartWithOffset(ctx.timerService().currentProcessingTime(), 0, 60 * 1000) + 60 * 1000);
            sumState.update(value.f1);
        } else {
            sumState.update(sumState.value() + value.f1);
        }
    }

    Saat waktu pemrosesan mencapai waktu yang ditentukan, metode onTimer dipanggil.

    Anda dapat memodifikasi logika komputasi status dan logika data keluaran berdasarkan logika sebelumnya untuk memenuhi persyaratan komputasi lain yang serupa.

    Mekanisme timer juga diperlukan untuk mendukung logika bisnis dari peringatan timeout denyut jantung. Jika Anda hanya menggunakan windows, persyaratan untuk peringatan timeout denyut jantung tidak dapat dipenuhi. Jika tidak ada data baru yang dimasukkan untuk sebuah kunci dalam 1 menit setelah data dimasukkan untuk kunci sekali, pesan peringatan dikirim. Untuk kemudahan Anda, hanya kunci yang digunakan untuk input data. Kode sampel berikut menunjukkan cara mengimplementasikan logika tersebut.

    DataStream<String> sum = inputs
        .keyBy(input->input)
        .process(new KeyedProcessFunction<String, String, String>() {
            // Rekam status periode timeout sebelumnya.
            private ValueState<Long> lastTimerState;
    
            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                lastTimerState = getRuntimeContext().getState(new ValueStateDescriptor<>("timer", Long.class));
            }
    
            @Override
            public void processElement(String value, Context ctx, Collector<String> out) throws Exception {
                if (lastTimerState.value() != null) {
                    // Hapus timer yang sebelumnya didaftarkan yang kadaluwarsa.
                    ctx.timerService().deleteProcessingTimeTimer(lastTimerState.value());
                }
                // Daftarkan timer baru dan rekam timer dalam status untuk pembersihan selanjutnya.
                long timeout = ctx.timerService().currentProcessingTime() + 60 * 1000;
                ctx.timerService().registerProcessingTimeTimer(timeout);
                lastTimerState.update(timeout);
                // Hasilkan data keluaran normal.
                out.collect(value);
            }
    
            @Override
            public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
                // Metode ini menunjukkan timeout denyut jantung. Pesan timeout denyut jantung dikirim. Anda juga dapat menggunakan SideOutput alih-alih aliran keluaran default untuk keluaran data.
                out.collect("Timeout denyut jantung:" + ctx.getCurrentKey());
            });

    Saran

    • Dalam kebanyakan kasus, kami sarankan Anda menggunakan windows jika windows dapat memenuhi persyaratan bisnis Anda.

    • Metode processElement dan onTimer dari KeyedProcessFunction tidak dapat dipanggil pada saat yang sama. Oleh karena itu, Anda tidak perlu khawatir tentang masalah sinkronisasi. Namun, logika metode onTimer dapat memblokir pemrosesan data.

    • Flink tidak menyediakan API untuk memeriksa status pendaftaran timer. Jika timer diharapkan dihapus, fungsi yang Anda gunakan harus mencatat waktu ketika timer didaftarkan. Fungsi tersebut dapat berupa KeyedProcessFunction, KeyedCoProcessFunction, atau KeyedBroadcastProcessFunction.

    • Timer disimpan dalam checkpoint. Saat penyebaran dipulihkan dari failover atau dimulai ulang dari savepoint, timer juga dipulihkan.

      • Timer waktu-pemrosesan yang mencapai waktu yang ditentukan dipicu. Oleh karena itu, sejumlah besar timer mungkin dipicu untuk penyebaran untuk memproses dan mengirim data dalam waktu singkat setelah penyebaran dimulai.

      • Timer yang menggunakan waktu peristiwa dipicu setelah watermark waktu-peristiwa diterima. Oleh karena itu, sejumlah besar timer mungkin dipicu untuk penyebaran untuk memproses dan mengirim data setelah watermark waktu-peristiwa diperbarui. Watermark waktu-peristiwa diperbarui setelah penyebaran telah dimulai untuk jangka waktu yang lama.

    • Timer terkait dengan kunci dan disimpan dalam KeyedState dalam checkpoint. Oleh karena itu, timer hanya dapat digunakan dalam KeyedStream atau dalam ConnectedStream atau BroadcastConnectedStream yang memiliki kunci. Jika Anda ingin menggunakan timer untuk penyebaran streaming yang tidak memiliki kunci, Anda dapat menggunakan salah satu dari metode berikut:

      • Jika logika timer independen dari nilai bidang tertentu dan timer digunakan secara mandiri untuk setiap rekaman data, Anda dapat menggunakan UUID dalam data sebagai kunci dalam metode keyBy.

        null

        Bidang ini harus ada dalam data masukan dan tidak dapat digunakan untuk menghasilkan nilai acak untuk metode keyBy.

      • Jika timer dibagikan untuk agregasi global, Anda dapat menggunakan konstanta sebagai kunci dalam metode keyBy dan atur paralelisme ke 1.

    Tindakan Pencegahan

    • Hindari situasi di mana sejumlah besar timer dipicu pada saat yang sama. Misalnya, jika semua timer untuk jutaan kunci ditentukan untuk dipicu tepat pada jam, kami sarankan Anda menyesuaikan waktu pemicuan timer beberapa menit sebelum atau sesudah jam atau rentang waktu yang lebih lama.

    • Hindari mendaftarkan timer berulang kali dalam metode processElement dan onTimer karena operasi ini dapat menyebabkan jumlah timer meningkat secara dramatis.

    • Dalam kebanyakan kasus, overhead timer kecil. Oleh karena itu, Anda dapat mendaftarkan timer untuk sejumlah besar kunci. Namun, kami sarankan Anda memperhatikan waktu checkpoint dan penggunaan memori. Jika waktu checkpoint atau penggunaan memori melebihi ambang batas tertentu setelah timer digunakan, Anda mungkin perlu mengoptimalkan logika timer atau menggunakan metode lain.

    • Jika Anda menggunakan timer waktu-pemrosesan pada aliran terbatas, timer waktu-pemrosesan yang tidak mencapai waktu yang ditentukan akan diabaikan saat pemrosesan data berakhir. Dalam hal ini, data mungkin hilang.