This topic describes the rules that are defined in the JSON format in dynamic Flink complex event progressing (CEP).
Intended audience
Risk control platform developers: The platform developers who are familiar with dynamic Flink CEP can learn the format that is described in this topic and determine whether further encapsulation is required based on the platform requirements.
Risk control strategy personnel: The personnel who are familiar with only specific risk control strategies but do not have Java development experience can learn and use this format to write new rules based on CEP concepts and apply the rules in online risk control deployments.
JSON format definition
A pattern in an event sequence can be viewed as a graph. Each node in the graph represents a pattern for specific events. An edge between nodes represents an event selection strategy. This strategy defines the transition from one matched pattern to the next. Each graph can also be a child node of a larger graph, which allows for nested patterns. Based on this concept, Real-time Compute for Apache Flink provides JSON specifications to describe CEP rules. This approach simplifies the storage and modification of rules. The following sections describe the fields in these specifications.
Node definition
A node represents a complete pattern and contains the following properties.
Field
Description
Type
Required
Notes
name
The name of the pattern.
string
Yes
A unique string.
NoteNode names must be unique.
type
The type of the node.
enum(string)
Yes
COMPOSITE for a node with a child pattern.
ATOMIC for a node without a child pattern.
quantifier
Describes how to match the pattern, such as matching only once.
dict
Yes
For more information, see the Quantifier definition section.
condition
The condition.
dict
No
For more information, see the Condition definition section.
Quantifier definition
A quantifier describes how to match events that meet the pattern. For example, for the pattern
"A*", the `properties` field of the quantifier is LOOPING, and the event selection strategy within the pattern is SKIP_TILL_ANY.Field
Description
Type
Required
Notes
consumingStrategy
The event selection strategy.
enum(string)
Yes
Valid values:
STRICT
SKIP_TILL_NEXT
SKIP_TILL_ANY
For more information about the values, see the Contiguity definition section.
times
The number of times the pattern must be matched.
dict
No
Example:
"times": { "from": 3, "to": 3, "windowTime": { "unit": "MINUTES", "size": 12 } },The `from` and `to` fields are integers. The `unit` for `windowTime` can be DAYS, HOURS, MINUTES, SECONDS, or MILLISECONDS.
Note`windowTime` can be set to null:
"windowTime": null.properties
The properties of the quantifier.
array of enumString
Yes
For more information about the values, see the Quantifier property meanings section.
untilCondition
The stop condition.
NoteThis can only be used after a pattern that has a LOOPING quantifier.
dict
No
For more information about the values, see the Condition definition section.
Condition definition
Conditions are used to filter events that meet specific requirements. For example, if you want to filter for customers who browsed for more than 5 minutes, "browsing for more than 5 minutes" is the condition.
Field
Description
Type
Required
Notes
type
The type of the condition.
enum(string)
Yes
Valid values for the condition type:
CLASS: A user-defined condition.
AVIATOR: A condition based on an AVIATOR expression.
GROOVY: A condition based on a GROOVY expression.
...
Other custom fields that can be serialized.
No
...
The following types of Condition are supported:
Class type Condition
Field Name
Description
Type
Required
Notes
type
The type of the condition.
enum(string)
Yes
The value is fixed to Class.
className
The name of the class.
string
Yes
The full name of the class, such as
com.alibaba.ververica.cep.demo.StartCondition.Condition with custom parameters
With a normal Class type condition, you can only pass the class name (`className`) and cannot pass parameters dynamically. To create more expressive conditions, dynamic CEP supports conditions with custom parameters (`CustomArgsCondition`). This lets you set the required parameters for `CustomArgsCondition` as a string array in the JSON format. You can then dynamically construct `CustomArgsCondition` instances. This feature lets you dynamically update condition parameters without changing and recompiling the Java code.
Field
Description
Type
Required
Notes
type
The type of the condition.
enum(string)
Yes
The value is fixed to Class.
className
The name of the class.
string
Yes
The full name of the class, such as
com.alibaba.ververica.cep.demo.CustomMiddleCondition.args
The custom parameters.
array of string
Yes
An array of strings.
Condition based on an Aviator expression
Aviator is an expression evaluation engine that dynamically compiles expressions into bytecode. For more information, see aviatorscript. You can use a condition based on an Aviator expression in a job. This lets you dynamically change the condition's threshold without modifying, recompiling, and rerunning the Java code.
Field
Description
Type
Required
Notes
type
The name of the class.
string
Yes
The value is fixed to AVIATOR.
expression
The expression string.
string
Yes
An expression string, such as price > 10. The price variable comes from a field defined in the Java code.
You can change the value of this string in the database. For example, change it to price > 20. The Flink CEP job then dynamically loads price > 20 and creates a new AviatorCondition to process later events.
Condition based on a Groovy expression
Groovy is a dynamically-typed language for the Java Virtual Machine (JVM). For more information about Groovy syntax, see syntax. Dynamic CEP supports Groovy expressions to define conditions. This lets you dynamically change the condition's threshold.
Field
Description
Type
Required
Notes
type
The name of the class.
string
Yes
The value is fixed to GROOVY.
expression
The expression string.
string
Yes
An expression string, such as price > 5.0 && name.contains("mid"). Variables like price and name come from fields defined in the Java code. You can change the value of this string in the database. For example, change it to price > 20 && name.contains("end"). The Flink CEP job then dynamically loads the new Groovy string and creates a new GroovyCondition to process later events.
Edge definition
Field
Description
Type
Required
Notes
source
The name of the source pattern.
string
Yes
None.
target
The name of the target pattern.
string
Yes
None.
type
The event selection strategy.
dict
Yes
Valid values:
STRICT
SKIP_TILL_NEXT
SKIP_TILL_ANY
NOT_FOLLOW
NOT_NEXT
For more information about the values, see the Contiguity definition section.
GraphNode extends Node definition
A `GraphNode` represents a complete pattern sequence. Its `nodes` are individual patterns, and its `edges` define the transitions from one matched pattern to the next.
To support nested patterns (`GroupPattern`), a `GraphNode` is treated as a child class of `Node`. This means a `GraphNode` can function as a `Node` within a larger `GraphNode`. Compared with a basic `Node`, a `GraphNode` has two additional types of fields:
`nodes` and `edges` fields to describe the graph structure.
`window` and `afterMatchSkipStrategy` fields to describe the time window policy and the after-match skip strategy within the graph.
The following table describes the fields of a `GraphNode`.
Field
Description
Type
Required
Notes
name
The name of the composite pattern.
String
Yes
A unique string.
NoteGraph names must be unique.
type
The type of the node.
enum(string)
Yes
The value is fixed to COMPOSITE.
version
The version of the JSON format that the graph uses.
Int
Yes
The default value is 1.
nodes
The child patterns nested within the pattern.
array of Node
Yes
A non-empty array.
edges
The connections between nested child patterns.
array of Edge
Yes
An array that can be empty.
window
If `type` is `FIRST_AND_LAST`, this is the maximum time between the start and end of a complete match for the composite pattern.
If `type` is `PREVIOUS_AND_CURRENT`, this is the maximum time between matches of two adjacent child patterns.
dict
No
Example:
"window": { "type": "FIRST_AND_LAST", "time": { "unit": "DAYS", "size": 1 } }The unit can be DAYS, HOURS, MINUTES, SECONDS, or MILLISECONDS. The data type is Long or Integer.
afterMatchSkipStrategy
The skip strategy to use after all events in the graph are matched.
dict
Yes
For more information, see the After-match skip strategy (AfterMatchSkipStrategy) definition section.
quantifier
Describes how to match the pattern, such as matching only once.
dict
Yes
For more information, see the Quantifier definition section.
After-match skip strategy (AfterMatchSkipStrategy) definition
Field
Description
Type
Required
Notes
type
The type of the strategy.
enum(string)
Yes
Valid values:
NO_SKIP (default): Every successful match is emitted.
SKIP_TO_NEXT: Discards every partial match that started with the same event.
SKIP_PAST_LAST_EVENT: Discards every partial match that started between the beginning and the end of this match.
SKIP_TO_FIRST: Discards every partial match that started between the beginning of this match and the first occurrence of the event named PatternName.
SKIP_TO_LAST: Discards every partial match that started between the beginning of this match and the last occurrence of the event named PatternName.
For more information, see After Match Skip Strategy.
patternName
The name of the pattern to which the strategy applies.
string
No
A unique string.
Contiguity definition
Physical value
Meaning
STRICT
Strict contiguity. No unmatched events can appear between matched events.
SKIP_TILL_NEXT
Relaxed contiguity. Unmatched events can appear between matched events. The unmatched events are ignored.
SKIP_TILL_ANY
Non-deterministic relaxed contiguity. A more relaxed contiguity that lets you ignore additional matches for some matched events.
NOT_NEXT
The immediately following event cannot be a specific event.
NOT_FOLLOW
A specific event does not appear later.
For more information, see FlinkCEP - Complex event processing for Flink.
Quantifier property meanings
Value
Meaning
SINGLE
The pattern appears only once.
LOOPING
The pattern is a looping pattern. It can appear multiple times, similar to `*` and `+` in regular expressions.
TIMES
The pattern appears a specified number of times.
GREEDY
When matching this pattern, a greedy matching strategy is used to find the longest possible match.
OPTIONAL
The pattern is optional.
Example 1: Use a common pattern
This example describes how to use dynamic Flink CEP to adjust marketing strategies for the customers that meet the following conditions in a 10-minute time window during a real-time e-commerce promotional event:
Obtained coupons for a venue.
Added items to their shopping carts for more than three times.
Did not complete the payments.
In the following sample code, the condition for obtaining coupons for a venue is defined as StartCondition, the condition for adding items to the shopping cart is defined as MiddleCondition, and the condition related to payment completion is defined as EndCondition. The following pattern is abstracted: In a 10-minute time window, an event that meets StartCondition occurs once, an event that meets MiddleCondition occurs three or more times, and no event that meets EndCondition occurs. The event that meets StartCondition is optional. The following sample code shows the Java code that describes the pattern in this example.
Pattern<Event, Event> pattern =
Pattern.<Event>begin("start")
.where(new StartCondition())
.optional()
.followedBy("middle")
.where(new MiddleCondition())
.timesOrMore(3)
.notFollowedBy("end")
.where(new EndCondition())
.within(Time.minutes(10));The following sample code shows the JSON-formatted code that describes the pattern in this example.
{
"name": "end",
"quantifier": {
"consumingStrategy": "SKIP_TILL_NEXT",
"properties": [
"SINGLE"
],
"times": null,
"untilCondition": null
},
"condition": null,
"nodes": [
{
"name": "end",
"quantifier": {
"consumingStrategy": "SKIP_TILL_NEXT",
"properties": [
"SINGLE"
],
"times": null,
"untilCondition": null
},
"condition": {
"className": "com.alibaba.ververica.cep.demo.condition.EndCondition",
"type": "CLASS"
},
"type": "ATOMIC"
},
{
"name": "middle",
"quantifier": {
"consumingStrategy": "SKIP_TILL_NEXT",
"properties": [
"LOOPING"
],
"times": {
"from": 3,
"to": 3,
"windowTime": null
},
"untilCondition": null
},
"condition": {
"className": "com.alibaba.ververica.cep.demo.condition.MiddleCondition",
"type": "CLASS"
},
"type": "ATOMIC"
},
{
"name": "start",
"quantifier": {
"consumingStrategy": "SKIP_TILL_NEXT",
"properties": [
"SINGLE",
"OPTIONAL"
],
"times": null,
"untilCondition": null
},
"condition": {
"className": "com.alibaba.ververica.cep.demo.condition.StartCondition",
"type": "CLASS"
},
"type": "ATOMIC"
}
],
"edges": [
{
"source": "middle",
"target": "end",
"type": "NOT_FOLLOW"
},
{
"source": "start",
"target": "middle",
"type": "SKIP_TILL_NEXT"
}
],
"window": {
"type": "FIRST_AND_LAST",
"time": {
"unit": "MINUTES",
"size": 10
}
},
"afterMatchStrategy": {
"type": "NO_SKIP",
"patternName": null
},
"type": "COMPOSITE",
"version": 1
}Example 2: Use the condition that includes a custom parameter in a pattern
This example describes how to specify different marketing strategies for customers of different classes during a real-time e-commerce promotional event. For example, you can specify a marketing strategy that sends marketing-related text messages to customers of Class A, a marketing strategy that sends coupons to customers of Class B, and a marketing strategy that does not take marketing actions for other customers. You can define a condition of the common class in your deployment to meet the preceding requirements. If you want to adjust marketing strategies when a condition of the common class is used in your deployment, you must rewrite the deployment code and recompile and run the deployment. For example, you can modify a marketing strategy that sends coupons to customers of Class C. To simplify the operation, you can use the condition that includes a custom parameter. After you define how to adjust the strategies based on the passed parameter in the code, you need to only change the value of the passed parameter in the database. The value of the passed parameter is the value of the args parameter of the condition that includes a custom parameter. For example, you can change ["A", "B"] to ["A", "B", "C"] to perform dynamic updates of marketing strategies.
The following sample code shows the condition that is initially defined in the pattern.
"condition": {
"args": [
"A", "B"
],
"className": "org.apache.flink.cep.pattern.conditions.CustomMiddleCondition",
"type": "CLASS"
}You can change the preceding condition to the condition that includes a custom parameter. The following sample code shows the condition that includes a custom parameter.
"condition": {
"args": [
"A", "B", "C"
],
"className": "org.apache.flink.cep.pattern.conditions.CustomMiddleCondition",
"type": "CLASS"
}For more information about how to use the condition that includes a custom parameter in specific business scenarios, see Demo.
aviatorscript and Demo that are mentioned in this topic are from third-party websites. When you visit the websites, the websites may fail to open or the access may be delayed.