全部產品
Search
文件中心

Realtime Compute for Apache Flink:動態Flink CEP電商即時預警系統

更新時間:Sep 19, 2025

FlinkCEP(Complex Event Processing)用於動態處理複雜事件流,能夠即時檢測特定事件模式並觸發預警。在電商營銷中,FlinkCEP可用於即時監控使用者行為、交易資料等,識別異常或關鍵事件,及時發出預警。

背景資訊

隨著電商行業的快速發展,使用者行為資料和交易資料的規模呈指數級增長。傳統的批處理方式已難以滿足對異常行為、系統風險和使用者流失的及時識別與響應。相比之下,利用動態複雜事件處理(CEP)引擎對多階段使用者行為進行建模分析,能夠自動識別複雜的事件模式,並在風險發生的初期觸發預警,這是動態CEP在即時業務中的核心優勢所在。其具備以下三大關鍵特點:

  • 即時性強:實現毫秒級響應,支援“事中預警”,而非事後分析,助力快速決策。

  • 規則靈活可配置:支援動態更新規則策略,無需重啟服務即可快速適應業務變化。

  • 複雜事件識別能力強:支援多事件序列、時間視窗、條件組合等進階邏輯匹配,精準捕捉複雜業務情境。

在電商行業,動態CEP的典型應用情境包括但不限於以下幾個方面:

情境

說明

交叉銷售與追加銷售機會

使用者在瀏覽商品時,常表現出跨品類興趣,例如先看手機,再查看耳機或充電寶。這種行為蘊含交叉銷售和追加銷售機會。通過精準推薦互補商品(如手機殼、耳機)或提供組合優惠(如“手機+耳機套餐立減”),平台不僅能提升附加商品購買率、提高客單價,還能最佳化使用者體驗,增強使用者粘性,從而推動業務增長。

高價值購物車挽回

使用者將高價值商品加入購物車後,可能因價格敏感或決策猶豫未完成購買,造成潛在銷售損失。通過即時識別購物車放棄行為並觸發幹預(如限時折扣、庫存預警或免運費優惠),平台可有效減少高價值商品的流失,提升訂單轉化率,挽回潛在收益,實現使用者價值與平台收益的雙贏。

高意向使用者識別

使用者短時間內多次瀏覽同一商品,表明其購買意向較高。通過識別該行為並觸發個人化營銷(如專屬優惠券或庫存提醒),平台可加速使用者決策,提高轉化率,同時最佳化使用者體驗,推動銷售增長。

價格敏感使用者營運

價格敏感使用者常反覆瀏覽某商品,僅在降價時加入購物車。通過分析該行為,平台可在價格變動時推播通知或定向優惠(如“您關注的商品已降價!”),提升轉化率,同時最佳化使用者營運效率。

流失風險預警

使用者頻繁瀏覽商品卻長期未下單,可能存在流失風險。通過識別此類行為並採取挽回措施(如發送專屬優惠券或推薦熱門商品),平台可有效降低流失率,延長使用者生命週期,同時提升使用者留存與平台收益。

方案架構

FlinkCEP是Apache Flink中用於處理複雜事件模式的庫。FlinkCEP(Complex Event Processing)通過定義複雜事件模式,即時監控事件流,並在事件流中識別出符合模式的事件序列,最終輸出匹配結果。其方案架構可以概括如下:

2

  1. Event Stream

    事件流是CEP處理的輸入源,通常是一個連續的資料流,包含一系列按時間順序排列的事件。每個事件可以包含多個屬性,用於後續的模式比對。

  2. Pattern and Rule Definitions

    使用者定義事件模式(Pattern)和規則(Rule),這些模式描述了使用者感興趣的事件序列或組合。模式可以包括事件的順序、時間約束、條件過濾等。例如,定義“A事件後跟隨B事件,且兩者時間間隔不超過10秒”的模式。

  3. CEP Engine Analysis

    CEP引擎接收事件流,並根據定義的模式和規則進行分析。引擎會持續監控事件流,嘗試將輸入事件與定義的模式進行匹配。匹配過程中,引擎會考慮事件的時間順序、屬性條件以及時間視窗等約束。

  4. CEP Matching Outputs

    當事件流中的事件序列與定義的模式比對成功時,CEP引擎會產生匹配結果(Output)。這些結果可以是匹配到的事件序列、觸發規則的動作,或者其他使用者定義的輸出形式。匹配結果可以用於後續的處理,如警示、決策或儲存。

前提條件

步驟一:準備工作

建立RDS MySQL執行個體並準備資料來源

  1. 建立RDS MySQL資料庫,詳情請參見建立資料庫

    為目標執行個體建立名稱為ecommerce的資料庫。

  2. 準備MySQL CDC資料來源。

    1. 在目標執行個體詳情頁面,單擊上方的登入資料庫

    2. 在彈出的DMS頁面中,填寫建立的資料庫帳號名和密碼,然後單擊登入

    3. 登入成功後,在左側雙擊ecommerce資料庫,切換資料庫。

    4. 在SQL Console地區編寫如下建表DDL以及插入的資料語句。

      -- 建立規則表1
      CREATE TABLE rds_demo1 (
        `id` VARCHAR(64),
        `version` INT,
        `pattern` VARCHAR(4096),
        `function` VARCHAR(512)
      );
      
      -- 建立規則表2
      CREATE TABLE rds_demo2 (
        `id` VARCHAR(64),
        `version` INT,
        `pattern` VARCHAR(4096),
        `function` VARCHAR(512)
      );
      
      -- 建立規則表3
      CREATE TABLE rds_demo3 (
        `id` VARCHAR(64),
        `version` INT,
        `pattern` VARCHAR(4096),
        `function` VARCHAR(512)
      );
      
      
      -- 建立規則表4
      CREATE TABLE rds_demo4 (
        `id` VARCHAR(64),
        `version` INT,
        `pattern` VARCHAR(4096),
        `function` VARCHAR(512)
      );
      
      -- 建立規則表5
      CREATE TABLE rds_demo5 (
        `id` VARCHAR(64),
        `version` INT,
        `pattern` VARCHAR(4096),
        `function` VARCHAR(512)
      );
      
      
      -- 建立源表
      CREATE TABLE `click_stream1` (
        id bigint not null primary key auto_increment,  -- 自增主鍵
        eventTime timestamp,  
        eventType varchar(50),
        productId varchar(50), 
        categoryId varchar(50),  
        categoryCode varchar(80),
        brand varchar(50),
        price decimal(10, 2),
        userId varchar(50), 
        userSession varchar(50)
      );
      
      CREATE TABLE `click_stream2` (
        id bigint not null primary key auto_increment,  -- 自增主鍵
        eventTime timestamp,  
        eventType varchar(50),
        productId varchar(50), 
        categoryId varchar(50),  
        categoryCode varchar(80),
        brand varchar(50),
        price decimal(10, 2),
        userId varchar(50), 
        userSession varchar(50)
      );
      
      CREATE TABLE `click_stream3` (
        id bigint not null primary key auto_increment,  -- 自增主鍵
        eventTime timestamp,  
        eventType varchar(50),
        productId varchar(50), 
        categoryId varchar(50),  
        categoryCode varchar(80),
        brand varchar(50),
        price decimal(10, 2),
        userId varchar(50), 
        userSession varchar(50)
      );
      
      CREATE TABLE `click_stream4` (
        id bigint not null primary key auto_increment,  -- 自增主鍵
        eventTime timestamp,  
        eventType varchar(50),
        productId varchar(50), 
        categoryId varchar(50),  
        categoryCode varchar(80),
        brand varchar(50),
        price decimal(10, 2),
        userId varchar(50), 
        userSession varchar(50)
      );
      
      
      CREATE TABLE `click_stream5` (
        id bigint not null primary key auto_increment,  -- 自增主鍵
        eventTime timestamp,  
        eventType varchar(50),
        productId varchar(50), 
        categoryId varchar(50),  
        categoryCode varchar(80),
        brand varchar(50),
        price decimal(10, 2),
        userId varchar(50), 
        userSession varchar(50)
      );
    5. 單擊執行後,再單擊直接執行

建立雲訊息佇列Kafka Topic和Group資源

參考建立資源建立以下Kafka資源:

  • Group:clickstream.consumer。

  • Topic:click_stream1、click_stream2、click_stream3、click_stream4和click_stream5。

    建立Topic時,分區數建議設定為1,否則在某些情境下可能導致樣本資料無法匹配到結果。

    image

步驟二:MySQL即時同步Kafka

將使用者點擊流事件從MySQL同步到Kafka中,可以有效降低多個任務對MySQL資料庫造成的壓力。

  1. 建立MySQL Catalog,詳情請參見建立MySQL Catalog

    本樣本Catalog命名為mysql-catalog,預設資料庫為ecommerce

  2. 建立kafak Catalog,詳情請參見管理Kafka JSON Catalog

    本樣本Catalog命名為kafka-catalog

  3. 資料開發 > ETL頁面,建立SQL流作業,並將如下代碼拷貝到SQL編輯器。

    CREATE TEMPORARY TABLE `clickstream1` (
      `key_id` BIGINT,
      `value_eventTime` BIGINT,  
      `value_eventType` STRING,
      `value_productId` STRING,
      `value_categoryId` STRING,
      `value_categoryCode` STRING,
      `value_brand` STRING,
      `value_price` DECIMAL(10, 2),
      `value_userId` STRING,
      `value_userSession` STRING,
      -- 定義主鍵
      PRIMARY KEY (`key_id`) NOT ENFORCED,
      ts AS TO_TIMESTAMP_LTZ(value_eventTime, 3),
      WATERMARK FOR ts AS ts - INTERVAL '2' SECOND  --定義Watermark。
    ) WITH (
      'connector'='upsert-kafka',
      'topic' = 'click_stream1',
      'properties.bootstrap.servers' = 'alikafka-pre-cn-w******02-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-3-vpc.alikafka.aliyuncs.com:9092',
      'key.format' = 'json',
      'value.format' = 'json',
      'key.fields-prefix' = 'key_',
      'value.fields-prefix' = 'value_',
      'value.fields-include' = 'EXCEPT_KEY'
    );
    
    
    CREATE TEMPORARY TABLE `clickstream2` (
      `key_id` BIGINT,
      `value_eventTime` BIGINT,  
      `value_eventType` STRING,
      `value_productId` STRING,
      `value_categoryId` STRING,
      `value_categoryCode` STRING,
      `value_brand` STRING,
      `value_price` DECIMAL(10, 2),
      `value_userId` STRING,
      `value_userSession` STRING,
      -- 定義主鍵
      PRIMARY KEY (`key_id`) NOT ENFORCED,
      ts AS TO_TIMESTAMP_LTZ(value_eventTime, 3),
      WATERMARK FOR ts AS ts - INTERVAL '2' SECOND  --定義Watermark。
    ) WITH (
      'connector'='upsert-kafka',
      'topic' = 'click_stream2',
      'properties.bootstrap.servers' = 'alikafka-pre-cn-w******02-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-3-vpc.alikafka.aliyuncs.com:9092',
      'key.format' = 'json',
      'value.format' = 'json',
      'key.fields-prefix' = 'key_',
      'value.fields-prefix' = 'value_',
      'value.fields-include' = 'EXCEPT_KEY'
    );
    
    
    
    CREATE TEMPORARY TABLE `clickstream3` (
      `key_id` BIGINT,
      `value_eventTime` BIGINT,  
      `value_eventType` STRING,
      `value_productId` STRING,
      `value_categoryId` STRING,
      `value_categoryCode` STRING,
      `value_brand` STRING,
      `value_price` DECIMAL(10, 2),
      `value_userId` STRING,
      `value_userSession` STRING,
      -- 定義主鍵
      PRIMARY KEY (`key_id`) NOT ENFORCED,
      ts AS TO_TIMESTAMP_LTZ(value_eventTime, 3),
      WATERMARK FOR ts AS ts - INTERVAL '2' SECOND  --定義Watermark。
    ) WITH (
      'connector'='upsert-kafka',
      'topic' = 'click_stream3',
      'properties.bootstrap.servers' = 'alikafka-pre-cn-w******02-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-3-vpc.alikafka.aliyuncs.com:9092',
      'key.format' = 'json',
      'value.format' = 'json',
      'key.fields-prefix' = 'key_',
      'value.fields-prefix' = 'value_',
      'value.fields-include' = 'EXCEPT_KEY'
    );
    
    
    
    CREATE TEMPORARY TABLE `clickstream4` (
      `key_id` BIGINT,
      `value_eventTime` BIGINT,  
      `value_eventType` STRING,
      `value_productId` STRING,
      `value_categoryId` STRING,
      `value_categoryCode` STRING,
      `value_brand` STRING,
      `value_price` DECIMAL(10, 2),
      `value_userId` STRING,
      `value_userSession` STRING,
      -- 定義主鍵
      PRIMARY KEY (`key_id`) NOT ENFORCED,
      ts AS TO_TIMESTAMP_LTZ(value_eventTime, 3),
      WATERMARK FOR ts AS ts - INTERVAL '2' SECOND  --定義Watermark。
    ) WITH (
      'connector'='upsert-kafka',
      'topic' = 'click_stream4',
      'properties.bootstrap.servers' = 'alikafka-pre-cn-w******02-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-3-vpc.alikafka.aliyuncs.com:9092',
      'key.format' = 'json',
      'value.format' = 'json',
      'key.fields-prefix' = 'key_',
      'value.fields-prefix' = 'value_',
      'value.fields-include' = 'EXCEPT_KEY'
    );
    
    
    CREATE TEMPORARY TABLE `clickstream5` (
      `key_id` BIGINT,
      `value_eventTime` BIGINT,  
      `value_eventType` STRING,
      `value_productId` STRING,
      `value_categoryId` STRING,
      `value_categoryCode` STRING,
      `value_brand` STRING,
      `value_price` DECIMAL(10, 2),
      `value_userId` STRING,
      `value_userSession` STRING,
      -- 定義主鍵
      PRIMARY KEY (`key_id`) NOT ENFORCED,
      ts AS TO_TIMESTAMP_LTZ(value_eventTime, 3),
      WATERMARK FOR ts AS ts - INTERVAL '2' SECOND  --定義Watermark。
    ) WITH (
      'connector'='upsert-kafka',
      'topic' = 'click_stream5',
      'properties.bootstrap.servers' = 'alikafka-pre-cn-w******02-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-3-vpc.alikafka.aliyuncs.com:9092',
      'key.format' = 'json',
      'value.format' = 'json',
      'key.fields-prefix' = 'key_',
      'value.fields-prefix' = 'value_',
      'value.fields-include' = 'EXCEPT_KEY'
    );
    
    BEGIN STATEMENT SET; 
    INSERT INTO `clickstream1`
    SELECT
      id,
      UNIX_TIMESTAMP(eventTime) * 1000 as eventTime,
      eventType,
      productId,
      categoryId,
      categoryCode,
      brand,
      price,
      `userId`,
      userSession
    FROM `mysql-catalog`.`ecommerce`.`click_stream1`;
    
    
    INSERT INTO `clickstream2`
    SELECT
      id,
      UNIX_TIMESTAMP(eventTime) * 1000 as eventTime,
      eventType,
      productId,
      categoryId,
      categoryCode,
      brand,
      price,
      `userId`,
      userSession
    FROM `mysql-catalog`.`ecommerce`.`click_stream2`;
    
    
    INSERT INTO `clickstream3`
    SELECT
      id,
      UNIX_TIMESTAMP(eventTime) * 1000 as eventTime,
      eventType,
      productId,
      categoryId,
      categoryCode,
      brand,
      price,
      `userId`,
      userSession
    FROM `mysql-catalog`.`ecommerce`.`click_stream3`;
    
    
    INSERT INTO `clickstream4`
    SELECT
      id,
      UNIX_TIMESTAMP(eventTime) * 1000 as eventTime,
      eventType,
      productId,
      categoryId,
      categoryCode,
      brand,
      price,
      `userId`,
      userSession
    FROM `mysql-catalog`.`ecommerce`.`click_stream4`;
    
    
    INSERT INTO `clickstream5`
    SELECT
      id,
      UNIX_TIMESTAMP(eventTime) * 1000 as eventTime,
      eventType,
      productId,
      categoryId,
      categoryCode,
      brand,
      price,
      `userId`,
      userSession
    FROM `mysql-catalog`.`ecommerce`.`click_stream5`;
    END;      --寫入多個Sink時,必填。
  4. 單擊右上方的部署,進行作業部署。

  5. 單擊左側導覽列的營運中心 > 作業營運,單擊目標作業操作列的啟動,選擇無狀態啟動後單擊啟動

步驟三:開發、部署與啟動CEP作業

本文部署了cep-demo-1.0-SNAPSHOT-jar-with-dependencies.jar作業,該作業從Kafka中消費使用者點擊流事件,經過處理產生預警資訊列印到Realtime Compute開發控制台。您可以根據實際業務架構調整代碼,選擇合適的下遊連接器以適配不同的資料輸出情境。更多支援的連接器詳情請參見支援的連接器

1、代碼開發

本步驟僅為您展示核心代碼及其功能說明。

主類

public class CepDemo {

    public static void checkArg(String argName, MultipleParameterTool params) {
        if (!params.has(argName)) {
            throw new IllegalArgumentException(argName + " must be set!");
        }
    }

  //解析規則表
    private static Match_results parseOutput(String output) {
        String rule = "\\(id, version\\): \\((\\d+), (\\d+)\\).*?Event\\((\\d+), (\\w+), (\\d+), (\\d+)";
        Pattern pattern = Pattern.compile(rule);
        Matcher matcher = pattern.matcher(output);
        if (matcher.find()) {
            return new Match_results(Integer.parseInt(matcher.group(1)), Integer.parseInt(matcher.group(2)), Integer.parseInt(matcher.group(3)), matcher.group(4), Integer.parseInt(matcher.group(6)));
        }
        return null;
    }

    public static void main(String[] args) throws Exception {
        // Process args
        final MultipleParameterTool params = MultipleParameterTool.fromArgs(args);

        //kafka的Brokers連結
        checkArg(KAFKA_BROKERS_ARG, params);
        //輸入topic
        checkArg(INPUT_TOPIC_ARG, params);
        //group
        checkArg(INPUT_TOPIC_GROUP_ARG, params);

        //mysql的jdbcurl
        checkArg(JDBC_URL_ARG, params);
        //mysql的表名
        checkArg(TABLE_NAME_ARG, params);
        //輪詢資料庫的時間間隔
        checkArg(JDBC_INTERVAL_MILLIS_ARG, params);
        //是否使用事件時間處理(true/false)
        checkArg(USING_EVENT_TIME, params);

        // Set up the streaming execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Build Kafka source with new Source API based on FLIP-27   Kafka Source設定
        KafkaSource<Event> kafkaSource =
                KafkaSource.<Event>builder()
                        .setBootstrapServers(params.get(KAFKA_BROKERS_ARG))
                        .setTopics(params.get(INPUT_TOPIC_ARG))
                        .setStartingOffsets(OffsetsInitializer.latest())
                        .setGroupId(params.get(INPUT_TOPIC_GROUP_ARG))
                        .setDeserializer(new EventDeSerializationSchema())
                        .build();



        // DataStream Source   Watermark策略
        DataStreamSource<Event> source =
                env.fromSource(
                        kafkaSource,
                        WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                                .withTimestampAssigner((event, ts) -> event.getEventTime()),
                        "Kafka Source");


        //根據UserId對流進行分組,以便後續的模式比對
        KeyedStream<Event, String> keyedStream =
                source.assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Event>forGenerator(ctx -> new EventBoundedOutOfOrdernessWatermarks(Duration.ofSeconds(5)))
                ).keyBy(new KeySelector<Event, String>() {
                    @Override
                    public String getKey(Event value) throws Exception {
                        return value.getUserId(); // 只用 UserId 作為 key
                    }
                });

        // Dynamic CEP patterns   動態CEP模式比對,使用動態載入的模式處理器工廠類 JDBCPeriodicPatternProcessorDiscovererFactory 來擷取模式,並執行模式比對(讀取mysql資料庫中的規則表)
        SingleOutputStreamOperator<String> output =
                CEP.dynamicPatterns(
                        keyedStream,//來源資料流
                        new JDBCPeriodicPatternProcessorDiscovererFactory<>(
                                params.get(JDBC_URL_ARG),
                                JDBC_DRIVE,
                                params.get(TABLE_NAME_ARG),
                                null,
                                Long.parseLong(params.get(JDBC_INTERVAL_MILLIS_ARG))),
                        Boolean.parseBoolean(params.get(USING_EVENT_TIME)) ? TimeBehaviour.EventTime : TimeBehaviour.ProcessingTime,
                        TypeInformation.of(new TypeHint<String>() {})
                );

        //輸出到用戶端
        output.print();
  
        env.execute("CEP Demo");
    }
}

情境一:檢測使用者在同一會話中5分鐘內瀏覽不同類別商品的行為

資料順序:模式開始於一個事件類型為view的事件,接下來是一個事件類型同樣為view的事件。

public class FirstViewCondition extends SimpleCondition<ClickEvent> {
    @Override
    public boolean filter(ClickEvent event) {
        return event.getEventType().equals("view");
    }
}

情境二:檢測使用者將高價值商品加入購物車後,10分鐘內未發生購買事件

資料順序:模式開始於一個事件類型為cart(加入購物車)且價格大於200的事件,接下來是一個事件類型為purchase(購買事件)。

public class CartAddCondition extends SimpleCondition<ClickEvent> {

    @Override
    public boolean filter(ClickEvent event) {
        return event.getEventType().equals("cart") && event.getPrice() > 200;
    }
}
public class PurchaseCondition extends SimpleCondition<ClickEvent> {

    @Override
    public boolean filter(ClickEvent event) {
        return event.getEventType().equals("purchase");
    }
}

情境三:檢測使用者15分鐘內多次瀏覽同一商品的行為

資料順序:模式開始於一個事件類型為view的事件,接下來是一個事件類型同樣為view的事件,並且這個事件重複3次。

-- 條件類同測試1
public class FirstViewCondition extends SimpleCondition<ClickEvent> {
    @Override
    public boolean filter(ClickEvent event) {
        return event.getEventType().equals("view");
    }
}

情境四:檢測使用者瀏覽商品後,僅在價格下降時加入購物車的行為

資料順序:模式開始於一個事件類型為view的事件,接下來是一個事件類型同樣為view的Event,並且該產品的價格低於初始產品的價格,最後是一個事件類型為cart的事件。

-- 條件類同測試1
public class FirstViewCondition extends SimpleCondition<ClickEvent> {
    @Override
    public boolean filter(ClickEvent event) {
        return event.getEventType().equals("view");
    }
}
public class InitialCondition extends IterativeCondition<ClickEvent> {
    @Override
    public boolean filter(ClickEvent event, Context<ClickEvent> ctx) throws Exception {
        ClickEvent initialView = ctx.getEventsForPattern("initial_view").iterator().next();
        return event.getEventType().equals("view") && event.getProductId().equals(initialView.getProductId()) && event.getPrice() < initialView.getPrice();
    }

}
public class CartCondition extends SimpleCondition<ClickEvent> {
    @Override
    public boolean filter(ClickEvent event) {
        return event.getEventType().equals("cart");
    }
}

情境五:檢測使用者一周內多次瀏覽商品但未下單的行為

資料順序:模式開始於一個事件類型為view的事件,在接下來的時間視窗內不能有purchase事件。

-- 條件類同測試1
public class FirstViewCondition extends SimpleCondition<ClickEvent> {
    @Override
    public boolean filter(ClickEvent event) {
        return event.getEventType().equals("view");
    }
}
-- 條件類同測試2
public class PurchaseCondition extends SimpleCondition<ClickEvent> {

    @Override
    public boolean filter(ClickEvent event) {
        return event.getEventType().equals("purchase");
    }
}

2、部署作業

營運中心 > 作業營運頁面,單擊部署作業 > JAR作業,分別部署5個流作業。

image

參數配置說明:

參數

說明

樣本

部署模式

流處理

流模式

部署名稱

填寫對應的JAR作業名稱。

  • 情境一作業名稱:EcommerceCEPRunner1

  • 情境二作業名稱:EcommerceCEPRunner2

  • 情境三作業名稱:EcommerceCEPRunner3

  • 情境四作業名稱:EcommerceCEPRunner4

  • 情境五作業名稱:EcommerceCEPRunner5

引擎版本

當前作業使用的Flink引擎版本。

本文代碼SDK使用JDK11,需要選擇帶有jdk11的版本,VVR推薦使用最新版本。

vvr-8.0.11-jdk11-flink-1.17

JAR URI

手動單擊右側上傳表徵圖上傳,選擇cep-demo-1.0-SNAPSHOT-jar-with-dependencies.jar

oss://xxx/artifacts/namespaces/xxx/cep-demo-1.0-SNAPSHOT-jar-with-dependencies.jar

Entry Point Class

程式的入口類。

com.alibaba.ververica.cep.demo.CepDemo

Entry Point Main Arguments

您可以在此處傳入參數,在主方法中調用該參數。

本文需配置如下參數:

  • bootstrap.servers:Kafka叢集地址。

  • clickstream_topic:消費的點擊流Kafka Topic。

  • group:消費者組ID。

  • jdbcUrl:MySQL服務地址。

  • database:資料庫名稱。

  • user:使用者名稱。

  • password:使用者密碼。

  • tableName :MySQL中規則表名稱。

  • 情境一配置資訊:--kafkaBrokers alikafka-pre-cn-******02-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-******02-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-******02-3-vpc.alikafka.aliyuncs.com:9092 --inputTopic click_stream1 --inputTopicGroup clickstream.consumer --jdbcUrl jdbc:mysql://rm-2********7xr86.mysql.rds.aliyuncs.com:3306/ecommerce?user=flink_rds&password=flink123NN@1 --tableName rds_demo1 --jdbcIntervalMs 3000 --usingEventTime false

  • 情境二配置資訊:--kafkaBrokers alikafka-pre-cn-******02-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-******02-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-******02-3-vpc.alikafka.aliyuncs.com:9092 --inputTopic click_stream2 --inputTopicGroup clickstream.consumer --jdbcUrl jdbc:mysql://rm-2********7xr86.mysql.rds.aliyuncs.com:3306/ecommerce?user=flink_rds&password=flink123NN@1 --tableName rds_demo2 --jdbcIntervalMs 3000 --usingEventTime false

  • 情境三配置資訊:--kafkaBrokers alikafka-pre-cn-******02-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-******02-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-******02-3-vpc.alikafka.aliyuncs.com:9092 --inputTopic click_stream3 --inputTopicGroup clickstream.consumer --jdbcUrl jdbc:mysql://rm-2********7xr86.mysql.rds.aliyuncs.com:3306/ecommerce?user=flink_rds&password=flink123NN@1 --tableName rds_demo3 --jdbcIntervalMs 3000 --usingEventTime false

  • 情境四配置資訊:--kafkaBrokers alikafka-pre-cn-******02-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-******02-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-******02-3-vpc.alikafka.aliyuncs.com:9092 --inputTopic click_stream4 --inputTopicGroup clickstream.consumer --jdbcUrl jdbc:mysql://rm-2********7xr86.mysql.rds.aliyuncs.com:3306/ecommerce?user=flink_rds&password=flink123NN@1 --tableName rds_demo4 --jdbcIntervalMs 3000 --usingEventTime false

  • 情境五配置資訊:--kafkaBrokers alikafka-pre-cn-******02-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-******02-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-******02-3-vpc.alikafka.aliyuncs.com:9092 --inputTopic click_stream5 --inputTopicGroup clickstream.consumer --jdbcUrl jdbc:mysql://rm-2********7xr86.mysql.rds.aliyuncs.com:3306/ecommerce?user=flink_rds&password=flink123NN@1 --tableName rds_demo5 --jdbcIntervalMs 3000 --usingEventTime false

部署詳情請參見部署JAR作業

3、啟動作業

作業營運頁面,單擊目標作業操作列的啟動,選擇無狀態啟動後單擊啟動。依次啟動名稱為EcommerceCEPRunner1、EcommerceCEPRunner2、EcommerceCEPRunner3、EcommerceCEPRunner4和EcommerceCEPRunner5共5個情境的作業。

啟動配置的具體詳情,請參見作業啟動

步驟四:預警查詢

情境一:檢測使用者在同一會話中5分鐘內瀏覽不同類別商品的行為

  1. 在MySQL中插入規則資料。

    -- 交叉銷售與追加銷售機會情境測試資料
    INSERT INTO rds_demo1 (
     `id`,
     `version`,
     `pattern`,
     `function`
    ) values(
      '1',
       1,
      '{"name":"second_view","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":null,"nodes":[{"name":"second_view","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.FirstViewCondition","type":"CLASS"},"type":"ATOMIC"},{"name":"first_view","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.FirstViewCondition","type":"CLASS"},"type":"ATOMIC"}],"edges":[{"source":"first_view","target":"second_view","type":"STRICT"}],"window":{"type":"FIRST_AND_LAST","time":{"unit":"MINUTES","size":5}},"afterMatchStrategy":{"type":"NO_SKIP","patternName":null},"type":"COMPOSITE","version":1}
    ',
      'com.alibaba.ververica.cep.demo.dynamic.DemoPatternProcessFunction')
    ;
  2. 在MySQL中插入使用者行為測試資料。

    -- 交叉銷售與追加銷售機會情境測試資料
    INSERT INTO `click_stream1` 
    (eventTime, eventType, productId, categoryId, categoryCode, brand, price, userId, userSession) 
    VALUES
    ('2020-01-01 00:01:00.0', 'view', 1005073, 2232732093077520756, 'construction.tools.light', 'samsung', 1130.02, 519698804, '69b5d72f-fd6e-4fed-aa23-1286b2ca89a0'),
    ('2020-01-01 00:01:03.0', 'view', 1005205, 2232732093077520756, 'apparel.shoes', 'xiaomi', 29.95, 519698804, '69b5d72f-fd6e-4fed-aa23-1286b2ca89a0'),
    ('2020-01-01 00:01:07.0', 'view', 1005205, 2232732093077520756, 'apparel.shoes.step_ins', 'intel', 167.20, 519698804, '69b5d72f-fd6e-4fed-aa23-1286b2ca89a0'),
    ('2020-01-01 00:01:08.0', 'view', 1005205, 2232732093077520756, 'appliances.personal.massager', 'samsung', 576.33, 519698804, '69b5d72f-fd6e-4fed-aa23-1286b2ca89a0');
  3. 在Realtime Compute開發控制台日誌中查看資料結果。

    • 在JobManager日誌中,通過JDBCPeriodicPatternProcessorDiscoverer關鍵詞搜尋,查看最新的規則。

      image

    • 在TaskManager中查看Stdout記錄檔,通過A match for Pattern of (id, version): (1, 1)關鍵詞搜尋,查看日誌中列印的匹配。

      image

情境二:檢測使用者將高價值商品加入購物車後,10分鐘內未發生購買事件

  1. 在MySQL中插入規則資料。

    -- 高價值購物車挽回情境測試資料
    INSERT INTO rds_demo2 (
     `id`,
     `version`,
     `pattern`,
     `function`
    ) values(
      '1',
       1,
      '{"name":"purchase","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":null,"nodes":[{"name":"purchase","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.PurchaseCondition","type":"CLASS"},"type":"ATOMIC"},{"name":"cart_add","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.CartAddCondition","type":"CLASS"},"type":"ATOMIC"}],"edges":[{"source":"cart_add","target":"purchase","type":"SKIP_TILL_NEXT"}],"window":{"type":"FIRST_AND_LAST","time":{"unit":"MINUTES","size":30}},"afterMatchStrategy":{"type":"NO_SKIP","patternName":null},"type":"COMPOSITE","version":1}
    ',
      'com.alibaba.ververica.cep.demo.dynamic.DemoPatternProcessFunction')
    ;
  2. 在MySQL中插入使用者行為測試資料。

    -- 高價值購物車挽回情境測試資料
    INSERT INTO `click_stream2` 
    (eventTime, eventType, productId, categoryId, categoryCode, brand, price, userId, userSession) 
    VALUES
    ('2020-01-01 01:01:01.0','cart',1002923,2053013555631882655,'electronics.smartphone','huawei',249.30,517014550,'b666b914-8abf-4ebe-b674-aa31a1d0f7ce'),
    ('2020-01-01 01:11:02.0','cart',1004227,2232732093077520756,'construction.tools.light','apple',892.94,590404850,'057050ba-adca-4c7f-99f3-e87d8f9bbded'),
    ('2020-01-01 01:11:03.0','purchase',1004227,2232732093077520756,'construction.tools.light','apple',892.94,590404850,'057050ba-adca-4c7f-99f3-e87d8f9bbded');
  3. 在Realtime Compute開發控制台日誌中查看資料結果。

    • 在JobManager日誌中,通過JDBCPeriodicPatternProcessorDiscoverer關鍵詞搜尋,查看最新的規則。

      image

    • 在TaskManager中查看Stdout記錄檔,通過A match for Pattern of (id, version): (1, 1)關鍵詞搜尋,查看日誌中列印的匹配。

      image

情境三:檢測使用者15分鐘內多次瀏覽同一商品的行為

  1. 在MySQL中插入規則資料。

    -- 高意向使用者識別情境測試資料
    INSERT INTO rds_demo3 (
     `id`,
     `version`,
     `pattern`,
     `function`
    ) values(
      '1',
       1,
      '{"name":"repeat_view","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["TIMES"],"times":{"from":3,"to":3,"windowTime":null},"untilCondition":null},"condition":null,"nodes":[{"name":"repeat_view","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["TIMES"],"times":{"from":3,"to":3,"windowTime":null},"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.FirstViewCondition","type":"CLASS"},"type":"ATOMIC"},{"name":"initial_view","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.FirstViewCondition","type":"CLASS"},"type":"ATOMIC"}],"edges":[{"source":"initial_view","target":"repeat_view","type":"SKIP_TILL_NEXT"}],"window":{"type":"FIRST_AND_LAST","time":{"unit":"MINUTES","size":15}},"afterMatchStrategy":{"type":"NO_SKIP","patternName":null},"type":"COMPOSITE","version":1}
    ',
      'com.alibaba.ververica.cep.demo.dynamic.DemoPatternProcessFunction')
    ;
  2. 在MySQL中插入使用者行為測試資料。

    -- 高意向使用者識別情境測試資料
    INSERT INTO `click_stream3` 
    (eventTime, eventType, productId, categoryId, categoryCode, brand, price, userId, userSession) 
    VALUES
    ('2020-01-01 02:01:01.0','view',21406322,2232732082063278200,'electronics.clocks','casio',81.03,578858757,'bdf051a8-1594-4630-b93d-2ba62b92d039'),
    ('2020-01-01 02:01:02.0','view',21406322,2232732082063278200,'electronics.clocks','casio',81.03,578858757,'bdf051a8-1594-4630-b93d-2ba62b92d039'),
    ('2020-01-01 02:01:03.0','view',21406322,2232732082063278200,'electronics.clocks','casio',81.03,578858757,'bdf051a8-1594-4630-b93d-2ba62b92d039'),
    ('2020-01-01 02:01:04.0','view',21406322,2232732082063278200,'electronics.clocks','casio',81.03,578858757,'bdf051a8-1594-4630-b93d-2ba62b92d039');
  3. 在Realtime Compute開發控制台日誌中查看資料結果。

    • 在JobManager日誌中,通過JDBCPeriodicPatternProcessorDiscoverer關鍵詞搜尋,查看最新的規則。

      image

    • 在TaskManager中查看Stdout記錄檔,通過A match for Pattern of (id, version): (1, 1)關鍵詞搜尋,查看日誌中列印的匹配。

      image

情境四:檢測使用者瀏覽商品後,僅在價格下降時加入購物車的行為

  1. 在MySQL中插入規則資料。

    -- 價格敏感使用者營運情境測試資料
    INSERT INTO rds_demo4 (
     `id`,
     `version`,
     `pattern`,
     `function`
    ) values(
      '1',
       1,
      '{"name":"cart_after_price_drop","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":null,"nodes":[{"name":"cart_after_price_drop","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.CartCondition","type":"CLASS"},"type":"ATOMIC"},{"name":"view_price_drop","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.InitialCondition","type":"CLASS"},"type":"ATOMIC"},{"name":"initial_view","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.FirstViewCondition","type":"CLASS"},"type":"ATOMIC"}],"edges":[{"source":"view_price_drop","target":"cart_after_price_drop","type":"STRICT"},{"source":"initial_view","target":"view_price_drop","type":"STRICT"}],"window":{"type":"FIRST_AND_LAST","time":{"unit":"MINUTES","size":10}},"afterMatchStrategy":{"type":"NO_SKIP","patternName":null},"type":"COMPOSITE","version":1}
    ',
      'com.alibaba.ververica.cep.demo.dynamic.DemoPatternProcessFunction')
    ;
  2. 在MySQL中插入使用者行為測試資料。

    -- 價格敏感使用者營運情境測試資料
    INSERT INTO `click_stream4` 
    (eventTime, eventType, productId, categoryId, categoryCode, brand, price, userId, userSession) 
    VALUES
    ('2020-01-01 03:01:01.0','view',15200496,2053013553484398879,'appliances.kitchen.toster','uragan',38.87,516616354,'f9233034-08e7-46fb-a1ae-175de0d0de7a'),
    ('2020-01-01 03:01:02.0','view',15200496,2053013553484398879,'appliances.kitchen.toster','uragan',30.87,516616354,'f9233034-08e7-46fb-a1ae-175de0d0de7a'),
    ('2020-01-01 03:01:03.0','cart',15200496,2053013553484398879,'appliances.kitchen.toster','uragan',30.87,516616354,'f9233034-08e7-46fb-a1ae-175de0d0de7a');
  3. 在Realtime Compute開發控制台日誌中查看資料結果。

    • 在JobManager日誌中,通過JDBCPeriodicPatternProcessorDiscoverer關鍵詞搜尋,查看最新的規則。

      image

    • 在TaskManager中查看Stdout記錄檔,通過A match for Pattern of (id, version): (1, 1)關鍵詞搜尋,查看日誌中列印的匹配。

      image

情境五:檢測使用者一周內多次瀏覽商品但未下單的行為

  1. 在MySQL中插入規則資料。

    -- 流失風險情境測試資料 
    INSERT INTO rds_demo5 (
     `id`,
     `version`,
     `pattern`,
     `function`
    ) values(
      '1',
       1,
      '{"name":"purchase","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":null,"nodes":[{"name":"purchase","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.PurchaseCondition","type":"CLASS"},"type":"ATOMIC"},{"name":"first_view","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["LOOPING"],"times":{"from":10,"to":10,"windowTime":null},"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.FirstViewCondition","type":"CLASS"},"type":"ATOMIC"}],"edges":[{"source":"first_view","target":"purchase","type":"NOT_NEXT"}],"window":{"type":"FIRST_AND_LAST","time":{"unit":"DAYS","size":7}},"afterMatchStrategy":{"type":"NO_SKIP","patternName":null},"type":"COMPOSITE","version":1}
    ',
      'com.alibaba.ververica.cep.demo.dynamic.DemoPatternProcessFunction')
    ;
  2. 在MySQL中插入使用者行為測試資料。

    -- 流失風險情境測試資料 
    INSERT INTO `click_stream5` 
    (eventTime, eventType, productId, categoryId, categoryCode, brand, price, userId, userSession) 
    VALUES
    ('2020-12-10 00:01:01.0', 'view', 28722181, 2053013555321504139, 'appliances.kitchen.meat_grinder', 'respect', 51.22, 563209914, '657edcb2-767d-458c-9598-714c1e474e09'),
    ('2020-12-10 00:02:02.0', 'view', 28722181, 2053013555321504139, 'appliances.kitchen.meat_grinder', 'respect', 51.22, 563209914, '657edcb2-767d-458c-9598-714c1e474e09'),
    ('2020-12-10 00:03:03.0', 'view', 28722181, 2053013555321504139, 'appliances.kitchen.meat_grinder', 'respect', 51.22, 563209914, '657edcb2-767d-458c-9598-714c1e474e09'),
    ('2020-12-10 00:04:03.0', 'view', 28722181, 2053013555321504139, 'appliances.kitchen.meat_grinder', 'respect', 51.22, 563209914, '657edcb2-767d-458c-9598-714c1e474e09'),
    ('2020-12-10 00:05:04.0', 'view', 28722181, 2053013555321504139, 'appliances.kitchen.meat_grinder', 'respect', 51.22, 563209914, '657edcb2-767d-458c-9598-714c1e474e09'),
    ('2020-12-10 00:06:05.0', 'view', 28722181, 2053013555321504139, 'appliances.kitchen.meat_grinder', 'respect', 51.22, 563209914, '657edcb2-767d-458c-9598-714c1e474e09'),
    ('2020-12-10 00:07:06.0', 'view', 28722181, 2053013555321504139, 'appliances.kitchen.meat_grinder', 'respect', 51.22, 563209914, '657edcb2-767d-458c-9598-714c1e474e09'),
    ('2020-12-10 00:08:07.0', 'view', 28722181, 2053013555321504139, 'appliances.kitchen.meat_grinder', 'respect', 51.22, 563209914, '657edcb2-767d-458c-9598-714c1e474e09'),
    ('2020-12-10 00:08:08.0', 'view', 28722181, 2053013555321504139, 'appliances.kitchen.meat_grinder', 'respect', 51.22, 563209914, '657edcb2-767d-458c-9598-714c1e474e09'),
    ('2020-12-10 00:08:08.0', 'view', 28722181, 2053013555321504139, 'appliances.kitchen.meat_grinder', 'respect', 51.22, 563209914, '657edcb2-767d-458c-9598-714c1e474e09'),
    ('2020-12-10 00:09:09.0', 'cart', 28722181, 2053013555321504139, 'appliances.kitchen.meat_grinder', 'respect', 51.22, 563209914, '657edcb2-767d-458c-9598-714c1e474e09');
  3. 在Realtime Compute開發控制台日誌中查看資料結果。

    • 在JobManager日誌中,通過JDBCPeriodicPatternProcessorDiscoverer關鍵詞搜尋,查看最新的規則。

      image

    • 在TaskManager中查看Stdout記錄檔,通過A match for Pattern of (id, version): (1, 1)關鍵詞搜尋,查看日誌中列印的匹配。

      image

相關文檔