Realtime Compute for Apache Flink mendukung pemrosesan peristiwa kompleks (CEP) dinamis Flink dalam program DataStream untuk memperbarui aturan secara dinamis. Topik ini menjelaskan cara mengembangkan pekerjaan Flink yang memuat aturan terbaru secara dinamis untuk memproses pesan Kafka upstream.
Casus penggunaan Flink CEP Dinamis
Flink CEP memanfaatkan kemampuan pemrosesan data latensi sub-milidetik terdistribusi dari Apache Flink untuk mendeteksi pola data yang rumit dalam aliran data. Ini memiliki berbagai aplikasi dunia nyata, termasuk:
Pengendalian risiko real-time: Flink CEP dapat menganalisis log perilaku pengguna untuk menandai pelanggan yang tidak biasa, seperti ketika mereka mentransfer total USD 10.000 dalam 10 transaksi dalam jangka waktu lima menit.
Pemasaran real-time: Dengan mengonsumsi log perilaku pengguna, Flink CEP dapat memberikan wawasan untuk mengoptimalkan strategi pemasaran, seperti mengidentifikasi pengguna yang menambahkan lebih dari tiga item ke keranjang mereka dalam 10 menit tetapi tidak melakukan pembayaran. Flink CEP juga berguna untuk deteksi penipuan dalam pemasaran real-time.
IoT: Flink CEP memungkinkan deteksi anomali dan peringatan. Sebagai contoh, ia dapat menandai sepeda bersama yang telah keluar dari area tertentu selama lebih dari 15 menit. Contoh lain dari Flink CEP adalah mengidentifikasi anomali lini produksi dalam produksi industri berdasarkan data sensor IoT. Sebagai contoh, ketika data suhu dari sensor secara konsisten melebihi ambang batas selama tiga jendela waktu berturut-turut, peringatan akan dipicu.
Ikhtisar
Topik ini memberikan panduan praktis untuk mengimplementasikan Flink CEP Dinamis. Sebagai contoh, kami akan menunjukkan bagaimana Flink mengonsumsi data aliran klik pengguna dari Kafka, mengambil aturan secara dinamis dari MySQL, dan mendeteksi peristiwa yang cocok. Setelah menemukan kecocokan pola, pekerjaan Flink dapat memicu peringatan atau menulis hasil ke penyimpanan data. Grafik pipa data adalah sebagai berikut:
Pada awalnya, kami akan memulai pekerjaan Flink dan menyisipkan Aturan 1: Tiga peristiwa dengan action = 0, diikuti oleh peristiwa dengan action != 1. Aturan ini menandai pengguna yang melihat halaman produk tiga kali berturut-turut tanpa melakukan pembayaran. Kemudian, Aturan 1 diperbarui menjadi Aturan 2, yang memperkenalkan kendala ketepatan waktu: Tiga peristiwa berturut-turut dengan action = 0 terjadi dalam jendela waktu 15 menit. Aturan yang diperbarui mengidentifikasi pengguna yang berulang kali melihat halaman produk dalam 30 menit tanpa membelinya.
Prasyarat
Ruang kerja Realtime Compute for Apache Flink telah dibuat. Untuk informasi lebih lanjut, lihat Aktifkan Realtime Compute for Apache Flink.
Pengguna RAM atau Peran RAM memiliki izin yang diperlukan untuk mengakses Konsol Realtime Compute for Apache Flink. Untuk informasi lebih lanjut, lihat Manajemen Izin.
Penyimpanan hulu dan hilir
Instans ApsaraDB RDS for MySQL dibuat. Untuk informasi lebih lanjut, lihat Buat instans RDS for MySQL.
Instans ApsaraMQ for Kafka dibuat. Untuk informasi lebih lanjut, lihat Ikhtisar ApsaraMQ for Kafka.
Prosedur
Berikut ini menjelaskan cara mengembangkan program Flink yang mencari pengguna yang tindakannya sesuai dengan aturan yang telah ditentukan dan cara memperbarui aturan secara dinamis. Prosedurnya adalah sebagai berikut:
Langkah 1: Siapkan data uji
Buat topik Kafka upstream
Masuk ke Konsol ApsaraMQ for Kafka.
Buat topik bernama demo_topic untuk menyimpan log perilaku pengguna simulasi.
Untuk informasi lebih lanjut, lihat Langkah 1: Buat topik.
Siapkan database ApsaraDB RDS for MySQL
Dalam Konsol Data Management (DMS), siapkan data uji dalam database ApsaraDB RDS for MySQL.
Masuk ke instans ApsaraDB RDS for MySQL menggunakan akun istimewa.
Untuk informasi lebih lanjut, lihat Masuk ke instans RDS di Konsol DMS.
Buat tabel bernama rds_demo untuk menyimpan aturan dan tabel bernama match_results untuk menerima kecocokan yang ditemukan.
Salin pernyataan berikut ke editor SQL dan klik Execute(F8):
CREATE DATABASE cep_demo_db; USE cep_demo_db; CREATE TABLE rds_demo ( `id` VARCHAR(64), `version` INT, `pattern` VARCHAR(4096), `function` VARCHAR(512) ); CREATE TABLE match_results ( rule_id INT, rule_version INT, user_id INT, user_name VARCHAR(255), production_id INT, PRIMARY KEY (rule_id,rule_version,user_id,production_id) );Setiap baris tabel rds_demo mewakili sebuah aturan, terdiri dari empat bidang: id (pengenal unik untuk aturan), versi, pattern, dan function (mendefinisikan cara memproses urutan peristiwa yang cocok).
Setiap baris tabel match_results mewakili kecocokan yang ditemukan, menunjukkan perilaku pengguna yang sesuai dengan pola tertentu. Catatan ini dapat memberdayakan tim pemasaran untuk membuat keputusan yang tepat, seperti memberikan kupon kepada pengguna.
Langkah 2: Konfigurasikan daftar putih alamat IP
Untuk memungkinkan ruang kerja Flink Anda mengakses instans ApsaraDB RDS for MySQL, Anda harus menambahkan Blok CIDR dari ruang kerja Flink ke daftar putih alamat IP dari instans ApsaraDB RDS for MySQL.
Peroleh Blok CIDR dari vSwitch yang digunakan oleh ruang kerja Flink.
Masuk ke Konsol Realtime Compute for Apache Flink.
Temukan ruang kerja target dan pilih di kolom Actions.
Dalam kotak dialog Workspace Details, salin Blok CIDR dari vSwitch.

Tambahkan Blok CIDR ke daftar putih alamat IP dari instans ApsaraDB RDS for MySQL.
Untuk informasi lebih lanjut, lihat Konfigurasikan daftar putih alamat IP dalam dokumentasi ApsaraDB RDS for MySQL.

Langkah 3: Kembangkan pekerjaan CEP dinamis
Semua file kode dalam topik ini dapat diunduh dari Repositori GitHub. Untuk tujuan demonstrasi, kode sampel dalam topik ini telah dimodifikasi pada cabang timeOrMoreAndWindow. Untuk kode lengkapnya, unduh paket ververica-cep-demo-master.zip.
Tambahkan dependensi flink-cep ke file pom.xml proyek Anda.
Untuk informasi tentang konfigurasi dependensi dan penanganan konflik, lihat Konfigurasikan dependensi lingkungan untuk Apache Flink.
<dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>flink-cep</artifactId> <version>1.17-vvr-8.0.8</version> <scope>provided</scope> </dependency>Tulis kode program CEP dinamis.
Buat sumber Kafka.
Untuk informasi lebih lanjut, lihat Konektor DataStream Kafka.
Definisikan transformasi menggunakan metode CEP.dynamicPatterns().
Untuk memungkinkan modifikasi aturan dinamis dan mendukung beberapa aturan, Alibaba Cloud Realtime Compute for Apache Flink mendefinisikan metode CEP.dynamicPatterns():
public static <T, R> SingleOutputStreamOperator<R> dynamicPatterns( DataStream<T> input, PatternProcessorDiscovererFactory<T> discovererFactory, TimeBehaviour timeBehaviour, TypeInformation<R> outTypeInfo)Tabel berikut menjelaskan parameter yang diperlukan. Ganti nilai placeholder dengan nilai konfigurasi aktual Anda.
Parameter
Deskripsi
DataStream<T> input
Aliran peristiwa masukan.
PatternProcessorDiscovererFactory<T> discovererFactory
Objek pabrik yang membangun PatternProcessorDiscoverer. PatternProcessorDiscoverer mengambil aturan terbaru untuk membangun antarmuka PatternProcessor.
TimeBehaviour timeBehaviour
Atribut waktu yang mendefinisikan bagaimana pekerjaan Flink CEP memproses peristiwa. Nilai valid:
TimeBehaviour.ProcessingTime: peristiwa diproses berdasarkan waktu pemrosesan.
TimeBehaviour.EventTime: peristiwa diproses berdasarkan waktu peristiwa.
TypeInformation<R> outTypeInfo
Informasi tipe dari aliran keluaran.
Untuk informasi tentang konsep seperti DataStream, TimeBehavior, dan TypeInformation, lihat Panduan Pemrograman API DataStream Flink, Konsep Waktu: Event Time dan Processing Time, dan Class TypeInformation<T>.
Antarmuka PatternProcessor berisi Pola yang mendefinisikan cara mencocokkan peristiwa dan PatternProcessFunction yang menentukan cara memproses peristiwa yang cocok, seperti mengirim peringatan. Selain itu, ia mencakup properti identifikasi, seperti id dan versi. Untuk informasi lebih lanjut, lihat FLIP-200: Dukungan Beberapa Aturan dan Perubahan Aturan Dinamis (Flink CEP).
PatternProcessorDiscovererFactory membangun PatternProcessorDiscoverer yang mengambil PatternProcessor terbaru. Realtime Compute for Apache Flink menawarkan kelas abstrak yang secara berkala memindai penyimpanan eksternal. Kode berikut membangun timer untuk secara berkala memantau penyimpanan eksternal untuk mengambil PatternProcessor terbaru:
public abstract class PeriodicPatternProcessorDiscoverer<T> implements PatternProcessorDiscoverer<T> { ... @Override public void discoverPatternProcessorUpdates( PatternProcessorManager<T> patternProcessorManager) { // Secara berkala menemukan pembaruan prosesor pola. timer.schedule( new TimerTask() { @Override public void run() { if (arePatternProcessorsUpdated()) { List<PatternProcessor<T>> patternProcessors = null; try { patternProcessors = getLatestPatternProcessors(); } catch (Exception e) { e.printStackTrace(); } patternProcessorManager.onPatternProcessorsUpdated(patternProcessors); } } }, 0, intervalMillis); } ... }Realtime Compute for Apache Flink juga menawarkan implementasi dari JDBCPeriodicPatternProcessorDiscoverer untuk mengambil aturan terbaru dari database JDBC, seperti ApsaraDB RDS atau Hologres. Tabel berikut menjelaskan parameter yang diperlukan:
Parameter
Deskripsi
jdbcUrl
URL JDBC dari database.
jdbcDriver
Nama kelas driver database.
tableName
Nama tabel dalam database.
initialPatternProcessors
Daftar PatternProcessors awal.
intervalMillis
Interval di mana database dipantau.
Kode sampel berikut memberikan contoh penggunaan JDBCPeriodicPatternProcessorDiscoverer. Peristiwa yang cocok dicetak ke log TaskManager.
// import ...... public class CepDemo { public static void main(String[] args) throws Exception { ...... // DataStream Source DataStreamSource<Event> source = env.fromSource( kafkaSource, WatermarkStrategy.<Event>forMonotonousTimestamps() .withTimestampAssigner((event, ts) -> event.getEventTime()), "Kafka Source"); env.setParallelism(1); // keyBy userId and productionId // Catatan, hanya peristiwa dengan kunci yang sama yang akan diproses untuk melihat apakah ada kecocokan KeyedStream<Event, Tuple2<Integer, Integer>> keyedStream = source.assignTimestampsAndWatermarks( WatermarkStrategy.<Event>forGenerator(ctx -> new EventBoundedOutOfOrdernessWatermarks(Duration.ofSeconds(5))) ).keyBy(new KeySelector<Event, Tuple2<Integer, Integer>>() { @Override public Tuple2<Integer, Integer> getKey(Event value) throws Exception { return Tuple2.of(value.getId(), value.getProductionId()); } }); SingleOutputStreamOperator<String> output = CEP.dynamicPatterns( keyedStream, new JDBCPeriodicPatternProcessorDiscovererFactory<>( params.get(JDBC_URL_ARG), JDBC_DRIVE, params.get(TABLE_NAME_ARG), null, Long.parseLong(params.get(JDBC_INTERVAL_MILLIS_ARG))), Boolean.parseBoolean(params.get(USING_EVENT_TIME)) ? TimeBehaviour.EventTime : TimeBehaviour.ProcessingTime, TypeInformation.of(new TypeHint<String>() {})); output.print(); // Kompilasi dan kirimkan pekerjaan env.execute("CEPDemo"); } }CatatanDalam kode demo, aliran data masukan di-key berdasarkan userId dan productionId sebelum
CEP.dynamicPatterns()dipanggil. Dengan cara ini, Flink hanya mencari pola dalam peristiwa dengan userId dan productionId yang sama.
Dalam Konsol Realtime Compute for Apache Flink, unggah JAR program dan buat Penyebaran JAR.
Tabel berikut menjelaskan parameter yang perlu Anda konfigurasikan saat membuat penyebaran:
CatatanUntuk memudahkan pengujian, unduh cep-demo.jar dan buat Penyebaran JAR darinya. Karena tidak ada data yang disimpan dalam sumber Kafka upstream dan tabel aturan kosong, tidak ada output yang dikembalikan setelah Anda memulai penyebaran.
Parameter
Deskripsi
Deployment Mode
Pilih Mode Aliran.
Deployment Name
Masukkan nama Penyebaran JAR.
Engine Version
Untuk informasi lebih lanjut tentang versi mesin, lihat Versi Mesin dan Kebijakan Siklus Hidup. Kami merekomendasikan Anda menggunakan versi yang direkomendasikan atau versi stabil. Versi mesin diklasifikasikan ke dalam jenis berikut:
Versi Direkomendasikan: versi minor terbaru dari versi utama terbaru.
Versi Stabil: versi minor terbaru dari versi utama yang masih dalam periode layanan produk. Cacat pada versi sebelumnya diperbaiki dalam versi ini.
Versi Normal: versi minor lainnya yang masih dalam periode layanan produk.
Versi Tidak Direkomendasikan: versi yang melebihi periode layanan produk.
JAR URL
Unggah JAR program Anda atau test cep-demo.jar.
Entry Point Class
Atur nilai menjadi
com.alibaba.ververica.cep.demo.CepDemo.Entry Point Main Arguments
Jika Anda menggunakan JAR program Anda dengan sistem upstream dan downstream yang dikonfigurasikan, lewati bidang ini. Jika Anda menggunakan test cep-demo.jar yang kami sediakan, salin kode berikut ke bidang tersebut:
--kafkaBrokers YOUR_KAFKA_BROKERS --inputTopic YOUR_KAFKA_TOPIC --inputTopicGroup YOUR_KAFKA_TOPIC_GROUP --jdbcUrl jdbc:mysql://YOUR_DB_URL:port/DATABASE_NAME?user=YOUR_USERNAME&password=YOUR_PASSWORD --tableName YOUR_TABLE_NAME --jdbcIntervalMs 3000 --usingEventTime falseGanti nilai placeholder dalam kode di atas dengan yang berikut:
kafkaBrokers: alamat broker Kafka Anda.
inputTopic: nama topik Kafka Anda.
inputTopicGroup: kelompok konsumen Kafka Anda.
jdbcUrl: URL JDBC database Anda.
CatatanUntuk terhubung ke database melalui URL JDBC, gunakan akun standar dengan kata sandi yang hanya terdiri dari huruf dan angka. Anda dapat menggunakan metode autentikasi untuk penyebaran Anda berdasarkan kebutuhan bisnis Anda.
tableName: nama tabel tujuan.
jdbcIntervalMs: interval di mana database dipantau.
usingEventTime: menentukan apakah akan menggunakan waktu peristiwa. Nilai valid: true dan false.
CatatanGanti nilai placeholder dengan nilai konfigurasi aktual Anda.
Gunakan variabel daripada kredensial teks biasa dalam produksi. Untuk informasi lebih lanjut, lihat Kelola variabel.
Pada tab Deployment halaman Deployments, klik Edit di bagian Parameters. Kemudian, tambahkan parameter berikut di bidang Other Configuration.
Dependensi
flink-cepbergantung pada AppClassLoader sementara kelasaviatordalam JAR pengguna bergantung pada UserCodeClassLoader. Untuk mencegah kegagalan pemuatan dan memastikan AppClassLoader dapat mengakses kelas dalam JAR pengguna, tambahkan konfigurasi berikut.kubernetes.application-mode.classpath.include-user-jar: 'true' classloader.resolve-order: parent-firstUntuk informasi lebih lanjut tentang cara mengonfigurasi parameter di bagian Parameters, lihat Parameter.
Navigasikan ke . Temukan penyebaran target, dan klik Start di kolom Actions.
Untuk informasi lebih lanjut, lihat Mulai penyebaran.
Langkah 4: Tambahkan aturan
Setelah Penyebaran JAR dimulai, tambahkan Aturan 1: Tiga peristiwa berturut-turut dengan action = 0, diikuti oleh peristiwa dengan action != 1. Urutan peristiwa ini menunjukkan bahwa pengguna melihat halaman produk tiga kali tetapi tidak melakukan pembayaran.
Masuk ke Konsol ApsaraDB RDS for MySQL.
Tambahkan aturan.
Gabungkan string JSON yang mendefinisikan pola dengan bidang id, version, dan function, lalu eksekusi pernyataan INSERT INTO untuk menyisipkan data ke dalam tabel aturan di database ApsaraDB RDS for MySQL.
INSERT INTO rds_demo ( `id`, `version`, `pattern`, `function` ) values( '1', 1, '{"name":"end","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":null,"nodes":[{"name":"end","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.EndCondition","type":"CLASS"},"type":"ATOMIC"},{"name":"start","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["LOOPING"],"times":{"from":3,"to":3,"windowTime":null},"untilCondition":null},"condition":{"expression":"action == 0","type":"AVIATOR"},"type":"ATOMIC"}],"edges":[{"source":"start","target":"end","type":"SKIP_TILL_NEXT"}],"window":null,"afterMatchStrategy":{"type":"SKIP_PAST_LAST_EVENT","patternName":null},"type":"COMPOSITE","version":1}', 'com.alibaba.ververica.cep.demo.dynamic.DemoPatternProcessFunction') ;Untuk meningkatkan keterbacaan bidang pola dalam database demi kenyamanan Anda, Realtime Compute for Apache Flink memungkinkan Anda mendefinisikan pola dalam format JSON. Untuk informasi lebih lanjut, lihat Definisi aturan dalam format JSON dalam Flink CEP Dinamis. Nilai bidang pola dalam pernyataan SQL di atas adalah contoh string pola serialisasi berformat JSON yang didukung oleh Realtime Compute for Apache Flink. Pola ini cocok dengan urutan peristiwa di mana tiga peristiwa berturut-turut memiliki action = 0, langsung diikuti oleh peristiwa dengan action != 1.
CatatanEndCondition didefinisikan dalam kode untuk memeriksa "action != 1".
Gunakan API Pola untuk mendefinisikan pola:
Pattern<Event, Event> pattern = Pattern.<Event>begin("start", AfterMatchSkipStrategy.skipPastLastEvent()) .where(new StartCondition("action == 0")) .timesOrMore(3) .followedBy("end") .where(new EndCondition());Dapat dikonversi ke string JSON menggunakan metode convertPatternToJSONString dalam CepJsonUtils.
public void printTestPattern(Pattern<?, ?> pattern) throws JsonProcessingException { System.out.println(CepJsonUtils.convertPatternToJSONString(pattern)); }Definisikan pola dalam JSON:
Gunakan klien Kafka untuk mengirim pesan ke topik demo_topic.
Dalam demo ini, Anda juga dapat mengirim pesan uji di panel Start to Send and Consume Message dari topik demo_topic di Konsol ApsaraMQ for Kafka.
1,Ken,0,1,1662022777000 1,Ken,0,1,1662022778000 1,Ken,0,1,1662022779000 1,Ken,0,1,1662022780000
Tabel berikut menjelaskan bidang pesan dalam topik demo_topic.
Bidang
Deskripsi
id
ID pengguna.
username
Nama pengguna.
action
Tindakan pengguna. Nilai valid:
0: operasi melihat.
1: operasi pembelian.
product_id
ID produk.
event_time
Waktu peristiwa ketika tindakan dilakukan.
Lihat aturan terbaru yang dicetak ke log JobManager dan kecocokan yang ditemukan yang di-output ke log TaskManager.
Dalam log JobManager, gunakan JDBCPeriodicPatternProcessorDiscoverer sebagai kata kunci untuk mencari aturan terbaru.

Pada subtab Running Task Managers di bawah tab Logs, pilih file log dengan akhiran .out. Cari
A match for Pattern of (id, version): (1, 1)dan lihat entri log.
Dalam tabel match_results, eksekusi
SELECT * FROM `match_results` ;untuk menanyakan kecocokan yang ditemukan.
Langkah 5: Perbarui aturan
Strategi pemasaran yang disesuaikan biasanya mempertimbangkan ketepatan waktu. Oleh karena itu, Aturan 2 dimasukkan: Tiga peristiwa berturut-turut dengan action = 0 terjadi dalam jendela waktu 15 menit.
Atur
usingEventTimemenjaditrue.Pergi ke . Temukan penyebaran target, dan klik Cancel di kolom Actions.
Klik nama penyebaran. Di halaman detail penyebaran, pilih tab Configuration. Klik Edit di sudut kanan atas bagian Basic. Di bidang Entry Point Main Arguments, atur
usingEventTimemenjaditrue. Klik Save.Start penyebaran lagi.
Sisipkan aturan baru.
Gunakan API Pola untuk mendefinisikan pola dalam Java:
Pattern<Event, Event> pattern = Pattern.<Event>begin("start", AfterMatchSkipStrategy.skipPastLastEvent()) .where(new StartCondition("action == 0")) .timesOrMore(3,Time.minutes(15)) .followedBy("end") .where(new EndCondition()); printTestPattern(pattern);Sisipkan aturan baru ke dalam tabel rds_demo.
# Untuk tujuan demonstrasi, Aturan 1 dihapus. DELETE FROM `rds_demo` WHERE `id` = 1; # Sisipkan Aturan 2: Tiga peristiwa berturut-turut dengan action = 0 terjadi dalam jendela 15 menit, diikuti oleh peristiwa dengan action != 1. Versi aturan adalah (1,2) INSERT INTO rds_demo (`id`,`version`,`pattern`,`function`) values('1',2,'{"name":"end","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":null,"nodes":[{"name":"end","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.EndCondition","type":"CLASS"},"type":"ATOMIC"},{"name":"start","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["LOOPING"],"times":{"from":3,"to":3,"windowTime":{"unit":"MINUTES","size":15}},"untilCondition":null},"condition":{"expression":"action == 0","type":"AVIATOR"},"type":"ATOMIC"}],"edges":[{"source":"start","target":"end","type":"SKIP_TILL_NEXT"}],"window":null,"afterMatchStrategy":{"type":"SKIP_PAST_LAST_EVENT","patternName":null},"type":"COMPOSITE","version":1}','com.alibaba.ververica.cep.demo.dynamic.DemoPatternProcessFunction');Kirim delapan pesan sederhana di Konsol ApsaraMQ for Kafka.
Contoh:
2,Tom,0,1,1739584800000 #10:00 2,Tom,0,1,1739585400000 #10:10 2,Tom,0,1,1739585700000 #10:15 2,Tom,0,1,1739586000000 #10:20 3,Ali,0,1,1739586600000 #10:30 3,Ali,0,1,1739588400000 #11:00 3,Ali,0,1,1739589000000 #11:10 3,Ali,0,1,1739590200000 #11:30Dalam tabel match_results, eksekusi
SELECT * FROM `match_results` ;untuk menanyakan kecocokan yang ditemukan.
Hasilnya menunjukkan perilaku Tom sesuai dengan pola yang telah ditentukan sementara Ali tidak. Ini karena klik Ali tidak memenuhi kendala waktu 15 menit. Dengan wawasan ini, tim pemasaran dapat menerapkan intervensi yang ditargetkan selama penjualan waktu terbatas. Sebagai contoh, mereka dapat mengeluarkan kupon kepada pengguna yang berulang kali melihat halaman produk dalam jangka waktu tertentu untuk mendorong pembelian.