Realtime Compute for Apache Flink extends standard Apache Flink CEP SQL with support for timeout match output, loose contiguity (followedBy), and contiguity policies between events. For background on standard CEP SQL syntax, see Pattern Recognition.
Limits
-
Engine version vvr-6.0.2-flink-1.15 and later support the extended CEP SQL syntax.
-
Engine version vvr-6.0.5-flink-1.15 and later support group patterns and the
NO SKIPsyntax.
Output timeout matches
By default, when you use a WITHIN clause, partial matches that exceed the time limit are silently discarded. To capture those unfinished sequences, add ONE ROW PER MATCH SHOW TIMEOUT MATCHES to your query.
Consider the following input events:
+----+------+------------------+
| id | type | rowtime |
+----+------+------------------+
| 1 | A | 2022-09-19 12:00 |
| 2 | B | 2022-09-19 12:01 |
| 3 | A | 2022-09-19 12:02 |
| 4 | B | 2022-09-19 12:05 |
+----+------+------------------+
The following query matches the pattern A B within a two-minute window:
SELECT *
FROM MyTable MATCH_RECOGNIZE (
ORDER BY rowtime
MEASURES
A.id AS aid,
B.id AS bid,
A.rowtime AS atime,
B.rowtime AS btime
PATTERN (A B) WITHIN INTERVAL '2' MINUTES
DEFINE
A AS type = 'A',
B AS type = 'B'
) AS T
Without the WITHIN clause, two matches are found: id=1, id=2 and id=3, id=4. With the WITHIN clause, the second match fails because the gap between id=3 and id=4 is three minutes. Only the first match is returned:
+-----+-----+------------------+------------------+
| aid | bid | atime | btime |
+-----+-----+------------------+------------------+
| 1 | 2 | 2022-09-19 12:00 | 2022-09-19 12:01 |
+-----+-----+------------------+------------------+
To include the timed-out sequence in the output, add ONE ROW PER MATCH SHOW TIMEOUT MATCHES:
SELECT *
FROM MyTable MATCH_RECOGNIZE (
ORDER BY rowtime
MEASURES
A.id AS aid,
B.id AS bid,
A.rowtime AS atime,
B.rowtime AS btime
ONE ROW PER MATCH SHOW TIMEOUT MATCHES
PATTERN (A B) WITHIN INTERVAL '2' MINUTES
DEFINE
A AS type = 'A',
B AS type = 'B'
) AS T
The output now includes the unmatched sequence:
+-----+--------+------------------+------------------+
| aid | bid | atime | btime |
+-----+--------+------------------+------------------+
| 1 | 2 | 2022-09-19 12:00 | 2022-09-19 12:01 |
| 3 | <NULL> | 2022-09-19 12:02 | <NULL> |
+-----+--------+------------------+------------------+
Event B withid=4falls outside theWITHINtime limit, so it is excluded from the timeout row. The values ofbidandbtimeareNULL.
Contiguity between events
The open-source open source Flink CEP Java APIopen source Flink CEP Java APIopen source Flink CEP Java APIopen source Flink CEP Java APIFlink CEP Java API defines five contiguity policies. By default, Apache Flink CEP SQL uses strict contiguity: matched events must appear consecutively with no intervening events. Realtime Compute for Apache Flink extends CEP SQL to support all five policies, with SQL expressions fully equivalent to the Java API.
The following table shows how each Java API method maps to SQL syntax, using the input event sequence a1, b1, a2, a3, b2, b3.
All examples in this table use the default AFTER MATCH SKIP TO NEXT ROW policy. For details, see After Match StrategyAfter Match StrategyAfter Match Strategy.
| Java API | SQL | Policy | Match sequence |
|---|---|---|---|
A.next(B) |
(A B) |
Strict contiguity — B must immediately follow A, with no events in between. | {a1 b1} {a3 b2} |
A.followedBy(B) |
(A {- C*? -} B) |
Loose contiguity — non-matching events between A and B are ignored. C is any event not defined in the DEFINE clause. |
{a1 b1} {a2 b2} {a3 b2} |
A.followedByAny(B) |
(A {- C* -} B) |
Non-deterministic loose contiguity — a more relaxed form of loose contiguity that allows additional matches of some matched events to be skipped. C is any event not defined in the DEFINE clause. |
{a1 b1} {a2 b2} {a3 b2} |
A.notNext(B) |
(A [^B]) |
Strict non-contiguity — the event immediately following A must not be B. | {a2} |
A.notFollowedBy(B) |
(A {- C*? -} [^B]) |
Loose non-contiguity — B must not appear anywhere between two other events. When used at the end of a pattern, add a WITHIN clause to define the time boundary. C is any event not defined in the DEFINE clause. |
No match |
ThefollowedByAnyresult in the table is based on the defaultSKIP TO NEXT ROWpolicy. The Flink CEP Java API defaults toNO SKIP. To useNO SKIPin SQL, see The AFTER MATCH NO SKIP policy.
Contiguity and greedy matching in looping patterns
CEP SQL does not support non-deterministic loose contiguity in looping patterns.
The open-source Flink CEP Java API lets you control both contiguity and greediness for looping patterns. By default, Apache Flink CEP SQL uses strict contiguity and greedy matching: for pattern A+, no events other than A can appear between matched A events, and the pattern matches as many A events as possible.
To specify contiguity and greediness, append ? symbols after a quantifier such as *, +, or {3, }.
The following table shows the four combinations, using the event sequence a1, b1, a2, a3, c1 with A AS type = 'a' and C AS type = 'a' OR type = 'c'.
All examples use the default AFTER MATCH SKIP TO NEXT ROW policy. For details, see After Match StrategyAfter Match StrategyAfter Match Strategy.
| Identifier | Contiguity | Greediness | Example pattern | Equivalent Java API | Match sequence |
|---|---|---|---|---|---|
| (none) | Strict | Greedy | A+ C |
A.oneOrMore().greedy().consecutive().next(C) |
{a2 a3 c1} {a3 c1} |
? |
Strict | Non-greedy | A+? C |
A.oneOrMore().consecutive().next(C) |
{a2 a3} {a3 c1} |
?? |
Loose | Greedy | A+?? C |
A.oneOrMore().greedy().next(C) |
{a1 a2 a3 c1} {a2 a3 c1} {a3 c1} |
??? |
Loose | Non-greedy | A+??? C |
A.oneOrMore().next(C) |
{a1 a2 a3} {a2 a3} {a3 c1} |
Specify a stop condition for looping patterns
The open-source Flink CEP Java API provides until(condition) to stop a looping pattern early. When the current event satisfies the until condition, the loop ends immediately and matching continues from that event onward.
In Realtime Compute for Apache Flink SQL jobs, express the same semantics with {CONDITION} placed after a quantifier such as +, *, or {3, }.
The following table shows how the stop condition affects matching, using the event sequence a1, d1, a2, b1, a3, c1 and the conditions A AS A.type = 'a' OR A.type = 'b', B AS B.type = 'b', C AS C.type = 'c'.
All examples use the AFTER MATCH SKIP TO NEXT ROW policy. For details, see After Match StrategyAfter Match StrategyAfter Match Strategy.
| Pattern | Equivalent Java API | Match sequence | Description |
|---|---|---|---|
A+ C |
A.oneOrMore().consecutive().greedy().next(C) |
a2 b1 a3 c1 b1 a3 c1 a3 c1 |
Strict contiguity within A and between A and C. Because d1 sits between a1 and a2, the match cannot start from a1. |
A+{B} C |
A.oneOrMore().consecutive().greedy().until(B).next(C) |
a3 c1 |
The loop starting at a2 must end at b1, so strict contiguity with c1 cannot be met. |
A+{B} {- X*? -} C |
A.oneOrMore().consecutive().greedy().until(B).followedBy(C) |
a2 c1 a3 c1 |
Loose contiguity between A and C. The loop from a2 ends at b1, then skips b1 and a3 to match c1. |
A+??{B} {- X*? -} C |
A.oneOrMore().greedy().until(B).followedBy(C) |
a1 a2 c1 a2 c1 a3 c1 |
Loose contiguity within A. The pattern skips d1, ends the loop at b1, and matches a1 and a2. |
Group patterns
The open-source Flink CEP Java API supports group patterns: you can combine multiple patterns into a group and loop the entire group using next(), followedBy(), or followedByAny(). In Realtime Compute for Apache Flink SQL jobs, use (...) to define a group pattern and apply quantifiers such as +, *, and {3, }.
Define a group pattern
In PATTERN (A (B C*)+? D), (B C*) is a group pattern. The + quantifier loops the group one or more times, and ? makes the match non-greedy. The equivalent Java API code is:
Pattern.<String>begin("A").where(...)
.next(
Pattern.<String>begin("B").where(...)
.next("C").where(...).oneOrMore().optional().greedy().consecutive())
.oneOrMore().consecutive()
.next("D").where(...)
Extract data from group pattern matches
Use the MEASURES clause to extract specific events from a group match. For a group pattern that matches b1, b2 c1, and b3 c2 c3, retrieve events by position within the group:
-
FIRST(B.id)— retrievesbfrom the first matched group -
FIRST(B.id, 1)— retrievesbfrom the second group -
FIRST(B.id, 2)— retrievesbfrom the third group
The output is b1, b2, b3.
SELECT *
FROM MyTable MATCH_RECOGNIZE (
ORDER BY rowtime
MEASURES
FIRST(B.id) AS b1_id,
FIRST(B.id,1) AS b2_id,
FIRST(B.id,2) AS b3_id
PATTERN (A (B C*)+? D)
DEFINE
A AS type = 'A',
B AS type = 'B',
C AS type = 'C',
D AS type = 'D'
) AS T
Contiguity between a group pattern and its preceding pattern
A contiguity declaration between a group pattern and its preceding pattern applies to the first pattern within the group, not the entire group. For example, in PATTERN (A {- X*? -} (B C)), the followedBy relationship applies between A and B — not between A and the entire (B C) group.
For the sequence a1 b1 d1 b2 c1, this pattern produces no output. After b1 appears, the engine enters the group and tries to match C. Because d1 does not match C, the sequence fails.
Limitations
-
Greedy matching is not supported for looping group patterns. For example,
PATTERN ((A B)+)is invalid. -
Group patterns cannot be used in
untilornotNextsyntax. For example,PATTERN (A+{(B C)})andPATTERN (A [^(B C)])are invalid. -
The first pattern in a group cannot be optional. For example,
PATTERN (A (B? C))is invalid.
The AFTER MATCH NO SKIP policy
In the Flink CEP Java API, the default AFTER MATCH policy is NO_SKIP. In CEP SQL, the default is SKIP TO NEXT ROW. Realtime Compute for Apache Flink extends the standard SQL AFTER MATCH statement with AFTER MATCH NO SKIP, which declares the NO_SKIP policy: when a sequence match completes, other matching procedures that have already started are not stopped or discarded.
A common use case combines AFTER MATCH NO SKIP with followedByAny to capture all overlapping matches. Consider the sequence a1 b1 b2 b3 c1 and the pattern PATTERN (A {- X* -} B {- Y*? -} C), equivalent to Pattern.begin("A").followedByAny("B").followedBy("C").
| Policy | Result |
|---|---|
AFTER MATCH SKIP TO NEXT ROW (default) |
a1 b1 c1 — after a1 b1 c1 is matched, all other sequences starting with a1 are discarded. |
AFTER MATCH NO SKIP |
a1 b1 c1, a1 b2 c1, a1 b3 c1 — all overlapping matches are retained. |