Realtime Compute for Apache Flink supports dynamic complex event processing (CEP) in DataStream programs. Unlike static CEP, dynamic CEP lets you update detection rules at runtime without restarting the job, so fraud detection, anomaly alerting, and marketing triggers adapt to changing business logic in real time.
This tutorial walks through a complete example: a Flink job consumes user clickstream data from ApsaraMQ for Kafka, polls rules from ApsaraDB RDS for MySQL every few seconds, and writes matched events back to MySQL. You start with a simple rule (three consecutive product views without a purchase), then add a 15-minute time constraint to that rule—without stopping the job.
Use cases
Dynamic Flink CEP applies to scenarios where detection rules change frequently:
-
Real-time risk control: Flag users who complete 10 transfers totaling USD 10,000 within a five-minute window.
-
Real-time marketing: Identify users who add more than three items to a cart within 10 minutes without checking out, and trigger targeted promotions or fraud checks.
-
IoT anomaly detection: Alert when a shared bike leaves a designated area for more than 15 minutes, or when temperature sensor readings exceed a threshold across three consecutive time windows on an assembly line.
How it works
The data pipeline for this tutorial:
The Flink job:
-
Reads user behavior events from a Kafka topic.
-
Keys the stream by user ID and product ID so that each rule is evaluated per user-product pair.
-
Polls the MySQL rule table every few seconds via
JDBCPeriodicPatternProcessorDiscoverer. -
Applies the current rules using
CEP.dynamicPatterns(). When a rule update is detected, the new rule takes effect on subsequent events without a job restart. -
Writes matched events to the MySQL
match_resultstable.
Prerequisites
Before you begin, ensure that you have:
-
A Realtime Compute for Apache Flink workspace. For more information, see Create a workspace.
-
A RAM user or RAM role with the required permissions. For more information, see Permission management.
-
An ApsaraDB RDS for MySQL instance. For more information, see Create an ApsaraDB RDS for MySQL instance.
-
An ApsaraMQ for Kafka instance. For more information, see Overview of ApsaraMQ for Kafka.
Step 1: Prepare test data
Create a Kafka topic
-
Log on to the ApsaraMQ for Kafka console.
-
Create a topic named
demo_topicto store simulated user behavior events. For more information, see Step 1: Create a topic.
Create the MySQL tables
Use the Data Management (DMS) console to create the rule table and the results table.
-
Log on to the ApsaraDB RDS for MySQL instance with a privileged account. For more information, see Log on to the RDS instance in the DMS console.
-
In the SQL editor, run the following statements and click Execute(F8):
Table Purpose rds_demoStores CEP rules. Each row is one rule, with fields id(unique identifier),version,pattern(JSON-serialized pattern), andfunction(the class that processes matched events).match_resultsReceives matched events. Each row represents a user whose behavior matched a rule, providing your marketing or operations team with actionable data. CREATE DATABASE cep_demo_db; USE cep_demo_db; CREATE TABLE rds_demo ( `id` VARCHAR(64), `version` INT, `pattern` VARCHAR(4096), `function` VARCHAR(512) ); CREATE TABLE match_results ( rule_id INT, rule_version INT, user_id INT, user_name VARCHAR(255), production_id INT, PRIMARY KEY (rule_id, rule_version, user_id, production_id) );The two tables serve different purposes:
Step 2: Configure the IP address whitelist
To allow your Flink workspace to reach the MySQL instance, add the workspace CIDR block to the MySQL IP address whitelist.
-
Get the CIDR block of the vSwitch used by your Flink workspace.
-
Log on to the Realtime Compute for Apache Flink console.
-
Find the target workspace and choose More > Workspace Details in the Actions column.
-
In the Workspace Details dialog box, copy the CIDR block of the vSwitch.

-
-
Add the CIDR block to the MySQL IP address whitelist. For more information, see Configure IP address whitelist in the ApsaraDB RDS for MySQL documentation.

Step 3: Develop the dynamic CEP job
All code in this tutorial is available on GitHub. The sample code in this tutorial is on the timeOrMoreAndWindow branch. To run the full example without writing code, download ververica-cep-demo-master.zip.
Add the flink-cep dependency
Add the following dependency to your pom.xml. For dependency configuration details and conflict handling, see Configure Flink environment dependencies.
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>flink-cep</artifactId>
<version>1.17-vvr-8.0.8</version>
<scope>provided</scope>
</dependency>
Write the dynamic CEP program
The program has three main parts: a Kafka source, a keyed stream, and the CEP.dynamicPatterns() call.
Part 1: Create a Kafka source
Create a DataStreamSource that reads from your Kafka topic. For connector configuration details, see Kafka DataStream Connector.
Part 2: Key the stream
Before calling CEP.dynamicPatterns(), key the stream by user ID and product ID. This ensures each rule is evaluated independently per user-product pair—events from different users or products do not interfere with each other.
KeyedStream<Event, Tuple2<Integer, Integer>> keyedStream =
source.assignTimestampsAndWatermarks(
WatermarkStrategy.<Event>forGenerator(
ctx -> new EventBoundedOutOfOrdernessWatermarks(Duration.ofSeconds(5)))
).keyBy(new KeySelector<Event, Tuple2<Integer, Integer>>() {
@Override
public Tuple2<Integer, Integer> getKey(Event value) throws Exception {
return Tuple2.of(value.getId(), value.getProductionId());
}
});
Part 3: Apply dynamic patterns
CEP.dynamicPatterns() is a method provided by Realtime Compute for Apache Flink that supports multiple rules and rule updates without job restarts:
public static <T, R> SingleOutputStreamOperator<R> dynamicPatterns(
DataStream<T> input,
PatternProcessorDiscovererFactory<T> discovererFactory,
TimeBehaviour timeBehaviour,
TypeInformation<R> outTypeInfo)
| Parameter | Description |
|---|---|
DataStream<T> input |
The input event stream. |
PatternProcessorDiscovererFactory<T> discovererFactory |
Constructs the PatternProcessorDiscoverer, which fetches the latest rules and builds PatternProcessor instances. Each PatternProcessor bundles a pattern (how to match events) with a PatternProcessFunction (what to do when a match is found). |
TimeBehaviour timeBehaviour |
The time attribute for event processing. TimeBehaviour.ProcessingTime uses wall-clock time; TimeBehaviour.EventTime uses timestamps embedded in the events. |
TypeInformation<R> outTypeInfo |
The type information of the output stream. |
For background on DataStream, time semantics, and TypeInformation, see Flink DataStream API Programming Guide, Notions of Time: Event Time and Processing Time, and Class TypeInformation\<T\>.
For the discovererFactory parameter, use JDBCPeriodicPatternProcessorDiscoverer, which periodically polls a JDBC database (such as ApsaraDB RDS for MySQL) and applies any rule changes it finds. It is built on an abstract base class that schedules a timer to check for updates:
public abstract class PeriodicPatternProcessorDiscoverer<T>
implements PatternProcessorDiscoverer<T> {
...
@Override
public void discoverPatternProcessorUpdates(
PatternProcessorManager<T> patternProcessorManager) {
// Periodically discovers pattern processor updates.
timer.schedule(
new TimerTask() {
@Override
public void run() {
if (arePatternProcessorsUpdated()) {
List<PatternProcessor<T>> patternProcessors = null;
try {
patternProcessors = getLatestPatternProcessors();
} catch (Exception e) {
e.printStackTrace();
}
patternProcessorManager.onPatternProcessorsUpdated(patternProcessors);
}
}
},
0,
intervalMillis);
}
...
}
JDBCPeriodicPatternProcessorDiscoverer takes the following parameters:
| Parameter | Description |
|---|---|
jdbcUrl |
JDBC URL of the database. |
jdbcDriver |
Name of the database driver class. |
tableName |
Name of the rules table. |
initialPatternProcessors |
Initial list of PatternProcessor instances (can be null). |
intervalMillis |
Polling interval in milliseconds. |
The following example shows the complete main() method with these components connected:
// import ......
public class CepDemo {
public static void main(String[] args) throws Exception {
......
// DataStream source
DataStreamSource<Event> source =
env.fromSource(
kafkaSource,
WatermarkStrategy.<Event>forMonotonousTimestamps()
.withTimestampAssigner((event, ts) -> event.getEventTime()),
"Kafka Source");
env.setParallelism(1);
// Key by userId and productionId.
// Only events with the same key are evaluated together for pattern matching.
KeyedStream<Event, Tuple2<Integer, Integer>> keyedStream =
source.assignTimestampsAndWatermarks(
WatermarkStrategy.<Event>forGenerator(ctx -> new EventBoundedOutOfOrdernessWatermarks(Duration.ofSeconds(5)))
).keyBy(new KeySelector<Event, Tuple2<Integer, Integer>>() {
@Override
public Tuple2<Integer, Integer> getKey(Event value) throws Exception {
return Tuple2.of(value.getId(), value.getProductionId());
}
});
SingleOutputStreamOperator<String> output =
CEP.dynamicPatterns(
keyedStream,
new JDBCPeriodicPatternProcessorDiscovererFactory<>(
params.get(JDBC_URL_ARG),
JDBC_DRIVE,
params.get(TABLE_NAME_ARG),
null,
Long.parseLong(params.get(JDBC_INTERVAL_MILLIS_ARG))),
Boolean.parseBoolean(params.get(USING_EVENT_TIME)) ?
TimeBehaviour.EventTime : TimeBehaviour.ProcessingTime,
TypeInformation.of(new TypeHint<String>() {}));
output.print();
// Compile and submit the job
env.execute("CEPDemo");
}
}
For more information about the PatternProcessor interface and the design specification for the dynamic CEP API, see FLIP-200: Support Multiple Rule and Dynamic Rule Changing (Flink CEP).
Upload the JAR and create a deployment
-
In the Realtime Compute for Apache Flink console, upload your JAR and create a JAR deployment. Configure the following parameters when creating the deployment: Arguments for the pre-built JAR:
To test without building your own JAR, download cep-demo.jar and create a deployment from it. Because the Kafka topic has no data yet and the rule table is empty, the job produces no output at this point.
Use environment variables rather than plaintext credentials in production. For more information, see Variable management.
Parameter Value Deployment Mode Select Stream Mode. Deployment Name Enter a name for the deployment. Engine Version Select a recommended or stable version. See Engine version and Lifecycle policies. JAR URL Upload your program JAR or the test cep-demo.jar.Entry Point Class com.alibaba.ververica.cep.demo.CepDemoEntry Point Main Arguments If you use the pre-built cep-demo.jar, paste and fill in the arguments below. Skip this field if you use your own JAR with upstream and downstream systems already configured.Placeholder Description YOUR_KAFKA_BROKERSAddresses of your Kafka brokers. YOUR_KAFKA_TOPICName of your Kafka topic. YOUR_KAFKA_TOPIC_GROUPYour Kafka consumer group. YOUR_DB_URL:port/DATABASE_NAMEJDBC URL of your MySQL instance. Use a standard account with a password that contains only letters and digits. YOUR_TABLE_NAMEName of the rules table (for example, rds_demo).--kafkaBrokers YOUR_KAFKA_BROKERS --inputTopic YOUR_KAFKA_TOPIC --inputTopicGroup YOUR_KAFKA_TOPIC_GROUP --jdbcUrl jdbc:mysql://YOUR_DB_URL:port/DATABASE_NAME?user=YOUR_USERNAME\&password=YOUR_PASSWORD --tableName YOUR_TABLE_NAME --jdbcIntervalMs 3000 --usingEventTime falseReplace each placeholder:
-
On the Deployment tab of the Deployments page, click Edit in the Parameters section. In the Other Configuration field, add the following:
kubernetes.application-mode.classpath.include-user-jar: 'true' classloader.resolve-order: parent-firstThese settings ensure that
flink-cep(loaded byAppClassLoader) can access theaviatorclasses in your user JAR (loaded byUserCodeClassLoader). For more information, see Runtime parameter configuration. -
Go to O&M > Deployments, find the deployment, and click Start in the Actions column. For more information, see Start a job.
Step 4: Add a rule
With the job running, insert Rule 1 into the MySQL rule table. Rule 1 flags users who view a product page three or more times consecutively without making a purchase (three consecutive events with action = 0, followed by an event with action != 1).
-
Log on to the ApsaraDB RDS for MySQL console.
-
Run the following INSERT statement to add Rule 1:
EndConditionis defined in the application code and checks foraction != 1.INSERT INTO rds_demo ( `id`, `version`, `pattern`, `function` ) values( '1', 1, '{"name":"end","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":null,"nodes":[{"name":"end","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.EndCondition","type":"CLASS"},"type":"ATOMIC"},{"name":"start","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["LOOPING"],"times":{"from":3,"to":3,"windowTime":null},"untilCondition":null},"condition":{"expression":"action == 0","type":"AVIATOR"},"type":"ATOMIC"}],"edges":[{"source":"start","target":"end","type":"SKIP_TILL_NEXT"}],"window":null,"afterMatchStrategy":{"type":"SKIP_PAST_LAST_EVENT","patternName":null},"type":"COMPOSITE","version":1}', 'com.alibaba.ververica.cep.demo.dynamic.DemoPatternProcessFunction') ;The
patternfield holds a JSON-serialized pattern string. Realtime Compute for Apache Flink lets you define patterns in JSON format for readability. For more information, see Definitions of rules in the JSON format in dynamic Flink CEP. The equivalent pattern defined with the Pattern API:Pattern<Event, Event> pattern = Pattern.<Event>begin("start", AfterMatchSkipStrategy.skipPastLastEvent()) .where(new StartCondition("action == 0")) .timesOrMore(3) .followedBy("end") .where(new EndCondition());To convert a Pattern API definition to JSON, call
CepJsonUtils.convertPatternToJSONString():public void printTestPattern(Pattern<?, ?> pattern) throws JsonProcessingException { System.out.println(CepJsonUtils.convertPatternToJSONString(pattern)); }The expanded, human-readable form of the
patternJSON for Rule 1:{ "name": "end", "quantifier": { "consumingStrategy": "SKIP_TILL_NEXT", "properties": ["SINGLE"], "times": null, "untilCondition": null }, "condition": null, "nodes": [ { "name": "end", "quantifier": { "consumingStrategy": "SKIP_TILL_NEXT", "properties": ["SINGLE"], "times": null, "untilCondition": null }, "condition": { "className": "com.alibaba.ververica.cep.demo.condition.EndCondition", "type": "CLASS" }, "type": "ATOMIC" }, { "name": "start", "quantifier": { "consumingStrategy": "SKIP_TILL_NEXT", "properties": ["LOOPING"], "times": { "from": 3, "to": 3, "windowTime": null }, "untilCondition": null }, "condition": { "expression": "action == 0", "type": "AVIATOR" }, "type": "ATOMIC" } ], "edges": [ { "source": "start", "target": "end", "type": "SKIP_TILL_NEXT" } ], "window": null, "afterMatchStrategy": { "type": "SKIP_PAST_LAST_EVENT", "patternName": null }, "type": "COMPOSITE", "version": 1 } -
Send four test messages to the
demo_topicKafka topic. In the ApsaraMQ for Kafka console, use the Start to Send and Consume Message panel on thedemo_topicpage to send the following messages:Field Description Values idUser ID Integer usernameUsername String actionUser action 0= view,1= purchaseproduct_idProduct ID Integer event_timeEvent timestamp in milliseconds Unix epoch in ms 1,Ken,0,1,1662022777000 1,Ken,0,1,1662022778000 1,Ken,0,1,1662022779000 1,Ken,0,1,1662022780000Each message uses the following format:

-
Verify that the rule is loaded and the match is recorded.
-
In the JobManager logs, search for
JDBCPeriodicPatternProcessorDiscovererto confirm the rule was fetched.
-
On the Running Task Managers subtab under the Logs tab, open the log file with the
.outsuffix and search forA match for Pattern of (id, version): (1, 1).
-
-
In the DMS console, query the
match_resultstable:SELECT * FROM `match_results`;The result contains one row for user Ken (user ID
1), confirming the pattern was matched and the record was written.
Step 5: Update the rule
Effective marketing often requires a time constraint. Knowing that a user viewed a product three times within 30 minutes is more actionable than knowing they did so at any point. Rule 2 adds a 15-minute time window to the existing pattern.
Rule 2 matches three consecutive events with action = 0 occurring within a 15-minute window, followed by an event with action != 1. Because this rule uses event time, you must switch the job to event-time mode first.
-
Switch to event-time mode.
-
Go to O&M > Deployments, find the target deployment, and click Cancel in the Actions column.
-
Click the deployment name. On the Configuration tab, click Edit in the Basic section. In the Entry Point Main Arguments field, set
usingEventTimetotrue. Click Save. -
Start the deployment again.
-
-
Insert Rule 2. The only code change from Rule 1 is adding
Time.minutes(15)to.timesOrMore():Pattern<Event, Event> pattern = Pattern.<Event>begin("start", AfterMatchSkipStrategy.skipPastLastEvent()) .where(new StartCondition("action == 0")) .timesOrMore(3, Time.minutes(15)) .followedBy("end") .where(new EndCondition()); printTestPattern(pattern);Run the following SQL to delete Rule 1 and insert Rule 2:
-- Delete Rule 1. DELETE FROM `rds_demo` WHERE `id` = 1; -- Insert Rule 2: three consecutive view events within 15 minutes, followed by no purchase. -- Rule version is (id=1, version=2). INSERT INTO rds_demo (`id`,`version`,`pattern`,`function`) values('1',2,'{"name":"end","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":null,"nodes":[{"name":"end","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.EndCondition","type":"CLASS"},"type":"ATOMIC"},{"name":"start","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["LOOPING"],"times":{"from":3,"to":3,"windowTime":{"unit":"MINUTES","size":15}},"untilCondition":null},"condition":{"expression":"action == 0","type":"AVIATOR"},"type":"ATOMIC"}],"edges":[{"source":"start","target":"end","type":"SKIP_TILL_NEXT"}],"window":null,"afterMatchStrategy":{"type":"SKIP_PAST_LAST_EVENT","patternName":null},"type":"COMPOSITE","version":1}','com.alibaba.ververica.cep.demo.dynamic.DemoPatternProcessFunction');The Flink job picks up the rule change within the next polling cycle (every
jdbcIntervalMsmilliseconds) without restarting. -
Send eight test messages to the Kafka topic:
2,Tom,0,1,1739584800000 #10:00 2,Tom,0,1,1739585400000 #10:10 2,Tom,0,1,1739585700000 #10:15 2,Tom,0,1,1739586000000 #10:20 3,Ali,0,1,1739586600000 #10:30 3,Ali,0,1,1739588400000 #11:00 3,Ali,0,1,1739589000000 #11:10 3,Ali,0,1,1739590200000 #11:30 -
Query the
match_resultstable:SELECT * FROM `match_results`;The result contains a row for Tom but not for Ali. Tom's four view events (10:00, 10:10, 10:15, 10:20) all fall within a 15-minute window, so they match Rule 2. Ali's view events span from 10:30 to 11:30—more than 15 minutes—so they do not match. With these insights, a marketing team can issue a coupon or trigger a notification to users who repeatedly view a product within a short time frame, maximizing conversion during limited-time sales.

What's next
-
Definitions of rules in the JSON format in dynamic Flink CEP: detailed reference for the JSON pattern schema.
-
FLIP-200: Support Multiple Rule and Dynamic Rule Changing (Flink CEP): design specification for the dynamic CEP API.
-
Kafka DataStream Connector: configure additional Kafka source options.
-
Variable management: store credentials securely for production deployments.