All Products
Search
Document Center

Realtime Compute for Apache Flink:CEP statements

Last Updated:May 19, 2023

This topic describes the complex event processing (CEP) statements supported by fully managed Flink of Realtime Compute for Apache Flink.

Background information

Compared with the CEP SQL of Apache Flink, the CEP statements of Realtime Compute for Apache Flink provide enhanced capabilities, such as output of matching events that do not arrive within a specified time interval, relaxed contiguity by using followedBy(), and the configuration of the contiguity pattern between events. For more information about the basic capabilities of Apache Flink CEP SQL, see Pattern Recognition.

Limits

  • Only Realtime Compute for Apache Flink whose engine version is vvr-6.0.2-flink-1.15 or later supports the extended CEP SQL syntax.

  • Only Realtime Compute for Apache Flink whose engine version is vvr-6.0.5-flink-1.15 or later supports group patterns and the AFTER MATCH NO SKIP syntax.

Output of matching events that do not arrive within a specified time interval

The following example shows a sequence of input events.

+----+------+------------------+
| id | type |          rowtime |
+----+------+------------------+
|  1 |    A | 2022-09-19 12:00 |
|  2 |    B | 2022-09-19 12:01 |
|  3 |    A | 2022-09-19 12:02 |
|  4 |    B | 2022-09-19 12:05 |
+----+------+------------------+

If you want to specify that the time interval between events in Pattern A B is within 2 minutes, you can add WITHIN INTERVAL '2' MINUTES after the PATTERN statement. Sample statement:

SELECT *
FROM MyTable MATCH_RECOGNIZE (
  ORDER BY rowtime
  MEASURES
    A.id AS aid,
    B.id AS bid,
    A.rowtime AS atime,
    B.rowtime AS btime
  PATTERN (A B) WITHIN INTERVAL '2' MINUTES
  DEFINE
    A AS type = 'A',
    B AS type = 'B'
) AS T

If the WITHIN clause is not added, two matching sequences id=1, id=2 and id=3, id=4 are obtained. After the WITHIN clause is added, only the first matching sequence is obtained. This is because the time interval between Event A and Event B in the second matching sequence is 3 minutes. This time interval is longer than 2 minutes, which is specified in the WITHIN clause. The following output is returned:

+-----+-----+------------------+------------------+
| aid | bid |            atime |            btime |
+-----+-----+------------------+------------------+
|   1 |   2 | 2022-09-19 12:00 | 2022-09-19 12:01 |
+-----+-----+------------------+------------------+

After the WITHIN clause is added, the matching event sequences in which events do not arrive within the time interval that is specified in the WITHIN clause are considered the event sequences that fail to match and then are discarded. If you want to obtain the matching event sequences in which events do not arrive within a specified time interval, you can use the ONE ROW PER MATCH SHOW TIMEOUT MATCHES statement. Sample statement:

SELECT *
FROM MyTable MATCH_RECOGNIZE (
  ORDER BY rowtime
  MEASURES
    A.id AS aid,
    B.id AS bid,
    A.rowtime AS atime,
    B.rowtime AS btime
  ONE ROW PER MATCH SHOW TIMEOUT MATCHES
  PATTERN (A B) WITHIN INTERVAL '2' MINUTES
  DEFINE
    A AS type = 'A',
    B AS type = 'B'
) AS T

The following output is returned. The output contains the matching event sequence in which events do not arrive within the specified time interval.

+-----+--------+------------------+------------------+
| aid |    bid |            atime |            btime |
+-----+--------+------------------+------------------+
|   1 |      2 | 2022-09-19 12:00 | 2022-09-19 12:01 |
|   3 | <NULL> | 2022-09-19 12:00 |           <NULL> |
+-----+--------+------------------+------------------+
Note

The time when Event B with the ID of 4 arrives exceeds the time interval that is specified in the WITHIN clause. This event is not included in the matching sequences. Therefore, both bid and btime of this event are NULL.

Contiguity patterns between events

The CEP Java API of Apache Flink supports the following contiguity patterns between events: strict contiguity by using next(), relaxed contiguity by using followedBy(), non-deterministic relaxed contiguity by using followedByAny(), strict non-contiguity by using notNext(), and relaxed non-contiguity by using notFollowedBy().

By default, the CEP SQL of Apache Flink uses strict contiguity. In this contiguity pattern, all matching events must appear strictly one after the other, and non-matching events cannot appear in between. In the preceding example, Pattern (A B) specifies that Event A and Event B must appear strictly one after the other. Realtime Compute for Apache Flink extends this capability to support the expression capability that is fully equivalent to the Java API.

For example, the input event sequence is a1, b1, a2, a3, b2, b3. The following table describes the matching sequences in different patterns.

Note

During the matching process, the AFTER MATCH SKIP clause uses the SKIP TO NEXT ROW strategy. For more information about the strategies of the AFTER MATCH SKIP clause, see After Match Strategy.

Java API

SQL

Strategy

Matching sequence

A.next(B)

(A B)

Strict contiguity: expects all matching events to strictly appear one after the other, and non-matching events cannot appear in between.

{a1 b1}
{a3 b2}

A.followedBy(B)

(A {- C*? -} B)

C is an undefined character in the DEFINE clause and is used to indicate any match.

Relaxed contiguity: ignores non-matching events that appear in between the matching events.

{a1 b1}
{a2 b2}
{a3 b2}

A.followedByAny(B)

(A {- C* -} B)

C is an undefined character in the DEFINE clause and is used to indicate any match.

Non-deterministic relaxed contiguity: further relaxes contiguity and allows additional matches that ignore specific matching events.

{a1 b1}
{a2 b2}
{a3 b2}
Note

The matching sequences in this example are obtained by using the SKIP TO NEXT ROW strategy, which is the default AFTER MATCH strategy of the CEP SQL. The default AFTER MATCH strategy of the CEP Java API of Apache Flink is NO SKIP. For more information about how to use the AFTER MATCH NO SKIP strategy, see AFTER MATCH NO SKIP strategy.

A.notNext(B)

(A [^B])

Strict non-contiguity: expects that no matching event appears after a matching event.

{a2}

A.notFollowedBy(B)

(A {- C*? -} [^B]

C is an undefined character in the DEFINE clause and is used to indicate any match.

Note

If you want to use notFollowedBy() at the end of a pattern, you must add the WITHIN clause to the pattern.

Relaxed non-contiguity: expects that a matching event does not appear between two matching events. When you use this syntax together with the WITHIN clause at the end of the pattern, no matching event of a specific type appears within a specific period of time.

No matching

Contiguity and greedy matching within looping patterns

Important

The CEP SQL does not support non-deterministic relaxed contiguity in looping patterns.

The CEP Java API of Apache Flink allows you to specify the contiguity and greedy matching strategies within looping patterns. By default, the CEP SQL of Apache Flink uses strict contiguity and greedy matching. For example, in the A+ pattern, no other events are allowed between multiple Events A, and Events A are matched as many as possible. You can add one or more question marks (?) after the looping quantifier, such as *, +, or {3, }, to specify the contiguity and greedy matching strategies.

For example, the input event sequence is a1, b1, a2, a3, c1, and the condition is A AS type = 'a', C AS type = 'a' or type = 'c'. The following table describes the matching sequences in different matching patterns.

Note

During the matching process, the AFTER MATCH SKIP clause uses the SKIP TO NEXT ROW strategy. For more information about the strategies of the AFTER MATCH SKIP clause, see After Match Strategy.

Identifier

Continuity

Greedy matching strategy

Sample pattern

Equivalent semantics

Matching sequence

None

Strict contiguity

Greedy

A+ C

A.oneOrMore().greedy().consecutive().next(C)

{a2 a3 c1}
{a3 c1}

?

Strict contiguity

Non-greedy

A+? C

A.oneOrMore().consecutive().next(C)

{a2 a3}
{a3 c1}

??

Relaxed contiguity

Greedy

A+?? C

A.oneOrMore().greedy().next(C)

{a1 a2 a3 c1}
{a2 a3 c1}
{a3 c1}

???

Relaxed contiguity

Non-greedy

A+??? C

A.oneOrMore().next(C)

{a1 a2 a3}
{a2 a3}
{a3 c1}

until(condition) within looping patterns

The CEP Java API of Apache Flink allows you to use the until(condition) function to specify an until condition for a looping pattern. If the current event in a looping pattern meets the condition that is specified by the until(condition) function, the matching of the current looping pattern is immediately terminated and the matching of the subsequent pattern starts from the current event. In SQL deployments of Alibaba Cloud Realtime Compute for Apache Flink, you can append the { CONDITION } syntax to a looping quantifier, such as +, *, and {3, }, to express the until semantics.

For example, the input event sequence is a1, d1, a2, b1, a3, c1, and the condition is DEFINE A AS A.type = 'a' OR A.type = 'b', B AS B.type = 'b', C AS C.type = 'c'. The following table describes the matching sequences in different patterns.

Note

During the matching process, the AFTER MATCH SKIP clause uses the SKIP TO NEXT ROW strategy. For more information about the strategies of the AFTER MATCH SKIP clause, see After Match Strategy.

Pattern

Equivalent semantics

Matching sequence

Description

A+ C

A.oneOrMore().consecutive().greedy().next(C)

a2 b1 a3 c1
b1 a3 c1
a3 c1

Events that start with a or b can match Looping Pattern A, and strict contiguity is applied between the events in Looping Pattern A and patterns A and C. d1 exists between a1 and a2 in the input event sequence. As a result, the match cannot start with a1.

A+{B} C

A.oneOrMore().consecutive().greedy().until(B).next(C)

a3 c1

The until(B) condition is added to Looping Pattern A and strict contiguity is still applied between patterns A and C. The looping pattern that starts from a2 ends at b1. Therefore, a2 and c1 do not meet the strict continuity requirements.

A+{B} {- X*? -} C

A.oneOrMore().consecutive().greedy().until(B).followedBy(C)

a2 c1
a3 c1

Relaxed contiguity is applied between patterns A and C. The looping pattern that starts from a2 ends at b1 and skips b1 and a3 to match c1.

A+??{B} {- X*? -} C

A.oneOrMore().greedy().until(B).followedBy(C)

a1 a2 c1
a2 c1
a3 c1

Relaxed contiguity is applied to the events in Looping Pattern A. The pattern skips d1 and ends at b1 to match a1 and a2.

Group pattern

The CEP Java API of Apache Flink supports group patterns. In a group pattern, multiple patterns are combined and used in the next(), followedBy(), or followedByAny() function. A group pattern can be looped as a whole. In SQL deployments of Alibaba Cloud Realtime Compute for Apache Flink, you can use the (...) syntax in the SQL standard to define a group pattern. Looping quantifiers, such as +, *, and {3, }, can be used.

For example, in the PATTERN (A (B C*)+? D) pattern, (B C*) is a group pattern and the group pattern is declared to appear more than once. The question mark (?) indicates the non-greedy matching strategy. Sample Java code:

Pattern.<String>begin("A").where(...)
  .next(
  	Pattern.<String>begin("B").where(...)
  		.next("C").where(...).oneOrMore().optional().greedy().consecutive())
  .oneOrMore().consecutive()
  .next("D").where(...)

If the matching is successful, the variables in MEASURES correspond to all events that match the group pattern. For example, the input event sequence is a1 b1 b2 c1 b3 c2 c3 d1, the group pattern (B C*) is matched three times, and the matching sequences are b1, b2 c1, and b3 c2 c3 each time the group pattern is matched. The output of the SQL statement in the following sample code is a1 b1 b2 c1 c3 d1.

SELECT *
FROM MyTable MATCH_RECOGNIZE (
  ORDER BY rowtime
  MEASURES
    A.id AS aid,
    FIRST(B.id) AS b1_id,
    FIRST(B.id, 1) AS b2_id,
    FIRST(C.id) AS c1_id,
    LAST(C.id) AS c3_id,
    D.id AS did
  PATTERN (A (B C*)+? D)
  DEFINE
    A AS type = 'A',
    B AS type = 'B',
    C AS type = 'C',
    D AS type = 'D'
) AS T

Take note that the declared continuity between a group pattern and its preceding pattern is applied to the first pattern in the group pattern rather than the entire group pattern. For example, in the PATTERN (A {- X*? -} (B C)) pattern, followedBy is used between pattern A and the group pattern (B C). This declares the followedBy continuity between pattern A and pattern B. In this case, several events that do not match pattern B can exist between pattern A and the group pattern (B C), but events that do not match the group pattern (B C) cannot exist. If no output is generated for the PATTERN (A {- X*? -} (B C)) pattern based on the input event sequence a1 b1 d1 b2 c1, this is because the matching process immediately enters the group pattern (B C) after b1 appears, and d1 fails to match pattern C. As a result, the sequence matching fails.

Important
  • Looping group patterns, such as the PATTERN ((A B)+) pattern, do not support greedy matching.

  • Group patterns, such as the PATTERN (A+{(B C)}) and PATTERN (A [^(B C)]) patterns, cannot be used in the until or notNext syntax.

  • The first pattern in a group pattern, such as the PATTERN (A (B? C)) pattern, cannot be declared as optional.

AFTER MATCH NO SKIP strategy

In the CEP Java API of Apache Flink, the default AFTER MATCH strategy is NO_SKIP. In the CEP SQL of Apache Flink, the default AFTER MATCH strategy is SKIP_TO_NEXT_ROW. Alibaba Cloud Realtime Compute for Apache Flink extends the AFTER MATCH clause in the SQL standard. You can use the AFTER MATCH NO SKIP clause to declare the NO_SKIP strategy. When the NO_SKIP strategy is used, the existing matching processes are not terminated or discarded when the matching of a sequence is complete.

In most cases, the NO_SKIP strategy is used together with followedByAny to skip specific matching events for relaxed continuity. For example, if the input event sequence is a1 b1 b2 b3 c1, the output for the PATTERN (A {- X* -} B {- Y*? -} C) pattern is a1 b1 c1 when the default strategy AFTER MATCH SKIP TO NEXT ROW is used. The PATTERN (A {- X* -} B {- Y*? -} C) pattern is equivalent to Pattern.begin("A").followedByAny("B").followedBy("C"). This is because all sequences that start with a1 are discarded when the matching for a1 b1 c1 is complete. However, if you use AFTER MATCH NO SKIP, all matching sequences can be obtained. In this case, a1 b1 c1, a1 b2 c1, and a1 b3 c1 are returned.