All Products
Search
Document Center

Realtime Compute for Apache Flink:Quick start with dynamic Flink CEP

Last Updated:Mar 26, 2026

Realtime Compute for Apache Flink supports dynamic complex event processing (CEP) in DataStream programs. Unlike static CEP, dynamic CEP lets you update detection rules at runtime without restarting the job, so fraud detection, anomaly alerting, and marketing triggers adapt to changing business logic in real time.

This tutorial walks through a complete example: a Flink job consumes user clickstream data from ApsaraMQ for Kafka, polls rules from ApsaraDB RDS for MySQL every few seconds, and writes matched events back to MySQL. You start with a simple rule (three consecutive product views without a purchase), then add a 15-minute time constraint to that rule—without stopping the job.

Use cases

Dynamic Flink CEP applies to scenarios where detection rules change frequently:

  • Real-time risk control: Flag users who complete 10 transfers totaling USD 10,000 within a five-minute window.

  • Real-time marketing: Identify users who add more than three items to a cart within 10 minutes without checking out, and trigger targeted promotions or fraud checks.

  • IoT anomaly detection: Alert when a shared bike leaves a designated area for more than 15 minutes, or when temperature sensor readings exceed a threshold across three consecutive time windows on an assembly line.

How it works

The data pipeline for this tutorial:

image

The Flink job:

  1. Reads user behavior events from a Kafka topic.

  2. Keys the stream by user ID and product ID so that each rule is evaluated per user-product pair.

  3. Polls the MySQL rule table every few seconds via JDBCPeriodicPatternProcessorDiscoverer.

  4. Applies the current rules using CEP.dynamicPatterns(). When a rule update is detected, the new rule takes effect on subsequent events without a job restart.

  5. Writes matched events to the MySQL match_results table.

Prerequisites

Before you begin, ensure that you have:

Step 1: Prepare test data

Create a Kafka topic

  1. Log on to the ApsaraMQ for Kafka console.

  2. Create a topic named demo_topic to store simulated user behavior events. For more information, see Step 1: Create a topic.

Create the MySQL tables

Use the Data Management (DMS) console to create the rule table and the results table.

  1. Log on to the ApsaraDB RDS for MySQL instance with a privileged account. For more information, see Log on to the RDS instance in the DMS console.

  2. In the SQL editor, run the following statements and click Execute(F8):

    Table Purpose
    rds_demo Stores CEP rules. Each row is one rule, with fields id (unique identifier), version, pattern (JSON-serialized pattern), and function (the class that processes matched events).
    match_results Receives matched events. Each row represents a user whose behavior matched a rule, providing your marketing or operations team with actionable data.
    CREATE DATABASE cep_demo_db;
    USE cep_demo_db;
    
    CREATE TABLE rds_demo (
      `id` VARCHAR(64),
      `version` INT,
      `pattern` VARCHAR(4096),
      `function` VARCHAR(512)
    );
    
    CREATE TABLE match_results (
        rule_id INT,
        rule_version INT,
        user_id INT,
        user_name VARCHAR(255),
        production_id INT,
        PRIMARY KEY (rule_id, rule_version, user_id, production_id)
    );

    The two tables serve different purposes:

Step 2: Configure the IP address whitelist

To allow your Flink workspace to reach the MySQL instance, add the workspace CIDR block to the MySQL IP address whitelist.

  1. Get the CIDR block of the vSwitch used by your Flink workspace.

    1. Log on to the Realtime Compute for Apache Flink console.

    2. Find the target workspace and choose More > Workspace Details in the Actions column.

    3. In the Workspace Details dialog box, copy the CIDR block of the vSwitch. 网段信息

  2. Add the CIDR block to the MySQL IP address whitelist. For more information, see Configure IP address whitelist in the ApsaraDB RDS for MySQL documentation.

    RDS白名单

Step 3: Develop the dynamic CEP job

All code in this tutorial is available on GitHub. The sample code in this tutorial is on the timeOrMoreAndWindow branch. To run the full example without writing code, download ververica-cep-demo-master.zip.

Add the flink-cep dependency

Add the following dependency to your pom.xml. For dependency configuration details and conflict handling, see Configure Flink environment dependencies.

<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>flink-cep</artifactId>
    <version>1.17-vvr-8.0.8</version>
    <scope>provided</scope>
</dependency>

Write the dynamic CEP program

The program has three main parts: a Kafka source, a keyed stream, and the CEP.dynamicPatterns() call.

Part 1: Create a Kafka source

Create a DataStreamSource that reads from your Kafka topic. For connector configuration details, see Kafka DataStream Connector.

Part 2: Key the stream

Before calling CEP.dynamicPatterns(), key the stream by user ID and product ID. This ensures each rule is evaluated independently per user-product pair—events from different users or products do not interfere with each other.

KeyedStream<Event, Tuple2<Integer, Integer>> keyedStream =
    source.assignTimestampsAndWatermarks(
        WatermarkStrategy.<Event>forGenerator(
            ctx -> new EventBoundedOutOfOrdernessWatermarks(Duration.ofSeconds(5)))
    ).keyBy(new KeySelector<Event, Tuple2<Integer, Integer>>() {
        @Override
        public Tuple2<Integer, Integer> getKey(Event value) throws Exception {
            return Tuple2.of(value.getId(), value.getProductionId());
        }
    });

Part 3: Apply dynamic patterns

CEP.dynamicPatterns() is a method provided by Realtime Compute for Apache Flink that supports multiple rules and rule updates without job restarts:

public static <T, R> SingleOutputStreamOperator<R> dynamicPatterns(
         DataStream<T> input,
         PatternProcessorDiscovererFactory<T> discovererFactory,
         TimeBehaviour timeBehaviour,
         TypeInformation<R> outTypeInfo)
Parameter Description
DataStream<T> input The input event stream.
PatternProcessorDiscovererFactory<T> discovererFactory Constructs the PatternProcessorDiscoverer, which fetches the latest rules and builds PatternProcessor instances. Each PatternProcessor bundles a pattern (how to match events) with a PatternProcessFunction (what to do when a match is found).
TimeBehaviour timeBehaviour The time attribute for event processing. TimeBehaviour.ProcessingTime uses wall-clock time; TimeBehaviour.EventTime uses timestamps embedded in the events.
TypeInformation<R> outTypeInfo The type information of the output stream.

For background on DataStream, time semantics, and TypeInformation, see Flink DataStream API Programming Guide, Notions of Time: Event Time and Processing Time, and Class TypeInformation\<T\>.

For the discovererFactory parameter, use JDBCPeriodicPatternProcessorDiscoverer, which periodically polls a JDBC database (such as ApsaraDB RDS for MySQL) and applies any rule changes it finds. It is built on an abstract base class that schedules a timer to check for updates:

public abstract class PeriodicPatternProcessorDiscoverer<T>
        implements PatternProcessorDiscoverer<T> {

    ...
    @Override
    public void discoverPatternProcessorUpdates(
            PatternProcessorManager<T> patternProcessorManager) {
        // Periodically discovers pattern processor updates.
        timer.schedule(
                new TimerTask() {
                    @Override
                    public void run() {
                        if (arePatternProcessorsUpdated()) {
                            List<PatternProcessor<T>> patternProcessors = null;
                            try {
                                patternProcessors = getLatestPatternProcessors();
                            } catch (Exception e) {
                                e.printStackTrace();
                            }
                            patternProcessorManager.onPatternProcessorsUpdated(patternProcessors);
                        }
                    }
                },
                0,
                intervalMillis);
    }

    ...
}

JDBCPeriodicPatternProcessorDiscoverer takes the following parameters:

Parameter Description
jdbcUrl JDBC URL of the database.
jdbcDriver Name of the database driver class.
tableName Name of the rules table.
initialPatternProcessors Initial list of PatternProcessor instances (can be null).
intervalMillis Polling interval in milliseconds.

The following example shows the complete main() method with these components connected:

// import ......
public class CepDemo {

    public static void main(String[] args) throws Exception {

        ......
        // DataStream source
        DataStreamSource<Event> source =
                env.fromSource(
                        kafkaSource,
                        WatermarkStrategy.<Event>forMonotonousTimestamps()
                                .withTimestampAssigner((event, ts) -> event.getEventTime()),
                        "Kafka Source");

        env.setParallelism(1);
        // Key by userId and productionId.
        // Only events with the same key are evaluated together for pattern matching.
        KeyedStream<Event, Tuple2<Integer, Integer>> keyedStream =
                source.assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Event>forGenerator(ctx -> new EventBoundedOutOfOrdernessWatermarks(Duration.ofSeconds(5)))
                ).keyBy(new KeySelector<Event, Tuple2<Integer, Integer>>() {
                    @Override
                    public Tuple2<Integer, Integer> getKey(Event value) throws Exception {
                        return Tuple2.of(value.getId(), value.getProductionId());
                    }
                });

        SingleOutputStreamOperator<String> output =
                CEP.dynamicPatterns(
                        keyedStream,
                        new JDBCPeriodicPatternProcessorDiscovererFactory<>(
                                params.get(JDBC_URL_ARG),
                                JDBC_DRIVE,
                                params.get(TABLE_NAME_ARG),
                                null,
                                Long.parseLong(params.get(JDBC_INTERVAL_MILLIS_ARG))),
                        Boolean.parseBoolean(params.get(USING_EVENT_TIME)) ?
                            TimeBehaviour.EventTime : TimeBehaviour.ProcessingTime,
                        TypeInformation.of(new TypeHint<String>() {}));

        output.print();
        // Compile and submit the job
        env.execute("CEPDemo");
    }
}

For more information about the PatternProcessor interface and the design specification for the dynamic CEP API, see FLIP-200: Support Multiple Rule and Dynamic Rule Changing (Flink CEP).

Upload the JAR and create a deployment

  1. In the Realtime Compute for Apache Flink console, upload your JAR and create a JAR deployment. Configure the following parameters when creating the deployment: Arguments for the pre-built JAR:

    To test without building your own JAR, download cep-demo.jar and create a deployment from it. Because the Kafka topic has no data yet and the rule table is empty, the job produces no output at this point.
    Use environment variables rather than plaintext credentials in production. For more information, see Variable management.
    Parameter Value
    Deployment Mode Select Stream Mode.
    Deployment Name Enter a name for the deployment.
    Engine Version Select a recommended or stable version. See Engine version and Lifecycle policies.
    JAR URL Upload your program JAR or the test cep-demo.jar.
    Entry Point Class com.alibaba.ververica.cep.demo.CepDemo
    Entry Point Main Arguments If you use the pre-built cep-demo.jar, paste and fill in the arguments below. Skip this field if you use your own JAR with upstream and downstream systems already configured.
    Placeholder Description
    YOUR_KAFKA_BROKERS Addresses of your Kafka brokers.
    YOUR_KAFKA_TOPIC Name of your Kafka topic.
    YOUR_KAFKA_TOPIC_GROUP Your Kafka consumer group.
    YOUR_DB_URL:port/DATABASE_NAME JDBC URL of your MySQL instance. Use a standard account with a password that contains only letters and digits.
    YOUR_TABLE_NAME Name of the rules table (for example, rds_demo).
    --kafkaBrokers YOUR_KAFKA_BROKERS
    --inputTopic YOUR_KAFKA_TOPIC
    --inputTopicGroup YOUR_KAFKA_TOPIC_GROUP
    --jdbcUrl jdbc:mysql://YOUR_DB_URL:port/DATABASE_NAME?user=YOUR_USERNAME\&password=YOUR_PASSWORD
    --tableName YOUR_TABLE_NAME
    --jdbcIntervalMs 3000
    --usingEventTime false

    Replace each placeholder:

  2. On the Deployment tab of the Deployments page, click Edit in the Parameters section. In the Other Configuration field, add the following:

    kubernetes.application-mode.classpath.include-user-jar: 'true'
    classloader.resolve-order: parent-first

    These settings ensure that flink-cep (loaded by AppClassLoader) can access the aviator classes in your user JAR (loaded by UserCodeClassLoader). For more information, see Runtime parameter configuration.

  3. Go to O&M > Deployments, find the deployment, and click Start in the Actions column. For more information, see Start a job.

Step 4: Add a rule

With the job running, insert Rule 1 into the MySQL rule table. Rule 1 flags users who view a product page three or more times consecutively without making a purchase (three consecutive events with action = 0, followed by an event with action != 1).

  1. Log on to the ApsaraDB RDS for MySQL console.

  2. Run the following INSERT statement to add Rule 1:

    EndCondition is defined in the application code and checks for action != 1.
    INSERT INTO rds_demo (
     `id`,
     `version`,
     `pattern`,
     `function`
    ) values(
      '1',
       1,
      '{"name":"end","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":null,"nodes":[{"name":"end","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.EndCondition","type":"CLASS"},"type":"ATOMIC"},{"name":"start","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["LOOPING"],"times":{"from":3,"to":3,"windowTime":null},"untilCondition":null},"condition":{"expression":"action == 0","type":"AVIATOR"},"type":"ATOMIC"}],"edges":[{"source":"start","target":"end","type":"SKIP_TILL_NEXT"}],"window":null,"afterMatchStrategy":{"type":"SKIP_PAST_LAST_EVENT","patternName":null},"type":"COMPOSITE","version":1}',
      'com.alibaba.ververica.cep.demo.dynamic.DemoPatternProcessFunction')
    ;

    The pattern field holds a JSON-serialized pattern string. Realtime Compute for Apache Flink lets you define patterns in JSON format for readability. For more information, see Definitions of rules in the JSON format in dynamic Flink CEP. The equivalent pattern defined with the Pattern API:

    Pattern<Event, Event> pattern =
        Pattern.<Event>begin("start", AfterMatchSkipStrategy.skipPastLastEvent())
            .where(new StartCondition("action == 0"))
            .timesOrMore(3)
            .followedBy("end")
            .where(new EndCondition());

    To convert a Pattern API definition to JSON, call CepJsonUtils.convertPatternToJSONString():

    public void printTestPattern(Pattern<?, ?> pattern) throws JsonProcessingException {
        System.out.println(CepJsonUtils.convertPatternToJSONString(pattern));
    }

    The expanded, human-readable form of the pattern JSON for Rule 1:

    {
      "name": "end",
      "quantifier": {
        "consumingStrategy": "SKIP_TILL_NEXT",
        "properties": ["SINGLE"],
        "times": null,
        "untilCondition": null
      },
      "condition": null,
      "nodes": [
        {
          "name": "end",
          "quantifier": {
            "consumingStrategy": "SKIP_TILL_NEXT",
            "properties": ["SINGLE"],
            "times": null,
            "untilCondition": null
          },
          "condition": {
            "className": "com.alibaba.ververica.cep.demo.condition.EndCondition",
            "type": "CLASS"
          },
          "type": "ATOMIC"
        },
        {
          "name": "start",
          "quantifier": {
            "consumingStrategy": "SKIP_TILL_NEXT",
            "properties": ["LOOPING"],
            "times": {
              "from": 3,
              "to": 3,
              "windowTime": null
            },
            "untilCondition": null
          },
          "condition": {
            "expression": "action == 0",
            "type": "AVIATOR"
          },
          "type": "ATOMIC"
        }
      ],
      "edges": [
        {
          "source": "start",
          "target": "end",
          "type": "SKIP_TILL_NEXT"
        }
      ],
      "window": null,
      "afterMatchStrategy": {
        "type": "SKIP_PAST_LAST_EVENT",
        "patternName": null
      },
      "type": "COMPOSITE",
      "version": 1
    }
  3. Send four test messages to the demo_topic Kafka topic. In the ApsaraMQ for Kafka console, use the Start to Send and Consume Message panel on the demo_topic page to send the following messages:

    Field Description Values
    id User ID Integer
    username Username String
    action User action 0 = view, 1 = purchase
    product_id Product ID Integer
    event_time Event timestamp in milliseconds Unix epoch in ms
    1,Ken,0,1,1662022777000
    1,Ken,0,1,1662022778000
    1,Ken,0,1,1662022779000
    1,Ken,0,1,1662022780000

    Each message uses the following format:

    发消息

  4. Verify that the rule is loaded and the match is recorded.

    • In the JobManager logs, search for JDBCPeriodicPatternProcessorDiscoverer to confirm the rule was fetched. image

    • On the Running Task Managers subtab under the Logs tab, open the log file with the .out suffix and search for A match for Pattern of (id, version): (1, 1). image

  5. In the DMS console, query the match_results table:

    SELECT * FROM `match_results`;

    The result contains one row for user Ken (user ID 1), confirming the pattern was matched and the record was written.

    image

Step 5: Update the rule

Effective marketing often requires a time constraint. Knowing that a user viewed a product three times within 30 minutes is more actionable than knowing they did so at any point. Rule 2 adds a 15-minute time window to the existing pattern.

Rule 2 matches three consecutive events with action = 0 occurring within a 15-minute window, followed by an event with action != 1. Because this rule uses event time, you must switch the job to event-time mode first.

  1. Switch to event-time mode.

    1. Go to O&M > Deployments, find the target deployment, and click Cancel in the Actions column.

    2. Click the deployment name. On the Configuration tab, click Edit in the Basic section. In the Entry Point Main Arguments field, set usingEventTime to true. Click Save.

    3. Start the deployment again.

  2. Insert Rule 2. The only code change from Rule 1 is adding Time.minutes(15) to .timesOrMore():

    Pattern<Event, Event> pattern =
            Pattern.<Event>begin("start", AfterMatchSkipStrategy.skipPastLastEvent())
                    .where(new StartCondition("action == 0"))
                    .timesOrMore(3, Time.minutes(15))
                    .followedBy("end")
                    .where(new EndCondition());
    printTestPattern(pattern);

    Run the following SQL to delete Rule 1 and insert Rule 2:

    -- Delete Rule 1.
    DELETE FROM `rds_demo` WHERE `id` = 1;
    
    -- Insert Rule 2: three consecutive view events within 15 minutes, followed by no purchase.
    -- Rule version is (id=1, version=2).
    INSERT INTO rds_demo (`id`,`version`,`pattern`,`function`) values('1',2,'{"name":"end","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":null,"nodes":[{"name":"end","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.EndCondition","type":"CLASS"},"type":"ATOMIC"},{"name":"start","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["LOOPING"],"times":{"from":3,"to":3,"windowTime":{"unit":"MINUTES","size":15}},"untilCondition":null},"condition":{"expression":"action == 0","type":"AVIATOR"},"type":"ATOMIC"}],"edges":[{"source":"start","target":"end","type":"SKIP_TILL_NEXT"}],"window":null,"afterMatchStrategy":{"type":"SKIP_PAST_LAST_EVENT","patternName":null},"type":"COMPOSITE","version":1}','com.alibaba.ververica.cep.demo.dynamic.DemoPatternProcessFunction');

    The Flink job picks up the rule change within the next polling cycle (every jdbcIntervalMs milliseconds) without restarting.

  3. Send eight test messages to the Kafka topic:

    2,Tom,0,1,1739584800000   #10:00
    2,Tom,0,1,1739585400000   #10:10
    2,Tom,0,1,1739585700000   #10:15
    2,Tom,0,1,1739586000000   #10:20
    3,Ali,0,1,1739586600000   #10:30
    3,Ali,0,1,1739588400000   #11:00
    3,Ali,0,1,1739589000000   #11:10
    3,Ali,0,1,1739590200000   #11:30
  4. Query the match_results table:

    SELECT * FROM `match_results`;

    The result contains a row for Tom but not for Ali. Tom's four view events (10:00, 10:10, 10:15, 10:20) all fall within a 15-minute window, so they match Rule 2. Ali's view events span from 10:30 to 11:30—more than 15 minutes—so they do not match. With these insights, a marketing team can issue a coupon or trigger a notification to users who repeatedly view a product within a short time frame, maximizing conversion during limited-time sales.

    image

What's next