FlinkCEP(Complex Event Processing)用於動態處理複雜事件流,能夠即時檢測特定事件模式並觸發預警。在電商營銷中,FlinkCEP可用於即時監控使用者行為、交易資料等,識別異常或關鍵事件,及時發出預警。
背景資訊
隨著電商行業的快速發展,使用者行為資料和交易資料的規模呈指數級增長。傳統的批處理方式已難以滿足對異常行為、系統風險和使用者流失的及時識別與響應。相比之下,利用動態複雜事件處理(CEP)引擎對多階段使用者行為進行建模分析,能夠自動識別複雜的事件模式,並在風險發生的初期觸發預警,這是動態CEP在即時業務中的核心優勢所在。其具備以下三大關鍵特點:
即時性強:實現毫秒級響應,支援“事中預警”,而非事後分析,助力快速決策。
規則靈活可配置:支援動態更新規則策略,無需重啟服務即可快速適應業務變化。
複雜事件識別能力強:支援多事件序列、時間視窗、條件組合等進階邏輯匹配,精準捕捉複雜業務情境。
在電商行業,動態CEP的典型應用情境包括但不限於以下幾個方面:
情境 | 說明 |
交叉銷售與追加銷售機會 | 使用者在瀏覽商品時,常表現出跨品類興趣,例如先看手機,再查看耳機或充電寶。這種行為蘊含交叉銷售和追加銷售機會。通過精準推薦互補商品(如手機殼、耳機)或提供組合優惠(如“手機+耳機套餐立減”),平台不僅能提升附加商品購買率、提高客單價,還能最佳化使用者體驗,增強使用者粘性,從而推動業務增長。 |
高價值購物車挽回 | 使用者將高價值商品加入購物車後,可能因價格敏感或決策猶豫未完成購買,造成潛在銷售損失。通過即時識別購物車放棄行為並觸發幹預(如限時折扣、庫存預警或免運費優惠),平台可有效減少高價值商品的流失,提升訂單轉化率,挽回潛在收益,實現使用者價值與平台收益的雙贏。 |
高意向使用者識別 | 使用者短時間內多次瀏覽同一商品,表明其購買意向較高。通過識別該行為並觸發個人化營銷(如專屬優惠券或庫存提醒),平台可加速使用者決策,提高轉化率,同時最佳化使用者體驗,推動銷售增長。 |
價格敏感使用者營運 | 價格敏感使用者常反覆瀏覽某商品,僅在降價時加入購物車。通過分析該行為,平台可在價格變動時推播通知或定向優惠(如“您關注的商品已降價!”),提升轉化率,同時最佳化使用者營運效率。 |
流失風險預警 | 使用者頻繁瀏覽商品卻長期未下單,可能存在流失風險。通過識別此類行為並採取挽回措施(如發送專屬優惠券或推薦熱門商品),平台可有效降低流失率,延長使用者生命週期,同時提升使用者留存與平台收益。 |
方案架構
FlinkCEP是Apache Flink中用於處理複雜事件模式的庫。FlinkCEP(Complex Event Processing)通過定義複雜事件模式,即時監控事件流,並在事件流中識別出符合模式的事件序列,最終輸出匹配結果。其方案架構可以概括如下:

Event Stream
事件流是CEP處理的輸入源,通常是一個連續的資料流,包含一系列按時間順序排列的事件。每個事件可以包含多個屬性,用於後續的模式比對。
Pattern and Rule Definitions
使用者定義事件模式(Pattern)和規則(Rule),這些模式描述了使用者感興趣的事件序列或組合。模式可以包括事件的順序、時間約束、條件過濾等。例如,定義“A事件後跟隨B事件,且兩者時間間隔不超過10秒”的模式。
CEP Engine Analysis
CEP引擎接收事件流,並根據定義的模式和規則進行分析。引擎會持續監控事件流,嘗試將輸入事件與定義的模式進行匹配。匹配過程中,引擎會考慮事件的時間順序、屬性條件以及時間視窗等約束。
CEP Matching Outputs
當事件流中的事件序列與定義的模式比對成功時,CEP引擎會產生匹配結果(Output)。這些結果可以是匹配到的事件序列、觸發規則的動作,或者其他使用者定義的輸出形式。匹配結果可以用於後續的處理,如警示、決策或儲存。
前提條件
已開通Realtime ComputeFlink版,詳情請參見開通Realtime ComputeFlink版。
已開通雲訊息佇列Kafka,詳情請參見部署訊息佇列Kafka執行個體。
已開通RDS MySQL,詳情請參見建立RDS MySQL執行個體。
Realtime ComputeFlink版、雲資料庫RDS MySQL、雲訊息佇列Kafka需要在同一VPC下。如果不在同一VPC,需要先打通跨VPC的網路或者使用公網的形式訪問,詳情請參見如何訪問跨VPC的其他服務?和如何訪問公網?。
通過RAM使用者或RAM角色等身份訪問時,需要具備操作許可權。
步驟一:準備工作
建立RDS MySQL執行個體並準備資料來源
建立RDS MySQL資料庫,詳情請參見建立資料庫。
為目標執行個體建立名稱為
ecommerce的資料庫。準備MySQL CDC資料來源。
在目標執行個體詳情頁面,單擊上方的登入資料庫。
在彈出的DMS頁面中,填寫建立的資料庫帳號名和密碼,然後單擊登入。
登入成功後,在左側雙擊
ecommerce資料庫,切換資料庫。在SQL Console地區編寫如下建表DDL以及插入的資料語句。
-- 建立規則表1 CREATE TABLE rds_demo1 ( `id` VARCHAR(64), `version` INT, `pattern` VARCHAR(4096), `function` VARCHAR(512) ); -- 建立規則表2 CREATE TABLE rds_demo2 ( `id` VARCHAR(64), `version` INT, `pattern` VARCHAR(4096), `function` VARCHAR(512) ); -- 建立規則表3 CREATE TABLE rds_demo3 ( `id` VARCHAR(64), `version` INT, `pattern` VARCHAR(4096), `function` VARCHAR(512) ); -- 建立規則表4 CREATE TABLE rds_demo4 ( `id` VARCHAR(64), `version` INT, `pattern` VARCHAR(4096), `function` VARCHAR(512) ); -- 建立規則表5 CREATE TABLE rds_demo5 ( `id` VARCHAR(64), `version` INT, `pattern` VARCHAR(4096), `function` VARCHAR(512) ); -- 建立源表 CREATE TABLE `click_stream1` ( id bigint not null primary key auto_increment, -- 自增主鍵 eventTime timestamp, eventType varchar(50), productId varchar(50), categoryId varchar(50), categoryCode varchar(80), brand varchar(50), price decimal(10, 2), userId varchar(50), userSession varchar(50) ); CREATE TABLE `click_stream2` ( id bigint not null primary key auto_increment, -- 自增主鍵 eventTime timestamp, eventType varchar(50), productId varchar(50), categoryId varchar(50), categoryCode varchar(80), brand varchar(50), price decimal(10, 2), userId varchar(50), userSession varchar(50) ); CREATE TABLE `click_stream3` ( id bigint not null primary key auto_increment, -- 自增主鍵 eventTime timestamp, eventType varchar(50), productId varchar(50), categoryId varchar(50), categoryCode varchar(80), brand varchar(50), price decimal(10, 2), userId varchar(50), userSession varchar(50) ); CREATE TABLE `click_stream4` ( id bigint not null primary key auto_increment, -- 自增主鍵 eventTime timestamp, eventType varchar(50), productId varchar(50), categoryId varchar(50), categoryCode varchar(80), brand varchar(50), price decimal(10, 2), userId varchar(50), userSession varchar(50) ); CREATE TABLE `click_stream5` ( id bigint not null primary key auto_increment, -- 自增主鍵 eventTime timestamp, eventType varchar(50), productId varchar(50), categoryId varchar(50), categoryCode varchar(80), brand varchar(50), price decimal(10, 2), userId varchar(50), userSession varchar(50) );單擊執行後,再單擊直接執行。
建立雲訊息佇列Kafka Topic和Group資源
參考建立資源建立以下Kafka資源:
Group:clickstream.consumer。
Topic:click_stream1、click_stream2、click_stream3、click_stream4和click_stream5。
建立Topic時,分區數建議設定為1,否則在某些情境下可能導致樣本資料無法匹配到結果。

步驟二:MySQL即時同步Kafka
將使用者點擊流事件從MySQL同步到Kafka中,可以有效降低多個任務對MySQL資料庫造成的壓力。
建立MySQL Catalog,詳情請參見建立MySQL Catalog。
本樣本Catalog命名為
mysql-catalog,預設資料庫為ecommerce。建立kafak Catalog,詳情請參見管理Kafka JSON Catalog。
本樣本Catalog命名為
kafka-catalog。在頁面,建立SQL流作業,並將如下代碼拷貝到SQL編輯器。
CREATE TEMPORARY TABLE `clickstream1` ( `key_id` BIGINT, `value_eventTime` BIGINT, `value_eventType` STRING, `value_productId` STRING, `value_categoryId` STRING, `value_categoryCode` STRING, `value_brand` STRING, `value_price` DECIMAL(10, 2), `value_userId` STRING, `value_userSession` STRING, -- 定義主鍵 PRIMARY KEY (`key_id`) NOT ENFORCED, ts AS TO_TIMESTAMP_LTZ(value_eventTime, 3), WATERMARK FOR ts AS ts - INTERVAL '2' SECOND --定義Watermark。 ) WITH ( 'connector'='upsert-kafka', 'topic' = 'click_stream1', 'properties.bootstrap.servers' = 'alikafka-pre-cn-w******02-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-3-vpc.alikafka.aliyuncs.com:9092', 'key.format' = 'json', 'value.format' = 'json', 'key.fields-prefix' = 'key_', 'value.fields-prefix' = 'value_', 'value.fields-include' = 'EXCEPT_KEY' ); CREATE TEMPORARY TABLE `clickstream2` ( `key_id` BIGINT, `value_eventTime` BIGINT, `value_eventType` STRING, `value_productId` STRING, `value_categoryId` STRING, `value_categoryCode` STRING, `value_brand` STRING, `value_price` DECIMAL(10, 2), `value_userId` STRING, `value_userSession` STRING, -- 定義主鍵 PRIMARY KEY (`key_id`) NOT ENFORCED, ts AS TO_TIMESTAMP_LTZ(value_eventTime, 3), WATERMARK FOR ts AS ts - INTERVAL '2' SECOND --定義Watermark。 ) WITH ( 'connector'='upsert-kafka', 'topic' = 'click_stream2', 'properties.bootstrap.servers' = 'alikafka-pre-cn-w******02-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-3-vpc.alikafka.aliyuncs.com:9092', 'key.format' = 'json', 'value.format' = 'json', 'key.fields-prefix' = 'key_', 'value.fields-prefix' = 'value_', 'value.fields-include' = 'EXCEPT_KEY' ); CREATE TEMPORARY TABLE `clickstream3` ( `key_id` BIGINT, `value_eventTime` BIGINT, `value_eventType` STRING, `value_productId` STRING, `value_categoryId` STRING, `value_categoryCode` STRING, `value_brand` STRING, `value_price` DECIMAL(10, 2), `value_userId` STRING, `value_userSession` STRING, -- 定義主鍵 PRIMARY KEY (`key_id`) NOT ENFORCED, ts AS TO_TIMESTAMP_LTZ(value_eventTime, 3), WATERMARK FOR ts AS ts - INTERVAL '2' SECOND --定義Watermark。 ) WITH ( 'connector'='upsert-kafka', 'topic' = 'click_stream3', 'properties.bootstrap.servers' = 'alikafka-pre-cn-w******02-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-3-vpc.alikafka.aliyuncs.com:9092', 'key.format' = 'json', 'value.format' = 'json', 'key.fields-prefix' = 'key_', 'value.fields-prefix' = 'value_', 'value.fields-include' = 'EXCEPT_KEY' ); CREATE TEMPORARY TABLE `clickstream4` ( `key_id` BIGINT, `value_eventTime` BIGINT, `value_eventType` STRING, `value_productId` STRING, `value_categoryId` STRING, `value_categoryCode` STRING, `value_brand` STRING, `value_price` DECIMAL(10, 2), `value_userId` STRING, `value_userSession` STRING, -- 定義主鍵 PRIMARY KEY (`key_id`) NOT ENFORCED, ts AS TO_TIMESTAMP_LTZ(value_eventTime, 3), WATERMARK FOR ts AS ts - INTERVAL '2' SECOND --定義Watermark。 ) WITH ( 'connector'='upsert-kafka', 'topic' = 'click_stream4', 'properties.bootstrap.servers' = 'alikafka-pre-cn-w******02-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-3-vpc.alikafka.aliyuncs.com:9092', 'key.format' = 'json', 'value.format' = 'json', 'key.fields-prefix' = 'key_', 'value.fields-prefix' = 'value_', 'value.fields-include' = 'EXCEPT_KEY' ); CREATE TEMPORARY TABLE `clickstream5` ( `key_id` BIGINT, `value_eventTime` BIGINT, `value_eventType` STRING, `value_productId` STRING, `value_categoryId` STRING, `value_categoryCode` STRING, `value_brand` STRING, `value_price` DECIMAL(10, 2), `value_userId` STRING, `value_userSession` STRING, -- 定義主鍵 PRIMARY KEY (`key_id`) NOT ENFORCED, ts AS TO_TIMESTAMP_LTZ(value_eventTime, 3), WATERMARK FOR ts AS ts - INTERVAL '2' SECOND --定義Watermark。 ) WITH ( 'connector'='upsert-kafka', 'topic' = 'click_stream5', 'properties.bootstrap.servers' = 'alikafka-pre-cn-w******02-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-3-vpc.alikafka.aliyuncs.com:9092', 'key.format' = 'json', 'value.format' = 'json', 'key.fields-prefix' = 'key_', 'value.fields-prefix' = 'value_', 'value.fields-include' = 'EXCEPT_KEY' ); BEGIN STATEMENT SET; INSERT INTO `clickstream1` SELECT id, UNIX_TIMESTAMP(eventTime) * 1000 as eventTime, eventType, productId, categoryId, categoryCode, brand, price, `userId`, userSession FROM `mysql-catalog`.`ecommerce`.`click_stream1`; INSERT INTO `clickstream2` SELECT id, UNIX_TIMESTAMP(eventTime) * 1000 as eventTime, eventType, productId, categoryId, categoryCode, brand, price, `userId`, userSession FROM `mysql-catalog`.`ecommerce`.`click_stream2`; INSERT INTO `clickstream3` SELECT id, UNIX_TIMESTAMP(eventTime) * 1000 as eventTime, eventType, productId, categoryId, categoryCode, brand, price, `userId`, userSession FROM `mysql-catalog`.`ecommerce`.`click_stream3`; INSERT INTO `clickstream4` SELECT id, UNIX_TIMESTAMP(eventTime) * 1000 as eventTime, eventType, productId, categoryId, categoryCode, brand, price, `userId`, userSession FROM `mysql-catalog`.`ecommerce`.`click_stream4`; INSERT INTO `clickstream5` SELECT id, UNIX_TIMESTAMP(eventTime) * 1000 as eventTime, eventType, productId, categoryId, categoryCode, brand, price, `userId`, userSession FROM `mysql-catalog`.`ecommerce`.`click_stream5`; END; --寫入多個Sink時,必填。單擊右上方的部署,進行作業部署。
單擊左側導覽列的,單擊目標作業操作列的啟動,選擇無狀態啟動後單擊啟動。
步驟三:開發、部署與啟動CEP作業
本文部署了cep-demo-1.0-SNAPSHOT-jar-with-dependencies.jar作業,該作業從Kafka中消費使用者點擊流事件,經過處理產生預警資訊列印到Realtime Compute開發控制台。您可以根據實際業務架構調整代碼,選擇合適的下遊連接器以適配不同的資料輸出情境。更多支援的連接器詳情請參見支援的連接器。
1、代碼開發
本步驟僅為您展示核心代碼及其功能說明。
2、部署作業
在頁面,單擊,分別部署5個流作業。

參數配置說明:
參數 | 說明 | 樣本 |
部署模式 | 流處理 | 流模式 |
部署名稱 | 填寫對應的JAR作業名稱。 |
|
引擎版本 | 當前作業使用的Flink引擎版本。 本文代碼SDK使用JDK11,需要選擇帶有 | vvr-8.0.11-jdk11-flink-1.17 |
JAR URI | 手動單擊右側 | oss://xxx/artifacts/namespaces/xxx/cep-demo-1.0-SNAPSHOT-jar-with-dependencies.jar |
Entry Point Class | 程式的入口類。 | com.alibaba.ververica.cep.demo.CepDemo |
Entry Point Main Arguments | 您可以在此處傳入參數,在主方法中調用該參數。 本文需配置如下參數:
|
|
部署詳情請參見部署JAR作業。
3、啟動作業
在作業營運頁面,單擊目標作業操作列的啟動,選擇無狀態啟動後單擊啟動。依次啟動名稱為EcommerceCEPRunner1、EcommerceCEPRunner2、EcommerceCEPRunner3、EcommerceCEPRunner4和EcommerceCEPRunner5共5個情境的作業。
啟動配置的具體詳情,請參見作業啟動。
表徵圖上傳,選擇








