Dalam Apache Flink, pemicu jendela menentukan kapan data dalam jendela siap untuk dihitung dan dikeluarkan. Flink menyediakan berbagai pemicu bawaan untuk kasus penggunaan umum. Namun, dalam skenario bisnis yang kompleks, pemicu khusus memungkinkan Anda mengimplementasikan logika bisnis secara fleksibel.
Ikhtisar
Apa yang dilakukan pemicu
Memantau elemen data yang masuk ke jendela.
Menentukan apakah akan memicu pemrosesan berdasarkan kondisi tertentu, seperti waktu atau jumlah peristiwa.
Mengembalikan salah satu dari operasi berikut:
CONTINUE: Lanjutkan mengumpulkan data.FIRE: Pemicu komputasi dan pertahankan status.FIRE_AND_PURGE: Pemicu komputasi dan bersihkan status.PURGE: Bersihkan status tanpa memicu komputasi.
Pemicu bawaan
Flink menyediakan pemicu bawaan berikut untuk operasi jendela berbasis waktu atau hitungan:
Jenis pemicu | Deskripsi |
EventTimeTrigger | Memicu jendela ketika waktu watermark melebihi waktu akhir jendela. Ini adalah pemicu default untuk jendela berbasis waktu peristiwa. |
ProcessingTimeTrigger | Memicu jendela ketika waktu pemrosesan mencapai waktu akhir jendela. Ini adalah pemicu default untuk jendela berbasis waktu pemrosesan. |
CountTrigger | Memicu jendela ketika jumlah peristiwa dalam jendela mencapai ambang batas yang ditentukan. |
PurgingTrigger | Secara otomatis membersihkan jendela ketika dipicu. Ini membungkus pemicu lainnya. |
Jika Anda secara eksplisit menetapkan pemicu tertentu untuk jendela, itu akan menggantikan pemicu default. Misalnya, jika Anda menetapkan CountTrigger untuk jendela berbasis waktu peristiwa, EventTimeTrigger tidak akan lagi berlaku.
Dalam kasus penggunaan dunia nyata, sering kali kita perlu:
Menggunakan lebih dari satu kondisi pemicu, seperti memicu jendela setelah 5 peristiwa atau 1 menit.
Memacu jendela berdasarkan peristiwa tertentu, seperti logout pengguna atau penyelesaian pesanan.
Mengelola siklus hidup jendela, seperti penembakan awal, penembakan tertunda, dan penembakan berulang.
Menghindari efek samping perilaku default, seperti jendela yang dipicu ulang oleh data terlambat.
Permintaan kompleks ini tidak dapat dipenuhi oleh pemicu bawaan, sehingga membuat pemicu khusus menjadi diperlukan.
Buat pemicu khusus
Turunkan kelas abstrak
Trigger<T, W>.T: Tipe data dalam jendela.
W: Tipe jendela, seperti
TimeWindowatau subkelasnya.
Timpa metode inti.
public abstract class Trigger<T, W extends Window> implements Serializable { public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx); public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx); public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx); public void clear(W window, TriggerContext ctx); public boolean canMerge(); public void onMerge(W window, OnMergeContext mergeContext); }Metode yang harus ditimpa meliputi:
Metode
Deskripsi
Operasi
onElement()
Dipanggil ketika setiap peristiwa baru masuk ke jendela.
Menentukan apakah kondisi pemicu, seperti ambang batas hitungan peristiwa atau kedatangan peristiwa tertentu, terpenuhi.
onProcessingTime()
Dipanggil ketika timer waktu pemrosesan yang terdaftar aktif.
Digunakan untuk operasi jendela berbasis waktu pemrosesan. Metode ini jarang digunakan.
onEventTime()
Dipanggil ketika timer waktu peristiwa yang terdaftar aktif.
Menutup jendela ketika watermark mencapai timestamp akhir.
clear()
Membersihkan jendela.
Membersihkan status jendela untuk mencegah kebocoran memori.
canMerge()
onMerge()
Dipanggil ketika jendela (seperti jendela sesi) digabungkan.
Jika metode digunakan untuk menggabungkan jendela, Anda harus memperbarui timer dengan benar.
Gunakan
TriggerContextuntuk mengelola status dan timer.Manajemen status: Dapatkan status jendela, seperti penghitung, melalui
ctx.getPartitionedState(StateDescriptor).Manajemen timer: Daftarkan timer melalui
ctx.registerEventTimeTimer(timestamp).Pembersihan status: Gunakan
state.clear()dalamclear()untuk membersihkan status.Penghapusan timer: Seringkali, Anda tidak perlu menghapus timer secara manual. Flink secara otomatis membersihkannya ketika jendela ditutup.
Contoh
Dalam jendela tumbling waktu-peristiwa selama 1 jam, komputasi jendela dipicu ketika jendela berakhir. Selain itu, komputasi langsung dipicu pada kedatangan peristiwa kelima pengguna dalam jendela tersebut. Pemicu awal ini hanya dipicu sekali per jendela.
Kode contoh:
public class CustomCountTrigger extends Trigger<UserEvent, TimeWindow> {
// Rekam jumlah elemen setiap kunci dalam jendela
private final ValueStateDescriptor<Integer> countStateDesc =
new ValueStateDescriptor<>("count", Integer.class);
// Rekam apakah komputasi sudah dipicu
private final ValueStateDescriptor<Boolean> flagStateDesc =
new ValueStateDescriptor<>("flag", Boolean.class);
// Panggil onElement() ketika elemen baru masuk ke jendela
@Override
public TriggerResult onElement(UserEvent event, long timestamp, TimeWindow window, Trigger.TriggerContext ctx) throws Exception {
// Dapatkan status hitungan kunci saat ini dalam jendela saat ini
ValueState<Integer> countState = ctx.getPartitionedState(countStateDesc);
ValueState<Boolean> flagState = ctx.getPartitionedState(flagStateDesc);
int count = countState.value() == null ? 0 : countState.value();
boolean flag = flagState.value() == null ? false : flagState.value();
// Tambahkan hitungan untuk setiap elemen
count += 1;
countState.update(count); // Perbarui status
// Ketika hitungan mencapai 5, picu komputasi jendela segera
if (count >= 5 && !flag) {
flagState.update(true); // Perbarui status untuk memastikan komputasi tambahan ini hanya dipicu sekali
return TriggerResult.FIRE;
} else {
return TriggerResult.CONTINUE;
}
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext triggerContext) throws Exception {
// Lewati timer waktu pemrosesan dalam pemicu ini.
return TriggerResult.CONTINUE;
}
// Panggil onEventTime() ketika timer waktu peristiwa yang terdaftar aktif (misalnya, waktu akhir jendela)
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.FIRE_AND_PURGE; // Pemicu aktif dan jendela dibersihkan
}
@Override
public void clear(TimeWindow timeWindow, TriggerContext ctx) throws Exception {
// Bersihkan status jendela
ctx.getPartitionedState(countStateDesc).clear();
ctx.getPartitionedState(flagStateDesc).clear();
}
}Penggunaan Pemicu: Pemicu harus digunakan dengan operasi jendela selama pemrosesan aliran.
DataStream<UserEvent> source = ...; // Aliran input yang ada
source.keyBy(keySelector) // Seperti .keyBy(value -> value.userId) untuk mengelompokkan elemen berdasarkan pengguna
.window(TumblingEventTimeWindows.of(Time.seconds(60))) // Tetapkan ukuran jendela tumbling menjadi 60 detik
.trigger(new CustomCountTrigger()) // Pemicu khusus: picu setelah 5 elemen atau timeout jendela
.process(new ProcessWindowFunction<UserEvent, String, KeyedType, TimeWindow>() {
@Override
public void process(KeyedType key, Context context, Iterable<UserEvent> elements, Collector<String> out) {
int count = 0;
// Iterasi semua elemen dalam jendela, hitung yang memiliki action == 1
for (UserEvent event : elements) {
if (event.action == 1) {
count++;
}
}
// Keluarkan hasil
out.collect("Key: " + key + ", Jumlah akses: " + count);
}
})
.print();Verifikasi Hasil: Jika 8 catatan akses dari pengguna yang sama diterima dalam satu menit, hasilnya adalah 2 catatan:
Key: 101, Jumlah akses: 5 // Karena count >= 5, komputasi awal dipicu, tetapi status jendela tidak dibersihkan.
Key: 101, Jumlah akses: 8 // Memicu komputasi ketika jendela ditutup dan membersihkan status.Esktensi dan ringkasan
Kasus penggunaan lanjutan dan implementasi
Kasus penggunaan 1
Picu komputasi lebih dari sekali sebelum jendela berakhir untuk peringatan.
Hapus
flagStateDescuntuk memungkinkan komputasi dipicu lebih dari sekali.Atau, tambahkan penanda hitungan untuk mengakhiri jendela hanya setelah sejumlah pemotretan pemicu (seperti untuk peristiwa peringatan).
### Setelah menghapus flagStateDesc, 5 rekaman dikeluarkan. Key: 101, Jumlah akses: 5 // count >= 5, memicu komputasi awal dan tidak membersihkan status. Key: 101, Jumlah akses: 6 // count >= 5, memicu komputasi awal. Key: 101, Jumlah akses: 7 // count >= 5, memicu komputasi awal. Key: 101, Jumlah akses: 8 // count >= 5, memicu komputasi awal. Key: 101, Jumlah akses: 8 // Memicu komputasi ketika jendela ditutup, dan membersihkan status.Pemicu hanya memutuskan kapan memicu komputasi. Informasi objek aktual diperoleh dari
.process, di mana Anda dapat menentukan menghasilkan hasil koleksi yang berbeda berdasarkan jumlah dan kondisi status.
Kasus penggunaan 2
Penutupan jendela memerlukan kemajuan watermark. Bagaimana cara memastikan komputasi jendela tepat waktu jika tidak ada data yang dihasilkan untuk periode waktu yang lama?
Solusi | Bergantung pada watermark? | Jaminan pemicu tepat waktu? | Cocok untuk data tidak berurutan? | Skenario cocok |
Gunakan jendela waktu pemrosesan | Tidak | Ya | Tidak | Pemrosesan waktu peristiwa tidak diperlukan. |
Gunakan metode | Ya | Tidak (tergantung pada interval watermark) | Ya | Cocok untuk skenario sederhana, seperti partisi input idle. |
Gunakan generator watermark khusus | Ya | Ya (watermark diperbarui secara berkala) | Ya | Ini adalah pendekatan standar. |
Daftarkan timer | Ya (opsional) | Ya (toleransi kesalahan) | Tidak (penutupan jendela paksa memerlukan waktu yang tepat) | Keandalan yang ditingkatkan diperlukan. |
Kirim pesan denyut jantung dari sistem eksternal | Tidak | Ya | Ya | Kafka memerlukan pemeliharaan tambahan, sedangkan orkestrasi tugas tidak. |
Solusi 1: Gunakan Jendela Waktu Pemrosesan
Jika Anda tidak memerlukan semantik waktu peristiwa (yaitu, waktu terjadinya peristiwa bukanlah perhatian), Anda dapat menggunakan jendela waktu pemrosesan:
.keyBy(keySelector) .window(TumblingProcessingTimeWindows.of(Time.seconds(60))) // Gunakan waktu pemrosesan .process(new MyProcessWindowFunction())Solusi 2: Gunakan Metode
withIdlenessStrategi Watermark Flink
WatermarkStrategymenyediakan metodewithIdlenessuntuk secara otomatis menandai sumber data sebagai idle setelah periode inaktivitas tertentu, mencegahnya menghalangi pembuatan watermark.// Sumber data idle tidak lagi terlibat dalam komputasi watermark minimum, dan tidak akan menghambat kemajuan watermark dari sumber data aktif. WatermarkStrategy .<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20)) .withIdleness(Duration.ofMinutes(1)); // Menunjukkan bahwa jika sumber data atau partisi tidak memiliki peristiwa selama 1 menit, itu ditandai sebagai idleSolusi 3: Gunakan Generator Watermark Khusus
Jika Anda perlu menggunakan semantik waktu peristiwa, tentukan generator watermark khusus yang memastikan watermark terus maju meskipun tidak ada data baru yang tiba:
Rekam waktu kedatangan peristiwa terbaru dalam metode
onEvent().Periksa interval antara waktu saat ini dan waktu terakhir peristiwa diterima dalam metode
onPeriodicEmit().Jika interval melebihi ambang batas yang ditetapkan, anggap sumber data sebagai idle, lewati pembuatan watermark atau langsung buat watermark tertentu.
public class IdleAwareWatermarkGenerator implements WatermarkGenerator<MyEvent> { private long lastEventTimestamp = Long.MIN_VALUE; private final long maxIdleTimeMs; // Waktu idle maksimum public IdleAwareWatermarkGenerator(long maxIdleTimeMs) { this.maxIdleTimeMs = maxIdleTimeMs; } @Override public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) { lastEventTimestamp = Math.max(lastEventTimestamp, eventTimestamp); } @Override public void onPeriodicEmit(WatermarkOutput output) { long currentTime = System.currentTimeMillis(); if (lastEventTimestamp == Long.MIN_VALUE || currentTime - lastEventTimestamp > maxIdleTimeMs) { // Jika tidak ada peristiwa yang tiba untuk waktu yang lama, jangan keluarkan watermark baru return; } output.emitWatermark(new Watermark(lastEventTimestamp)); } }WatermarkStrategy<MyEvent> strategy = WatermarkStrategy .forGenerator((ctx) -> new IdleAwareWatermarkGenerator(60_000)) // Tetapkan waktu idle maksimum menjadi 60 detik .withTimestampAssigner((event, timestamp) -> event.getEventTime());Solusi 4: Daftarkan Timer
@Override public TriggerResult onElement(Event event, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception { ctx.registerEventTimeTimer(window.maxTimestamp()); // Daftarkan timer waktu peristiwa ctx.registerProcessingTimeTimer(window.maxTimestamp() + 1000); // Toleransi kesalahan: pastikan timer dipicu meskipun tidak ada peristiwa return TriggerResult.CONTINUE; } @Override public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception { return TriggerResult.FIRE_AND_PURGE; // Pemicuan paksa dan pembersihan }Solusi 5: Kirim Pesan Denyut Jantung dari Sistem Eksternal
Gunakan Kafka atau alur kerja untuk mengirim pesan denyut jantung secara teratur ke hilir untuk memicu penutupan jendela.
Ringkasan
Poin Utama | Deskripsi |
Pahami siklus hidup jendela |
|
Gunakan status dan timer dengan tepat | Gunakan |
Timpa metode | Implementasikan metode berikut: |
Aktifkan penggabungan jendela (seperti jendela sesi) | Untuk jendela yang dapat digabungkan seperti jendela sesi, implementasikan |
Hindari pemotretan berulang | Kontrol jumlah kali pemicu dapat dipicu, terutama ketika peristiwa mungkin terus tiba setelah |
Kode lengkap
CustomCountTrigger
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
public class CustomCountTrigger extends Trigger<UserEvent, TimeWindow> {
// Rekam jumlah elemen setiap kunci dalam jendela
private final ValueStateDescriptor<Integer> countStateDesc =
new ValueStateDescriptor<>("count", Integer.class);
// Rekam apakah komputasi sudah dipicu
private final ValueStateDescriptor<Boolean> flagStateDesc =
new ValueStateDescriptor<>("flag", Boolean.class);
// onElement dipanggil ketika elemen baru masuk ke jendela
@Override
public TriggerResult onElement(UserEvent event, long timestamp, TimeWindow window, Trigger.TriggerContext ctx) throws Exception {
// Dapatkan status elemen kunci saat ini dalam jendela saat ini
ValueState<Integer> countState = ctx.getPartitionedState(countStateDesc);
ValueState<Boolean> flagState = ctx.getPartitionedState(flagStateDesc);
int count = countState.value() == null ? 0 : countState.value();
boolean flag = flagState.value() == null ? false : flagState.value();
// Tambahkan hitungan untuk setiap elemen baru
count += 1;
countState.update(count); // Perbarui status
// Jika hitungan mencapai 5, picu komputasi jendela segera
if (count >= 5 && !flag) {
flagState.update(true); // Perbarui status untuk memastikan komputasi tambahan hanya dipicu sekali
return TriggerResult.FIRE;
} else {
return TriggerResult.CONTINUE;
}
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext triggerContext) throws Exception {
// Lewati timer waktu pemrosesan dalam pemicu ini
return TriggerResult.CONTINUE;
}
// onEventTime dipanggil ketika timer waktu peristiwa yang terdaftar aktif (seperti pada penutupan jendela)
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.FIRE_AND_PURGE; // Pemicu aktif dan jendela dibersihkan
}
@Override
public void clear(TimeWindow timeWindow, TriggerContext ctx) throws Exception {
// Bersihkan status jendela
ctx.getPartitionedState(countStateDesc).clear();
ctx.getPartitionedState(flagStateDesc).clear();
}
}KafkaTriggerTest
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
public class KafkaTriggerTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers("<BootstrapServers>")
.setTopics("trigger")
.setGroupId("trigger")
.setStartingOffsets(OffsetsInitializer.earliest())
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
.build();
// Contoh data: 101,alie,1,2025-6-10 10:02:00
DataStream<UserEvent> userEventStream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source")
.map(new MapFunction<String, UserEvent>() {
@Override
public UserEvent map(String value) throws Exception {
String[] fields = value.split(",");
return new UserEvent(
Integer.parseInt(fields[0]),
fields[1],
fields[2],
fields[3]
);
}
});
WatermarkStrategy<UserEvent> watermarkStrategy = WatermarkStrategy
.<UserEvent>forBoundedOutOfOrderness(java.time.Duration.ofSeconds(2))
.withTimestampAssigner((event, timestamp) -> {
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
return java.time.LocalDateTime.parse(event.getEvent_time(), formatter).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
});
DataStream<UserEvent> timestampedStream = userEventStream.assignTimestampsAndWatermarks(watermarkStrategy);
timestampedStream
.keyBy(UserEvent::getUser_id)
.window(TumblingEventTimeWindows.of(Time.seconds(60))) // Tetapkan ukuran jendela tumbling menjadi 60 detik
.trigger(new CustomCountTrigger()) // Pemicu khusus: dipicu pada kedatangan 5 elemen data atau timeout
.process(new ProcessWindowFunction<UserEvent, String, Integer, TimeWindow>() {
@Override
public void process(Integer userId, ProcessWindowFunction<UserEvent, String, Integer, TimeWindow>.Context context, Iterable<UserEvent> userEvents, Collector<String> collector) throws Exception {
int count = 0;
for (UserEvent event : userEvents) {
if (event.getEvent_type().equals("1"))
count++;
}
collector.collect("ID Pengguna: " + userId + " | Hitungan: " + count + " | Jendela: " + context.window());
}
}).print();
env.execute("Aliran Data Partisi Kafka");
}
}UserEvent
public class UserEvent {
private int user_id;
private String username;
private String event_type;
private String event_time;
public UserEvent(int user_id, String username, String event_type, String event_time) {
this.user_id = user_id;
this.username = username;
this.event_type = event_type;
this.event_time = event_time;
}
public String toString() {
return "user_id:" + user_id + " username:" + username + " event_type:" + event_type + " event_time:" + event_time;
}
public int getUser_id() {
return user_id;
}
public void setUser_id(int user_id) {
this.user_id = user_id;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public String getEvent_type() {
return event_type;
}
public void setEvent_type(String event_type) {
this.event_type = event_type;
}
public String getEvent_time() {
return event_time;
}
public void setEvent_time(String event_time) {
this.event_time = event_time;
}
}