FlinkCEP is a Complex Event Processing (CEP) library that lets you detect pre-defined patterns in a complex event stream and trigger alerts. In e-commerce marketing, Flink CEP monitors user behaviors and transactions in real time, detecting unexpected or critical events and issuing timely alerts.
Background information
In the ever-flourishing e-commerce industry, data about user behaviors and transactions is growing exponentially. E-commerce platforms need to closely monitor user behaviors, transactions, and marketing effectiveness to promptly handle unexpected issues. Use cases:
Detecting unusual behaviors: For example, if a user added many items to their cart but did not make any purchases, it may signal a brushing scam or a system failure.
Monitoring marketing events: During big sales events, real-time monitoring enables e-commerce platforms to quickly address issues like traffic surges that can overload the system or lead to stockouts.
Alerting customer churn: If a user viewed a product page many times without making a purchase, this may indicate a pricing or user experience issue.
In these scenarios, a stream processing system is a better fit than batch processing, as it enables e-commerce platforms to perform analysis in real time and take prompt actions.
FlinkCEP Architecture
FlinkCEP is an Apache Flink library for processing complex events. The figure below demonstrates how it works.
Event Stream An event stream consists of continuous, time-ordered events that move through CEP. Each event has multiple properties, which CEP looks at during pattern matching.
Pattern and Rule Definitions
Patterns and rules define a sequence or combination of events that users want the system to detect. When defining a pattern, you can specify the sequence of events, temporal constraints, and conditions that must be met to accept the events. For example, you can define a pattern as: Event A is followed by Event B, the two occurring within 10 seconds.
CEP Engine Analysis The CEP engine receives an event stream and keeps analyzing the input stream to find the defined pattern.
CEP Matching Outputs Upon finding a match, the CEP engine generates an output that includes the matched event sequence, actions to trigger as defined in rules, or other information as required by users. The output can then be used to issue alerts or support decision making, or it can be stored for future reference.
In summary, Flink CEP monitors event streams in real time, matches event streams against the pre-defined patterns, and generates outputs. It is ideal for real-time monitoring, anomaly detection, and risk control.
Use cases in e-commerce
In e-commerce, user behavior data is crucial for optimizing marketing strategies and driving business growth. By analyzing user behaviors such as page views, add-to-carts, and checkouts e-commerce platforms can identify potential opportunities and risks, so that they can take appropriate interventions. Here are some typical use cases of Flink CEP. You'll learn how data-driven strategies can empower targeted marketing to enhance customer conversion and transaction amounts per customer, improve retention, and bring in greater revenue.
Cross-selling and upselling
If customers visit different but related products, such as phones, headphones, and power banks, in quick succession, this is an opportunity for cross-selling and upselling. By recommending complementary products (phone cases, earbuds, etc.) or providing bundle discounts (such as USD 10 off when buying a phone and earbuds together), e-commerce platforms can enhance cross-selling chances, increase order value, improve user experience, foster buyer loyalty, and ultimately drive business growth.
High-value abandoned cart recovery
After adding an expensive product to their cart, a buyer may not complete the purchase due to dissatisfaction about pricing or other factors, resulting in losses of potential sales. By identifying cart abandonment and implementing timely interventions, such as offering limited-time discounts, sending out-of-stock alert messages, or providing free shipping, e-commerce platforms can effectively reduce high-value cart abandonment, recover losses of potential sales, and increase buyer value and platform revenue.
High-intent buyer identification
A high-intent buyer repeatedly views a specific product, like a pair of shoes or an electronic device, within a short time. By detecting high-intent buyers and tailoring marketing strategies, e-commerce platforms can nudge them into making purchases and improve buyer conversion, user satisfaction and experience, driving sales growth.
Price-sensitive customer identification
Price-sensitive buyers may repeatedly view a product but not add it to their cart until it becomes cheaper. After identifying such buyers, e-commerce platforms can proactively send notifications of price drops or offer targeted discounts, thereby increasing the sales conversion. They can also optimize customer operations through personalized pricing and promotions.
Churn prevention
Customers are at risk of churning if they repeatedly view products but do not make any purchases for a relatively long period, such as a week. E-commerce platforms can identify this behavior and take preventive actions, like giving coupons or recommending popular items, to engage the customers longer and increase retention and revenue.
Develop Flink CEP programs
You can use ecommerce-cep-demos-0.1.0.jar for quick deployment. You can also download the source code here: ecommerce-cep-demos-main.zip.
See also ecommerce-cep-demos on GitHub.
Cross-selling and upselling
Define the pattern
The customer views different products within five minutes in a session. For example, first_view -> second_view
.
Pattern<ClickEvent, ?> crossSellPattern = Pattern.<ClickEvent>begin("first_view")
// The pattern begins with a "view" ClickEvent.
.where(new SimpleCondition<ClickEvent>() {
@Override
public boolean filter(ClickEvent event) {
return event.getEventType().equals("view");
}
})
// It is followed immediately by another "view" ClickEvent.
.next("second_view")
.where(new SimpleCondition<ClickEvent>() {
@Override
public boolean filter(ClickEvent event) {
return event.getEventType().equals("view");
}
})
// The ClickEvents must occur within five minutes.
.within(Time.minutes(5));
Define the output
After a match is found, send a cross-selling or upselling alert. After receiving the alert, the system will automatically recommend complementary products (phone cases, earbuds, power banks, etc.) or offer bundle discounts (such as USD 10 off when buying a phone and a pair of earbuds together).
SingleOutputStreamOperator<Alert> crossSellAlerts = crossSellPatternStream.select(new PatternSelectFunction<ClickEvent, Alert>() {
@Override
public Alert select(Map<String, List<ClickEvent>> pattern) {
ClickEvent firstView = pattern.get("first_view").get(0);
ClickEvent secondView = pattern.get("second_view").get(0);
// Check whether the products viewed by the customer are in different categories.
if (!firstView.getCategoryCode().equals(secondView.getCategoryCode())) {
var message = "Cross-sell opportunity detected for user " + firstView.getUserId() +
": viewed products in categories " + firstView.getCategoryCode() + " and " + secondView.getCategoryCode();
// Return an alert for cross-selling and upselling opportunities.
return new Alert(secondView.getUserSession(), secondView.getUserId(), AlertType.CROSS_UPSELL, message);
}
return null; // Returns null if the product categories are the same, so that this match can be easily filtered out.
}
}).filter(Objects::nonNull).name("CrossSellAlertsPattern").uid("CrossSellAlertsPattern");
High-value abandoned cart recovery
Define the pattern
A customer adds an expensive product to the cart (cart_add
) but does not complete the purchase (purchase
) within 10 minutes.
Pattern<ClickEvent, ?> abandonmentPattern = Pattern.<ClickEvent>begin("cart_add")
// The pattern begins with a "cart" ClickEvent with a price greater than 200.
.where(new SimpleCondition<ClickEvent>() {
@Override
public boolean filter(ClickEvent event) {
return event.getEventType().equals("cart") && event.getPrice() > 200;
}
})
// It is followed by a "purchase" ClickEvent.
.followedBy("purchase")
.where(new SimpleCondition<ClickEvent>() {
@Override
public boolean filter(ClickEvent event) {
return event.getEventType().equals("purchase");
}
})
// The ClickEvents must occur within ten minutes.
.within(Time.minutes(10));
Define the output
If a purchase does not occur within a pre-defined timeframe, send a cart recovery alert. This triggers the system to send messages or emails about limited-time offers, free shipping, or stockout alerts, to recover abandoned carts.
SingleOutputStreamOperator<Alert> abandonedmentAlert = abandonmentPatternStream
.select(new PatternTimeoutFunction<ClickEvent, Alert>() {
@Override
// Process timeout events.
public Alert timeout(Map<String, List<ClickEvent>> pattern, long timeoutTimestamp) {
ClickEvent cartAdd = pattern.get("cart_add").get(0);
var message = "High-value cart abandonment detected for user '" + cartAdd.getUserId() + "' priced at " + cartAdd.getPrice() +
". No purchase within 10 minutes.";
// Return a cart abandonment alert.
return new Alert(cartAdd.getUserSession(), cartAdd.getUserId(), AlertType.CART_ABANDONMENT, message);
}
}, new PatternSelectFunction<ClickEvent, Alert>() {
@Override
// Process a match.
public Alert select(Map<String, List<ClickEvent>> pattern) {
ClickEvent cartAdd = pattern.get("cart_add").get(0);
ClickEvent purchase = pattern.get("purchase").get(0);
var message = "Purchase completed for user " + purchase.getUserId() + " on product " + purchase.getProductId() +
" priced at " + purchase.getPrice();
// Return a match alert.
return new Alert(cartAdd.getUserSession(), cartAdd.getUserId(), AlertType.PURCHASE_COMPLETION, message);
}
}).map(new MapFunction<Either<Alert, Alert>, Alert>() {
@Override
public Alert map(Either<Alert, Alert> alert) throws Exception {
if (alert.isLeft()) {
// If there is a timeout alert, return the left value.
return alert.left();
} else {
// If there is a match alert, return the right value.
return alert.right();
}
}
}).name("AbandonmentAlertsPattern").uid("AbandonmentAlertsPattern");
High-intent buyer identification
Define the pattern
The customer repeatedly views a product within 15 minutes. For example, initial_view -> repeat_view
.
Pattern<ClickEvent, ?> purchaseIntentPattern = Pattern.<ClickEvent>begin("initial_view")
// The pattern begins with a "view" ClickEvent.
.where(new SimpleCondition<ClickEvent>() {
@Override
public boolean filter(ClickEvent event) {
return event.getEventType().equals("view");
}
})
// It is followed by three "view" ClickEvents.
.followedBy("repeat_view")
.where(new SimpleCondition<ClickEvent>() {
@Override
public boolean filter(ClickEvent event) {
return event.getEventType().equals("view");
}
}).times(3)
// The ClickEvents must occur within 15 minutes.
.within(Time.minutes(15));
Define the output
When a match is identified, send a high-intent buyer alert. The system can then flag the customer as a high-intent buyer and initiate targeted marketing actions, such as offering personalized coupons, providing limited-time discounts, or sending out-of-stock notifications (such as "Only three items left!")
SingleOutputStreamOperator<Alert> purchaseIntentAlerts = purchaseIntentStream.select(new PatternSelectFunction<ClickEvent, Alert>() {
@Override
public Alert select(Map<String, List<ClickEvent>> pattern) {
ClickEvent initialView = pattern.get("initial_view").get(0);
var message = "High purchase intent detected for user " + initialView.getUserId() +
" on product " + initialView.getProductId();
// Return an alert for high-intent buyer identification.
return new Alert(initialView.getUserSession(), initialView.getUserId(), AlertType.PRICE_SENSITIVITY, message);
}
}).name("PurchaseIntentAlertsPattern").uid("PurchaseIntentAlertsPattern");
Price-sensitive customer identification
Define the pattern
The customer views a product but does not add it to their cart until the price drops. For example, initial_view -> view_price_drop -> cart_after_price_drop
.
Pattern<ClickEvent, ?> priceSensitivityPattern = Pattern.<ClickEvent>begin("initial_view")
// The pattern begins with a "view" ClickEvent.
.where(new SimpleCondition<ClickEvent>() {
@Override
public boolean filter(ClickEvent event) {
return event.getEventType().equals("view");
}
})
// It is immediately followed by another "view" ClickEvent of a product at a lower price.
.next("view_price_drop")
.where(new IterativeCondition<ClickEvent>() {
@Override
public boolean filter(ClickEvent event, Context<ClickEvent> ctx) throws Exception {
ClickEvent initialView = ctx.getEventsForPattern("initial_view").iterator().next();
return event.getEventType().equals("view") && event.getProductId().equals(initialView.getProductId()) && event.getPrice() < initialView.getPrice();
}
})
// Next is a "cart" ClickEvent.
.next("cart_after_price_drop")
.where(new SimpleCondition<ClickEvent>() {
@Override
public boolean filter(ClickEvent event) {
return event.getEventType().equals("cart");
}
})
// The ClickEvents must occur within ten minutes.
.within(Time.minutes(10));
Define the output
When a match is found, send a price-sensitive buyer identification alert. The system can flag the customer as price-sensitive and proactively send notifications about price drops or provide personalized offers.
SingleOutputStreamOperator<Alert> priceSensitivityAlerts = priceSensitivityStream.select(new PatternSelectFunction<ClickEvent, Alert>() {
@Override
public Alert select(Map<String, List<ClickEvent>> pattern) {
ClickEvent initialView = pattern.get("initial_view").get(0);
var message = "Price-sensitive customer detected for user " + initialView.getUserId() +
" on product " + initialView.getProductId() + " after a price drop.";
// Return an alert for price-sensitive customer identification.
return new Alert(initialView.getUserSession(), initialView.getUserId(), AlertType.PRICE_SENSITIVITY, message);
}
}).name("PriceSensitivityAlertsPattern").uid("PriceSensitivityAlertsPattern");
Churn prevention
Define the pattern
The customer views a product repeatedly within one week, but never proceeds to checkout (such as first_view -> ... -> no_purchase
.)
Pattern<ClickEvent, ?> churnPredictionPattern = Pattern.<ClickEvent>begin("first_view")
// The pattern begins with a "view" ClickEvent.
.where(new SimpleCondition<ClickEvent>() {
@Override
public boolean filter(ClickEvent event) {
return event.getEventType().equals("view");
}
})
// Customer views exceed 10.
.timesOrMore(10)
// A purchase event does not occur within the time window.
.notNext("purchase")
.where(new SimpleCondition<ClickEvent>() {
@Override
public boolean filter(ClickEvent event) {
return event.getEventType().equals("purchase");
}
})
// The pattern must be completed within seven days.
.within(Time.days(7));
Define the output
When a match is found, send a customer churn alert. The system flags the customer and triggers the implementation of churn reduction strategies, such as sending personalized coupons, recommending popular products, or offering free trials.
SingleOutputStreamOperator<Alert> churnPredictionAlerts = churnPredictionStream.select(new PatternSelectFunction<ClickEvent, Alert>() {
@Override
public Alert select(Map<String, List<ClickEvent>> pattern) {
ClickEvent firstView = pattern.get("first_view").get(0);
var message = "Churn risk detected for user " + firstView.getUserId() +
": viewed multiple products over the week without making a purchase.";
// Return a customer churn alert.
return new Alert(firstView.getUserSession(), firstView.getUserId(), AlertType.CHURN_RISK, message);
}
}).name("ChurnPredictionAlertsPattern").uid("ChurnPredictionAlertsPattern");
Prerequisites
Realtime Compute for Apache Flink is activated. For more information, see Activate Realtime Compute for Apache Flink.
An ApsaraMQ for Kafka instance is created. For more information, see Step 2: Purchase and deploy an instance.
An ApsaraDB RDS for MySQL instance is created. For more information, see Create an ApsaraDB RDS for MySQL instance.
The Realtime Compute for Apache Flink workspace, ApsaraDB RDS for MySQL instance, and ApsaraMQ for Kafka instance reside in the same virtual private cloud (VPC). If they do not reside in the same VPC, you must establish connections between the VPCs or enable Realtime Compute for Apache Flink to access other services over the Internet. For more information, see How does Realtime Compute for Apache Flink access a service across VPCs? and How does Realtime Compute for Apache Flink access the Internet?
The RAM user or RAM role has the required permissions to access relevant resources.
Step 1: Make preparations
Create a ApsaraDB RDS for MySQL instance and prepare a MySQL CDC data source
Create a database in the ApsaraDB RDS for MySQL instance. For more information, see Create a database.
A database named
ecommerce
is created.Prepare a MySQL CDC data source.
In the upper-right corner of the instance's details page, click Log On to Database.
In the Log on to Database Instance dialog box, configure the Database Account and Database Password parameters and click Login.
After logging on, double-click the
ecommerce
database in the left-side navigation pane.In the SQL editor, copy the DDL statements to create business tables and insert data into the tables.
CREATE TABLE `click_stream` ( id bigint not null primary key auto_increment, -- Auto-increment primary key. eventTime timestamp, eventType varchar(50), productId varchar(50), categoryId varchar(50), categoryCode varchar(80), brand varchar(50), price decimal(10, 2), userId varchar(50), userSession varchar(50) );
Click Execute(F8). In the panel that appears, click Execute.
Create a Kafka topic and a consumer group
Create the following Kafka resources:
Topics:
clickstream
andalerts
.Group:
clickstream.consumer
.
For more information, see Step 3: Create resources.
Step 2: Sync clickstreams from MySQL to Kafka
To minimize MySQL load caused by Flink's parallel subtasks, use Kafka as a buffer between the two services.
mysql-catalog
is created. Its default database isecommerce
.In the left-side navigation pane of the Realtime Compute for Apache Flink development console, choose
. On the page that appears, create an SQL streaming draft and copy the following snippet to the SQL editor:CREATE TEMPORARY TABLE `clickstream` ( `key_id` BIGINT, `value_eventTime` BIGINT, `value_eventType` STRING, `value_productId` STRING, `value_categoryId` STRING, `value_categoryCode` STRING, `value_brand` STRING, `value_price` DECIMAL(10, 2), `value_userId` STRING, `value_userSession` STRING, -- Specify the primary key. PRIMARY KEY (`key_id`) NOT ENFORCED, ts AS TO_TIMESTAMP_LTZ(value_eventTime, 3), WATERMARK FOR ts AS ts-INTERVAL '2' SECOND -- Define the watermark strategy. ) WITH ( 'connector'='upsert-kafka', 'topic' = 'clickstream', 'properties.bootstrap.servers' = '<The endpoint of your Kafka cluster>', 'key.format' = 'json', 'value.format' = 'json', 'key.fields-prefix' = 'key_', 'value.fields-prefix' = 'value_', 'value.fields-include' = 'EXCEPT_KEY' ); INSERT INTO `clickstream` SELECT id, UNIX_TIMESTAMP(eventTime) * 1000 as eventTime, -- Convert to milliseconds. eventType, productId, categoryId, categoryCode, brand, price, `userId`, userSession FROM `mysql-catalog`.`ecommerce`.`click_stream`;
In the upper-right corner of the SQL editor, click Deploy.
In the left-side navigation pane, choose
. On the Deployments page, find the target deployment, and click Start in the Actions column. In the Start Job panel that appears, select Initial Mode, then click Start.
Step 3: Create and start a deployment
This example creates a JAR deployment from the ecommerce-cep-demos-0.1.0.jar file. The corresponding job consumes clickstreams from Kafka, generates alerts, and writes alerts to Kafka. Choose a downstream connector and modify business code as needed.
Create a JAR deployment.
In the left-side navigation pane of the development console, choose
. On the Deployments page, choose in the upper-left corner.Configure the deployment:
Parameters
Description
Example
Deployment Model
Choose stream mode.
Stream Mode
Deployment Name
Enter the deployment name.
EcommerceCEPRunner
Engine Version
As the example CEP program is built using JDK 11, select the latest VVR version that explicitly includes
jdk11
in its name.vvr-8.0.11-jdk11-flink-1.17
JAR URI
Click the
icon and select the
ecommerce-cep-demos-0.1.0.jar
file.oss://xxx/artifacts/namespaces/xxx/ecommerce-cep-demos-0.1.0.jar
Entry Point Class
Specify the entry point class of the application.
com.ververica.cep.EcommerceCEPRunner
Entry Point Main Arguments
You can pass arguments and use them in the main method.
Configure the following arguments:
bootstrap.servers
: the endpoint of your Kafka cluster.clickstream_topic
: the Kafka topic that receives clickstreams.group
: the ID of the consumer group.alerts_topic
: the Kafka topic that receives alerts.
--bootstrap.servers <The endpoint of your Kafka cluster>
--clickstream_topic clickstream
--group clickstream.consumer
--alerts_topic alerts
For more information, see Create a JAR deployment.
Start the deployment.
Find the target deployment and click Start in the Actions column. In the Start Job panel that appears, click Initial Mode, then click Start.
For more information, see Start a deployment.
Step 4: Query alerts
After alerts are sent to ApsaraMQ for Kafka, you can query and analyze the alert messages. You can also use Realtime Compute for Apache Flink to analyze and process the alert messages, automating the end-to-end, real-time pipeline.
Log on to the ApsaraMQ for Kafka console.
In the Resource Distribution section of the Overview page, select the region where the ApsaraMQ for Kafka instance that you want to manage resides.
On the Instances page, click the name of the instance that you want to manage.
In the left-side navigation pane, click Message Query.
On the Message Query page, select Search by offset from the Search Method drop-down list.
Select the target topic from the Topic drop-down list. Select the partition that holds alert messages from the Partition drop-down list, and enter the partition's offset in the Offset field. Then, click Search.