Flink Complex Event Processing (CEP) adalah fitur yang memproses aliran event kompleks secara dinamis untuk mendeteksi pola event tertentu secara real time dan memicu peringatan. Dalam pemasaran e-commerce, Flink CEP dapat memantau perilaku pengguna dan data transaksi secara real time guna mengidentifikasi event abnormal atau kritis serta mengirimkan peringatan tepat waktu.
Informasi latar belakang
Pertumbuhan pesat industri e-commerce telah menyebabkan peningkatan eksponensial pada volume data perilaku pengguna dan transaksi. Metode pemrosesan batch tradisional tidak lagi mampu memenuhi kebutuhan identifikasi dan respons cepat terhadap perilaku abnormal, ancaman sistem, dan churn pengguna. Sebaliknya, mesin Pemrosesan Peristiwa Kompleks (CEP) dinamis dapat memodelkan dan menganalisis perilaku pengguna multi-tahap, secara otomatis mengidentifikasi pola event kompleks, serta memicu peringatan pada tahap awal ancaman. Inilah keunggulan utama CEP dinamis dalam operasi bisnis real-time. Fitur ini memiliki tiga karakteristik utama berikut:
Kinerja real-time tinggi: Memberikan respons dalam hitungan milidetik, memungkinkan peringatan selama kejadian berlangsung—bukan analisis pasca-kejadian—sehingga membantu Anda membuat keputusan lebih cepat.
Aturan fleksibel dan dapat dikonfigurasi: Mendukung pembaruan dinamis kebijakan aturan, memungkinkan Anda beradaptasi cepat terhadap perubahan bisnis tanpa harus me-restart layanan.
Pengenalan event kompleks yang andal: Mendukung pencocokan logika lanjutan, seperti urutan multi-event, jendela waktu, dan kondisi gabungan, sehingga mampu menangkap secara akurat skenario bisnis yang kompleks.
Dalam industri e-commerce, skenario umum penggunaan CEP dinamis meliputi, namun tidak terbatas pada, hal-hal berikut:
Skenario | Deskripsi |
Peluang cross-selling dan up-selling | Saat menelusuri produk, pengguna sering kali menunjukkan minat lintas kategori berbeda. Misalnya, pengguna mungkin melihat ponsel lalu mencari headphone atau power bank. Perilaku ini membuka peluang cross-selling dan up-selling. Dengan merekomendasikan produk pelengkap secara akurat, seperti casing ponsel atau headphone, atau menawarkan paket bundling seperti "diskon paket ponsel + headphone", platform dapat meningkatkan tingkat pembelian item tambahan dan menaikkan nilai pesanan rata-rata. Hal ini juga meningkatkan pengalaman pengguna dan memperkuat loyalitas pengguna, mendorong pertumbuhan bisnis. |
Pemulihan keranjang belanja bernilai tinggi | Pengguna mungkin menambahkan item bernilai tinggi ke keranjang belanja mereka tetapi tidak menyelesaikan pembelian karena sensitivitas harga atau keraguan. Hal ini menyebabkan potensi kehilangan penjualan. Dengan mengidentifikasi keranjang belanja yang ditinggalkan secara real time dan memicu intervensi, seperti diskon berbatas waktu, peringatan stok rendah, atau penawaran gratis ongkos kirim, platform dapat secara efektif mengurangi kehilangan item bernilai tinggi, meningkatkan tingkat konversi pesanan, dan memulihkan pendapatan potensial. Ini menciptakan situasi win-win bagi nilai pengguna maupun pendapatan platform. |
Identifikasi pengguna berminat tinggi | Pengguna yang menelusuri produk yang sama berulang kali dalam periode singkat menunjukkan minat beli yang tinggi. Dengan mengidentifikasi perilaku ini dan memicu pemasaran personalisasi, seperti kupon eksklusif atau pengingat stok, platform dapat mempercepat proses pengambilan keputusan pengguna, meningkatkan tingkat konversi, dan memperbaiki pengalaman pengguna, yang pada gilirannya mendorong penjualan. |
Operasi pengguna sensitif harga | Pengguna sensitif harga sering kali menelusuri suatu produk berulang kali dan hanya menambahkannya ke keranjang belanja saat harganya turun. Dengan menganalisis perilaku ini, platform dapat mengirimkan notifikasi atau penawaran tertarget saat harga berubah, seperti "Produk yang Anda pantau sedang diskon!". Hal ini meningkatkan tingkat konversi dan meningkatkan efisiensi operasi pengguna. |
Peringatan risiko churn | Pengguna yang sering menelusuri produk tetapi tidak melakukan pemesanan dalam waktu lama berisiko churn. Dengan mengidentifikasi perilaku ini dan mengambil langkah pemulihan, seperti mengirimkan kupon eksklusif atau merekomendasikan produk populer, platform dapat secara efektif mengurangi tingkat churn, memperpanjang siklus hidup pengguna, serta meningkatkan retensi pengguna dan pendapatan platform. |
Arsitektur solusi
Flink CEP adalah library Apache Flink untuk memproses pola event kompleks. Dengan Flink CEP, Anda dapat mendefinisikan pola event kompleks, memantau aliran event secara real time, dan mengidentifikasi urutan event yang sesuai dengan pola tersebut. Library ini kemudian menghasilkan hasil pencocokan. Arsitektur solusi adalah sebagai berikut:

Aliran Event
Aliran event merupakan sumber input untuk pemrosesan CEP, biasanya berupa aliran data kontinu yang berisi rangkaian event tersusun secara kronologis. Setiap event dapat memiliki beberapa properti yang digunakan untuk pencocokan pola.
Definisi Pola dan Aturan
Anda dapat mendefinisikan pola event dan aturan yang menggambarkan urutan atau kombinasi event yang ingin dideteksi. Pola dapat mencakup urutan event, batasan waktu, dan filter kondisi. Misalnya, Anda dapat mendefinisikan pola di mana event A diikuti oleh event B dalam waktu 10 detik.
Analisis Mesin CEP
Mesin CEP menerima aliran event dan menganalisisnya berdasarkan pola dan aturan yang telah ditentukan. Mesin ini terus-menerus memantau aliran event dan mencoba mencocokkan event input dengan pola yang telah ditentukan, mempertimbangkan batasan seperti urutan waktu event, kondisi properti, dan jendela waktu selama proses pencocokan.
Keluaran Pencocokan CEP
Ketika urutan event dalam aliran event berhasil cocok dengan pola yang ditentukan, mesin CEP menghasilkan keluaran. Keluaran ini dapat berupa urutan event yang cocok, aksi yang dipicu oleh aturan, atau format keluaran lain yang ditentukan pengguna. Hasil pencocokan dapat digunakan untuk pemrosesan selanjutnya, seperti peringatan, pengambilan keputusan, atau penyimpanan data.
Prasyarat
Anda telah mengaktifkan Realtime Compute for Apache Flink. Untuk informasi lebih lanjut, lihat Aktifkan Realtime Compute for Apache Flink.
Anda telah mengaktifkan Message Queue for Apache Kafka. Untuk informasi lebih lanjut, lihat Terapkan instance Message Queue for Apache Kafka.
Anda telah mengaktifkan RDS for MySQL. Untuk informasi lebih lanjut, lihat Buat instance RDS for MySQL.
Pastikan Realtime Compute for Apache Flink, ApsaraDB RDS for MySQL, dan Message Queue for Apache Kafka berada dalam VPC yang sama. Jika tidak berada dalam VPC yang sama, Anda harus membuat koneksi jaringan antar-VPC atau mengakses layanan melalui Internet. Untuk informasi lebih lanjut, lihat Bagaimana cara mengakses layanan lain lintas VPC? dan Bagaimana cara mengakses Internet?.
Anda memiliki pengguna Resource Access Management (RAM) atau peran RAM dengan izin yang diperlukan.
Langkah 1: Persiapan
Buat instance ApsaraDB RDS for MySQL dan siapkan sumber data
Buat database ApsaraDB RDS for MySQL. Untuk informasi lebih lanjut, lihat Buat database.
Untuk instance tujuan, buat database bernama
ecommerce.Siapkan sumber data Change Data Capture (CDC) MySQL.
Pada halaman detail instance tujuan, klik Log On To Database di bagian atas halaman.
Pada dialog logon DMS, masukkan nama pengguna dan kata sandi untuk akun database yang telah Anda buat, lalu klik Log On.
Setelah berhasil login, klik ganda database
ecommercedi sebelah kiri untuk beralih ke database tersebut.Pada Konsol SQL, masukkan pernyataan Data Definition Language (DDL) berikut untuk membuat tabel dan memasukkan data.
-- Create rule table 1 CREATE TABLE rds_demo1 ( `id` VARCHAR(64), `version` INT, `pattern` VARCHAR(4096), `function` VARCHAR(512) ); -- Create rule table 2 CREATE TABLE rds_demo2 ( `id` VARCHAR(64), `version` INT, `pattern` VARCHAR(4096), `function` VARCHAR(512) ); -- Create rule table 3 CREATE TABLE rds_demo3 ( `id` VARCHAR(64), `version` INT, `pattern` VARCHAR(4096), `function` VARCHAR(512) ); -- Create rule table 4 CREATE TABLE rds_demo4 ( `id` VARCHAR(64), `version` INT, `pattern` VARCHAR(4096), `function` VARCHAR(512) ); -- Create rule table 5 CREATE TABLE rds_demo5 ( `id` VARCHAR(64), `version` INT, `pattern` VARCHAR(4096), `function` VARCHAR(512) ); -- Create the source table CREATE TABLE `click_stream1` ( id bigint not null primary key auto_increment, -- Auto-increment primary key eventTime timestamp, eventType varchar(50), productId varchar(50), categoryId varchar(50), categoryCode varchar(80), brand varchar(50), price decimal(10, 2), userId varchar(50), userSession varchar(50) ); CREATE TABLE `click_stream2` ( id bigint not null primary key auto_increment, -- Auto-increment primary key eventTime timestamp, eventType varchar(50), productId varchar(50), categoryId varchar(50), categoryCode varchar(80), brand varchar(50), price decimal(10, 2), userId varchar(50), userSession varchar(50) ); CREATE TABLE `click_stream3` ( id bigint not null primary key auto_increment, -- Auto-increment primary key eventTime timestamp, eventType varchar(50), productId varchar(50), categoryId varchar(50), categoryCode varchar(80), brand varchar(50), price decimal(10, 2), userId varchar(50), userSession varchar(50) ); CREATE TABLE `click_stream4` ( id bigint not null primary key auto_increment, -- Auto-increment primary key eventTime timestamp, eventType varchar(50), productId varchar(50), categoryId varchar(50), categoryCode varchar(80), brand varchar(50), price decimal(10, 2), userId varchar(50), userSession varchar(50) ); CREATE TABLE `click_stream5` ( id bigint not null primary key auto_increment, -- Auto-increment primary key eventTime timestamp, eventType varchar(50), productId varchar(50), categoryId varchar(50), categoryCode varchar(80), brand varchar(50), price decimal(10, 2), userId varchar(50), userSession varchar(50) );Klik Execute, lalu klik Execute Directly.
Buat sumber daya topik dan grup Kafka
Buat sumber daya Kafka berikut. Untuk informasi lebih lanjut, lihat Buat sumber daya.
Grup: clickstream.consumer.
Topik: click_stream1, click_stream2, click_stream3, click_stream4, dan click_stream5.
Saat membuat topik, atur jumlah partisi menjadi 1. Jika tidak, data sampel mungkin tidak sesuai dengan hasil pada beberapa skenario.

Langkah 2: Sinkronisasi data dari MySQL ke Kafka secara real time
Menyinkronkan event clickstream pengguna dari MySQL ke Kafka mengurangi beban yang ditempatkan oleh banyak pekerjaan pada database MySQL.
Buat katalog MySQL. Untuk informasi lebih lanjut, lihat Buat katalog MySQL.
Pada contoh ini, katalog diberi nama
mysql-catalog, dan database default adalahecommerce.Buat katalog Kafka. Untuk informasi lebih lanjut, lihat Kelola katalog Kafka JSON.
Pada contoh ini, katalog diberi nama
kafka-catalog.Pada halaman , buat pekerjaan stream SQL dan salin kode berikut ke editor SQL.
CREATE TEMPORARY TABLE `clickstream1` ( `key_id` BIGINT, `value_eventTime` BIGINT, `value_eventType` STRING, `value_productId` STRING, `value_categoryId` STRING, `value_categoryCode` STRING, `value_brand` STRING, `value_price` DECIMAL(10, 2), `value_userId` STRING, `value_userSession` STRING, -- Define the primary key. PRIMARY KEY (`key_id`) NOT ENFORCED, ts AS TO_TIMESTAMP_LTZ(value_eventTime, 3), WATERMARK FOR ts AS ts - INTERVAL '2' SECOND -- Define a watermark. ) WITH ( 'connector'='upsert-kafka', 'topic' = 'click_stream1', 'properties.bootstrap.servers' = 'alikafka-pre-cn-w******02-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-3-vpc.alikafka.aliyuncs.com:9092', 'key.format' = 'json', 'value.format' = 'json', 'key.fields-prefix' = 'key_', 'value.fields-prefix' = 'value_', 'value.fields-include' = 'EXCEPT_KEY' ); CREATE TEMPORARY TABLE `clickstream2` ( `key_id` BIGINT, `value_eventTime` BIGINT, `value_eventType` STRING, `value_productId` STRING, `value_categoryId` STRING, `value_categoryCode` STRING, `value_brand` STRING, `value_price` DECIMAL(10, 2), `value_userId` STRING, `value_userSession` STRING, -- Define the primary key. PRIMARY KEY (`key_id`) NOT ENFORCED, ts AS TO_TIMESTAMP_LTZ(value_eventTime, 3), WATERMARK FOR ts AS ts - INTERVAL '2' SECOND -- Define a watermark. ) WITH ( 'connector'='upsert-kafka', 'topic' = 'click_stream2', 'properties.bootstrap.servers' = 'alikafka-pre-cn-w******02-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-3-vpc.alikafka.aliyuncs.com:9092', 'key.format' = 'json', 'value.format' = 'json', 'key.fields-prefix' = 'key_', 'value.fields-prefix' = 'value_', 'value.fields-include' = 'EXCEPT_KEY' ); CREATE TEMPORARY TABLE `clickstream3` ( `key_id` BIGINT, `value_eventTime` BIGINT, `value_eventType` STRING, `value_productId` STRING, `value_categoryId` STRING, `value_categoryCode` STRING, `value_brand` STRING, `value_price` DECIMAL(10, 2), `value_userId` STRING, `value_userSession` STRING, -- Define the primary key. PRIMARY KEY (`key_id`) NOT ENFORCED, ts AS TO_TIMESTAMP_LTZ(value_eventTime, 3), WATERMARK FOR ts AS ts - INTERVAL '2' SECOND -- Define a watermark. ) WITH ( 'connector'='upsert-kafka', 'topic' = 'click_stream3', 'properties.bootstrap.servers' = 'alikafka-pre-cn-w******02-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-3-vpc.alikafka.aliyuncs.com:9092', 'key.format' = 'json', 'value.format' = 'json', 'key.fields-prefix' = 'key_', 'value.fields-prefix' = 'value_', 'value.fields-include' = 'EXCEPT_KEY' ); CREATE TEMPORARY TABLE `clickstream4` ( `key_id` BIGINT, `value_eventTime` BIGINT, `value_eventType` STRING, `value_productId` STRING, `value_categoryId` STRING, `value_categoryCode` STRING, `value_brand` STRING, `value_price` DECIMAL(10, 2), `value_userId` STRING, `value_userSession` STRING, -- Define the primary key. PRIMARY KEY (`key_id`) NOT ENFORCED, ts AS TO_TIMESTAMP_LTZ(value_eventTime, 3), WATERMARK FOR ts AS ts - INTERVAL '2' SECOND -- Define a watermark. ) WITH ( 'connector'='upsert-kafka', 'topic' = 'click_stream4', 'properties.bootstrap.servers' = 'alikafka-pre-cn-w******02-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-3-vpc.alikafka.aliyuncs.com:9092', 'key.format' = 'json', 'value.format' = 'json', 'key.fields-prefix' = 'key_', 'value.fields-prefix' = 'value_', 'value.fields-include' = 'EXCEPT_KEY' ); CREATE TEMPORARY TABLE `clickstream5` ( `key_id` BIGINT, `value_eventTime` BIGINT, `value_eventType` STRING, `value_productId` STRING, `value_categoryId` STRING, `value_categoryCode` STRING, `value_brand` STRING, `value_price` DECIMAL(10, 2), `value_userId` STRING, `value_userSession` STRING, -- Define the primary key. PRIMARY KEY (`key_id`) NOT ENFORCED, ts AS TO_TIMESTAMP_LTZ(value_eventTime, 3), WATERMARK FOR ts AS ts - INTERVAL '2' SECOND -- Define a watermark. ) WITH ( 'connector'='upsert-kafka', 'topic' = 'click_stream5', 'properties.bootstrap.servers' = 'alikafka-pre-cn-w******02-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-3-vpc.alikafka.aliyuncs.com:9092', 'key.format' = 'json', 'value.format' = 'json', 'key.fields-prefix' = 'key_', 'value.fields-prefix' = 'value_', 'value.fields-include' = 'EXCEPT_KEY' ); BEGIN STATEMENT SET; INSERT INTO `clickstream1` SELECT id, UNIX_TIMESTAMP(eventTime) * 1000 as eventTime, eventType, productId, categoryId, categoryCode, brand, price, `userId`, userSession FROM `mysql-catalog`.`ecommerce`.`click_stream1`; INSERT INTO `clickstream2` SELECT id, UNIX_TIMESTAMP(eventTime) * 1000 as eventTime, eventType, productId, categoryId, categoryCode, brand, price, `userId`, userSession FROM `mysql-catalog`.`ecommerce`.`click_stream2`; INSERT INTO `clickstream3` SELECT id, UNIX_TIMESTAMP(eventTime) * 1000 as eventTime, eventType, productId, categoryId, categoryCode, brand, price, `userId`, userSession FROM `mysql-catalog`.`ecommerce`.`click_stream3`; INSERT INTO `clickstream4` SELECT id, UNIX_TIMESTAMP(eventTime) * 1000 as eventTime, eventType, productId, categoryId, categoryCode, brand, price, `userId`, userSession FROM `mysql-catalog`.`ecommerce`.`click_stream4`; INSERT INTO `clickstream5` SELECT id, UNIX_TIMESTAMP(eventTime) * 1000 as eventTime, eventType, productId, categoryId, categoryCode, brand, price, `userId`, userSession FROM `mysql-catalog`.`ecommerce`.`click_stream5`; END; -- Required when writing to multiple sinks.Di pojok kanan atas, klik Deploy untuk menerapkan pekerjaan.
Di panel navigasi sebelah kiri, pilih . Di kolom Actions untuk pekerjaan target, klik Start. Pilih Stateless Start lalu klik Start.
Langkah 3: Mengembangkan, menerapkan, dan menjalankan pekerjaan CEP
Bagian ini menjelaskan cara menerapkan pekerjaan cep-demo-1.0-SNAPSHOT-jar-with-dependencies.jar. Pekerjaan ini mengonsumsi event clickstream pengguna dari Kafka, memprosesnya, dan mencetak informasi peringatan ke Konsol pengembangan Realtime Compute for Apache Flink. Anda dapat menyesuaikan kode berdasarkan arsitektur bisnis Anda dan memilih konektor downstream yang sesuai untuk berbagai skenario keluaran data. Untuk informasi lebih lanjut tentang konektor yang didukung, lihat Konektor yang didukung.
1. Pengembangan kode
Bagian ini hanya menampilkan kode inti dan menjelaskan fungsinya.
2. Terapkan pekerjaan
Pada halaman , klik untuk menerapkan lima pekerjaan stream secara terpisah.

Tabel berikut menjelaskan parameter-parameter tersebut:
Parameter | Deskripsi | Contoh |
Deployment Mode | Pemrosesan stream | Streaming Mode |
Deployment Name | Masukkan nama pekerjaan JAR yang sesuai. |
|
Engine Version | Versi mesin Flink yang digunakan oleh pekerjaan saat ini. SDK untuk kode dalam topik ini menggunakan JDK 11. Pilih versi yang menyertakan | vvr-8.0.11-jdk11-flink-1.17 |
JAR URI | Klik ikon | oss://xxx/artifacts/namespaces/xxx/cep-demo-1.0-SNAPSHOT-jar-with-dependencies.jar |
Entry Point Class | Kelas titik masuk program. | com.alibaba.ververica.cep.demo.CepDemo |
Entry Point Main Arguments | Anda dapat meneruskan parameter di sini dan memanggilnya dalam metode utama. Konfigurasikan parameter berikut untuk topik ini:
|
|
Untuk informasi lebih lanjut tentang penerapan, lihat Terapkan pekerjaan JAR.
3. Jalankan pekerjaan
Pada halaman Job O&M, di kolom Actions untuk pekerjaan target, klik Start. Pilih Stateless Start lalu klik Start. Jalankan lima pekerjaan untuk skenario tersebut, yaitu EcommerceCEPRunner1, EcommerceCEPRunner2, EcommerceCEPRunner3, EcommerceCEPRunner4, dan EcommerceCEPRunner5, secara berurutan.
Untuk informasi lebih lanjut tentang konfigurasi start, lihat Jalankan pekerjaan.
di sebelah kanan untuk mengunggah file 








