Dynamic Flink complex event processing (CEP) uses a JSON format to describe rules. This lets you store and update CEP rules without modifying or recompiling Java code.
This topic is for:
-
Risk control platform developers familiar with dynamic Flink CEP who want to understand the JSON schema and decide whether to add further abstraction layers.
-
Risk control strategy personnel who understand risk control logic but have no Java background and want to write or adjust CEP rules directly in JSON.
JSON format definition
A CEP rule is modeled as a directed graph. Each node in the graph represents an event pattern. Each edge defines the event selection strategy — the transition condition from one matched pattern to the next. Graphs can be nested: a graph node can be a child of a larger graph, enabling grouped patterns.
The following sections describe each component of the JSON schema.
Node definition
A node represents a single, complete pattern.
| Field | Type | Required | Description |
|---|---|---|---|
name |
string | Yes | A unique name for the node. Node names must be unique across the graph. |
type |
enum(string) | Yes | ATOMIC for a node with no child pattern. COMPOSITE for a node that contains a child pattern. |
quantifier |
dict | Yes | Describes how to match the pattern. See Quantifier definition. |
condition |
dict | No | Filters events that the pattern applies to. See Condition definition. |
Quantifier definition
A quantifier describes how many times events must match a pattern and the contiguity strategy within the pattern. For example, the pattern A* has a properties value of LOOPING and a consumingStrategy of SKIP_TILL_ANY.
| Field | Type | Required | Description |
|---|---|---|---|
consumingStrategy |
enum(string) | Yes | The event selection strategy within the pattern. Valid values: STRICT, SKIP_TILL_NEXT, SKIP_TILL_ANY. See Contiguity definition. |
times |
dict | No | How many times the pattern must match. See the example below. |
properties |
array of enum(string) | Yes | Matching behavior flags. See Quantifier property values. |
untilCondition |
dict | No | A stop condition. Only valid after a pattern with a LOOPING quantifier. See Condition definition. |
Example `times` value:
"times": {
"from": 3,
"to": 3,
"windowTime": {
"unit": "MINUTES",
"size": 12
}
}
The from and to fields are integers. The unit in windowTime accepts DAYS, HOURS, MINUTES, SECONDS, or MILLISECONDS. Set windowTime to null to omit a per-match time constraint.
Condition definition
A condition filters events that meet specific criteria. For example, "browsing for more than 5 minutes" is a condition that filters customers by session duration.
| Field | Type | Required | Description |
|---|---|---|---|
type |
enum(string) | Yes | The condition type. Valid values: CLASS, AVIATOR, GROOVY. |
| Additional custom fields | — | No | Any additional serializable fields specific to the condition type. |
When to use each condition type
| Scenario | Recommended type |
|---|---|
| Business logic that requires full Java expressiveness or stateful evaluation across previous events | CLASS |
Threshold comparisons that change frequently (e.g., price > 10) without redeploying the job |
AVIATOR |
| Multi-field logic or string operations that change frequently without redeploying the job | GROOVY |
Use AVIATOR or GROOVY when you need to update condition thresholds by changing a value in the database, with no code change or recompile required.
CLASS condition
A CLASS condition delegates to a Java class you provide.
| Field | Type | Required | Description |
|---|---|---|---|
type |
enum(string) | Yes | Fixed value: CLASS. |
className |
string | Yes | The fully qualified class name, such as com.alibaba.ververica.cep.demo.StartCondition. |
CLASS condition with custom parameters (CustomArgsCondition)
A standard CLASS condition only receives the class name — it cannot accept runtime parameters. CustomArgsCondition extends the CLASS condition with a string array (args) that the framework passes when constructing the condition instance. This lets you update condition parameters in the database without changing or recompiling the Java class.
| Field | Type | Required | Description |
|---|---|---|---|
type |
enum(string) | Yes | Fixed value: CLASS. |
className |
string | Yes | The fully qualified class name, such as com.alibaba.ververica.cep.demo.CustomMiddleCondition. |
args |
array of string | Yes | Parameters passed to the condition constructor at runtime. |
Aviator expression condition
Aviator is an expression evaluation engine that compiles expressions to bytecode at runtime. For more information, see aviatorscript.
| Field | Type | Required | Description |
|---|---|---|---|
type |
string | Yes | Fixed value: AVIATOR. |
expression |
string | Yes | An Aviator expression string, such as price > 10. Variables in the expression (for example, price) map to fields defined in the Java event class. Update this string in the database to change the threshold dynamically — the Flink CEP job loads the new expression and creates a new AviatorCondition for subsequent events. |
Groovy expression condition
Groovy is a dynamically typed language for the Java Virtual Machine (JVM). For more information about Groovy syntax, see Groovy syntax.
| Field | Type | Required | Description |
|---|---|---|---|
type |
string | Yes | Fixed value: GROOVY. |
expression |
string | Yes | A Groovy expression string, such as price > 5.0 && name.contains("mid"). Variables map to fields in the Java event class. Update this string in the database to change the logic dynamically — the Flink CEP job loads the new Groovy string and creates a new GroovyCondition for subsequent events. |
Edge definition
An edge connects two pattern nodes and defines the event selection strategy for that transition.
| Field | Type | Required | Description |
|---|---|---|---|
source |
string | Yes | The name of the source pattern node. |
target |
string | Yes | The name of the target pattern node. |
type |
enum(string) | Yes | The event selection strategy. Valid values: STRICT, SKIP_TILL_NEXT, SKIP_TILL_ANY, NOT_FOLLOW, NOT_NEXT. See Contiguity definition. |
GraphNode definition
A GraphNode represents a complete pattern sequence. It extends the basic node with graph structure fields (nodes and edges) and policy fields (window and afterMatchSkipStrategy). Because GraphNode is treated as a subtype of Node, a GraphNode can be nested inside another GraphNode to create grouped patterns (GroupPattern).
| Field | Type | Required | Description |
|---|---|---|---|
name |
string | Yes | A unique name for the graph. Graph names must be unique. |
type |
enum(string) | Yes | Fixed value: COMPOSITE. |
version |
int | Yes | The JSON format version. Default value: 1. |
nodes |
array of Node | Yes | The child patterns in this graph. Must be non-empty. |
edges |
array of Edge | Yes | The connections between child patterns. Can be empty. |
window |
dict | No | The time window constraint. See the description below. |
afterMatchSkipStrategy |
dict | Yes | The skip strategy applied after a complete match. See After-match skip strategy definition. |
quantifier |
dict | Yes | Describes how to match the overall graph pattern. See Quantifier definition. |
Window field:
The window field constrains the time allowed for a complete match. The type controls what the time limit applies to:
-
FIRST_AND_LAST: the maximum time between the first and last event in a complete match. -
PREVIOUS_AND_CURRENT: the maximum time between matches of any two adjacent child patterns.
Example:
"window": {
"type": "FIRST_AND_LAST",
"time": {
"unit": "DAYS",
"size": 1
}
}
The unit accepts DAYS, HOURS, MINUTES, SECONDS, or MILLISECONDS. The size value is a long or integer.
After-match skip strategy definition
The after-match skip strategy controls which partial matches to discard after a complete match is found.
| Field | Type | Required | Description |
|---|---|---|---|
type |
enum(string) | Yes | The skip strategy. Valid values: NO_SKIP, SKIP_TO_NEXT, SKIP_PAST_LAST_EVENT, SKIP_TO_FIRST, SKIP_TO_LAST. |
patternName |
string | No | The pattern name used by SKIP_TO_FIRST and SKIP_TO_LAST. |
The strategies have the following behaviors:
-
NO_SKIP(default): Every successful match is emitted with no discarding. -
SKIP_TO_NEXT: Discards every partial match that started with the same event as the current match. -
SKIP_PAST_LAST_EVENT: Discards every partial match that started between the beginning and the end of the current match. -
SKIP_TO_FIRST: Discards every partial match that started between the beginning of the current match and the first occurrence of the event named bypatternName. -
SKIP_TO_LAST: Discards every partial match that started between the beginning of the current match and the last occurrence of the event named bypatternName.
For more information, see After match skip strategy.
Contiguity definition
Contiguity controls how strictly events must follow each other within a pattern or along an edge.
| Value | Meaning |
|---|---|
STRICT |
Strict contiguity. No unmatched events can appear between matched events. |
SKIP_TILL_NEXT |
Relaxed contiguity. Unmatched events between matched events are silently ignored. |
SKIP_TILL_ANY |
Non-deterministic relaxed contiguity. More permissive than SKIP_TILL_NEXT — allows multiple matches for some matched events. |
NOT_NEXT |
The event immediately following the source must not match the target pattern. |
NOT_FOLLOW |
No event matching the target pattern can appear anywhere after the source. |
For more information, see FlinkCEP — Complex event processing for Flink.
Quantifier property values
Quantifier properties describe the matching cardinality and strategy for a pattern.
| Value | Meaning |
|---|---|
SINGLE |
The pattern must match exactly once. |
LOOPING |
The pattern can match multiple times in a loop, similar to * and + in regular expressions. |
TIMES |
The pattern must match a specified number of times, as set in the times field. |
GREEDY |
When matching, the longest possible sequence is preferred. |
OPTIONAL |
The pattern is optional and may not match at all. |
Example 1: Common pattern
This example uses dynamic Flink CEP to identify customers who should receive adjusted marketing offers during a real-time e-commerce promotion. The target customers, within a 10-minute window, must have:
-
Collected a venue coupon (optional step).
-
Added items to their cart three or more times.
-
Not completed payment.
The three conditions are modeled as StartCondition, MiddleCondition, and EndCondition. The equivalent Java pattern is:
Pattern<Event, Event> pattern =
Pattern.<Event>begin("start")
.where(new StartCondition())
.optional()
.followedBy("middle")
.where(new MiddleCondition())
.timesOrMore(3)
.notFollowedBy("end")
.where(new EndCondition())
.within(Time.minutes(10));
The equivalent JSON rule is:
{
"name": "end",
"quantifier": {
"consumingStrategy": "SKIP_TILL_NEXT",
"properties": [
"SINGLE"
],
"times": null,
"untilCondition": null
},
"condition": null,
"nodes": [
{
"name": "end",
"quantifier": {
"consumingStrategy": "SKIP_TILL_NEXT",
"properties": [
"SINGLE"
],
"times": null,
"untilCondition": null
},
"condition": {
"className": "com.alibaba.ververica.cep.demo.condition.EndCondition",
"type": "CLASS"
},
"type": "ATOMIC"
},
{
"name": "middle",
"quantifier": {
"consumingStrategy": "SKIP_TILL_NEXT",
"properties": [
"LOOPING"
],
"times": {
"from": 3,
"to": 3,
"windowTime": null
},
"untilCondition": null
},
"condition": {
"className": "com.alibaba.ververica.cep.demo.condition.MiddleCondition",
"type": "CLASS"
},
"type": "ATOMIC"
},
{
"name": "start",
"quantifier": {
"consumingStrategy": "SKIP_TILL_NEXT",
"properties": [
"SINGLE",
"OPTIONAL"
],
"times": null,
"untilCondition": null
},
"condition": {
"className": "com.alibaba.ververica.cep.demo.condition.StartCondition",
"type": "CLASS"
},
"type": "ATOMIC"
}
],
"edges": [
{
"source": "middle",
"target": "end",
"type": "NOT_FOLLOW"
},
{
"source": "start",
"target": "middle",
"type": "SKIP_TILL_NEXT"
}
],
"window": {
"type": "FIRST_AND_LAST",
"time": {
"unit": "MINUTES",
"size": 10
}
},
"afterMatchStrategy": {
"type": "NO_SKIP",
"patternName": null
},
"type": "COMPOSITE",
"version": 1
}
Example 2: Condition with custom parameters
This example shows how to apply different marketing strategies to customers of different classes during a real-time e-commerce promotional event. For example, you can send marketing text messages to customers of Class A, send coupons to customers of Class B, and take no marketing action for other customers.
With a standard CLASS condition, the class is hardcoded to handle tiers A and B. If you want to add Class C or otherwise adjust the strategy, you must rewrite and recompile the deployment code. To simplify this, use a condition with custom parameters (CustomArgsCondition). After you define how to adjust the strategies based on the passed parameter in the code, you only need to change the value of the args parameter in the database — no code change or recompile required.
The following sample code shows the condition initially defined in the pattern:
"condition": {
"args": [
"A", "B"
],
"className": "org.apache.flink.cep.pattern.conditions.CustomMiddleCondition",
"type": "CLASS"
}
To add tier C to the strategy, update the args array in the database:
"condition": {
"args": [
"A", "B", "C"
],
"className": "org.apache.flink.cep.pattern.conditions.CustomMiddleCondition",
"type": "CLASS"
}
For a complete working example, see Demo.
aviatorscript and Demo are third-party resources. These links may be slow to load or temporarily unavailable.