All Products
Search
Document Center

Realtime Compute for Apache Flink:Build a real-time early warning system for e-commerce marketing with Flink CEP

Last Updated:Apr 07, 2025

FlinkCEP is a Complex Event Processing (CEP) library that lets you detect pre-defined patterns in a complex event stream and trigger alerts. In e-commerce marketing, Flink CEP monitors user behaviors and transactions in real time, detecting unexpected or critical events and issuing timely alerts.

Background information

In the ever-flourishing e-commerce industry, data about user behaviors and transactions is growing exponentially. E-commerce platforms need to closely monitor user behaviors, transactions, and marketing effectiveness to promptly handle unexpected issues. Use cases:

  • Detecting unusual behaviors: For example, if a user added many items to their cart but did not make any purchases, it may signal a brushing scam or a system failure.

  • Monitoring marketing events: During big sales events, real-time monitoring enables e-commerce platforms to quickly address issues like traffic surges that can overload the system or lead to stockouts.

  • Alerting customer churn: If a user viewed a product page many times without making a purchase, this may indicate a pricing or user experience issue.

In these scenarios, a stream processing system is a better fit than batch processing, as it enables e-commerce platforms to perform analysis in real time and take prompt actions.

FlinkCEP Architecture

FlinkCEP is an Apache Flink library for processing complex events. The figure below demonstrates how it works.

2

  1. Event Stream An event stream consists of continuous, time-ordered events that move through CEP. Each event has multiple properties, which CEP looks at during pattern matching.

  2. Pattern and Rule Definitions

    Patterns and rules define a sequence or combination of events that users want the system to detect. When defining a pattern, you can specify the sequence of events, temporal constraints, and conditions that must be met to accept the events. For example, you can define a pattern as: Event A is followed by Event B, the two occurring within 10 seconds.

  3. CEP Engine Analysis The CEP engine receives an event stream and keeps analyzing the input stream to find the defined pattern.

  4. CEP Matching Outputs Upon finding a match, the CEP engine generates an output that includes the matched event sequence, actions to trigger as defined in rules, or other information as required by users. The output can then be used to issue alerts or support decision making, or it can be stored for future reference.

In summary, Flink CEP monitors event streams in real time, matches event streams against the pre-defined patterns, and generates outputs. It is ideal for real-time monitoring, anomaly detection, and risk control.

Use cases in e-commerce

In e-commerce, user behavior data is crucial for optimizing marketing strategies and driving business growth. By analyzing user behaviors such as page views, add-to-carts, and checkouts e-commerce platforms can identify potential opportunities and risks, so that they can take appropriate interventions. Here are some typical use cases of Flink CEP. You'll learn how data-driven strategies can empower targeted marketing to enhance customer conversion and transaction amounts per customer, improve retention, and bring in greater revenue.

Cross-selling and upselling

If customers visit different but related products, such as phones, headphones, and power banks, in quick succession, this is an opportunity for cross-selling and upselling. By recommending complementary products (phone cases, earbuds, etc.) or providing bundle discounts (such as USD 10 off when buying a phone and earbuds together), e-commerce platforms can enhance cross-selling chances, increase order value, improve user experience, foster buyer loyalty, and ultimately drive business growth.

High-value abandoned cart recovery

After adding an expensive product to their cart, a buyer may not complete the purchase due to dissatisfaction about pricing or other factors, resulting in losses of potential sales. By identifying cart abandonment and implementing timely interventions, such as offering limited-time discounts, sending out-of-stock alert messages, or providing free shipping, e-commerce platforms can effectively reduce high-value cart abandonment, recover losses of potential sales, and increase buyer value and platform revenue.

High-intent buyer identification

A high-intent buyer repeatedly views a specific product, like a pair of shoes or an electronic device, within a short time. By detecting high-intent buyers and tailoring marketing strategies, e-commerce platforms can nudge them into making purchases and improve buyer conversion, user satisfaction and experience, driving sales growth.

Price-sensitive customer identification

Price-sensitive buyers may repeatedly view a product but not add it to their cart until it becomes cheaper. After identifying such buyers, e-commerce platforms can proactively send notifications of price drops or offer targeted discounts, thereby increasing the sales conversion. They can also optimize customer operations through personalized pricing and promotions.

Churn prevention

Customers are at risk of churning if they repeatedly view products but do not make any purchases for a relatively long period, such as a week. E-commerce platforms can identify this behavior and take preventive actions, like giving coupons or recommending popular items, to engage the customers longer and increase retention and revenue.

Develop Flink CEP programs

Note

Cross-selling and upselling

Define the pattern

The customer views different products within five minutes in a session. For example, first_view -> second_view.

Pattern<ClickEvent, ?> crossSellPattern = Pattern.<ClickEvent>begin("first_view")
         // The pattern begins with a "view" ClickEvent.
        .where(new SimpleCondition<ClickEvent>() {
            @Override
            public boolean filter(ClickEvent event) {
                return event.getEventType().equals("view");
            }
        })
        // It is followed immediately by another "view" ClickEvent.
        .next("second_view")
        .where(new SimpleCondition<ClickEvent>() {
            @Override
            public boolean filter(ClickEvent event) {
                return event.getEventType().equals("view");
            }
        })
        // The ClickEvents must occur within five minutes.
        .within(Time.minutes(5));

Define the output

After a match is found, send a cross-selling or upselling alert. After receiving the alert, the system will automatically recommend complementary products (phone cases, earbuds, power banks, etc.) or offer bundle discounts (such as USD 10 off when buying a phone and a pair of earbuds together).

SingleOutputStreamOperator<Alert> crossSellAlerts = crossSellPatternStream.select(new PatternSelectFunction<ClickEvent, Alert>() {
    @Override
    public Alert select(Map<String, List<ClickEvent>> pattern) {
        ClickEvent firstView = pattern.get("first_view").get(0);
        ClickEvent secondView = pattern.get("second_view").get(0);

        // Check whether the products viewed by the customer are in different categories.
        if (!firstView.getCategoryCode().equals(secondView.getCategoryCode())) {
            var message = "Cross-sell opportunity detected for user " + firstView.getUserId() +
                    ": viewed products in categories " + firstView.getCategoryCode() + " and " + secondView.getCategoryCode();
            // Return an alert for cross-selling and upselling opportunities.
            return new Alert(secondView.getUserSession(), secondView.getUserId(), AlertType.CROSS_UPSELL, message);
        }
        return null; // Returns null if the product categories are the same, so that this match can be easily filtered out.
    }
}).filter(Objects::nonNull).name("CrossSellAlertsPattern").uid("CrossSellAlertsPattern");

High-value abandoned cart recovery

Define the pattern

A customer adds an expensive product to the cart (cart_add) but does not complete the purchase (purchase) within 10 minutes.

Pattern<ClickEvent, ?> abandonmentPattern = Pattern.<ClickEvent>begin("cart_add")
        // The pattern begins with a "cart" ClickEvent with a price greater than 200.
        .where(new SimpleCondition<ClickEvent>() {
            @Override
            public boolean filter(ClickEvent event) {
                return event.getEventType().equals("cart") && event.getPrice() > 200;
            }
        })
        // It is followed by a "purchase" ClickEvent.
        .followedBy("purchase")
        .where(new SimpleCondition<ClickEvent>() {
            @Override
            public boolean filter(ClickEvent event) {
                return event.getEventType().equals("purchase");
            }
        })
        // The ClickEvents must occur within ten minutes.
        .within(Time.minutes(10));

Define the output

If a purchase does not occur within a pre-defined timeframe, send a cart recovery alert. This triggers the system to send messages or emails about limited-time offers, free shipping, or stockout alerts, to recover abandoned carts.

SingleOutputStreamOperator<Alert> abandonedmentAlert = abandonmentPatternStream
        .select(new PatternTimeoutFunction<ClickEvent, Alert>() {
            @Override
            // Process timeout events.
            public Alert timeout(Map<String, List<ClickEvent>> pattern, long timeoutTimestamp) {
                ClickEvent cartAdd = pattern.get("cart_add").get(0);

                var message = "High-value cart abandonment detected for user '" + cartAdd.getUserId() + "' priced at " + cartAdd.getPrice() +
                        ". No purchase within 10 minutes.";
                // Return a cart abandonment alert.
                return new Alert(cartAdd.getUserSession(), cartAdd.getUserId(), AlertType.CART_ABANDONMENT, message);
            }
        }, new PatternSelectFunction<ClickEvent, Alert>() {
            @Override
             // Process a match.
            public Alert select(Map<String, List<ClickEvent>> pattern) {
                ClickEvent cartAdd = pattern.get("cart_add").get(0);
                ClickEvent purchase = pattern.get("purchase").get(0);

                var message = "Purchase completed for user " + purchase.getUserId() + " on product " + purchase.getProductId() +
                        " priced at " + purchase.getPrice();
                // Return a match alert.
                return new Alert(cartAdd.getUserSession(), cartAdd.getUserId(), AlertType.PURCHASE_COMPLETION, message);
            }
        }).map(new MapFunction<Either<Alert, Alert>, Alert>() {
            @Override
            public Alert map(Either<Alert, Alert> alert) throws Exception {
                if (alert.isLeft()) {
                    // If there is a timeout alert, return the left value.
                    return alert.left();
                } else {
                    // If there is a match alert, return the right value.
                    return alert.right();
                }
            }
        }).name("AbandonmentAlertsPattern").uid("AbandonmentAlertsPattern");

High-intent buyer identification

Define the pattern

The customer repeatedly views a product within 15 minutes. For example, initial_view -> repeat_view.

Pattern<ClickEvent, ?> purchaseIntentPattern = Pattern.<ClickEvent>begin("initial_view")
        // The pattern begins with a "view" ClickEvent.
        .where(new SimpleCondition<ClickEvent>() {
            @Override
            public boolean filter(ClickEvent event) {
                return event.getEventType().equals("view");
            }
        })
        // It is followed by three "view" ClickEvents.
        .followedBy("repeat_view")
        .where(new SimpleCondition<ClickEvent>() {
            @Override
            public boolean filter(ClickEvent event) {
                return event.getEventType().equals("view");
            }
        }).times(3)
        // The ClickEvents must occur within 15 minutes.
        .within(Time.minutes(15));

Define the output

When a match is identified, send a high-intent buyer alert. The system can then flag the customer as a high-intent buyer and initiate targeted marketing actions, such as offering personalized coupons, providing limited-time discounts, or sending out-of-stock notifications (such as "Only three items left!")

SingleOutputStreamOperator<Alert> purchaseIntentAlerts = purchaseIntentStream.select(new PatternSelectFunction<ClickEvent, Alert>() {
    @Override
    public Alert select(Map<String, List<ClickEvent>> pattern) {
        ClickEvent initialView = pattern.get("initial_view").get(0);
        var message = "High purchase intent detected for user " + initialView.getUserId() +
                " on product " + initialView.getProductId();
        // Return an alert for high-intent buyer identification.
        return new Alert(initialView.getUserSession(), initialView.getUserId(), AlertType.PRICE_SENSITIVITY, message);
    }
}).name("PurchaseIntentAlertsPattern").uid("PurchaseIntentAlertsPattern");

Price-sensitive customer identification

Define the pattern

The customer views a product but does not add it to their cart until the price drops. For example, initial_view -> view_price_drop -> cart_after_price_drop.

Pattern<ClickEvent, ?> priceSensitivityPattern = Pattern.<ClickEvent>begin("initial_view")
        // The pattern begins with a "view" ClickEvent.
        .where(new SimpleCondition<ClickEvent>() {
            @Override
            public boolean filter(ClickEvent event) {
                return event.getEventType().equals("view");
            }
        })
        // It is immediately followed by another "view" ClickEvent of a product at a lower price.
        .next("view_price_drop")
        .where(new IterativeCondition<ClickEvent>() {
            @Override
            public boolean filter(ClickEvent event, Context<ClickEvent> ctx) throws Exception {
                ClickEvent initialView = ctx.getEventsForPattern("initial_view").iterator().next();
                return event.getEventType().equals("view") && event.getProductId().equals(initialView.getProductId()) && event.getPrice() < initialView.getPrice();
            }
        })
        // Next is a "cart" ClickEvent.
        .next("cart_after_price_drop")
        .where(new SimpleCondition<ClickEvent>() {
            @Override
            public boolean filter(ClickEvent event) {
                return event.getEventType().equals("cart");
            }
        })
        // The ClickEvents must occur within ten minutes.
        .within(Time.minutes(10));

Define the output

When a match is found, send a price-sensitive buyer identification alert. The system can flag the customer as price-sensitive and proactively send notifications about price drops or provide personalized offers.

SingleOutputStreamOperator<Alert> priceSensitivityAlerts = priceSensitivityStream.select(new PatternSelectFunction<ClickEvent, Alert>() {
    @Override
    public Alert select(Map<String, List<ClickEvent>> pattern) {
        ClickEvent initialView = pattern.get("initial_view").get(0);
        var message = "Price-sensitive customer detected for user " + initialView.getUserId() +
                " on product " + initialView.getProductId() + " after a price drop.";
        // Return an alert for price-sensitive customer identification.
        return new Alert(initialView.getUserSession(), initialView.getUserId(), AlertType.PRICE_SENSITIVITY, message);
    }
}).name("PriceSensitivityAlertsPattern").uid("PriceSensitivityAlertsPattern");

Churn prevention

Define the pattern

The customer views a product repeatedly within one week, but never proceeds to checkout (such as first_view -> ... -> no_purchase.)

Pattern<ClickEvent, ?> churnPredictionPattern = Pattern.<ClickEvent>begin("first_view")
        // The pattern begins with a "view" ClickEvent.
        .where(new SimpleCondition<ClickEvent>() {
            @Override
            public boolean filter(ClickEvent event) {
                return event.getEventType().equals("view");
            }
        })
         // Customer views exceed 10.
        .timesOrMore(10)  
        // A purchase event does not occur within the time window.
        .notNext("purchase")
        .where(new SimpleCondition<ClickEvent>() {
            @Override
            public boolean filter(ClickEvent event) {
                return event.getEventType().equals("purchase");
            }
        })
        // The pattern must be completed within seven days.
        .within(Time.days(7));

Define the output

When a match is found, send a customer churn alert. The system flags the customer and triggers the implementation of churn reduction strategies, such as sending personalized coupons, recommending popular products, or offering free trials.

SingleOutputStreamOperator<Alert> churnPredictionAlerts = churnPredictionStream.select(new PatternSelectFunction<ClickEvent, Alert>() {
    @Override
    public Alert select(Map<String, List<ClickEvent>> pattern) {
        ClickEvent firstView = pattern.get("first_view").get(0);
        var message = "Churn risk detected for user " + firstView.getUserId() +
                ": viewed multiple products over the week without making a purchase.";
        // Return a customer churn alert.
        return new Alert(firstView.getUserSession(), firstView.getUserId(), AlertType.CHURN_RISK, message);
    }
}).name("ChurnPredictionAlertsPattern").uid("ChurnPredictionAlertsPattern");

Prerequisites

Step 1: Make preparations

Create a ApsaraDB RDS for MySQL instance and prepare a MySQL CDC data source

  1. Create a database in the ApsaraDB RDS for MySQL instance. For more information, see Create a database.

    A database named ecommerce is created.

  2. Prepare a MySQL CDC data source.

    1. In the upper-right corner of the instance's details page, click Log On to Database.

    2. In the Log on to Database Instance dialog box, configure the Database Account and Database Password parameters and click Login.

    3. After logging on, double-click the ecommerce database in the left-side navigation pane.

    4. In the SQL editor, copy the DDL statements to create business tables and insert data into the tables.

      CREATE TABLE `click_stream` (
        id bigint not null primary key auto_increment, -- Auto-increment primary key.
        eventTime timestamp,  
        eventType varchar(50),
        productId varchar(50), 
        categoryId varchar(50),  
        categoryCode varchar(80),
        brand varchar(50),
        price decimal(10, 2),
        userId varchar(50), 
        userSession varchar(50)
      );

      Insert data

      -- Test data for cross-selling and upselling scenarios.
      INSERT INTO `click_stream` 
      (eventTime, eventType, productId, categoryId, categoryCode, brand, price, userId, userSession) 
      VALUES
      ('2020-01-01 00:01:00.0', 'view', 1005073, 2232732093077520756, 'construction.tools.light', 'samsung', 1130.02, 519698804, '69b5d72f-fd6e-4fed-aa23-1286b2ca89a0'),
      ('2020-01-01 00:01:03.0', 'view', 1005205, 2232732093077520756, 'apparel.shoes', 'xiaomi', 29.95, 519698804, '69b5d72f-fd6e-4fed-aa23-1286b2ca89a0'),
      ('2020-01-01 00:01:07.0', 'view', 1005205, 2232732093077520756, 'apparel.shoes.step_ins', 'intel', 167.20, 519698804, '69b5d72f-fd6e-4fed-aa23-1286b2ca89a0'),
      ('2020-01-01 00:01:08.0', 'view', 1005205, 2232732093077520756, 'appliances.personal.massager', 'samsung', 576.33, 519698804, '69b5d72f-fd6e-4fed-aa23-1286b2ca89a0');
      
      -- Test data for high-value abandoned cart recovery scenarios.
      INSERT INTO `click_stream` 
      (eventTime, eventType, productId, categoryId, categoryCode, brand, price, userId, userSession) 
      VALUES
      ('2020-01-01 01:01:01.0','cart',1002923,2053013555631882655,'electronics.smartphone','huawei',249.30,517014550,'b666b914-8abf-4ebe-b674-aa31a1d0f7ce'),
      ('2020-01-01 01:11:02.0','cart',1004227,2232732093077520756,'construction.tools.light','apple',892.94,590404850,'057050ba-adca-4c7f-99f3-e87d8f9bbded'),
      ('2020-01-01 01:11:03.0','purchase',1004227,2232732093077520756,'construction.tools.light','apple',892.94,590404850,'057050ba-adca-4c7f-99f3-e87d8f9bbded');
      
      -- Test data for high-intent user identification scenarios.
      INSERT INTO `click_stream` 
      (eventTime, eventType, productId, categoryId, categoryCode, brand, price, userId, userSession) 
      VALUES
      ('2020-01-01 02:01:01.0','view',21406322,2232732082063278200,'electronics.clocks','casio',81.03,578858757,'bdf051a8-1594-4630-b93d-2ba62b92d039'),
      ('2020-01-01 02:01:02.0','view',21406322,2232732082063278200,'electronics.clocks','casio',81.03,578858757,'bdf051a8-1594-4630-b93d-2ba62b92d039'),
      ('2020-01-01 02:01:03.0','view',21406322,2232732082063278200,'electronics.clocks','casio',81.03,578858757,'bdf051a8-1594-4630-b93d-2ba62b92d039'),
      ('2020-01-01 02:01:04.0','view',21406322,2232732082063278200,'electronics.clocks','casio',81.03,578858757,'bdf051a8-1594-4630-b93d-2ba62b92d039');
      
      -- Test data for price-sensitive buyer identification.
      INSERT INTO `click_stream` 
      (eventTime, eventType, productId, categoryId, categoryCode, brand, price, userId, userSession) 
      VALUES
      ('2020-01-01 03:01:01.0','view',15200496,2053013553484398879,'appliances.kitchen.toster','uragan',38.87,516616354,'f9233034-08e7-46fb-a1ae-175de0d0de7a'),
      ('2020-01-01 03:01:02.0','view',15200496,2053013553484398879,'appliances.kitchen.toster','uragan',30.87,516616354,'f9233034-08e7-46fb-a1ae-175de0d0de7a'),
      ('2020-01-01 03:01:03.0','cart',15200496,2053013553484398879,'appliances.kitchen.toster','uragan',30.87,516616354,'f9233034-08e7-46fb-a1ae-175de0d0de7a');
      
      -- Test data for customer churn prevention. 
      INSERT INTO `click_stream` 
      (eventTime, eventType, productId, categoryId, categoryCode, brand, price, userId, userSession) 
      VALUES
      ('2020-12-10 00:01:01.0', 'view', 28722181, 2053013555321504139, 'appliances.kitchen.meat_grinder', 'respect', 51.22, 563209914, '657edcb2-767d-458c-9598-714c1e474e09'),
      ('2020-12-10 00:02:02.0', 'view', 28722181, 2053013555321504139, 'appliances.kitchen.meat_grinder', 'respect', 51.22, 563209914, '657edcb2-767d-458c-9598-714c1e474e09'),
      ('2020-12-10 00:03:03.0', 'view', 28722181, 2053013555321504139, 'appliances.kitchen.meat_grinder', 'respect', 51.22, 563209914, '657edcb2-767d-458c-9598-714c1e474e09'),
      ('2020-12-10 00:04:03.0', 'view', 28722181, 2053013555321504139, 'appliances.kitchen.meat_grinder', 'respect', 51.22, 563209914, '657edcb2-767d-458c-9598-714c1e474e09'),
      ('2020-12-10 00:05:04.0', 'view', 28722181, 2053013555321504139, 'appliances.kitchen.meat_grinder', 'respect', 51.22, 563209914, '657edcb2-767d-458c-9598-714c1e474e09'),
      ('2020-12-10 00:06:05.0', 'view', 28722181, 2053013555321504139, 'appliances.kitchen.meat_grinder', 'respect', 51.22, 563209914, '657edcb2-767d-458c-9598-714c1e474e09'),
      ('2020-12-10 00:07:06.0', 'view', 28722181, 2053013555321504139, 'appliances.kitchen.meat_grinder', 'respect', 51.22, 563209914, '657edcb2-767d-458c-9598-714c1e474e09'),
      ('2020-12-10 00:08:07.0', 'view', 28722181, 2053013555321504139, 'appliances.kitchen.meat_grinder', 'respect', 51.22, 563209914, '657edcb2-767d-458c-9598-714c1e474e09'),
      ('2020-12-10 00:08:08.0', 'view', 28722181, 2053013555321504139, 'appliances.kitchen.meat_grinder', 'respect', 51.22, 563209914, '657edcb2-767d-458c-9598-714c1e474e09'),
      ('2020-12-10 00:08:08.0', 'view', 28722181, 2053013555321504139, 'appliances.kitchen.meat_grinder', 'respect', 51.22, 563209914, '657edcb2-767d-458c-9598-714c1e474e09'),
      ('2020-12-10 00:09:09.0', 'cart', 28722181, 2053013555321504139, 'appliances.kitchen.meat_grinder', 'respect', 51.22, 563209914, '657edcb2-767d-458c-9598-714c1e474e09');
  3. Click Execute(F8). In the panel that appears, click Execute.

Create a Kafka topic and a consumer group

Create the following Kafka resources:

  • Topics: clickstream and alerts.

  • Group: clickstream.consumer.

For more information, see Step 3: Create resources.

Step 2: Sync clickstreams from MySQL to Kafka

To minimize MySQL load caused by Flink's parallel subtasks, use Kafka as a buffer between the two services.

  1. Create a MySQL catalog.

    mysql-catalog is created. Its default database is ecommerce.

  2. In the left-side navigation pane of the Realtime Compute for Apache Flink development console, choose Development > ETL. On the page that appears, create an SQL streaming draft and copy the following snippet to the SQL editor:

    CREATE TEMPORARY TABLE `clickstream` (
      `key_id` BIGINT,
      `value_eventTime` BIGINT,  
      `value_eventType` STRING,
      `value_productId` STRING,
      `value_categoryId` STRING,
      `value_categoryCode` STRING,
      `value_brand` STRING,
      `value_price` DECIMAL(10, 2),
      `value_userId` STRING,
      `value_userSession` STRING,
      -- Specify the primary key.
      PRIMARY KEY (`key_id`) NOT ENFORCED,
      ts AS TO_TIMESTAMP_LTZ(value_eventTime, 3),
      WATERMARK FOR ts AS ts-INTERVAL '2' SECOND -- Define the watermark strategy. 
    ) WITH (
      'connector'='upsert-kafka',
      'topic' = 'clickstream',
      'properties.bootstrap.servers' = '<The endpoint of your Kafka cluster>',
      'key.format' = 'json',
      'value.format' = 'json',
      'key.fields-prefix' = 'key_',
      'value.fields-prefix' = 'value_',
      'value.fields-include' = 'EXCEPT_KEY'
    );
    
    INSERT INTO `clickstream`
    SELECT
      id,
      UNIX_TIMESTAMP(eventTime) * 1000 as eventTime, -- Convert to milliseconds.
      eventType,
      productId,
      categoryId,
      categoryCode,
      brand,
      price,
      `userId`,
      userSession
    FROM `mysql-catalog`.`ecommerce`.`click_stream`;
  3. In the upper-right corner of the SQL editor, click Deploy.

  4. In the left-side navigation pane, choose O&M > Deployments. On the Deployments page, find the target deployment, and click Start in the Actions column. In the Start Job panel that appears, select Initial Mode, then click Start.

Step 3: Create and start a deployment

This example creates a JAR deployment from the ecommerce-cep-demos-0.1.0.jar file. The corresponding job consumes clickstreams from Kafka, generates alerts, and writes alerts to Kafka. Choose a downstream connector and modify business code as needed.

  1. Create a JAR deployment.

    In the left-side navigation pane of the development console, choose O&M > Deployments. On the Deployments page, choose Create Deployment > JAR Deployment in the upper-left corner.

    image

    Configure the deployment:

    Parameters

    Description

    Example

    Deployment Model

    Choose stream mode.

    Stream Mode

    Deployment Name

    Enter the deployment name.

    EcommerceCEPRunner

    Engine Version

    As the example CEP program is built using JDK 11, select the latest VVR version that explicitly includes jdk11 in its name.

    vvr-8.0.11-jdk11-flink-1.17

    JAR URI

    Click the 上传 icon and select the ecommerce-cep-demos-0.1.0.jar file.

    oss://xxx/artifacts/namespaces/xxx/ecommerce-cep-demos-0.1.0.jar

    Entry Point Class

    Specify the entry point class of the application.

    com.ververica.cep.EcommerceCEPRunner

    Entry Point Main Arguments

    You can pass arguments and use them in the main method.

    Configure the following arguments:

    • bootstrap.servers: the endpoint of your Kafka cluster.

    • clickstream_topic: the Kafka topic that receives clickstreams.

    • group: the ID of the consumer group.

    • alerts_topic: the Kafka topic that receives alerts.

    --bootstrap.servers <The endpoint of your Kafka cluster>

    --clickstream_topic clickstream

    --group clickstream.consumer

    --alerts_topic alerts

    For more information, see Create a JAR deployment.

  2. Start the deployment.

    Find the target deployment and click Start in the Actions column. In the Start Job panel that appears, click Initial Mode, then click Start.

    For more information, see Start a deployment.

Step 4: Query alerts

After alerts are sent to ApsaraMQ for Kafka, you can query and analyze the alert messages. You can also use Realtime Compute for Apache Flink to analyze and process the alert messages, automating the end-to-end, real-time pipeline.

  1. Log on to the ApsaraMQ for Kafka console.

  2. In the Resource Distribution section of the Overview page, select the region where the ApsaraMQ for Kafka instance that you want to manage resides.

  3. On the Instances page, click the name of the instance that you want to manage.

  4. In the left-side navigation pane, click Message Query.

  5. On the Message Query page, select Search by offset from the Search Method drop-down list.

  6. Select the target topic from the Topic drop-down list. Select the partition that holds alert messages from the Partition drop-down list, and enter the partition's offset in the Offset field. Then, click Search.

    image

References