全部产品
Search
文档中心

Realtime Compute for Apache Flink:Memulai dengan Flink CEP Dinamis

更新时间:Oct 29, 2025

Realtime Compute for Apache Flink mendukung pemrosesan peristiwa kompleks (CEP) dinamis Flink dalam program DataStream untuk memperbarui aturan secara dinamis. Topik ini menjelaskan cara mengembangkan pekerjaan Flink yang memuat aturan terbaru secara dinamis untuk memproses pesan Kafka upstream.

Casus penggunaan Flink CEP Dinamis

Flink CEP memanfaatkan kemampuan pemrosesan data latensi sub-milidetik terdistribusi dari Apache Flink untuk mendeteksi pola data yang rumit dalam aliran data. Ini memiliki berbagai aplikasi dunia nyata, termasuk:

  • Pengendalian risiko real-time: Flink CEP dapat menganalisis log perilaku pengguna untuk menandai pelanggan yang tidak biasa, seperti ketika mereka mentransfer total USD 10.000 dalam 10 transaksi dalam jangka waktu lima menit.

  • Pemasaran real-time: Dengan mengonsumsi log perilaku pengguna, Flink CEP dapat memberikan wawasan untuk mengoptimalkan strategi pemasaran, seperti mengidentifikasi pengguna yang menambahkan lebih dari tiga item ke keranjang mereka dalam 10 menit tetapi tidak melakukan pembayaran. Flink CEP juga berguna untuk deteksi penipuan dalam pemasaran real-time.

  • IoT: Flink CEP memungkinkan deteksi anomali dan peringatan. Sebagai contoh, ia dapat menandai sepeda bersama yang telah keluar dari area tertentu selama lebih dari 15 menit. Contoh lain dari Flink CEP adalah mengidentifikasi anomali lini produksi dalam produksi industri berdasarkan data sensor IoT. Sebagai contoh, ketika data suhu dari sensor secara konsisten melebihi ambang batas selama tiga jendela waktu berturut-turut, peringatan akan dipicu.

Ikhtisar

Topik ini memberikan panduan praktis untuk mengimplementasikan Flink CEP Dinamis. Sebagai contoh, kami akan menunjukkan bagaimana Flink mengonsumsi data aliran klik pengguna dari Kafka, mengambil aturan secara dinamis dari MySQL, dan mendeteksi peristiwa yang cocok. Setelah menemukan kecocokan pola, pekerjaan Flink dapat memicu peringatan atau menulis hasil ke penyimpanan data. Grafik pipa data adalah sebagai berikut:

Pada awalnya, kami akan memulai pekerjaan Flink dan menyisipkan Aturan 1: Tiga peristiwa dengan action = 0, diikuti oleh peristiwa dengan action != 1. Aturan ini menandai pengguna yang melihat halaman produk tiga kali berturut-turut tanpa melakukan pembayaran. Kemudian, Aturan 1 diperbarui menjadi Aturan 2, yang memperkenalkan kendala ketepatan waktu: Tiga peristiwa berturut-turut dengan action = 0 terjadi dalam jendela waktu 15 menit. Aturan yang diperbarui mengidentifikasi pengguna yang berulang kali melihat halaman produk dalam 30 menit tanpa membelinya.

Prasyarat

Prosedur

Berikut ini menjelaskan cara mengembangkan program Flink yang mencari pengguna yang tindakannya sesuai dengan aturan yang telah ditentukan dan cara memperbarui aturan secara dinamis. Prosedurnya adalah sebagai berikut:

Langkah 1: Siapkan data uji

Buat topik Kafka upstream

  1. Masuk ke Konsol ApsaraMQ for Kafka.

  2. Buat topik bernama demo_topic untuk menyimpan log perilaku pengguna simulasi.

    Untuk informasi lebih lanjut, lihat Langkah 1: Buat topik.

Siapkan database ApsaraDB RDS for MySQL

Dalam Konsol Data Management (DMS), siapkan data uji dalam database ApsaraDB RDS for MySQL.

  1. Masuk ke instans ApsaraDB RDS for MySQL menggunakan akun istimewa.

    Untuk informasi lebih lanjut, lihat Masuk ke instans RDS di Konsol DMS.

  2. Buat tabel bernama rds_demo untuk menyimpan aturan dan tabel bernama match_results untuk menerima kecocokan yang ditemukan.

    Salin pernyataan berikut ke editor SQL dan klik Execute(F8):

    CREATE DATABASE cep_demo_db;
    USE cep_demo_db;
    
    CREATE TABLE rds_demo (
      `id` VARCHAR(64),
      `version` INT,
      `pattern` VARCHAR(4096),
      `function` VARCHAR(512)
    );
    
    CREATE TABLE match_results (
        rule_id INT,
        rule_version INT,
        user_id INT,
        user_name VARCHAR(255),
        production_id INT,
        PRIMARY KEY (rule_id,rule_version,user_id,production_id)
    );

    Setiap baris tabel rds_demo mewakili sebuah aturan, terdiri dari empat bidang: id (pengenal unik untuk aturan), versi, pattern, dan function (mendefinisikan cara memproses urutan peristiwa yang cocok).

    Setiap baris tabel match_results mewakili kecocokan yang ditemukan, menunjukkan perilaku pengguna yang sesuai dengan pola tertentu. Catatan ini dapat memberdayakan tim pemasaran untuk membuat keputusan yang tepat, seperti memberikan kupon kepada pengguna.

Langkah 2: Konfigurasikan daftar putih alamat IP

Untuk memungkinkan ruang kerja Flink Anda mengakses instans ApsaraDB RDS for MySQL, Anda harus menambahkan Blok CIDR dari ruang kerja Flink ke daftar putih alamat IP dari instans ApsaraDB RDS for MySQL.

  1. Peroleh Blok CIDR dari vSwitch yang digunakan oleh ruang kerja Flink.

    1. Masuk ke Konsol Realtime Compute for Apache Flink.

    2. Temukan ruang kerja target dan pilih More > Workspace Details di kolom Actions.

    3. Dalam kotak dialog Workspace Details, salin Blok CIDR dari vSwitch.

      网段信息

  2. Tambahkan Blok CIDR ke daftar putih alamat IP dari instans ApsaraDB RDS for MySQL.

    Untuk informasi lebih lanjut, lihat Konfigurasikan daftar putih alamat IP dalam dokumentasi ApsaraDB RDS for MySQL. RDS白名单

Langkah 3: Kembangkan pekerjaan CEP dinamis

Catatan

Semua file kode dalam topik ini dapat diunduh dari Repositori GitHub. Untuk tujuan demonstrasi, kode sampel dalam topik ini telah dimodifikasi pada cabang timeOrMoreAndWindow. Untuk kode lengkapnya, unduh paket ververica-cep-demo-master.zip.

  1. Tambahkan dependensi flink-cep ke file pom.xml proyek Anda.

    Untuk informasi tentang konfigurasi dependensi dan penanganan konflik, lihat Konfigurasikan dependensi lingkungan untuk Apache Flink.

    <dependency>
        <groupId>com.alibaba.ververica</groupId>
        <artifactId>flink-cep</artifactId>
        <version>1.17-vvr-8.0.8</version>
        <scope>provided</scope>
    </dependency>
  2. Tulis kode program CEP dinamis.

    1. Buat sumber Kafka.

      Untuk informasi lebih lanjut, lihat Konektor DataStream Kafka.

    2. Definisikan transformasi menggunakan metode CEP.dynamicPatterns().

      Untuk memungkinkan modifikasi aturan dinamis dan mendukung beberapa aturan, Alibaba Cloud Realtime Compute for Apache Flink mendefinisikan metode CEP.dynamicPatterns():

      public static <T, R> SingleOutputStreamOperator<R> dynamicPatterns(
               DataStream<T> input,
               PatternProcessorDiscovererFactory<T> discovererFactory,
               TimeBehaviour timeBehaviour,
               TypeInformation<R> outTypeInfo)

      Tabel berikut menjelaskan parameter yang diperlukan. Ganti nilai placeholder dengan nilai konfigurasi aktual Anda.

      Parameter

      Deskripsi

      DataStream<T> input

      Aliran peristiwa masukan.

      PatternProcessorDiscovererFactory<T> discovererFactory

      Objek pabrik yang membangun PatternProcessorDiscoverer. PatternProcessorDiscoverer mengambil aturan terbaru untuk membangun antarmuka PatternProcessor.

      TimeBehaviour timeBehaviour

      Atribut waktu yang mendefinisikan bagaimana pekerjaan Flink CEP memproses peristiwa. Nilai valid:

      • TimeBehaviour.ProcessingTime: peristiwa diproses berdasarkan waktu pemrosesan.

      • TimeBehaviour.EventTime: peristiwa diproses berdasarkan waktu peristiwa.

      TypeInformation<R> outTypeInfo

      Informasi tipe dari aliran keluaran.

      Untuk informasi tentang konsep seperti DataStream, TimeBehavior, dan TypeInformation, lihat Panduan Pemrograman API DataStream Flink, Konsep Waktu: Event Time dan Processing Time, dan Class TypeInformation<T>.

      Antarmuka PatternProcessor berisi Pola yang mendefinisikan cara mencocokkan peristiwa dan PatternProcessFunction yang menentukan cara memproses peristiwa yang cocok, seperti mengirim peringatan. Selain itu, ia mencakup properti identifikasi, seperti id dan versi. Untuk informasi lebih lanjut, lihat FLIP-200: Dukungan Beberapa Aturan dan Perubahan Aturan Dinamis (Flink CEP).

      PatternProcessorDiscovererFactory membangun PatternProcessorDiscoverer yang mengambil PatternProcessor terbaru. Realtime Compute for Apache Flink menawarkan kelas abstrak yang secara berkala memindai penyimpanan eksternal. Kode berikut membangun timer untuk secara berkala memantau penyimpanan eksternal untuk mengambil PatternProcessor terbaru:

      public abstract class PeriodicPatternProcessorDiscoverer<T>
              implements PatternProcessorDiscoverer<T> {
      
          ...
          @Override
          public void discoverPatternProcessorUpdates(
                  PatternProcessorManager<T> patternProcessorManager) {
              // Secara berkala menemukan pembaruan prosesor pola.
              timer.schedule(
                      new TimerTask() {
                          @Override
                          public void run() {
                              if (arePatternProcessorsUpdated()) {
                                  List<PatternProcessor<T>> patternProcessors = null;
                                  try {
                                      patternProcessors = getLatestPatternProcessors();
                                  } catch (Exception e) {
                                      e.printStackTrace();
                                  }
                                  patternProcessorManager.onPatternProcessorsUpdated(patternProcessors);
                              }
                          }
                      },
                      0,
                      intervalMillis);
          }
      
          ...
      }

      Realtime Compute for Apache Flink juga menawarkan implementasi dari JDBCPeriodicPatternProcessorDiscoverer untuk mengambil aturan terbaru dari database JDBC, seperti ApsaraDB RDS atau Hologres. Tabel berikut menjelaskan parameter yang diperlukan:

      Parameter

      Deskripsi

      jdbcUrl

      URL JDBC dari database.

      jdbcDriver

      Nama kelas driver database.

      tableName

      Nama tabel dalam database.

      initialPatternProcessors

      Daftar PatternProcessors awal.

      intervalMillis

      Interval di mana database dipantau.

      Kode sampel berikut memberikan contoh penggunaan JDBCPeriodicPatternProcessorDiscoverer. Peristiwa yang cocok dicetak ke log TaskManager.

      // import ......
      public class CepDemo {
      
          public static void main(String[] args) throws Exception {
      
              ......
              // DataStream Source
              DataStreamSource<Event> source =
                      env.fromSource(
                              kafkaSource,
                              WatermarkStrategy.<Event>forMonotonousTimestamps()
                                      .withTimestampAssigner((event, ts) -> event.getEventTime()),
                              "Kafka Source");
      
              env.setParallelism(1);
              // keyBy userId and productionId
              // Catatan, hanya peristiwa dengan kunci yang sama yang akan diproses untuk melihat apakah ada kecocokan
              KeyedStream<Event, Tuple2<Integer, Integer>> keyedStream =
                      source.assignTimestampsAndWatermarks(
                              WatermarkStrategy.<Event>forGenerator(ctx -> new EventBoundedOutOfOrdernessWatermarks(Duration.ofSeconds(5)))
                      ).keyBy(new KeySelector<Event, Tuple2<Integer, Integer>>() {
                          @Override
                          public Tuple2<Integer, Integer> getKey(Event value) throws Exception {
                              return Tuple2.of(value.getId(), value.getProductionId());
                          }
                      });
      
              SingleOutputStreamOperator<String> output =
                      CEP.dynamicPatterns(
                              keyedStream,
                              new JDBCPeriodicPatternProcessorDiscovererFactory<>(
                                      params.get(JDBC_URL_ARG),
                                      JDBC_DRIVE,
                                      params.get(TABLE_NAME_ARG),
                                      null,
                                      Long.parseLong(params.get(JDBC_INTERVAL_MILLIS_ARG))),
                              Boolean.parseBoolean(params.get(USING_EVENT_TIME)) ?  TimeBehaviour.EventTime : TimeBehaviour.ProcessingTime,
                              TypeInformation.of(new TypeHint<String>() {}));
      
              output.print();
              // Kompilasi dan kirimkan pekerjaan
              env.execute("CEPDemo");
          }
      }
      Catatan

      Dalam kode demo, aliran data masukan di-key berdasarkan userId dan productionId sebelum CEP.dynamicPatterns() dipanggil. Dengan cara ini, Flink hanya mencari pola dalam peristiwa dengan userId dan productionId yang sama.

  3. Dalam Konsol Realtime Compute for Apache Flink, unggah JAR program dan buat Penyebaran JAR.

    Tabel berikut menjelaskan parameter yang perlu Anda konfigurasikan saat membuat penyebaran:

    Catatan

    Untuk memudahkan pengujian, unduh cep-demo.jar dan buat Penyebaran JAR darinya. Karena tidak ada data yang disimpan dalam sumber Kafka upstream dan tabel aturan kosong, tidak ada output yang dikembalikan setelah Anda memulai penyebaran.

    Parameter

    Deskripsi

    Deployment Mode

    Pilih Mode Aliran.

    Deployment Name

    Masukkan nama Penyebaran JAR.

    Engine Version

    Untuk informasi lebih lanjut tentang versi mesin, lihat Versi Mesin dan Kebijakan Siklus Hidup. Kami merekomendasikan Anda menggunakan versi yang direkomendasikan atau versi stabil. Versi mesin diklasifikasikan ke dalam jenis berikut:

    • Versi Direkomendasikan: versi minor terbaru dari versi utama terbaru.

    • Versi Stabil: versi minor terbaru dari versi utama yang masih dalam periode layanan produk. Cacat pada versi sebelumnya diperbaiki dalam versi ini.

    • Versi Normal: versi minor lainnya yang masih dalam periode layanan produk.

    • Versi Tidak Direkomendasikan: versi yang melebihi periode layanan produk.

    JAR URL

    Unggah JAR program Anda atau test cep-demo.jar.

    Entry Point Class

    Atur nilai menjadi com.alibaba.ververica.cep.demo.CepDemo.

    Entry Point Main Arguments

    Jika Anda menggunakan JAR program Anda dengan sistem upstream dan downstream yang dikonfigurasikan, lewati bidang ini. Jika Anda menggunakan test cep-demo.jar yang kami sediakan, salin kode berikut ke bidang tersebut:

    --kafkaBrokers YOUR_KAFKA_BROKERS 
    --inputTopic YOUR_KAFKA_TOPIC 
    --inputTopicGroup YOUR_KAFKA_TOPIC_GROUP 
    --jdbcUrl jdbc:mysql://YOUR_DB_URL:port/DATABASE_NAME?user=YOUR_USERNAME&password=YOUR_PASSWORD
    --tableName YOUR_TABLE_NAME  
    --jdbcIntervalMs 3000
    --usingEventTime false

    Ganti nilai placeholder dalam kode di atas dengan yang berikut:

    • kafkaBrokers: alamat broker Kafka Anda.

    • inputTopic: nama topik Kafka Anda.

    • inputTopicGroup: kelompok konsumen Kafka Anda.

    • jdbcUrl: URL JDBC database Anda.

      Catatan

      Untuk terhubung ke database melalui URL JDBC, gunakan akun standar dengan kata sandi yang hanya terdiri dari huruf dan angka. Anda dapat menggunakan metode autentikasi untuk penyebaran Anda berdasarkan kebutuhan bisnis Anda.

    • tableName: nama tabel tujuan.

    • jdbcIntervalMs: interval di mana database dipantau.

    • usingEventTime: menentukan apakah akan menggunakan waktu peristiwa. Nilai valid: true dan false.

    Catatan
    • Ganti nilai placeholder dengan nilai konfigurasi aktual Anda.

    • Gunakan variabel daripada kredensial teks biasa dalam produksi. Untuk informasi lebih lanjut, lihat Kelola variabel.

  4. Pada tab Deployment halaman Deployments, klik Edit di bagian Parameters. Kemudian, tambahkan parameter berikut di bidang Other Configuration.

    Dependensi flink-cep bergantung pada AppClassLoader sementara kelas aviator dalam JAR pengguna bergantung pada UserCodeClassLoader. Untuk mencegah kegagalan pemuatan dan memastikan AppClassLoader dapat mengakses kelas dalam JAR pengguna, tambahkan konfigurasi berikut.

    kubernetes.application-mode.classpath.include-user-jar: 'true' 
    classloader.resolve-order: parent-first

    Untuk informasi lebih lanjut tentang cara mengonfigurasi parameter di bagian Parameters, lihat Parameter.

  5. Navigasikan ke O&M > Deployments. Temukan penyebaran target, dan klik Start di kolom Actions.

    Untuk informasi lebih lanjut, lihat Mulai penyebaran.

Langkah 4: Tambahkan aturan

Setelah Penyebaran JAR dimulai, tambahkan Aturan 1: Tiga peristiwa berturut-turut dengan action = 0, diikuti oleh peristiwa dengan action != 1. Urutan peristiwa ini menunjukkan bahwa pengguna melihat halaman produk tiga kali tetapi tidak melakukan pembayaran.

  1. Masuk ke Konsol ApsaraDB RDS for MySQL.

  2. Tambahkan aturan.

    Gabungkan string JSON yang mendefinisikan pola dengan bidang id, version, dan function, lalu eksekusi pernyataan INSERT INTO untuk menyisipkan data ke dalam tabel aturan di database ApsaraDB RDS for MySQL.

    INSERT INTO rds_demo (
     `id`,
     `version`,
     `pattern`,
     `function`
    ) values(
      '1',
       1,
      '{"name":"end","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":null,"nodes":[{"name":"end","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.EndCondition","type":"CLASS"},"type":"ATOMIC"},{"name":"start","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["LOOPING"],"times":{"from":3,"to":3,"windowTime":null},"untilCondition":null},"condition":{"expression":"action == 0","type":"AVIATOR"},"type":"ATOMIC"}],"edges":[{"source":"start","target":"end","type":"SKIP_TILL_NEXT"}],"window":null,"afterMatchStrategy":{"type":"SKIP_PAST_LAST_EVENT","patternName":null},"type":"COMPOSITE","version":1}',
      'com.alibaba.ververica.cep.demo.dynamic.DemoPatternProcessFunction')
    ;

    Untuk meningkatkan keterbacaan bidang pola dalam database demi kenyamanan Anda, Realtime Compute for Apache Flink memungkinkan Anda mendefinisikan pola dalam format JSON. Untuk informasi lebih lanjut, lihat Definisi aturan dalam format JSON dalam Flink CEP Dinamis. Nilai bidang pola dalam pernyataan SQL di atas adalah contoh string pola serialisasi berformat JSON yang didukung oleh Realtime Compute for Apache Flink. Pola ini cocok dengan urutan peristiwa di mana tiga peristiwa berturut-turut memiliki action = 0, langsung diikuti oleh peristiwa dengan action != 1.

    Catatan

    EndCondition didefinisikan dalam kode untuk memeriksa "action != 1".

    • Gunakan API Pola untuk mendefinisikan pola:

      Pattern<Event, Event> pattern =
          Pattern.<Event>begin("start", AfterMatchSkipStrategy.skipPastLastEvent())
              .where(new StartCondition("action == 0"))
              .timesOrMore(3)
              .followedBy("end")
              .where(new EndCondition());
    • Dapat dikonversi ke string JSON menggunakan metode convertPatternToJSONString dalam CepJsonUtils.

      public void printTestPattern(Pattern<?, ?> pattern) throws JsonProcessingException {
          System.out.println(CepJsonUtils.convertPatternToJSONString(pattern));
      }
    • Definisikan pola dalam JSON:

      Pola berformat JSON

      {
        "name": "end",
        "quantifier": {
          "consumingStrategy": "SKIP_TILL_NEXT",
          "properties": [
            "SINGLE"
          ],
          "times": null,
          "untilCondition": null
        },
        "condition": null,
        "nodes": [
          {
            "name": "end",
            "quantifier": {
              "consumingStrategy": "SKIP_TILL_NEXT",
              "properties": [
                "SINGLE"
              ],
              "times": null,
              "untilCondition": null
            },
            "condition": {
              "className": "com.alibaba.ververica.cep.demo.condition.EndCondition",
              "type": "CLASS"
            },
            "type": "ATOMIC"
          },
          {
            "name": "start",
            "quantifier": {
              "consumingStrategy": "SKIP_TILL_NEXT",
              "properties": [
                "LOOPING"
              ],
              "times": {
                "from": 3,
                "to": 3,
                "windowTime": null
              },
              "untilCondition": null
            },
            "condition": {
              "expression": "action == 0",
              "type": "AVIATOR"
            },
            "type": "ATOMIC"
          }
        ],
        "edges": [
          {
            "source": "start",
            "target": "end",
            "type": "SKIP_TILL_NEXT"
          }
        ],
        "window": null,
        "afterMatchStrategy": {
          "type": "SKIP_PAST_LAST_EVENT",
          "patternName": null
        },
        "type": "COMPOSITE",
        "version": 1
      }
  3. Gunakan klien Kafka untuk mengirim pesan ke topik demo_topic.

    Dalam demo ini, Anda juga dapat mengirim pesan uji di panel Start to Send and Consume Message dari topik demo_topic di Konsol ApsaraMQ for Kafka.

    1,Ken,0,1,1662022777000
    1,Ken,0,1,1662022778000
    1,Ken,0,1,1662022779000
    1,Ken,0,1,1662022780000

    发消息

    Tabel berikut menjelaskan bidang pesan dalam topik demo_topic.

    Bidang

    Deskripsi

    id

    ID pengguna.

    username

    Nama pengguna.

    action

    Tindakan pengguna. Nilai valid:

    • 0: operasi melihat.

    • 1: operasi pembelian.

    product_id

    ID produk.

    event_time

    Waktu peristiwa ketika tindakan dilakukan.

  4. Lihat aturan terbaru yang dicetak ke log JobManager dan kecocokan yang ditemukan yang di-output ke log TaskManager.

    • Dalam log JobManager, gunakan JDBCPeriodicPatternProcessorDiscoverer sebagai kata kunci untuk mencari aturan terbaru.

      image

    • Pada subtab Running Task Managers di bawah tab Logs, pilih file log dengan akhiran .out. Cari A match for Pattern of (id, version): (1, 1) dan lihat entri log.

      image

  5. Dalam tabel match_results, eksekusi SELECT * FROM `match_results` ; untuk menanyakan kecocokan yang ditemukan.

    image

Langkah 5: Perbarui aturan

Strategi pemasaran yang disesuaikan biasanya mempertimbangkan ketepatan waktu. Oleh karena itu, Aturan 2 dimasukkan: Tiga peristiwa berturut-turut dengan action = 0 terjadi dalam jendela waktu 15 menit.

  1. Atur usingEventTime menjadi true.

    1. Pergi ke O&M > Deployments. Temukan penyebaran target, dan klik Cancel di kolom Actions.

    2. Klik nama penyebaran. Di halaman detail penyebaran, pilih tab Configuration. Klik Edit di sudut kanan atas bagian Basic. Di bidang Entry Point Main Arguments, atur usingEventTime menjadi true. Klik Save.

    3. Start penyebaran lagi.

  2. Sisipkan aturan baru.

    Gunakan API Pola untuk mendefinisikan pola dalam Java:

    Pattern<Event, Event> pattern =
            Pattern.<Event>begin("start", AfterMatchSkipStrategy.skipPastLastEvent())
                    .where(new StartCondition("action == 0"))
                    .timesOrMore(3,Time.minutes(15))
                    .followedBy("end")
                    .where(new EndCondition());
    printTestPattern(pattern);

    Sisipkan aturan baru ke dalam tabel rds_demo.

    # Untuk tujuan demonstrasi, Aturan 1 dihapus.
    DELETE FROM `rds_demo` WHERE `id` = 1;
    
    # Sisipkan Aturan 2: Tiga peristiwa berturut-turut dengan action = 0 terjadi dalam jendela 15 menit, diikuti oleh peristiwa dengan action != 1. Versi aturan adalah (1,2) 
    INSERT INTO rds_demo (`id`,`version`,`pattern`,`function`) values('1',2,'{"name":"end","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":null,"nodes":[{"name":"end","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.EndCondition","type":"CLASS"},"type":"ATOMIC"},{"name":"start","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["LOOPING"],"times":{"from":3,"to":3,"windowTime":{"unit":"MINUTES","size":15}},"untilCondition":null},"condition":{"expression":"action == 0","type":"AVIATOR"},"type":"ATOMIC"}],"edges":[{"source":"start","target":"end","type":"SKIP_TILL_NEXT"}],"window":null,"afterMatchStrategy":{"type":"SKIP_PAST_LAST_EVENT","patternName":null},"type":"COMPOSITE","version":1}','com.alibaba.ververica.cep.demo.dynamic.DemoPatternProcessFunction');
  3. Kirim delapan pesan sederhana di Konsol ApsaraMQ for Kafka.

    Contoh:

    2,Tom,0,1,1739584800000   #10:00
    2,Tom,0,1,1739585400000   #10:10
    2,Tom,0,1,1739585700000   #10:15
    2,Tom,0,1,1739586000000   #10:20
    3,Ali,0,1,1739586600000   #10:30
    3,Ali,0,1,1739588400000   #11:00
    3,Ali,0,1,1739589000000   #11:10
    3,Ali,0,1,1739590200000   #11:30
  4. Dalam tabel match_results, eksekusi SELECT * FROM `match_results` ; untuk menanyakan kecocokan yang ditemukan.

    image

    Hasilnya menunjukkan perilaku Tom sesuai dengan pola yang telah ditentukan sementara Ali tidak. Ini karena klik Ali tidak memenuhi kendala waktu 15 menit. Dengan wawasan ini, tim pemasaran dapat menerapkan intervensi yang ditargetkan selama penjualan waktu terbatas. Sebagai contoh, mereka dapat mengeluarkan kupon kepada pengguna yang berulang kali melihat halaman produk dalam jangka waktu tertentu untuk mendorong pembelian.