全部产品
Search
文档中心

Realtime Compute for Apache Flink:Mengembangkan Pemicu Khusus

更新时间:Jul 09, 2025

Dalam Apache Flink, pemicu jendela menentukan kapan data dalam jendela siap untuk dihitung dan dikeluarkan. Flink menyediakan berbagai pemicu bawaan untuk kasus penggunaan umum. Namun, dalam skenario bisnis yang kompleks, pemicu khusus memungkinkan Anda mengimplementasikan logika bisnis secara fleksibel.

Ikhtisar

Apa yang dilakukan pemicu

  1. Memantau elemen data yang masuk ke jendela.

  2. Menentukan apakah akan memicu pemrosesan berdasarkan kondisi tertentu, seperti waktu atau jumlah peristiwa.

  3. Mengembalikan salah satu dari operasi berikut:

    • CONTINUE: Lanjutkan mengumpulkan data.

    • FIRE: Pemicu komputasi dan pertahankan status.

    • FIRE_AND_PURGE: Pemicu komputasi dan bersihkan status.

    • PURGE: Bersihkan status tanpa memicu komputasi.

Pemicu bawaan

Flink menyediakan pemicu bawaan berikut untuk operasi jendela berbasis waktu atau hitungan:

Jenis pemicu

Deskripsi

EventTimeTrigger

Memicu jendela ketika waktu watermark melebihi waktu akhir jendela. Ini adalah pemicu default untuk jendela berbasis waktu peristiwa.

ProcessingTimeTrigger

Memicu jendela ketika waktu pemrosesan mencapai waktu akhir jendela. Ini adalah pemicu default untuk jendela berbasis waktu pemrosesan.

CountTrigger

Memicu jendela ketika jumlah peristiwa dalam jendela mencapai ambang batas yang ditentukan.

PurgingTrigger

Secara otomatis membersihkan jendela ketika dipicu. Ini membungkus pemicu lainnya.

Penting

Jika Anda secara eksplisit menetapkan pemicu tertentu untuk jendela, itu akan menggantikan pemicu default. Misalnya, jika Anda menetapkan CountTrigger untuk jendela berbasis waktu peristiwa, EventTimeTrigger tidak akan lagi berlaku.

Dalam kasus penggunaan dunia nyata, sering kali kita perlu:

  • Menggunakan lebih dari satu kondisi pemicu, seperti memicu jendela setelah 5 peristiwa atau 1 menit.

  • Memacu jendela berdasarkan peristiwa tertentu, seperti logout pengguna atau penyelesaian pesanan.

  • Mengelola siklus hidup jendela, seperti penembakan awal, penembakan tertunda, dan penembakan berulang.

  • Menghindari efek samping perilaku default, seperti jendela yang dipicu ulang oleh data terlambat.

Permintaan kompleks ini tidak dapat dipenuhi oleh pemicu bawaan, sehingga membuat pemicu khusus menjadi diperlukan.

Buat pemicu khusus

  1. Turunkan kelas abstrak Trigger<T, W>.

    • T: Tipe data dalam jendela.

    • W: Tipe jendela, seperti TimeWindow atau subkelasnya.

  2. Timpa metode inti.

    public abstract class Trigger<T, W extends Window> implements Serializable {
        public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx);
        public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx);
        public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx);
        public void clear(W window, TriggerContext ctx);
        public boolean canMerge();
        public void onMerge(W window, OnMergeContext mergeContext);
    }

    Metode yang harus ditimpa meliputi:

    Metode

    Deskripsi

    Operasi

    onElement()

    Dipanggil ketika setiap peristiwa baru masuk ke jendela.

    Menentukan apakah kondisi pemicu, seperti ambang batas hitungan peristiwa atau kedatangan peristiwa tertentu, terpenuhi.

    onProcessingTime()

    Dipanggil ketika timer waktu pemrosesan yang terdaftar aktif.

    Digunakan untuk operasi jendela berbasis waktu pemrosesan. Metode ini jarang digunakan.

    onEventTime()

    Dipanggil ketika timer waktu peristiwa yang terdaftar aktif.

    Menutup jendela ketika watermark mencapai timestamp akhir.

    clear()

    Membersihkan jendela.

    Membersihkan status jendela untuk mencegah kebocoran memori.

    canMerge()

    onMerge()

    Dipanggil ketika jendela (seperti jendela sesi) digabungkan.

    Jika metode digunakan untuk menggabungkan jendela, Anda harus memperbarui timer dengan benar.

  3. Gunakan TriggerContext untuk mengelola status dan timer.

    1. Manajemen status: Dapatkan status jendela, seperti penghitung, melalui ctx.getPartitionedState(StateDescriptor).

    2. Manajemen timer: Daftarkan timer melalui ctx.registerEventTimeTimer(timestamp).

    3. Pembersihan status: Gunakan state.clear() dalam clear() untuk membersihkan status.

    4. Penghapusan timer: Seringkali, Anda tidak perlu menghapus timer secara manual. Flink secara otomatis membersihkannya ketika jendela ditutup.

Contoh

Dalam jendela tumbling waktu-peristiwa selama 1 jam, komputasi jendela dipicu ketika jendela berakhir. Selain itu, komputasi langsung dipicu pada kedatangan peristiwa kelima pengguna dalam jendela tersebut. Pemicu awal ini hanya dipicu sekali per jendela.

Kode contoh:

public class CustomCountTrigger extends Trigger<UserEvent, TimeWindow> {

    // Rekam jumlah elemen setiap kunci dalam jendela
    private final ValueStateDescriptor<Integer> countStateDesc =
            new ValueStateDescriptor<>("count", Integer.class);
    // Rekam apakah komputasi sudah dipicu
    private final ValueStateDescriptor<Boolean> flagStateDesc =
            new ValueStateDescriptor<>("flag", Boolean.class);

    // Panggil onElement() ketika elemen baru masuk ke jendela
    @Override
    public TriggerResult onElement(UserEvent event, long timestamp, TimeWindow window, Trigger.TriggerContext ctx) throws Exception {
        // Dapatkan status hitungan kunci saat ini dalam jendela saat ini
        ValueState<Integer> countState = ctx.getPartitionedState(countStateDesc);
        ValueState<Boolean> flagState = ctx.getPartitionedState(flagStateDesc);
        int count = countState.value() == null ? 0 : countState.value();
        boolean flag = flagState.value() == null ? false : flagState.value();

        // Tambahkan hitungan untuk setiap elemen
        count += 1;
        countState.update(count); // Perbarui status
        
        // Ketika hitungan mencapai 5, picu komputasi jendela segera
        if (count >= 5 && !flag) {
            flagState.update(true); // Perbarui status untuk memastikan komputasi tambahan ini hanya dipicu sekali
            return TriggerResult.FIRE;
        } else {
            return TriggerResult.CONTINUE;
        }
    }

    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext triggerContext) throws Exception {
        // Lewati timer waktu pemrosesan dalam pemicu ini.
        return TriggerResult.CONTINUE;
    }

    // Panggil onEventTime() ketika timer waktu peristiwa yang terdaftar aktif (misalnya, waktu akhir jendela)
    @Override
    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
            return TriggerResult.FIRE_AND_PURGE; // Pemicu aktif dan jendela dibersihkan
    }

    @Override
    public void clear(TimeWindow timeWindow, TriggerContext ctx) throws Exception {
        // Bersihkan status jendela
        ctx.getPartitionedState(countStateDesc).clear();
        ctx.getPartitionedState(flagStateDesc).clear();
    }
}

Penggunaan Pemicu: Pemicu harus digunakan dengan operasi jendela selama pemrosesan aliran.

DataStream<UserEvent> source = ...; // Aliran input yang ada

source.keyBy(keySelector)  // Seperti .keyBy(value -> value.userId) untuk mengelompokkan elemen berdasarkan pengguna
      .window(TumblingEventTimeWindows.of(Time.seconds(60))) // Tetapkan ukuran jendela tumbling menjadi 60 detik
      .trigger(new CustomCountTrigger()) // Pemicu khusus: picu setelah 5 elemen atau timeout jendela
      .process(new ProcessWindowFunction<UserEvent, String, KeyedType, TimeWindow>() {

          @Override
          public void process(KeyedType key, Context context, Iterable<UserEvent> elements, Collector<String> out) {
              int count = 0;

              // Iterasi semua elemen dalam jendela, hitung yang memiliki action == 1
              for (UserEvent event : elements) {
                  if (event.action == 1) {
                      count++;
                  }
              }

              // Keluarkan hasil
              out.collect("Key: " + key + ", Jumlah akses: " + count);
          }
      })
      .print();

Verifikasi Hasil: Jika 8 catatan akses dari pengguna yang sama diterima dalam satu menit, hasilnya adalah 2 catatan:

Key: 101, Jumlah akses: 5       // Karena count >= 5, komputasi awal dipicu, tetapi status jendela tidak dibersihkan.
Key: 101, Jumlah akses: 8      // Memicu komputasi ketika jendela ditutup dan membersihkan status.

Esktensi dan ringkasan

Kasus penggunaan lanjutan dan implementasi

Kasus penggunaan 1

Picu komputasi lebih dari sekali sebelum jendela berakhir untuk peringatan.

  1. Hapus flagStateDesc untuk memungkinkan komputasi dipicu lebih dari sekali.

  2. Atau, tambahkan penanda hitungan untuk mengakhiri jendela hanya setelah sejumlah pemotretan pemicu (seperti untuk peristiwa peringatan).

    ### Setelah menghapus flagStateDesc, 5 rekaman dikeluarkan.  
    Key: 101, Jumlah akses: 5       // count >= 5, memicu komputasi awal dan tidak membersihkan status.
    Key: 101, Jumlah akses: 6       // count >= 5, memicu komputasi awal.
    Key: 101, Jumlah akses: 7      // count >= 5, memicu komputasi awal.
    Key: 101, Jumlah akses: 8      // count >= 5, memicu komputasi awal.
    Key: 101, Jumlah akses: 8      // Memicu komputasi ketika jendela ditutup, dan membersihkan status.
  3. Pemicu hanya memutuskan kapan memicu komputasi. Informasi objek aktual diperoleh dari .process, di mana Anda dapat menentukan menghasilkan hasil koleksi yang berbeda berdasarkan jumlah dan kondisi status.

Kasus penggunaan 2

Penutupan jendela memerlukan kemajuan watermark. Bagaimana cara memastikan komputasi jendela tepat waktu jika tidak ada data yang dihasilkan untuk periode waktu yang lama?

Solusi

Bergantung pada watermark?

Jaminan pemicu tepat waktu?

Cocok untuk data tidak berurutan?

Skenario cocok

Gunakan jendela waktu pemrosesan

Tidak

Ya

Tidak

Pemrosesan waktu peristiwa tidak diperlukan.

Gunakan metode withIdleness

Ya

Tidak (tergantung pada interval watermark)

Ya

Cocok untuk skenario sederhana, seperti partisi input idle.

Gunakan generator watermark khusus

Ya

Ya (watermark diperbarui secara berkala)

Ya

Ini adalah pendekatan standar.

Daftarkan timer

Ya (opsional)

Ya (toleransi kesalahan)

Tidak (penutupan jendela paksa memerlukan waktu yang tepat)

Keandalan yang ditingkatkan diperlukan.

Kirim pesan denyut jantung dari sistem eksternal

Tidak

Ya

Ya

Kafka memerlukan pemeliharaan tambahan, sedangkan orkestrasi tugas tidak.

  • Solusi 1: Gunakan Jendela Waktu Pemrosesan

    Jika Anda tidak memerlukan semantik waktu peristiwa (yaitu, waktu terjadinya peristiwa bukanlah perhatian), Anda dapat menggunakan jendela waktu pemrosesan:

    .keyBy(keySelector)
    .window(TumblingProcessingTimeWindows.of(Time.seconds(60))) // Gunakan waktu pemrosesan
    .process(new MyProcessWindowFunction())
  • Solusi 2: Gunakan Metode withIdleness

    Strategi Watermark Flink WatermarkStrategy menyediakan metode withIdleness untuk secara otomatis menandai sumber data sebagai idle setelah periode inaktivitas tertentu, mencegahnya menghalangi pembuatan watermark.

    // Sumber data idle tidak lagi terlibat dalam komputasi watermark minimum, dan tidak akan menghambat kemajuan watermark dari sumber data aktif.
    WatermarkStrategy
        .<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
        .withIdleness(Duration.ofMinutes(1));  // Menunjukkan bahwa jika sumber data atau partisi tidak memiliki peristiwa selama 1 menit, itu ditandai sebagai idle
  • Solusi 3: Gunakan Generator Watermark Khusus

    Jika Anda perlu menggunakan semantik waktu peristiwa, tentukan generator watermark khusus yang memastikan watermark terus maju meskipun tidak ada data baru yang tiba:

    • Rekam waktu kedatangan peristiwa terbaru dalam metode onEvent().

    • Periksa interval antara waktu saat ini dan waktu terakhir peristiwa diterima dalam metode onPeriodicEmit().

    • Jika interval melebihi ambang batas yang ditetapkan, anggap sumber data sebagai idle, lewati pembuatan watermark atau langsung buat watermark tertentu.

    public class IdleAwareWatermarkGenerator implements WatermarkGenerator<MyEvent> {
        private long lastEventTimestamp = Long.MIN_VALUE;
        private final long maxIdleTimeMs; // Waktu idle maksimum
    
        public IdleAwareWatermarkGenerator(long maxIdleTimeMs) {
            this.maxIdleTimeMs = maxIdleTimeMs;
        }
    
        @Override
        public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
            lastEventTimestamp = Math.max(lastEventTimestamp, eventTimestamp);
        }
    
        @Override
        public void onPeriodicEmit(WatermarkOutput output) {
            long currentTime = System.currentTimeMillis();
            if (lastEventTimestamp == Long.MIN_VALUE || currentTime - lastEventTimestamp > maxIdleTimeMs) {
                // Jika tidak ada peristiwa yang tiba untuk waktu yang lama, jangan keluarkan watermark baru
                return;
            }
            output.emitWatermark(new Watermark(lastEventTimestamp));
        }
    }
    WatermarkStrategy<MyEvent> strategy = WatermarkStrategy
        .forGenerator((ctx) -> new IdleAwareWatermarkGenerator(60_000)) // Tetapkan waktu idle maksimum menjadi 60 detik
        .withTimestampAssigner((event, timestamp) -> event.getEventTime());
  • Solusi 4: Daftarkan Timer

    @Override
    public TriggerResult onElement(Event event, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
        ctx.registerEventTimeTimer(window.maxTimestamp()); // Daftarkan timer waktu peristiwa
        ctx.registerProcessingTimeTimer(window.maxTimestamp() + 1000); // Toleransi kesalahan: pastikan timer dipicu meskipun tidak ada peristiwa
        return TriggerResult.CONTINUE;
    }
    
    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
        return TriggerResult.FIRE_AND_PURGE; // Pemicuan paksa dan pembersihan
    }
  • Solusi 5: Kirim Pesan Denyut Jantung dari Sistem Eksternal

    Gunakan Kafka atau alur kerja untuk mengirim pesan denyut jantung secara teratur ke hilir untuk memicu penutupan jendela.

Ringkasan

Poin Utama

Deskripsi

Pahami siklus hidup jendela

FIRE tidak membersihkan status jendela, sementara FIRE_AND_PURGE menutup dan membersihkan jendela. Gunakan mereka dengan benar untuk memicu pemicu beberapa kali atau sekali.

Gunakan status dan timer dengan tepat

Gunakan ValueState untuk melacak hitungan dan flag serta lepaskan sumber daya dengan clear().

Timpa metode

Implementasikan metode berikut: onElement, onEventTime, onProcessingTime, dan clear untuk memastikan kelengkapan.

Aktifkan penggabungan jendela (seperti jendela sesi)

Untuk jendela yang dapat digabungkan seperti jendela sesi, implementasikan canMerge() dan onMerge() untuk menjaga konsistensi timer.

Hindari pemotretan berulang

Kontrol jumlah kali pemicu dapat dipicu, terutama ketika peristiwa mungkin terus tiba setelah FIRE dipanggil.

Kode lengkap

CustomCountTrigger

import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

public class CustomCountTrigger extends Trigger<UserEvent, TimeWindow> {

    // Rekam jumlah elemen setiap kunci dalam jendela
    private final ValueStateDescriptor<Integer> countStateDesc =
            new ValueStateDescriptor<>("count", Integer.class);
    // Rekam apakah komputasi sudah dipicu
    private final ValueStateDescriptor<Boolean> flagStateDesc =
            new ValueStateDescriptor<>("flag", Boolean.class);

    // onElement dipanggil ketika elemen baru masuk ke jendela
    @Override
    public TriggerResult onElement(UserEvent event, long timestamp, TimeWindow window, Trigger.TriggerContext ctx) throws Exception {
        // Dapatkan status elemen kunci saat ini dalam jendela saat ini
        ValueState<Integer> countState = ctx.getPartitionedState(countStateDesc);
        ValueState<Boolean> flagState = ctx.getPartitionedState(flagStateDesc);
        int count = countState.value() == null ? 0 : countState.value();
        boolean flag = flagState.value() == null ? false : flagState.value();

        // Tambahkan hitungan untuk setiap elemen baru
        count += 1;
        countState.update(count); // Perbarui status

        // Jika hitungan mencapai 5, picu komputasi jendela segera
        if (count >= 5 && !flag) {
            flagState.update(true); // Perbarui status untuk memastikan komputasi tambahan hanya dipicu sekali
            return TriggerResult.FIRE;
        } else {
            return TriggerResult.CONTINUE;
        }
    }

    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext triggerContext) throws Exception {
        // Lewati timer waktu pemrosesan dalam pemicu ini
        return TriggerResult.CONTINUE;
    }

    // onEventTime dipanggil ketika timer waktu peristiwa yang terdaftar aktif (seperti pada penutupan jendela)
    @Override
    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
            return TriggerResult.FIRE_AND_PURGE; // Pemicu aktif dan jendela dibersihkan
    }

    @Override
    public void clear(TimeWindow timeWindow, TriggerContext ctx) throws Exception {
        // Bersihkan status jendela
        ctx.getPartitionedState(countStateDesc).clear();
        ctx.getPartitionedState(flagStateDesc).clear();
    }
}

KafkaTriggerTest

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.time.ZoneId;
import java.time.format.DateTimeFormatter;


public class KafkaTriggerTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                .setBootstrapServers("<BootstrapServers>")
                .setTopics("trigger")
                .setGroupId("trigger")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
                .build();

        // Contoh data: 101,alie,1,2025-6-10 10:02:00
        DataStream<UserEvent> userEventStream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source")
                .map(new MapFunction<String, UserEvent>() {
                    @Override
                    public UserEvent map(String value) throws Exception {
                        String[] fields = value.split(",");
                        return new UserEvent(
                                Integer.parseInt(fields[0]),
                                fields[1],
                                fields[2],
                                fields[3]
                        );
                    }
                });

        WatermarkStrategy<UserEvent> watermarkStrategy = WatermarkStrategy
                .<UserEvent>forBoundedOutOfOrderness(java.time.Duration.ofSeconds(2))
                .withTimestampAssigner((event, timestamp) -> {
                    DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
                    return java.time.LocalDateTime.parse(event.getEvent_time(), formatter).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
                });

        DataStream<UserEvent> timestampedStream = userEventStream.assignTimestampsAndWatermarks(watermarkStrategy);

        timestampedStream
                .keyBy(UserEvent::getUser_id)
                .window(TumblingEventTimeWindows.of(Time.seconds(60))) // Tetapkan ukuran jendela tumbling menjadi 60 detik
                .trigger(new CustomCountTrigger()) // Pemicu khusus: dipicu pada kedatangan 5 elemen data atau timeout
                .process(new ProcessWindowFunction<UserEvent, String, Integer, TimeWindow>() {

                    @Override
                             public void process(Integer userId, ProcessWindowFunction<UserEvent, String, Integer, TimeWindow>.Context context, Iterable<UserEvent> userEvents, Collector<String> collector) throws Exception {
                                 int count = 0;
                                 for (UserEvent event : userEvents) {
                                     if (event.getEvent_type().equals("1"))
                                         count++;
                                 }
                                 collector.collect("ID Pengguna: " + userId + " | Hitungan: " + count + " | Jendela: " + context.window());
                             }
                         }).print();
        env.execute("Aliran Data Partisi Kafka");
    }
}

UserEvent

public class UserEvent {
      private  int  user_id;
      private  String username;
      private  String event_type;
      private  String event_time;

      public UserEvent(int user_id, String username, String event_type, String event_time) {
              this.user_id = user_id;
              this.username = username;
              this.event_type = event_type;
              this.event_time = event_time;
      }
      public String toString() {
              return "user_id:" + user_id + " username:" + username + " event_type:" + event_type + " event_time:" + event_time;
      }

    public int getUser_id() {
        return user_id;
    }

    public void setUser_id(int user_id) {
        this.user_id = user_id;
    }

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public String getEvent_type() {
        return event_type;
    }

    public void setEvent_type(String event_type) {
        this.event_type = event_type;
    }

    public String getEvent_time() {
        return event_time;
    }

    public void setEvent_time(String event_time) {
        this.event_time = event_time;
    }
}