Flink Complex Event Processing (CEP) is a feature that dynamically processes complex event streams to detect specific event patterns in real time and trigger alerts. In e-commerce marketing, Flink CEP can monitor user behavior and transaction data in real time to identify abnormal or critical events and send timely alerts.
Background information
The rapid growth of the e-commerce industry has led to an exponential increase in the volume of user behavior and transaction data. Traditional batch processing methods can no longer meet the demand for timely identification of and response to abnormal behavior, system threats, and user churn. In contrast, a dynamic complex event processing (CEP) engine can model and analyze multi-stage user behavior. It automatically identifies complex event patterns and triggers alerts in the early stages of a threat. This is the core advantage of dynamic CEP for real-time business operations. It has the following three key features:
High real-time performance: It provides millisecond-level responses. This enables in-event alerting, rather than post-event analysis, to help you make faster decisions.
Flexible and configurable rules: It supports dynamic updates to rule policies. This lets you quickly adapt to business changes without restarting the service.
Powerful complex event recognition: It supports advanced logical matching, such as multi-event sequences, time windows, and combined conditions. This allows for the accurate capture of complex business scenarios.
In the e-commerce industry, common scenarios for dynamic CEP include but are not limited to the following:
Scenario | Description |
Cross-selling and up-selling opportunities | When browsing products, users often show interest across different categories. For example, a user might view a mobile phone and then look at headphones or power banks. This behavior presents opportunities for cross-selling and up-selling. By accurately recommending complementary products, such as phone cases or headphones, or offering bundled deals, such as a "phone + headphones package discount", the platform can increase the purchase rate of additional items and raise the average order value. This also improves the user experience and enhances user loyalty, driving business growth. |
High-value shopping cart recovery | A user might add a high-value item to their shopping cart but not complete the purchase due to price sensitivity or hesitation. This results in a potential loss of sales. By identifying abandoned shopping carts in real time and triggering interventions, such as limited-time discounts, low stock alerts, or free shipping offers, the platform can effectively reduce the loss of high-value items, increase order conversion rates, and recover potential revenue. This creates a win-win situation for both user value and platform revenue. |
High-intent user identification | A user who browses the same product multiple times in a short period shows a high purchase intent. By identifying this behavior and triggering personalized marketing, such as exclusive coupons or stock reminders, the platform can speed up the user's decision-making process, increase conversion rates, and improve the user experience, which boosts sales. |
Price-sensitive user operations | Price-sensitive users often browse a product repeatedly and only add it to their shopping cart when the price drops. By analyzing this behavior, the platform can send notifications or targeted offers when the price changes, such as "The product you are following is now on sale!". This increases conversion rates and improves the efficiency of user operations. |
Churn risk alerts | A user who frequently browses products but does not place an order for a long time may be at risk of churning. By identifying this behavior and taking recovery measures, such as sending exclusive coupons or recommending popular products, the platform can effectively reduce the churn rate, extend the user lifecycle, and increase user retention and platform revenue. |
Solution architecture
Flink CEP is an Apache Flink library for processing complex event patterns. With Flink CEP, you can define complex event patterns, monitor event streams in real time, and identify event sequences that match those patterns. The library then generates matching results. The solution architecture is as follows:

Event Stream
An event stream is the input source for CEP processing. It is typically a continuous data stream that contains a series of chronologically ordered events. Each event can have multiple properties that are used for pattern matching.
Pattern and Rule Definitions
You can define event patterns and rules that describe the event sequences or combinations you want to detect. Patterns can include the order of events, time constraints, and condition filters. For example, you can define a pattern where event A is followed by event B within 10 seconds.
CEP Engine Analysis
The CEP engine accepts the event stream and analyzes it based on the defined patterns and rules. The engine continuously monitors the event stream and attempts to match input events with the defined patterns. During the matching process, the engine considers constraints such as the time order of events, property conditions, and time windows.
CEP Matching Outputs
When an event sequence in the event stream successfully matches a defined pattern, the CEP engine generates an output. This output can be the matched event sequence, an action triggered by a rule, or another user-defined output format. The matching results can be used for subsequent processing, such as alerting, decision-making, or data storage.
Prerequisites
You have activated Realtime Compute for Apache Flink. For more information, see Activate Realtime Compute for Apache Flink.
You have activated Message Queue for Apache Kafka. For more information, see Deploy a Message Queue for Apache Kafka instance.
You have activated RDS for MySQL. For more information, see Create an RDS for MySQL instance.
Ensure that Realtime Compute for Apache Flink, ApsaraDB RDS for MySQL, and Message Queue for Apache Kafka are in the same VPC. If they are not in the same VPC, you must establish a network connection between VPCs or access the services over the Internet. For more information, see How do I access other services across VPCs? and How do I access the Internet?.
You have a Resource Access Management (RAM) user or RAM role with the required permissions.
Step 1: Preparations
Create an ApsaraDB RDS for MySQL instance and prepare the data source
Create an ApsaraDB RDS for MySQL database. For more information, see Create a database.
For the destination instance, create a database named
ecommerce.Prepare the MySQL Change Data Capture (CDC) data source.
On the details page of the destination instance, click Log On To Database in the upper section of the page.
In the DMS logon dialog box, enter the username and password for the database account that you created, and then click Log On.
After you log on, double-click the
ecommercedatabase on the left to switch to it.In the SQL Console, enter the following Data Definition Language (DDL) statements to create tables and insert data.
-- Create rule table 1 CREATE TABLE rds_demo1 ( `id` VARCHAR(64), `version` INT, `pattern` VARCHAR(4096), `function` VARCHAR(512) ); -- Create rule table 2 CREATE TABLE rds_demo2 ( `id` VARCHAR(64), `version` INT, `pattern` VARCHAR(4096), `function` VARCHAR(512) ); -- Create rule table 3 CREATE TABLE rds_demo3 ( `id` VARCHAR(64), `version` INT, `pattern` VARCHAR(4096), `function` VARCHAR(512) ); -- Create rule table 4 CREATE TABLE rds_demo4 ( `id` VARCHAR(64), `version` INT, `pattern` VARCHAR(4096), `function` VARCHAR(512) ); -- Create rule table 5 CREATE TABLE rds_demo5 ( `id` VARCHAR(64), `version` INT, `pattern` VARCHAR(4096), `function` VARCHAR(512) ); -- Create the source table CREATE TABLE `click_stream1` ( id bigint not null primary key auto_increment, -- Auto-increment primary key eventTime timestamp, eventType varchar(50), productId varchar(50), categoryId varchar(50), categoryCode varchar(80), brand varchar(50), price decimal(10, 2), userId varchar(50), userSession varchar(50) ); CREATE TABLE `click_stream2` ( id bigint not null primary key auto_increment, -- Auto-increment primary key eventTime timestamp, eventType varchar(50), productId varchar(50), categoryId varchar(50), categoryCode varchar(80), brand varchar(50), price decimal(10, 2), userId varchar(50), userSession varchar(50) ); CREATE TABLE `click_stream3` ( id bigint not null primary key auto_increment, -- Auto-increment primary key eventTime timestamp, eventType varchar(50), productId varchar(50), categoryId varchar(50), categoryCode varchar(80), brand varchar(50), price decimal(10, 2), userId varchar(50), userSession varchar(50) ); CREATE TABLE `click_stream4` ( id bigint not null primary key auto_increment, -- Auto-increment primary key eventTime timestamp, eventType varchar(50), productId varchar(50), categoryId varchar(50), categoryCode varchar(80), brand varchar(50), price decimal(10, 2), userId varchar(50), userSession varchar(50) ); CREATE TABLE `click_stream5` ( id bigint not null primary key auto_increment, -- Auto-increment primary key eventTime timestamp, eventType varchar(50), productId varchar(50), categoryId varchar(50), categoryCode varchar(80), brand varchar(50), price decimal(10, 2), userId varchar(50), userSession varchar(50) );Click Execute, and then click Execute Directly.
Create Kafka topic and group resources
Create the following Kafka resources. For more information, see Create resources.
Group: clickstream.consumer.
Topics: click_stream1, click_stream2, click_stream3, click_stream4, and click_stream5.
When you create the topics, set the number of partitions to 1. If you do not, the sample data may not match the results in some scenarios.

Step 2: Synchronize data from MySQL to Kafka in real time
Synchronizing user clickstream events from MySQL to Kafka reduces the load that multiple jobs place on the MySQL database.
Create a MySQL catalog. For more information, see Create a MySQL catalog.
In this example, the catalog is named
mysql-catalog, and the default database isecommerce.Create a Kafka catalog. For more information, see Manage Kafka JSON catalogs.
In this example, the catalog is named
kafka-catalog.On the page, create a SQL stream job and copy the following code into the SQL editor.
CREATE TEMPORARY TABLE `clickstream1` ( `key_id` BIGINT, `value_eventTime` BIGINT, `value_eventType` STRING, `value_productId` STRING, `value_categoryId` STRING, `value_categoryCode` STRING, `value_brand` STRING, `value_price` DECIMAL(10, 2), `value_userId` STRING, `value_userSession` STRING, -- Define the primary key. PRIMARY KEY (`key_id`) NOT ENFORCED, ts AS TO_TIMESTAMP_LTZ(value_eventTime, 3), WATERMARK FOR ts AS ts - INTERVAL '2' SECOND -- Define a watermark. ) WITH ( 'connector'='upsert-kafka', 'topic' = 'click_stream1', 'properties.bootstrap.servers' = 'alikafka-pre-cn-w******02-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-3-vpc.alikafka.aliyuncs.com:9092', 'key.format' = 'json', 'value.format' = 'json', 'key.fields-prefix' = 'key_', 'value.fields-prefix' = 'value_', 'value.fields-include' = 'EXCEPT_KEY' ); CREATE TEMPORARY TABLE `clickstream2` ( `key_id` BIGINT, `value_eventTime` BIGINT, `value_eventType` STRING, `value_productId` STRING, `value_categoryId` STRING, `value_categoryCode` STRING, `value_brand` STRING, `value_price` DECIMAL(10, 2), `value_userId` STRING, `value_userSession` STRING, -- Define the primary key. PRIMARY KEY (`key_id`) NOT ENFORCED, ts AS TO_TIMESTAMP_LTZ(value_eventTime, 3), WATERMARK FOR ts AS ts - INTERVAL '2' SECOND -- Define a watermark. ) WITH ( 'connector'='upsert-kafka', 'topic' = 'click_stream2', 'properties.bootstrap.servers' = 'alikafka-pre-cn-w******02-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-3-vpc.alikafka.aliyuncs.com:9092', 'key.format' = 'json', 'value.format' = 'json', 'key.fields-prefix' = 'key_', 'value.fields-prefix' = 'value_', 'value.fields-include' = 'EXCEPT_KEY' ); CREATE TEMPORARY TABLE `clickstream3` ( `key_id` BIGINT, `value_eventTime` BIGINT, `value_eventType` STRING, `value_productId` STRING, `value_categoryId` STRING, `value_categoryCode` STRING, `value_brand` STRING, `value_price` DECIMAL(10, 2), `value_userId` STRING, `value_userSession` STRING, -- Define the primary key. PRIMARY KEY (`key_id`) NOT ENFORCED, ts AS TO_TIMESTAMP_LTZ(value_eventTime, 3), WATERMARK FOR ts AS ts - INTERVAL '2' SECOND -- Define a watermark. ) WITH ( 'connector'='upsert-kafka', 'topic' = 'click_stream3', 'properties.bootstrap.servers' = 'alikafka-pre-cn-w******02-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-3-vpc.alikafka.aliyuncs.com:9092', 'key.format' = 'json', 'value.format' = 'json', 'key.fields-prefix' = 'key_', 'value.fields-prefix' = 'value_', 'value.fields-include' = 'EXCEPT_KEY' ); CREATE TEMPORARY TABLE `clickstream4` ( `key_id` BIGINT, `value_eventTime` BIGINT, `value_eventType` STRING, `value_productId` STRING, `value_categoryId` STRING, `value_categoryCode` STRING, `value_brand` STRING, `value_price` DECIMAL(10, 2), `value_userId` STRING, `value_userSession` STRING, -- Define the primary key. PRIMARY KEY (`key_id`) NOT ENFORCED, ts AS TO_TIMESTAMP_LTZ(value_eventTime, 3), WATERMARK FOR ts AS ts - INTERVAL '2' SECOND -- Define a watermark. ) WITH ( 'connector'='upsert-kafka', 'topic' = 'click_stream4', 'properties.bootstrap.servers' = 'alikafka-pre-cn-w******02-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-3-vpc.alikafka.aliyuncs.com:9092', 'key.format' = 'json', 'value.format' = 'json', 'key.fields-prefix' = 'key_', 'value.fields-prefix' = 'value_', 'value.fields-include' = 'EXCEPT_KEY' ); CREATE TEMPORARY TABLE `clickstream5` ( `key_id` BIGINT, `value_eventTime` BIGINT, `value_eventType` STRING, `value_productId` STRING, `value_categoryId` STRING, `value_categoryCode` STRING, `value_brand` STRING, `value_price` DECIMAL(10, 2), `value_userId` STRING, `value_userSession` STRING, -- Define the primary key. PRIMARY KEY (`key_id`) NOT ENFORCED, ts AS TO_TIMESTAMP_LTZ(value_eventTime, 3), WATERMARK FOR ts AS ts - INTERVAL '2' SECOND -- Define a watermark. ) WITH ( 'connector'='upsert-kafka', 'topic' = 'click_stream5', 'properties.bootstrap.servers' = 'alikafka-pre-cn-w******02-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-w******02-3-vpc.alikafka.aliyuncs.com:9092', 'key.format' = 'json', 'value.format' = 'json', 'key.fields-prefix' = 'key_', 'value.fields-prefix' = 'value_', 'value.fields-include' = 'EXCEPT_KEY' ); BEGIN STATEMENT SET; INSERT INTO `clickstream1` SELECT id, UNIX_TIMESTAMP(eventTime) * 1000 as eventTime, eventType, productId, categoryId, categoryCode, brand, price, `userId`, userSession FROM `mysql-catalog`.`ecommerce`.`click_stream1`; INSERT INTO `clickstream2` SELECT id, UNIX_TIMESTAMP(eventTime) * 1000 as eventTime, eventType, productId, categoryId, categoryCode, brand, price, `userId`, userSession FROM `mysql-catalog`.`ecommerce`.`click_stream2`; INSERT INTO `clickstream3` SELECT id, UNIX_TIMESTAMP(eventTime) * 1000 as eventTime, eventType, productId, categoryId, categoryCode, brand, price, `userId`, userSession FROM `mysql-catalog`.`ecommerce`.`click_stream3`; INSERT INTO `clickstream4` SELECT id, UNIX_TIMESTAMP(eventTime) * 1000 as eventTime, eventType, productId, categoryId, categoryCode, brand, price, `userId`, userSession FROM `mysql-catalog`.`ecommerce`.`click_stream4`; INSERT INTO `clickstream5` SELECT id, UNIX_TIMESTAMP(eventTime) * 1000 as eventTime, eventType, productId, categoryId, categoryCode, brand, price, `userId`, userSession FROM `mysql-catalog`.`ecommerce`.`click_stream5`; END; -- Required when writing to multiple sinks.In the upper-right corner, click Deploy to deploy the job.
In the navigation pane on the left, choose . In the Actions column for the target job, click Start. Select Stateless Start and then click Start.
Step 3: Develop, deploy, and start the CEP job
This section describes how to deploy the cep-demo-1.0-SNAPSHOT-jar-with-dependencies.jar job. This job consumes user clickstream events from Kafka, processes them, and prints alert information to the Realtime Compute for Apache Flink development console. You can adjust the code based on your business architecture and select a suitable downstream connector for different data output scenarios. For more information about supported connectors, see Supported connectors.
1. Code development
This section shows only the core code and describes its functions.
2. Deploy the job
On the page, click to deploy the five stream jobs separately.

The following table describes the parameters:
Parameter | Description | Example |
Deployment Mode | Stream processing | Streaming Mode |
Deployment Name | Enter the name of the corresponding JAR job. |
|
Engine Version | The Flink engine version used by the current job. The SDK for the code in this topic uses JDK 11. Select a version that includes | vvr-8.0.11-jdk11-flink-1.17 |
JAR URI | Click the | oss://xxx/artifacts/namespaces/xxx/cep-demo-1.0-SNAPSHOT-jar-with-dependencies.jar |
Entry Point Class | The entry point class of the program. | com.alibaba.ververica.cep.demo.CepDemo |
Entry Point Main Arguments | You can pass parameters here and call them in the main method. Configure the following parameters for this topic:
|
|
For more information about deployment, see Deploy a JAR job.
3. Start the job
On the Job O&M page, in the Actions column for the target job, click Start. Select Stateless Start and then click Start. Start the five jobs for the scenarios, named EcommerceCEPRunner1, EcommerceCEPRunner2, EcommerceCEPRunner3, EcommerceCEPRunner4, and EcommerceCEPRunner5, in sequence.
For more information about start configurations, see Start a job.
icon on the right to manually upload the 








