All Products
Search
Document Center

Realtime Compute for Apache Flink:Dynamic Flink CEP

Last Updated:Jun 21, 2026

Realtime Compute for Apache Flink supports Flink CEP tasks with dynamically updated rules in DataStream jobs. This article uses a real-time marketing scenario to demonstrate how to build a Flink CEP job that dynamically loads rules to process data from an upstream Kafka topic.

Use cases

Flink CEP is well-suited for a wide range of applications, thanks to its distributed architecture, millisecond-level latency, and powerful rule expression capabilities. Three typical scenarios are:

  • Real-time risk control: Flink CEP can be used to identify at-risk users. For example, it can analyze customer behavior logs to flag users who make more than 10 transfers totaling over 10,000 within 5 minutes.

  • Real-time marketing: Flink CEP can optimize marketing strategies. For example, by analyzing user behavior logs during an e-commerce promotion, you can identify users who add more than three items to their cart within 10 minutes but do not complete the purchase, enabling targeted marketing adjustments. Flink CEP is also effective in anti-fraud scenarios for real-time marketing.

  • Internet of Things (IoT): Flink CEP can detect abnormal states and send alerts. For example, it can issue a risk alert if a shared bike leaves a designated area and does not return within 15 minutes. It can also be combined with IoT sensors to detect production line anomalies. For instance, if a temperature sensor continuously reports temperatures exceeding a set threshold over three consecutive time periods, an alert is triggered.

Example walkthrough

This article demonstrates how to use dynamic CEP to address these scenarios. In this example, customer behavior logs are stored in ApsaraMQ for Kafka. The Flink CEP job consumes this data while polling a rule table in an ApsaraDB RDS for MySQL database. It fetches the latest rules added by a policy administrator and uses them to match events. When a match occurs, the job sends an alert or writes the relevant information to another data store. The following figure shows the overall data pipeline.

This walkthrough first starts the Flink CEP job and then inserts Rule 1, which matches a sequence where three consecutive events with an action of 0 are followed by an event where the action is not 1. This signifies a user viewing a product three times without making a purchase.

Prerequisites

Procedure

This article describes how to write and dynamically update a Flink CEP job that monitors and records users whose behavior logs match specific rules.

Step 1: Prepare test data

Prepare the upstream Kafka topic

  1. Log on to the ApsaraMQ for Kafka console.

  2. Create a topic named demo_topic to store simulated user behavior logs.

    For more information, see Step 1: Create a topic.

Prepare the RDS database

In the Data Management (DMS) console, prepare the test data for ApsaraDB RDS for MySQL.

  1. Log on to the ApsaraDB RDS for MySQL instance with a privileged account.

    For more information, see Log on to an ApsaraDB RDS for MySQL instance by using DMS.

  2. Create the rds_demo rule table to store the rules for the Flink CEP job. Create the match_results table to store the data that matches the rules.

    In the active SQLConsole window, enter the following commands and click Execute.

    CREATE DATABASE cep_demo_db;
    USE cep_demo_db;
    CREATE TABLE rds_demo (
      `id` VARCHAR(64),
      `version` INT,
      `pattern` VARCHAR(4096),
      `function` VARCHAR(512)
    );
    CREATE TABLE match_results (
        rule_id INT,
        rule_version INT,
        user_id INT,
        user_name VARCHAR(255),
        production_id INT,
        PRIMARY KEY (rule_id,rule_version,user_id,production_id)
    );

    Each row in the rds_demo rule table represents a single rule. It includes an id and version to distinguish between different rules and their versions, a pattern field that describes the CEP API's pattern object, and a function field that describes how to process the event sequence that matches the pattern.

    Each row in the match_results table represents a match where a user's behavior for a specific product conforms to a certain rule. This record can be used to formulate corresponding sales strategies, such as sending coupons for related products.

Step 2: Configure IP whitelist

To allow the Flink job to access the ApsaraDB RDS for MySQL instance, add the CIDR block of the Realtime Compute for Apache Flink workspace to the instance's IP address whitelist.

  1. Obtain the VPC CIDR block of the Realtime Compute for Apache Flink workspace.

    1. Log on to the Realtime Compute for Apache Flink console.

    2. In the Actions column of the target workspace, choose More > Workspace details.

    3. In the Workspace details dialog box, view the CIDR block of the fully managed Flink vSwitch.

  2. Add the fully managed Flink CIDR block to the IP address whitelist of your ApsaraDB RDS for MySQL instance.

    For more information, see Configure an IP address whitelist. In the Modify whitelist dialog box, enter the Flink CIDR block in the IP addresses in whitelist text box. If you have multiple CIDR blocks, separate them with commas. Then, click OK.

Step 3: Develop and start CEP job

Note

All code for this article is available in our GitHub repository. For demonstration purposes, the sample code in this article is slightly modified on the timeOrMoreAndWindow branch. You can download the complete ververica-cep-demo-master.zip file for reference.

  1. Add flink-cep as a project dependency in the job's Maven POM file.

    For more information about handling other Flink-related JAR packages and resolving conflicts, see Configure Flink environment dependencies.

    <dependency>
        <groupId>com.alibaba.ververica</groupId>
        <artifactId>flink-cep</artifactId>
        <version>1.17-vvr-8.0.8</version>
        <scope>provided</scope>
    </dependency>
  2. Develop the job code.

    1. Build a Kafka Source.

      For details on writing the code, see Kafka DataStream Connector.

    2. Build the CEP.dynamicPatterns() API.

      To support dynamic rule changes and multi-rule matching for CEP, Realtime Compute for Apache Flink defines the CEP.dynamicPatterns() API. The API is defined as follows.

      public static <T, R> SingleOutputStreamOperator<R> dynamicPatterns(
               DataStream<T> input,
               PatternProcessorDiscovererFactory<T> discovererFactory,
               TimeBehaviour timeBehaviour,
               TypeInformation<R> outTypeInfo)

      The following table describes the parameters for this API. You can update the parameter values based on your actual use case.

      Parameter

      Description

      DataStream<T> input

      The input event stream.

      PatternProcessorDiscovererFactory<T> discovererFactory

      A factory that creates a PatternProcessorDiscoverer. This discoverer fetches the latest rules and builds the corresponding PatternProcessor instances.

      TimeBehaviour timeBehaviour

      Describes how the Flink CEP job handles time attributes of events. Valid values:

      • TimeBehaviour.ProcessingTime: Processes events based on processing time.

      • TimeBehaviour.EventTime: Processes events based on event time.

      TypeInformation<R> outTypeInfo

      Describes the type information of the output stream.

      For more information about common Flink concepts such as DataStream, TimeBehaviour, and TypeInformation, see DataStream API, Event Time and Processing Time, and TypeInformation.

      The PatternProcessor interface is a key component. A PatternProcessor contains a specific Pattern that describes how to match events, and a PatternProcessFunction that describes how to handle a match, such as sending an alert. It also includes an id and version to identify the PatternProcessor. For more background, see the proposal.

      The patternProcessorDiscovererFactory creates a discoverer to fetch the latest PatternProcessor. The sample code includes an abstract class that shows how to periodically poll an external store for new PatternProcessor instances.

      public abstract class PeriodicPatternProcessorDiscoverer<T>
              implements PatternProcessorDiscoverer<T> {
          ...
          @Override
          public void discoverPatternProcessorUpdates(
                  PatternProcessorManager<T> patternProcessorManager) {
              // Periodically discovers the pattern processor updates.
              timer.schedule(
                      new TimerTask() {
                          @Override
                          public void run() {
                              if (arePatternProcessorsUpdated()) {
                                  List<PatternProcessor<T>> patternProcessors = null;
                                  try {
                                      patternProcessors = getLatestPatternProcessors();
                                  } catch (Exception e) {
                                      e.printStackTrace();
                                  }
                                  patternProcessorManager.onPatternProcessorsUpdated(patternProcessors);
                              }
                          }
                      },
                      0,
                      intervalMillis);
          }
          ...
      }

      Realtime Compute for Apache Flink provides an implementation of JDBCPeriodicPatternProcessorDiscoverer to fetch the latest rules from a database that supports the JDBC protocol, such as ApsaraDB RDS for MySQL or Hologres. When using it, you need to specify the following parameters.

      Parameter

      Description

      jdbcUrl

      The JDBC URL of the database.

      jdbcDriver

      The class name of the database driver.

      tableName

      The name of the database table.

      initialPatternProcessors

      The default PatternProcessor to use when the rule table in the database is empty.

      intervalMillis

      The polling interval for the database, in milliseconds.

      In your code, you can use it as follows. The job will print the matched rules to the Flink TaskManager output.

      // import ......
      public class CepDemo {
          public static void main(String[] args) throws Exception {
              ......
              // DataStream Source
              DataStreamSource<Event> source =
                      env.fromSource(
                              kafkaSource,
                              WatermarkStrategy.<Event>forMonotonousTimestamps()
                                      .withTimestampAssigner((event, ts) -> event.getEventTime()),
                              "Kafka Source");
              env.setParallelism(1);
              // Key by userId and productionId.
              // Note: Only events with the same key will be processed to see if there is a match.
              KeyedStream<Event, Tuple2<Integer, Integer>> keyedStream =
                      source.assignTimestampsAndWatermarks(
                              WatermarkStrategy.<Event>forGenerator(ctx -> new EventBoundedOutOfOrdernessWatermarks(Duration.ofSeconds(5)))
                      ).keyBy(new KeySelector<Event, Tuple2<Integer, Integer>>() {
                          @Override
                          public Tuple2<Integer, Integer> getKey(Event value) throws Exception {
                              return Tuple2.of(value.getId(), value.getProductionId());
                          }
                      });
              SingleOutputStreamOperator<String> output =
                      CEP.dynamicPatterns(
                              keyedStream,
                              new JDBCPeriodicPatternProcessorDiscovererFactory<>(
                                      params.get(JDBC_URL_ARG),
                                      JDBC_DRIVE,
                                      params.get(TABLE_NAME_ARG),
                                      null,
                                      Long.parseLong(params.get(JDBC_INTERVAL_MILLIS_ARG))),
                              Boolean.parseBoolean(params.get(USING_EVENT_TIME)) ? TimeBehaviour.EventTime : TimeBehaviour.ProcessingTime,
                              TypeInformation.of(new TypeHint<String>() {}));
              output.print();
              // Compile and submit the job
              env.execute("CEPDemo");
          }
      }
      Note

      For demonstration purposes, the demo code keys the input data stream by id and product_id before connecting it to CEP.dynamicPatterns(). This means that only events with the same id and product_id are considered for rule matching. Events with different keys will not be matched against each other.

  3. In the Realtime Compute for Apache Flink console, upload the JAR file and deploy the JAR job. For more information, see Deploy a job.

    To help you get started quickly, you can download the test cep-demo.jar file. The following table describes the parameters to configure during deployment.

    Note

    Because the upstream Kafka source is empty and the database rule table contains no data, the job will not produce any output after starting.

    Parameter

    Description

    Deployment mode

    Select Stream Mode.

    Deployment name

    Enter a name for the JAR job.

    Engine version

    For more information about engine versions, see Engine versions and Lifecycle policies. We recommend using a recommended or stable version. The version tags are described as follows:

    • Recommended version: The latest minor version of the current major version.

    • Stable version: The latest minor version of a major version that is still within its product support period and has historical defects fixed.

    • Normal version: Other minor versions that are still within their product support period.

    • EOS version: A version that has exceeded its product support period.

    JAR URL

    Upload your packaged JAR file, or upload the test JAR file we provide.

    Entry Point Class

    Enter com.alibaba.ververica.cep.demo.CepDemo.

    Entry Point Main Arguments

    If you are using your own developed job and have already configured the upstream and downstream storage information, you can leave this field blank. However, if you are using the provided test JAR, you must configure this parameter. The code is as follows.

    --kafkaBrokers YOUR_KAFKA_BROKERS 
    --inputTopic YOUR_KAFKA_TOPIC 
    --inputTopicGroup YOUR_KAFKA_TOPIC_GROUP 
    --jdbcUrl jdbc:mysql://YOUR_DB_URL:port/DATABASE_NAME?user=YOUR_USERNAME&password=YOUR_PASSWORD
    --tableName YOUR_TABLE_NAME  
    --jdbcIntervalMs 3000
    --usingEventTime false

    The following table describes the parameters.

    • kafkaBrokers: The Kafka broker address.

    • inputTopic: The Kafka topic name.

    • inputTopicGroup: The Kafka consumer group.

    • jdbcUrl: The JDBC URL of the database.

      Note

      The username and password in the JDBC URL for this example must be for a standard account, and the password can only contain letters and numbers. In a real-world scenario, you can use different authentication methods in your job as needed.

    • tableName: The name of the target table.

    • jdbcIntervalMs: The polling interval for the database.

    • usingEventTime: Specifies whether to use event time for processing (true/false).

    Note
    • You must replace the placeholder values with your actual upstream and downstream storage information.

    • Avoid using plaintext passwords in a production environment. We recommend using the variable management feature. For more information, see Variable management.

  4. On the Deployment details page, in the Other configuration section, add the following job runtime parameters.

    In practical applications, the flink-cep JAR is loaded by the system class loader, while aviator-related classes are typically packaged in a user JAR and loaded by the user class loader. By using the two configurations below, you can ensure that the system class loader can access classes in the user JAR when it attempts to load a class, thereby avoiding class loading failures.

    kubernetes.application-mode.classpath.include-user-jar: 'true' 
    classloader.resolve-order: parent-first

    For more information about configuring runtime parameters, see Runtime parameter configuration.

  5. On the O&M > Deployments page, find the target deployment and click Start in the Actions column.

    For more information about configuring job startup parameters, see Start a job.

Step 4: Insert a rule

With the Flink CEP job running, insert Rule 1: after three consecutive events with an action of 0, the next event's action is not 1. This means a user views a product three times without making a purchase.

  1. Log on to the ApsaraDB RDS for MySQL console.

  2. Insert the dynamic update rule.

    Concatenate the JSON string with the id, version, and function class name, then insert it into RDS.

    INSERT INTO rds_demo (
     `id`,
     `version`,
     `pattern`,
     `function`
    ) values(
      '1',
       1,
      '{"name":"end","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":null,"nodes":[{"name":"end","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.EndCondition","type":"CLASS"},"type":"ATOMIC"},{"name":"start","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["LOOPING"],"times":{"from":3,"to":3,"windowTime":null},"untilCondition":null},"condition":{"expression":"action == 0","type":"AVIATOR"},"type":"ATOMIC"}],"edges":[{"source":"start","target":"end","type":"SKIP_TILL_NEXT"}],"window":null,"afterMatchStrategy":{"type":"SKIP_PAST_LAST_EVENT","patternName":null},"type":"COMPOSITE","version":1}',
      'com.alibaba.ververica.cep.demo.dynamic.DemoPatternProcessFunction')
    ;

    To improve usability and the readability of the pattern field in the database, Realtime Compute for Apache Flink defines a JSON-based rule format. For more information, see JSON format for rules in dynamic CEP. The pattern field in the preceding SQL statement contains a serialized JSON string. This string represents a pattern that matches the following sequence: after three consecutive events with an action of 0, the next event's action is not 1.

    Note

    In the EndCondition code, the defined condition is action != 1.

    • The corresponding CEP API description is as follows.

      Pattern<Event, Event> pattern =
          Pattern.<Event>begin("start", AfterMatchSkipStrategy.skipPastLastEvent())
              .where(new StartCondition("action == 0"))
              .timesOrMore(3)
              .followedBy("end")
              .where(new EndCondition());
    • You can convert it to the corresponding JSON string by using the method in CepJsonUtils.

      public void printTestPattern(Pattern<?, ?> pattern) throws JsonProcessingException {
          System.out.println(CepJsonUtils.convertPatternToJSONString(pattern));
      }
    • The corresponding JSON string is as follows.

      Example dynamic CEP rule JSON string

      {
        "name": "end",
        "quantifier": {
          "consumingStrategy": "SKIP_TILL_NEXT",
          "properties": [
            "SINGLE"
          ],
          "times": null,
          "untilCondition": null
        },
        "condition": null,
        "nodes": [
          {
            "name": "end",
            "quantifier": {
              "consumingStrategy": "SKIP_TILL_NEXT",
              "properties": [
                "SINGLE"
              ],
              "times": null,
              "untilCondition": null
            },
            "condition": {
              "className": "com.alibaba.ververica.cep.demo.condition.EndCondition",
              "type": "CLASS"
            },
            "type": "ATOMIC"
          },
          {
            "name": "start",
            "quantifier": {
              "consumingStrategy": "SKIP_TILL_NEXT",
              "properties": [
                "LOOPING"
              ],
              "times": {
                "from": 3,
                "to": 3,
                "windowTime": null
              },
              "untilCondition": null
            },
            "condition": {
              "expression": "action == 0",
              "type": "AVIATOR"
            },
            "type": "ATOMIC"
          }
        ],
        "edges": [
          {
            "source": "start",
            "target": "end",
            "type": "SKIP_TILL_NEXT"
          }
        ],
        "window": null,
        "afterMatchStrategy": {
          "type": "SKIP_PAST_LAST_EVENT",
          "patternName": null
        },
        "type": "COMPOSITE",
        "version": 1
      }
  3. Send messages to the demo_topic topic by using a Kafka client.

    In this demo, you can also use the Start to send and consume message page provided by ApsaraMQ for Kafka to send test messages.

    1,Ken,0,1,1662022777000
    1,Ken,0,1,1662022778000
    1,Ken,0,1,1662022779000
    1,Ken,0,1,1662022780000

    Select the Console sending method, enter 1 in the Message key field, paste the test data into the Message content field, set Send to specified partition to No, and send the message. The page displays a Message sent successfully notification.

    The following table describes the fields in demo_topic.

    Parameter

    Description

    id

    The user ID.

    username

    The username.

    action

    The user action. Valid values:

    • 0: view operation

    • 1: purchase action

    product_id

    The product ID.

    event_time

    The time of the behavior event.

  4. View the latest rule in the JobManager logs and the match in the TaskManager logs.

    • In the JobManager logs, search for JDBCPeriodicPatternProcessorDiscoverer to view the latest rule.

      To find the logs, navigate to Logs > JobManager, and then click the Logs tab. Enter the keyword in the search box to locate the relevant log entry. The log message PatternProcessors have been updated confirms that the rule was updated successfully.

    • In the TaskManager log file ending in .out, search for A match for Pattern of (id, version): (1, 1) to view the resulting match.

      On the deployment details page, click the Logs tab, select Running task managers, and open the Logs subtab for the corresponding TaskManager. Search for the keyword in the flink.out file to locate the matched event sequence.

  5. Query the match_results table by running SELECT * FROM `match_results`; to see the results that match the rule.

    The query returns one record with the fields rule_id, rule_version, user_id, user_name, and production_id, with values 1, 1, 1, Ken, and 1, respectively.

Step 5: Update matching rule

Marketing strategies often have time constraints. This step updates the rule to require the three action = 0 events to occur within a 15-minute interval.

  1. Set the usingEventTime parameter to true.

    1. On the O&M > Deployments page, find the target deployment and click Cancel in the Actions column.

    2. In Deployment Details > Entry Point Main Arguments , click Edit, set the usingEventTime parameter to true, and then click Save.

    3. Start the job again.

  2. Insert the new rule.

    The corresponding CEP API description is as follows.

    Pattern<Event, Event> pattern =
            Pattern.<Event>begin("start", AfterMatchSkipStrategy.skipPastLastEvent())
                    .where(new StartCondition("action == 0"))
                    .timesOrMore(3,Time.minutes(15))
                    .followedBy("end")
                    .where(new EndCondition());
    printTestPattern(pattern);

    Insert the new rule into the rds_demo table.

    # To avoid rule conflicts for this demo, delete the previous rule first.
    DELETE FROM `rds_demo` WHERE `id` = 1;
    # Insert the new rule: three consecutive `action = 0` events within 15 minutes, followed by an event where the action is not 1. The rule version is (1, 2).
    INSERT INTO rds_demo (`id`,`version`,`pattern`,`function`) values('1',2,'{"name":"end","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":null,"nodes":[{"name":"end","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.EndCondition","type":"CLASS"},"type":"ATOMIC"},{"name":"start","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["LOOPING"],"times":{"from":3,"to":3,"windowTime":{"unit":"MINUTES","size":15}},"untilCondition":null},"condition":{"expression":"action == 0","type":"AVIATOR"},"type":"ATOMIC"}],"edges":[{"source":"start","target":"end","type":"SKIP_TILL_NEXT"}],"window":null,"afterMatchStrategy":{"type":"SKIP_PAST_LAST_EVENT","patternName":null},"type":"COMPOSITE","version":1}','com.alibaba.ververica.cep.demo.dynamic.DemoPatternProcessFunction');
  3. On the Kafka console, send eight messages to trigger a match.

    The following are eight sample messages.

    2,Tom,0,1,1739584800000   #10:00
    2,Tom,0,1,1739585400000   #10:10
    2,Tom,0,1,1739585700000   #10:15
    2,Tom,0,1,1739586000000   #10:20
    3,Ali,0,1,1739586600000   #10:30
    3,Ali,0,1,1739588400000   #11:00
    3,Ali,0,1,1739589000000   #11:10
    3,Ali,0,1,1739590200000   #11:30
  4. Query the match_results table by running SELECT * FROM `match_results`; to see the results that match the rule.

    The query returns two records, with columns for rule_id, rule_version, user_id, user_name, and production_id. The first record is for Ken (rule_version=1), and the second is for Tom (rule_version=2), both associated with production_id=1.

    The results show that only Tom's behavior matches the new rule because Ali's actions occurred over more than 15 minutes. For limited-time promotions, this allows you to send coupons to users who repeatedly visit a product within a specific timeframe, encouraging a purchase.