All Products
Search
Document Center

Realtime Compute for Apache Flink:Dynamic Flink CEP e-commerce real-time alerting system

Last Updated:Sep 19, 2025

Flink Complex Event Processing (CEP) is a feature that dynamically processes complex event streams to detect specific event patterns in real time and trigger alerts. In e-commerce marketing, Flink CEP can monitor user behavior and transaction data in real time to identify abnormal or critical events and send timely alerts.

Background information

The rapid growth of the e-commerce industry has led to an exponential increase in the volume of user behavior and transaction data. Traditional batch processing methods can no longer meet the demand for timely identification of and response to abnormal behavior, system threats, and user churn. In contrast, a dynamic complex event processing (CEP) engine can model and analyze multi-stage user behavior. It automatically identifies complex event patterns and triggers alerts in the early stages of a threat. This is the core advantage of dynamic CEP for real-time business operations. It has the following three key features:

  • High real-time performance: It provides millisecond-level responses. This enables in-event alerting, rather than post-event analysis, to help you make faster decisions.

  • Flexible and configurable rules: It supports dynamic updates to rule policies. This lets you quickly adapt to business changes without restarting the service.

  • Powerful complex event recognition: It supports advanced logical matching, such as multi-event sequences, time windows, and combined conditions. This allows for the accurate capture of complex business scenarios.

In the e-commerce industry, common scenarios for dynamic CEP include but are not limited to the following:

Scenario

Description

Cross-selling and up-selling opportunities

When browsing products, users often show interest across different categories. For example, a user might view a mobile phone and then look at headphones or power banks. This behavior presents opportunities for cross-selling and up-selling. By accurately recommending complementary products, such as phone cases or headphones, or offering bundled deals, such as a "phone + headphones package discount", the platform can increase the purchase rate of additional items and raise the average order value. This also improves the user experience and enhances user loyalty, driving business growth.

High-value shopping cart recovery

A user might add a high-value item to their shopping cart but not complete the purchase due to price sensitivity or hesitation. This results in a potential loss of sales. By identifying abandoned shopping carts in real time and triggering interventions, such as limited-time discounts, low stock alerts, or free shipping offers, the platform can effectively reduce the loss of high-value items, increase order conversion rates, and recover potential revenue. This creates a win-win situation for both user value and platform revenue.

High-intent user identification

A user who browses the same product multiple times in a short period shows a high purchase intent. By identifying this behavior and triggering personalized marketing, such as exclusive coupons or stock reminders, the platform can speed up the user's decision-making process, increase conversion rates, and improve the user experience, which boosts sales.

Price-sensitive user operations

Price-sensitive users often browse a product repeatedly and only add it to their shopping cart when the price drops. By analyzing this behavior, the platform can send notifications or targeted offers when the price changes, such as "The product you are following is now on sale!". This increases conversion rates and improves the efficiency of user operations.

Churn risk alerts

A user who frequently browses products but does not place an order for a long time may be at risk of churning. By identifying this behavior and taking recovery measures, such as sending exclusive coupons or recommending popular products, the platform can effectively reduce the churn rate, extend the user lifecycle, and increase user retention and platform revenue.

Solution architecture

Flink CEP is an Apache Flink library for processing complex event patterns. With Flink CEP, you can define complex event patterns, monitor event streams in real time, and identify event sequences that match those patterns. The library then generates matching results. The solution architecture is as follows:

2

  1. Event Stream

    An event stream is the input source for CEP processing. It is typically a continuous data stream that contains a series of chronologically ordered events. Each event can have multiple properties that are used for pattern matching.

  2. Pattern and Rule Definitions

    You can define event patterns and rules that describe the event sequences or combinations you want to detect. Patterns can include the order of events, time constraints, and condition filters. For example, you can define a pattern where event A is followed by event B within 10 seconds.

  3. CEP Engine Analysis

    The CEP engine accepts the event stream and analyzes it based on the defined patterns and rules. The engine continuously monitors the event stream and attempts to match input events with the defined patterns. During the matching process, the engine considers constraints such as the time order of events, property conditions, and time windows.

  4. CEP Matching Outputs

    When an event sequence in the event stream successfully matches a defined pattern, the CEP engine generates an output. This output can be the matched event sequence, an action triggered by a rule, or another user-defined output format. The matching results can be used for subsequent processing, such as alerting, decision-making, or data storage.

Prerequisites

Step 1: Preparations

Create an ApsaraDB RDS for MySQL instance and prepare the data source

  1. Create an ApsaraDB RDS for MySQL database. For more information, see Create a database.

    For the destination instance, create a database named ecommerce.

  2. Prepare the MySQL Change Data Capture (CDC) data source.

    1. On the details page of the destination instance, click Log On To Database in the upper section of the page.

    2. In the DMS logon dialog box, enter the username and password for the database account that you created, and then click Log On.

    3. After you log on, double-click the ecommerce database on the left to switch to it.

    4. In the SQL Console, enter the following Data Definition Language (DDL) statements to create tables and insert data.

      -- Create rule table 1
      CREATE TABLE rds_demo1 (
        `id` VARCHAR(64),
        `version` INT,
        `pattern` VARCHAR(4096),
        `function` VARCHAR(512)
      );
      
      -- Create rule table 2
      CREATE TABLE rds_demo2 (
        `id` VARCHAR(64),
        `version` INT,
        `pattern` VARCHAR(4096),
        `function` VARCHAR(512)
      );
      
      -- Create rule table 3
      CREATE TABLE rds_demo3 (
        `id` VARCHAR(64),
        `version` INT,
        `pattern` VARCHAR(4096),
        `function` VARCHAR(512)
      );
      
      
      -- Create rule table 4
      CREATE TABLE rds_demo4 (
        `id` VARCHAR(64),
        `version` INT,
        `pattern` VARCHAR(4096),
        `function` VARCHAR(512)
      );
      
      -- Create rule table 5
      CREATE TABLE rds_demo5 (
        `id` VARCHAR(64),
        `version` INT,
        `pattern` VARCHAR(4096),
        `function` VARCHAR(512)
      );
      
      
      -- Create the source table
      CREATE TABLE `click_stream1` (
        id bigint not null primary key auto_increment,  -- Auto-increment primary key
        eventTime timestamp,  
        eventType varchar(50),
        productId varchar(50), 
        categoryId varchar(50),  
        categoryCode varchar(80),
        brand varchar(50),
        price decimal(10, 2),
        userId varchar(50), 
        userSession varchar(50)
      );
      
      CREATE TABLE `click_stream2` (
        id bigint not null primary key auto_increment,  -- Auto-increment primary key
        eventTime timestamp,  
        eventType varchar(50),
        productId varchar(50), 
        categoryId varchar(50),  
        categoryCode varchar(80),
        brand varchar(50),
        price decimal(10, 2),
        userId varchar(50), 
        userSession varchar(50)
      );
      
      CREATE TABLE `click_stream3` (
        id bigint not null primary key auto_increment,  -- Auto-increment primary key
        eventTime timestamp,  
        eventType varchar(50),
        productId varchar(50), 
        categoryId varchar(50),  
        categoryCode varchar(80),
        brand varchar(50),
        price decimal(10, 2),
        userId varchar(50), 
        userSession varchar(50)
      );
      
      CREATE TABLE `click_stream4` (
        id bigint not null primary key auto_increment,  -- Auto-increment primary key
        eventTime timestamp,  
        eventType varchar(50),
        productId varchar(50), 
        categoryId varchar(50),  
        categoryCode varchar(80),
        brand varchar(50),
        price decimal(10, 2),
        userId varchar(50), 
        userSession varchar(50)
      );
      
      
      CREATE TABLE `click_stream5` (
        id bigint not null primary key auto_increment,  -- Auto-increment primary key
        eventTime timestamp,  
        eventType varchar(50),
        productId varchar(50), 
        categoryId varchar(50),  
        categoryCode varchar(80),
        brand varchar(50),
        price decimal(10, 2),
        userId varchar(50), 
        userSession varchar(50)
      );
    5. Click Execute, and then click Execute Directly.

Create Kafka topic and group resources

Create the following Kafka resources. For more information, see Create resources.

  • Group: clickstream.consumer.

  • Topics: click_stream1, click_stream2, click_stream3, click_stream4, and click_stream5.

    When you create the topics, set the number of partitions to 1. If you do not, the sample data may not match the results in some scenarios.

    image

Step 2: Synchronize data from MySQL to Kafka in real time

Synchronizing user clickstream events from MySQL to Kafka reduces the load that multiple jobs place on the MySQL database.

  1. Create a MySQL catalog. For more information, see Create a MySQL catalog.

    In this example, the catalog is named mysql-catalog, and the default database is ecommerce.

  2. Create a Kafka catalog. For more information, see Manage Kafka JSON catalogs.

    In this example, the catalog is named kafka-catalog.

  3. On the Data Development > ETL page, create a SQL stream job and copy the following code into the SQL editor.

    CREATE TEMPORARY TABLE `clickstream1` (
      `key_id` BIGINT,
      `value_eventTime` BIGINT,  
      `value_eventType` STRING,
      `value_productId` STRING,
      `value_categoryId` STRING,
      `value_categoryCode` STRING,
      `value_brand` STRING,
      `value_price` DECIMAL(10, 2),
      `value_userId` STRING,
      `value_userSession` STRING,
      -- Define the primary key.
      PRIMARY KEY (`key_id`) NOT ENFORCED,
      ts AS TO_TIMESTAMP_LTZ(value_eventTime, 3),
      WATERMARK FOR ts AS ts - INTERVAL '2' SECOND  -- Define a watermark.
    ) WITH (
      'connector'='upsert-kafka',
      'topic' = 'click_stream1',
      'properties.bootstrap.servers' = 'alikafka-pre-cn-w******02-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-3-vpc.alikafka.aliyuncs.com:9092',
      'key.format' = 'json',
      'value.format' = 'json',
      'key.fields-prefix' = 'key_',
      'value.fields-prefix' = 'value_',
      'value.fields-include' = 'EXCEPT_KEY'
    );
    
    
    CREATE TEMPORARY TABLE `clickstream2` (
      `key_id` BIGINT,
      `value_eventTime` BIGINT,  
      `value_eventType` STRING,
      `value_productId` STRING,
      `value_categoryId` STRING,
      `value_categoryCode` STRING,
      `value_brand` STRING,
      `value_price` DECIMAL(10, 2),
      `value_userId` STRING,
      `value_userSession` STRING,
      -- Define the primary key.
      PRIMARY KEY (`key_id`) NOT ENFORCED,
      ts AS TO_TIMESTAMP_LTZ(value_eventTime, 3),
      WATERMARK FOR ts AS ts - INTERVAL '2' SECOND  -- Define a watermark.
    ) WITH (
      'connector'='upsert-kafka',
      'topic' = 'click_stream2',
      'properties.bootstrap.servers' = 'alikafka-pre-cn-w******02-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-3-vpc.alikafka.aliyuncs.com:9092',
      'key.format' = 'json',
      'value.format' = 'json',
      'key.fields-prefix' = 'key_',
      'value.fields-prefix' = 'value_',
      'value.fields-include' = 'EXCEPT_KEY'
    );
    
    
    
    CREATE TEMPORARY TABLE `clickstream3` (
      `key_id` BIGINT,
      `value_eventTime` BIGINT,  
      `value_eventType` STRING,
      `value_productId` STRING,
      `value_categoryId` STRING,
      `value_categoryCode` STRING,
      `value_brand` STRING,
      `value_price` DECIMAL(10, 2),
      `value_userId` STRING,
      `value_userSession` STRING,
      -- Define the primary key.
      PRIMARY KEY (`key_id`) NOT ENFORCED,
      ts AS TO_TIMESTAMP_LTZ(value_eventTime, 3),
      WATERMARK FOR ts AS ts - INTERVAL '2' SECOND  -- Define a watermark.
    ) WITH (
      'connector'='upsert-kafka',
      'topic' = 'click_stream3',
      'properties.bootstrap.servers' = 'alikafka-pre-cn-w******02-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-3-vpc.alikafka.aliyuncs.com:9092',
      'key.format' = 'json',
      'value.format' = 'json',
      'key.fields-prefix' = 'key_',
      'value.fields-prefix' = 'value_',
      'value.fields-include' = 'EXCEPT_KEY'
    );
    
    
    
    CREATE TEMPORARY TABLE `clickstream4` (
      `key_id` BIGINT,
      `value_eventTime` BIGINT,  
      `value_eventType` STRING,
      `value_productId` STRING,
      `value_categoryId` STRING,
      `value_categoryCode` STRING,
      `value_brand` STRING,
      `value_price` DECIMAL(10, 2),
      `value_userId` STRING,
      `value_userSession` STRING,
      -- Define the primary key.
      PRIMARY KEY (`key_id`) NOT ENFORCED,
      ts AS TO_TIMESTAMP_LTZ(value_eventTime, 3),
      WATERMARK FOR ts AS ts - INTERVAL '2' SECOND  -- Define a watermark.
    ) WITH (
      'connector'='upsert-kafka',
      'topic' = 'click_stream4',
      'properties.bootstrap.servers' = 'alikafka-pre-cn-w******02-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-3-vpc.alikafka.aliyuncs.com:9092',
      'key.format' = 'json',
      'value.format' = 'json',
      'key.fields-prefix' = 'key_',
      'value.fields-prefix' = 'value_',
      'value.fields-include' = 'EXCEPT_KEY'
    );
    
    
    CREATE TEMPORARY TABLE `clickstream5` (
      `key_id` BIGINT,
      `value_eventTime` BIGINT,  
      `value_eventType` STRING,
      `value_productId` STRING,
      `value_categoryId` STRING,
      `value_categoryCode` STRING,
      `value_brand` STRING,
      `value_price` DECIMAL(10, 2),
      `value_userId` STRING,
      `value_userSession` STRING,
      -- Define the primary key.
      PRIMARY KEY (`key_id`) NOT ENFORCED,
      ts AS TO_TIMESTAMP_LTZ(value_eventTime, 3),
      WATERMARK FOR ts AS ts - INTERVAL '2' SECOND  -- Define a watermark.
    ) WITH (
      'connector'='upsert-kafka',
      'topic' = 'click_stream5',
      'properties.bootstrap.servers' = 'alikafka-pre-cn-w******02-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-3-vpc.alikafka.aliyuncs.com:9092',
      'key.format' = 'json',
      'value.format' = 'json',
      'key.fields-prefix' = 'key_',
      'value.fields-prefix' = 'value_',
      'value.fields-include' = 'EXCEPT_KEY'
    );
    
    BEGIN STATEMENT SET; 
    INSERT INTO `clickstream1`
    SELECT
      id,
      UNIX_TIMESTAMP(eventTime) * 1000 as eventTime,
      eventType,
      productId,
      categoryId,
      categoryCode,
      brand,
      price,
      `userId`,
      userSession
    FROM `mysql-catalog`.`ecommerce`.`click_stream1`;
    
    
    INSERT INTO `clickstream2`
    SELECT
      id,
      UNIX_TIMESTAMP(eventTime) * 1000 as eventTime,
      eventType,
      productId,
      categoryId,
      categoryCode,
      brand,
      price,
      `userId`,
      userSession
    FROM `mysql-catalog`.`ecommerce`.`click_stream2`;
    
    
    INSERT INTO `clickstream3`
    SELECT
      id,
      UNIX_TIMESTAMP(eventTime) * 1000 as eventTime,
      eventType,
      productId,
      categoryId,
      categoryCode,
      brand,
      price,
      `userId`,
      userSession
    FROM `mysql-catalog`.`ecommerce`.`click_stream3`;
    
    
    INSERT INTO `clickstream4`
    SELECT
      id,
      UNIX_TIMESTAMP(eventTime) * 1000 as eventTime,
      eventType,
      productId,
      categoryId,
      categoryCode,
      brand,
      price,
      `userId`,
      userSession
    FROM `mysql-catalog`.`ecommerce`.`click_stream4`;
    
    
    INSERT INTO `clickstream5`
    SELECT
      id,
      UNIX_TIMESTAMP(eventTime) * 1000 as eventTime,
      eventType,
      productId,
      categoryId,
      categoryCode,
      brand,
      price,
      `userId`,
      userSession
    FROM `mysql-catalog`.`ecommerce`.`click_stream5`;
    END;      -- Required when writing to multiple sinks.
  4. In the upper-right corner, click Deploy to deploy the job.

  5. In the navigation pane on the left, choose Operation Center > Job O&M. In the Actions column for the target job, click Start. Select Stateless Start and then click Start.

Step 3: Develop, deploy, and start the CEP job

This section describes how to deploy the cep-demo-1.0-SNAPSHOT-jar-with-dependencies.jar job. This job consumes user clickstream events from Kafka, processes them, and prints alert information to the Realtime Compute for Apache Flink development console. You can adjust the code based on your business architecture and select a suitable downstream connector for different data output scenarios. For more information about supported connectors, see Supported connectors.

1. Code development

This section shows only the core code and describes its functions.

Main class

public class CepDemo {

    public static void checkArg(String argName, MultipleParameterTool params) {
        if (!params.has(argName)) {
            throw new IllegalArgumentException(argName + " must be set!");
        }
    }

  // Parse the rule table.
    private static Match_results parseOutput(String output) {
        String rule = "\\(id, version\\): \\((\\d+), (\\d+)\\).*?Event\\((\\d+), (\\w+), (\\d+), (\\d+)";
        Pattern pattern = Pattern.compile(rule);
        Matcher matcher = pattern.matcher(output);
        if (matcher.find()) {
            return new Match_results(Integer.parseInt(matcher.group(1)), Integer.parseInt(matcher.group(2)), Integer.parseInt(matcher.group(3)), matcher.group(4), Integer.parseInt(matcher.group(6)));
        }
        return null;
    }

    public static void main(String[] args) throws Exception {
        // Process args
        final MultipleParameterTool params = MultipleParameterTool.fromArgs(args);

        // Kafka broker endpoints.
        checkArg(KAFKA_BROKERS_ARG, params);
        // Input topic.
        checkArg(INPUT_TOPIC_ARG, params);
        // group
        checkArg(INPUT_TOPIC_GROUP_ARG, params);

        // MySQL JDBC URL.
        checkArg(JDBC_URL_ARG, params);
        // MySQL table name.
        checkArg(TABLE_NAME_ARG, params);
        // Polling interval for the database.
        checkArg(JDBC_INTERVAL_MILLIS_ARG, params);
        // Specifies whether to use event time processing (true/false).
        checkArg(USING_EVENT_TIME, params);

        // Set up the streaming execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Build Kafka source with new Source API based on FLIP-27   Kafka Source settings.
        KafkaSource<Event> kafkaSource =
                KafkaSource.<Event>builder()
                        .setBootstrapServers(params.get(KAFKA_BROKERS_ARG))
                        .setTopics(params.get(INPUT_TOPIC_ARG))
                        .setStartingOffsets(OffsetsInitializer.latest())
                        .setGroupId(params.get(INPUT_TOPIC_GROUP_ARG))
                        .setDeserializer(new EventDeSerializationSchema())
                        .build();



        // DataStream Source   Watermark strategy.
        DataStreamSource<Event> source =
                env.fromSource(
                        kafkaSource,
                        WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                                .withTimestampAssigner((event, ts) -> event.getEventTime()),
                        "Kafka Source");


        // Group the stream by UserId for subsequent pattern matching.
        KeyedStream<Event, String> keyedStream =
                source.assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Event>forGenerator(ctx -> new EventBoundedOutOfOrdernessWatermarks(Duration.ofSeconds(5)))
                ).keyBy(new KeySelector<Event, String>() {
                    @Override
                    public String getKey(Event value) throws Exception {
                        return value.getUserId(); // Use only UserId as the key.
                    }
                });

        // Dynamic CEP patterns. Use the dynamically loaded pattern processor factory class JDBCPeriodicPatternProcessorDiscovererFactory to obtain patterns and perform pattern matching (read the rule table in the MySQL database).
        SingleOutputStreamOperator<String> output =
                CEP.dynamicPatterns(
                        keyedStream,// Source data stream.
                        new JDBCPeriodicPatternProcessorDiscovererFactory<>(
                                params.get(JDBC_URL_ARG),
                                JDBC_DRIVE,
                                params.get(TABLE_NAME_ARG),
                                null,
                                Long.parseLong(params.get(JDBC_INTERVAL_MILLIS_ARG))),
                        Boolean.parseBoolean(params.get(USING_EVENT_TIME)) ? TimeBehaviour.EventTime : TimeBehaviour.ProcessingTime,
                        TypeInformation.of(new TypeHint<String>() {})
                );

        // Print the output to the client.
        output.print();
  
        env.execute("CEP Demo");
    }
}

Scenario 1: Detect when a user browses products from different categories within the same session in 5 minutes

Data order: The pattern starts with a `view` event, followed by another `view` event.

public class FirstViewCondition extends SimpleCondition<ClickEvent> {
    @Override
    public boolean filter(ClickEvent event) {
        return event.getEventType().equals("view");
    }
}

Scenario 2: Detect when a user adds a high-value product to the shopping cart but does not make a purchase within 10 minutes

Data order: The pattern starts with a `cart` event (add to cart) where the price is greater than 200, followed by a `purchase` event.

public class CartAddCondition extends SimpleCondition<ClickEvent> {

    @Override
    public boolean filter(ClickEvent event) {
        return event.getEventType().equals("cart") && event.getPrice() > 200;
    }
}
public class PurchaseCondition extends SimpleCondition<ClickEvent> {

    @Override
    public boolean filter(ClickEvent event) {
        return event.getEventType().equals("purchase");
    }
}

Scenario 3: Detect when a user browses the same product multiple times within 15 minutes

Data order: The pattern starts with a `view` event, followed by another `view` event that is repeated three times.

-- The condition class is the same as in Test 1.
public class FirstViewCondition extends SimpleCondition<ClickEvent> {
    @Override
    public boolean filter(ClickEvent event) {
        return event.getEventType().equals("view");
    }
}

Scenario 4: Detect when a user browses a product and only adds it to the shopping cart after the price drops

Data order: The pattern starts with a `view` event, followed by another `view` event where the product price is lower than the initial product price, and finally a `cart` event.

-- The condition class is the same as in Test 1.
public class FirstViewCondition extends SimpleCondition<ClickEvent> {
    @Override
    public boolean filter(ClickEvent event) {
        return event.getEventType().equals("view");
    }
}
public class InitialCondition extends IterativeCondition<ClickEvent> {
    @Override
    public boolean filter(ClickEvent event, Context<ClickEvent> ctx) throws Exception {
        ClickEvent initialView = ctx.getEventsForPattern("initial_view").iterator().next();
        return event.getEventType().equals("view") && event.getProductId().equals(initialView.getProductId()) && event.getPrice() < initialView.getPrice();
    }

}
public class CartCondition extends SimpleCondition<ClickEvent> {
    @Override
    public boolean filter(ClickEvent event) {
        return event.getEventType().equals("cart");
    }
}

Scenario 5: Detect when a user browses a product multiple times within a week but does not place an order

Data order: The pattern starts with a `view` event, and no `purchase` event can occur within the subsequent time window.

-- The condition class is the same as in Test 1.
public class FirstViewCondition extends SimpleCondition<ClickEvent> {
    @Override
    public boolean filter(ClickEvent event) {
        return event.getEventType().equals("view");
    }
}
-- The condition class is the same as in Test 2.
public class PurchaseCondition extends SimpleCondition<ClickEvent> {

    @Override
    public boolean filter(ClickEvent event) {
        return event.getEventType().equals("purchase");
    }
}

2. Deploy the job

On the Operation Center > Job O&M page, click Deploy Job > JAR Job to deploy the five stream jobs separately.

image

The following table describes the parameters:

Parameter

Description

Example

Deployment Mode

Stream processing

Streaming Mode

Deployment Name

Enter the name of the corresponding JAR job.

  • Scenario 1 job name: EcommerceCEPRunner1

  • Scenario 2 job name: EcommerceCEPRunner2

  • Scenario 3 job name: EcommerceCEPRunner3

  • Scenario 4 job name: EcommerceCEPRunner4

  • Scenario 5 job name: EcommerceCEPRunner5

Engine Version

The Flink engine version used by the current job.

The SDK for the code in this topic uses JDK 11. Select a version that includes jdk11. We recommend that you use the latest Ververica Runtime (VVR) version.

vvr-8.0.11-jdk11-flink-1.17

JAR URI

Click the 上传 icon on the right to manually upload the cep-demo-1.0-SNAPSHOT-jar-with-dependencies.jar file.

oss://xxx/artifacts/namespaces/xxx/cep-demo-1.0-SNAPSHOT-jar-with-dependencies.jar

Entry Point Class

The entry point class of the program.

com.alibaba.ververica.cep.demo.CepDemo

Entry Point Main Arguments

You can pass parameters here and call them in the main method.

Configure the following parameters for this topic:

  • bootstrap.servers: The Kafka cluster endpoint.

  • clickstream_topic: The Kafka topic for the consumed clickstream.

  • group: The consumer group ID.

  • jdbcUrl: The MySQL endpoint.

  • database: The database name.

  • user: The username.

  • password: The user password.

  • tableName: The name of the rule table in MySQL.

  • Scenario 1 configuration: --kafkaBrokers alikafka-pre-cn-******02-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-******02-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-******02-3-vpc.alikafka.aliyuncs.com:9092 --inputTopic click_stream1 --inputTopicGroup clickstream.consumer --jdbcUrl jdbc:mysql://rm-2********7xr86.mysql.rds.aliyuncs.com:3306/ecommerce?user=flink_rds&password=flink123NN@1 --tableName rds_demo1 --jdbcIntervalMs 3000 --usingEventTime false

  • Scenario 2 configuration: --kafkaBrokers alikafka-pre-cn-******02-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-******02-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-******02-3-vpc.alikafka.aliyuncs.com:9092 --inputTopic click_stream2 --inputTopicGroup clickstream.consumer --jdbcUrl jdbc:mysql://rm-2********7xr86.mysql.rds.aliyuncs.com:3306/ecommerce?user=flink_rds&password=flink123NN@1 --tableName rds_demo2 --jdbcIntervalMs 3000 --usingEventTime false

  • Scenario 3 configuration: --kafkaBrokers alikafka-pre-cn-******02-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-******02-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-******02-3-vpc.alikafka.aliyuncs.com:9092 --inputTopic click_stream3 --inputTopicGroup clickstream.consumer --jdbcUrl jdbc:mysql://rm-2********7xr86.mysql.rds.aliyuncs.com:3306/ecommerce?user=flink_rds&password=flink123NN@1 --tableName rds_demo3 --jdbcIntervalMs 3000 --usingEventTime false

  • Scenario 4 configuration: --kafkaBrokers alikafka-pre-cn-******02-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-******02-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-******02-3-vpc.alikafka.aliyuncs.com:9092 --inputTopic click_stream4 --inputTopicGroup clickstream.consumer --jdbcUrl jdbc:mysql://rm-2********7xr86.mysql.rds.aliyuncs.com:3306/ecommerce?user=flink_rds&password=flink123NN@1 --tableName rds_demo4 --jdbcIntervalMs 3000 --usingEventTime false

  • Scenario 5 configuration: --kafkaBrokers alikafka-pre-cn-******02-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-******02-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-******02-3-vpc.alikafka.aliyuncs.com:9092 --inputTopic click_stream5 --inputTopicGroup clickstream.consumer --jdbcUrl jdbc:mysql://rm-2********7xr86.mysql.rds.aliyuncs.com:3306/ecommerce?user=flink_rds&password=flink123NN@1 --tableName rds_demo5 --jdbcIntervalMs 3000 --usingEventTime false

For more information about deployment, see Deploy a JAR job.

3. Start the job

On the Job O&M page, in the Actions column for the target job, click Start. Select Stateless Start and then click Start. Start the five jobs for the scenarios, named EcommerceCEPRunner1, EcommerceCEPRunner2, EcommerceCEPRunner3, EcommerceCEPRunner4, and EcommerceCEPRunner5, in sequence.

For more information about start configurations, see Start a job.

Step 4: Query alerts

Scenario 1: Detect when a user browses products from different categories within the same session in 5 minutes

  1. Insert the rule data into MySQL.

    -- Test data for the cross-selling and up-selling opportunities scenario.
    INSERT INTO rds_demo1 (
     `id`,
     `version`,
     `pattern`,
     `function`
    ) values(
      '1',
       1,
      '{"name":"second_view","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":null,"nodes":[{"name":"second_view","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.FirstViewCondition","type":"CLASS"},"type":"ATOMIC"},{"name":"first_view","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.FirstViewCondition","type":"CLASS"},"type":"ATOMIC"}],"edges":[{"source":"first_view","target":"second_view","type":"STRICT"}],"window":{"type":"FIRST_AND_LAST","time":{"unit":"MINUTES","size":5}},"afterMatchStrategy":{"type":"NO_SKIP","patternName":null},"type":"COMPOSITE","version":1}
    ',
      'com.alibaba.ververica.cep.demo.dynamic.DemoPatternProcessFunction')
    ;
  2. Insert the user behavior test data into MySQL.

    -- Test data for the cross-selling and up-selling opportunities scenario.
    INSERT INTO `click_stream1` 
    (eventTime, eventType, productId, categoryId, categoryCode, brand, price, userId, userSession) 
    VALUES
    ('2020-01-01 00:01:00.0', 'view', 1005073, 2232732093077520756, 'construction.tools.light', 'samsung', 1130.02, 519698804, '69b5d72f-fd6e-4fed-aa23-1286b2ca89a0'),
    ('2020-01-01 00:01:03.0', 'view', 1005205, 2232732093077520756, 'apparel.shoes', 'xiaomi', 29.95, 519698804, '69b5d72f-fd6e-4fed-aa23-1286b2ca89a0'),
    ('2020-01-01 00:01:07.0', 'view', 1005205, 2232732093077520756, 'apparel.shoes.step_ins', 'intel', 167.20, 519698804, '69b5d72f-fd6e-4fed-aa23-1286b2ca89a0'),
    ('2020-01-01 00:01:08.0', 'view', 1005205, 2232732093077520756, 'appliances.personal.massager', 'samsung', 576.33, 519698804, '69b5d72f-fd6e-4fed-aa23-1286b2ca89a0');
  3. View the results in the logs of the Realtime Compute for Apache Flink development console.

    • In the JobManager log, search for the keyword `JDBCPeriodicPatternProcessorDiscoverer` to view the latest rule.

      image

    • In the TaskManager, view the Stdout log file. Search for the keyword A match for Pattern of (id, version): (1, 1) to view the printed matches in the log.

      image

Scenario 2: Detect when a user adds a high-value product to the shopping cart but does not make a purchase within 10 minutes

  1. Insert the rule data into MySQL.

    -- Test data for the high-value shopping cart recovery scenario.
    INSERT INTO rds_demo2 (
     `id`,
     `version`,
     `pattern`,
     `function`
    ) values(
      '1',
       1,
      '{"name":"purchase","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":null,"nodes":[{"name":"purchase","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.PurchaseCondition","type":"CLASS"},"type":"ATOMIC"},{"name":"cart_add","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.CartAddCondition","type":"CLASS"},"type":"ATOMIC"}],"edges":[{"source":"cart_add","target":"purchase","type":"SKIP_TILL_NEXT"}],"window":{"type":"FIRST_AND_LAST","time":{"unit":"MINUTES","size":30}},"afterMatchStrategy":{"type":"NO_SKIP","patternName":null},"type":"COMPOSITE","version":1}
    ',
      'com.alibaba.ververica.cep.demo.dynamic.DemoPatternProcessFunction')
    ;
  2. Insert the user behavior test data into MySQL.

    -- Test data for the high-value shopping cart recovery scenario.
    INSERT INTO `click_stream2` 
    (eventTime, eventType, productId, categoryId, categoryCode, brand, price, userId, userSession) 
    VALUES
    ('2020-01-01 01:01:01.0','cart',1002923,2053013555631882655,'electronics.smartphone','huawei',249.30,517014550,'b666b914-8abf-4ebe-b674-aa31a1d0f7ce'),
    ('2020-01-01 01:11:02.0','cart',1004227,2232732093077520756,'construction.tools.light','apple',892.94,590404850,'057050ba-adca-4c7f-99f3-e87d8f9bbded'),
    ('2020-01-01 01:11:03.0','purchase',1004227,2232732093077520756,'construction.tools.light','apple',892.94,590404850,'057050ba-adca-4c7f-99f3-e87d8f9bbded');
  3. View the results in the logs of the Realtime Compute for Apache Flink development console.

    • In the JobManager log, search for the keyword `JDBCPeriodicPatternProcessorDiscoverer` to view the latest rule.

      image

    • In the TaskManager, view the Stdout log file. Search for the keyword A match for Pattern of (id, version): (1, 1) to view the printed matches in the log.

      image

Scenario 3: Detect when a user browses the same product multiple times within 15 minutes

  1. Insert the rule data into MySQL.

    -- Test data for the high-intent user identification scenario.
    INSERT INTO rds_demo3 (
     `id`,
     `version`,
     `pattern`,
     `function`
    ) values(
      '1',
       1,
      '{"name":"repeat_view","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["TIMES"],"times":{"from":3,"to":3,"windowTime":null},"untilCondition":null},"condition":null,"nodes":[{"name":"repeat_view","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["TIMES"],"times":{"from":3,"to":3,"windowTime":null},"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.FirstViewCondition","type":"CLASS"},"type":"ATOMIC"},{"name":"initial_view","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.FirstViewCondition","type":"CLASS"},"type":"ATOMIC"}],"edges":[{"source":"initial_view","target":"repeat_view","type":"SKIP_TILL_NEXT"}],"window":{"type":"FIRST_AND_LAST","time":{"unit":"MINUTES","size":15}},"afterMatchStrategy":{"type":"NO_SKIP","patternName":null},"type":"COMPOSITE","version":1}
    ',
      'com.alibaba.ververica.cep.demo.dynamic.DemoPatternProcessFunction')
    ;
  2. Insert the user behavior test data into MySQL.

    -- Test data for the high-intent user identification scenario.
    INSERT INTO `click_stream3` 
    (eventTime, eventType, productId, categoryId, categoryCode, brand, price, userId, userSession) 
    VALUES
    ('2020-01-01 02:01:01.0','view',21406322,2232732082063278200,'electronics.clocks','casio',81.03,578858757,'bdf051a8-1594-4630-b93d-2ba62b92d039'),
    ('2020-01-01 02:01:02.0','view',21406322,2232732082063278200,'electronics.clocks','casio',81.03,578858757,'bdf051a8-1594-4630-b93d-2ba62b92d039'),
    ('2020-01-01 02:01:03.0','view',21406322,2232732082063278200,'electronics.clocks','casio',81.03,578858757,'bdf051a8-1594-4630-b93d-2ba62b92d039'),
    ('2020-01-01 02:01:04.0','view',21406322,2232732082063278200,'electronics.clocks','casio',81.03,578858757,'bdf051a8-1594-4630-b93d-2ba62b92d039');
  3. View the results in the logs of the Realtime Compute for Apache Flink development console.

    • In the JobManager log, search for the keyword `JDBCPeriodicPatternProcessorDiscoverer` to view the latest rule.

      image

    • In the TaskManager, view the Stdout log file. Search for the keyword A match for Pattern of (id, version): (1, 1) to view the printed matches in the log.

      image

Scenario 4: Detect when a user browses a product and only adds it to the shopping cart after the price drops

  1. Insert the rule data into MySQL.

    -- Test data for the price-sensitive user operations scenario.
    INSERT INTO rds_demo4 (
     `id`,
     `version`,
     `pattern`,
     `function`
    ) values(
      '1',
       1,
      '{"name":"cart_after_price_drop","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":null,"nodes":[{"name":"cart_after_price_drop","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.CartCondition","type":"CLASS"},"type":"ATOMIC"},{"name":"view_price_drop","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.InitialCondition","type":"CLASS"},"type":"ATOMIC"},{"name":"initial_view","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.FirstViewCondition","type":"CLASS"},"type":"ATOMIC"}],"edges":[{"source":"view_price_drop","target":"cart_after_price_drop","type":"STRICT"},{"source":"initial_view","target":"view_price_drop","type":"STRICT"}],"window":{"type":"FIRST_AND_LAST","time":{"unit":"MINUTES","size":10}},"afterMatchStrategy":{"type":"NO_SKIP","patternName":null},"type":"COMPOSITE","version":1}
    ',
      'com.alibaba.ververica.cep.demo.dynamic.DemoPatternProcessFunction')
    ;
  2. Insert the user behavior test data into MySQL.

    -- Test data for the price-sensitive user operations scenario.
    INSERT INTO `click_stream4` 
    (eventTime, eventType, productId, categoryId, categoryCode, brand, price, userId, userSession) 
    VALUES
    ('2020-01-01 03:01:01.0','view',15200496,2053013553484398879,'appliances.kitchen.toster','uragan',38.87,516616354,'f9233034-08e7-46fb-a1ae-175de0d0de7a'),
    ('2020-01-01 03:01:02.0','view',15200496,2053013553484398879,'appliances.kitchen.toster','uragan',30.87,516616354,'f9233034-08e7-46fb-a1ae-175de0d0de7a'),
    ('2020-01-01 03:01:03.0','cart',15200496,2053013553484398879,'appliances.kitchen.toster','uragan',30.87,516616354,'f9233034-08e7-46fb-a1ae-175de0d0de7a');
  3. View the results in the logs of the Realtime Compute for Apache Flink development console.

    • In the JobManager log, search for the keyword `JDBCPeriodicPatternProcessorDiscoverer` to view the latest rule.

      image

    • In the TaskManager, view the Stdout log file. Search for the keyword A match for Pattern of (id, version): (1, 1) to view the printed matches in the log.

      image

Scenario 5: Detect when a user browses a product multiple times within a week but does not place an order

  1. Insert the rule data into MySQL.

    -- Test data for the churn risk scenario. 
    INSERT INTO rds_demo5 (
     `id`,
     `version`,
     `pattern`,
     `function`
    ) values(
      '1',
       1,
      '{"name":"purchase","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":null,"nodes":[{"name":"purchase","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.PurchaseCondition","type":"CLASS"},"type":"ATOMIC"},{"name":"first_view","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["LOOPING"],"times":{"from":10,"to":10,"windowTime":null},"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.FirstViewCondition","type":"CLASS"},"type":"ATOMIC"}],"edges":[{"source":"first_view","target":"purchase","type":"NOT_NEXT"}],"window":{"type":"FIRST_AND_LAST","time":{"unit":"DAYS","size":7}},"afterMatchStrategy":{"type":"NO_SKIP","patternName":null},"type":"COMPOSITE","version":1}
    ',
      'com.alibaba.ververica.cep.demo.dynamic.DemoPatternProcessFunction')
    ;
  2. Insert the user behavior test data into MySQL.

    -- Test data for the churn risk scenario. 
    INSERT INTO `click_stream5` 
    (eventTime, eventType, productId, categoryId, categoryCode, brand, price, userId, userSession) 
    VALUES
    ('2020-12-10 00:01:01.0', 'view', 28722181, 2053013555321504139, 'appliances.kitchen.meat_grinder', 'respect', 51.22, 563209914, '657edcb2-767d-458c-9598-714c1e474e09'),
    ('2020-12-10 00:02:02.0', 'view', 28722181, 2053013555321504139, 'appliances.kitchen.meat_grinder', 'respect', 51.22, 563209914, '657edcb2-767d-458c-9598-714c1e474e09'),
    ('2020-12-10 00:03:03.0', 'view', 28722181, 2053013555321504139, 'appliances.kitchen.meat_grinder', 'respect', 51.22, 563209914, '657edcb2-767d-458c-9598-714c1e474e09'),
    ('2020-12-10 00:04:03.0', 'view', 28722181, 2053013555321504139, 'appliances.kitchen.meat_grinder', 'respect', 51.22, 563209914, '657edcb2-767d-458c-9598-714c1e474e09'),
    ('2020-12-10 00:05:04.0', 'view', 28722181, 2053013555321504139, 'appliances.kitchen.meat_grinder', 'respect', 51.22, 563209914, '657edcb2-767d-458c-9598-714c1e474e09'),
    ('2020-12-10 00:06:05.0', 'view', 28722181, 2053013555321504139, 'appliances.kitchen.meat_grinder', 'respect', 51.22, 563209914, '657edcb2-767d-458c-9598-714c1e474e09'),
    ('2020-12-10 00:07:06.0', 'view', 28722181, 2053013555321504139, 'appliances.kitchen.meat_grinder', 'respect', 51.22, 563209914, '657edcb2-767d-458c-9598-714c1e474e09'),
    ('2020-12-10 00:08:07.0', 'view', 28722181, 2053013555321504139, 'appliances.kitchen.meat_grinder', 'respect', 51.22, 563209914, '657edcb2-767d-458c-9598-714c1e474e09'),
    ('2020-12-10 00:08:08.0', 'view', 28722181, 2053013555321504139, 'appliances.kitchen.meat_grinder', 'respect', 51.22, 563209914, '657edcb2-767d-458c-9598-714c1e474e09'),
    ('2020-12-10 00:08:08.0', 'view', 28722181, 2053013555321504139, 'appliances.kitchen.meat_grinder', 'respect', 51.22, 563209914, '657edcb2-767d-458c-9598-714c1e474e09'),
    ('2020-12-10 00:09:09.0', 'cart', 28722181, 2053013555321504139, 'appliances.kitchen.meat_grinder', 'respect', 51.22, 563209914, '657edcb2-767d-458c-9598-714c1e474e09');
  3. View the results in the logs of the Realtime Compute for Apache Flink development console.

    • In the JobManager log, search for the keyword `JDBCPeriodicPatternProcessorDiscoverer` to view the latest rule.

      image

    • In the TaskManager, view the Stdout log file. Search for the keyword A match for Pattern of (id, version): (1, 1) to view the printed matches in the log.

      image

References