すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:複雑なイベント処理 (CEP)

最終更新日:Mar 10, 2026

このトピックでは、Realtime Compute for Apache Flink で利用可能な複雑なイベント処理 (CEP) ステートメントについて説明します。

背景情報

Realtime Compute for Apache Flink は、Apache Flink の標準 CEP SQL を強化します。これらの強化には、タイムアウトマッチの出力、緩やかな連続性 (followedBy) のサポート、およびイベント間の連続性の指定が含まれます。Apache Flink CEP SQL の基本機能の詳細については、「パターン認識」をご参照ください。

制限事項

  • リアルタイム計算エンジン バージョン vvr-6.0.2-flink-1.15 以降のみが、拡張 CEP SQL 構文をサポートしています。

  • リアルタイム計算エンジン バージョン vvr-6.0.5-flink-1.15 以降のみが、グループパターンと NO SKIP 構文をサポートしています。

タイムアウトマッチの出力

次の入力イベントシーケンスを検討します。

+----+------+------------------+
| id | type |          rowtime |
+----+------+------------------+
|  1 |    A | 2022-09-19 12:00 |
|  2 |    B | 2022-09-19 12:01 |
|  3 |    A | 2022-09-19 12:02 |
|  4 |    B | 2022-09-19 12:05 |
+----+------+------------------+

パターン A B の場合、`PATTERN` ステートメントの後に WITHIN INTERVAL '2' MINUTES を宣言することで、パターン全体の一致の時間範囲を 2 分に制限できます。次のコードは例を示しています。

SELECT *
FROM MyTable MATCH_RECOGNIZE (
  ORDER BY rowtime
  MEASURES
    A.id AS aid,
    B.id AS bid,
    A.rowtime AS atime,
    B.rowtime AS btime
  PATTERN (A B) WITHIN INTERVAL '2' MINUTES
  DEFINE
    A AS type = 'A',
    B AS type = 'B'
) AS T

`WITHIN` 句がない場合、`id=1, id=2` と `id=3, id=4` の 2 つの一致が見つかります。`WITHIN` 句を追加すると、2 番目の一致におけるイベント A とイベント B の間の時間間隔が 3 分となり、2 分の制限を超えます。したがって、SQL ステートメントは、次の出力に示すように、最初の一致のみを返します。

+-----+-----+------------------+------------------+
| aid | bid |            atime |            btime |
+-----+-----+------------------+------------------+
|   1 |   2 | 2022-09-19 12:00 | 2022-09-19 12:01 |
+-----+-----+------------------+------------------+

`WITHIN` ステートメントを定義すると、時間制限を超える部分一致は破棄されます。時間制限内に完全に一致しないシーケンスであるこれらのタイムアウトシーケンスを出力するには、ONE ROW PER MATCH SHOW TIMEOUT MATCHES ステートメントを使用できます。たとえば、次の SQL ステートメントはタイムアウトシーケンスを出力します。

SELECT *
FROM MyTable MATCH_RECOGNIZE (
  ORDER BY rowtime
  MEASURES
    A.id AS aid,
    B.id AS bid,
    A.rowtime AS atime,
    B.rowtime AS btime
  ONE ROW PER MATCH SHOW TIMEOUT MATCHES
  PATTERN (A B) WITHIN INTERVAL '2' MINUTES
  DEFINE
    A AS type = 'A',
    B AS type = 'B'
) AS T

このステートメントは、不一致シーケンスを出力します。出力は次のとおりです。

+-----+--------+------------------+------------------+
| aid |    bid |            atime |            btime |
+-----+--------+------------------+------------------+
|   1 |      2 | 2022-09-19 12:00 | 2022-09-19 12:01 |
|   3 | <NULL> | 2022-09-19 12:00 |           <NULL> |
+-----+--------+------------------+------------------+
説明

id=4 のイベント B は `WITHIN` 時間制限を超えているため、一致シーケンスには含まれません。したがって、`bid` と `btime` の値は `NULL` です。

イベント間の連続性

オープンソース Flink CEP Java API を使用すると、イベント間の連続性ポリシーを定義できます。これらのポリシーには、厳密な連続性 (next())、緩やかな連続性 (followedBy())、非決定的な緩やかな連続性 (followedByAny())、厳密な非連続性 (notNext())、および緩やかな非連続性 (notFollowedBy()) が含まれます。

デフォルトでは、Flink CEP SQL は厳密な連続性ポリシーを使用します。このポリシーでは、パターンに一致するイベントが、間に他のイベントを挟まずに連続して出現する必要があります。たとえば、パターン `(A B)` は、イベント A の直後にイベント B が続くことを指定します。Realtime Compute for Apache Flink は、この機能を拡張して、Java API と完全に同等の表現機能を提供します。

たとえば、入力イベントシーケンス a1, b1, a2, a3, b2, b3 の場合、次の表は異なるパターンの一致シーケンスを示しています。

説明

マッチングプロセス中、`AFTER MATCH` 句は `SKIP TO NEXT ROW` ポリシーを使用します。詳細については、「After Match Strategy」をご参照ください。

Java API

SQL

ポリシー

一致シーケンス

A.next(B)

(A B)

厳密な連続性: 一致するすべてのイベントが、間に不一致イベントを挟まずに厳密に連続して出現することを期待します。

{a1 b1}
{a3 b2}

A.followedBy(B)

(A {- C*? -} B)

C は `DEFINE` 句で未定義のシンボルであり、任意の一致を示します。

緩やかな連続性: 一致するイベント間に不一致イベントを無視します。

{a1 b1}
{a2 b2}
{a3 b2}

A.followedByAny(B)

(A {- C* -} B)

C は `DEFINE` 句で未定義のシンボルであり、任意の一致を示します。

非決定的な緩やかな連続性: 一致するイベントの追加の一致が無視されることを許可する、より緩やかな連続性の形式です。

{a1 b1}
{a2 b2}
{a3 b2}
説明

この例の結果は、CEP SQL のデフォルトの `SKIP TO NEXT ROW` ポリシーに基づいています。Flink CEP Java API のデフォルトポリシーは `NO SKIP` です。`NO SKIP` を使用したい場合は、「AFTER MATCH NO SKIP ポリシー」をご参照ください。

A.notNext(B)

(A [^B])

厳密な非連続性: あるイベントの直後に別のイベントが続かないことを期待します。

{a2}

A.notFollowedBy(B)

(A {- C*? -} [^B]

C は `DEFINE` 句で未定義のシンボルであり、任意の一致を示します。

説明

パターンの最後に `notFollowedBy` 構文を使用する場合、`WITHIN` 条件を指定する必要があります。

緩やかな非連続性: あるイベントが他の2つのイベントの間にどこにも出現しないことを期待します。`WITHIN` 条件を持つパターンの最後に使用される場合、特定のタイプのイベントが一定期間内に到着しないことを示します。

一致なし

ループパターンにおける連続性と貪欲マッチング

重要

現在、CEP SQL はループパターンにおける非決定的な緩やかな連続性をサポートしていません。

オープンソース Flink CEP Java API を使用すると、ループパターンをマッチングするための連続性ポリシーと貪欲ポリシーを定義できます。デフォルトでは、Apache Flink CEP SQL は厳密かつ貪欲なポリシーを使用します。たとえば、パターン A+ の場合、一致する A イベント間に他のイベントが出現することはできず、パターンは可能な限り多くの A イベントに一致します。連続性ポリシーと貪欲ポリシーを指定するには、? シンボルを *, +, または {3, } などの量指定子の後に追加します。

たとえば、イベントシーケンス a1, b1, a2, a3, c1 と条件 A AS type = 'a', C AS type = 'a' or type = 'c' の場合、次の表は異なるパターンの一致シーケンスを示しています。

説明

マッチングプロセス中、`AFTER MATCH` 句は `SKIP TO NEXT ROW` ポリシーを使用します。詳細については、「After Match Strategy」をご参照ください。

識別子

連続性

貪欲性

パターン例

同等のセマンティクス

一致シーケンス

なし

厳密な連続性

貪欲

A+ C

A.oneOrMore().greedy().consecutive().next(C)

{a2 a3 c1}
{a3 c1}

?

厳密な連続性

非貪欲

A+? C

A.oneOrMore().consecutive().next(C)

{a2 a3}
{a3 c1}

??

緩い隣接性

貪欲

A+?? C

A.oneOrMore().greedy().next(C)

{a1 a2 a3 c1}
{a2 a3 c1}
{a3 c1}

???

緩やかな連続性

非貪欲

A+??? C

A.oneOrMore().next(C)

{a1 a2 a3}
{a2 a3}
{a3 c1}

ループパターンの停止条件の指定 (Until)

オープンソース Flink CEP Java API は、ループパターンの停止条件を指定するための until(condition) 関数を提供します。ループパターンをマッチングする際、現在のイベントが `until` 条件を満たす場合、そのループのマッチングは直ちに停止します。その後、現在のイベントから開始して、後続のパターンに一致するプロセスが続行されます。Realtime Compute for Apache Flink の SQL ジョブでは、+*、または {3, } などの量指定子の後に { CONDITION } 構文を使用して、`until` セマンティクスを表現できます。

たとえば、イベントシーケンス a1, d1, a2, b1, a3, c1 と一致条件 DEFINE A AS A.type = 'a' OR A.type = 'b', B AS B.type = 'b', C AS C.type = 'c' の場合、次の表は異なるパターンの一致シーケンスを示しています。

説明

マッチングプロセス中、AFTER MATCH SKIP TO NEXT ROW ポリシーが使用されます。詳細については、「After Match Strategy」をご参照ください。

パターン

同等のセマンティクス

一致シーケンス

説明

A+ C

A.oneOrMore().consecutive().greedy().next(C)

a2 b1 a3 c1
b1 a3 c1
a3 c1

`a` または `b` で始まるイベントはパターン A に一致できます。厳密な連続性はパターン A 内および A と C の間に適用されます。`a1` と `a2` の間に `d1` が存在するため、`a1` から一致を開始できません。

A+{B} C

A.oneOrMore().consecutive().greedy().until(B).next(C)

a3 c1

ループパターン A に `until B` 条件が追加されます。厳密な連続性は引き続き A と C の間に適用されます。`a2` から始まるループパターンは `b1` で終了する必要があるため、`c1` との厳密な連続性要件は満たされません。

A+{B} {- X*? -} C

A.oneOrMore().consecutive().greedy().until(B).followedBy(C)

a2 c1
a3 c1

緩やかな連続性は A と C の間に適用されます。`a2` から始まるループパターンは `b1` で終了し、その後 `b1` と `a3` をスキップして `c1` に一致します。

A+??{B} {- X*? -} C

A.oneOrMore().greedy().until(B).followedBy(C)

a1 a2 c1
a2 c1
a3 c1

緩やかな連続性はループパターン A 内に適用されます。`d1` をスキップし、`b1` で終了し、`a1` と `a2` に一致できます。

グループパターン

オープンソース Flink CEP Java API はグループパターンをサポートしています。複数のパターンをグループに結合し、next()followedBy()followedByAny() などの関数でグループを使用できます。グループ全体をループすることもできます。Realtime Compute for Apache Flink SQL ジョブでは、標準 SQL 構文 (...) を使用してグループパターンを定義し、+*、および {3, } などの量指定子を適用できます。

たとえば、パターン PATTERN (A (B C*)+? D) では、(B C*) はグループパターンです。`+` はグループが 1 回以上ループすることを指定し、`?` は非貪欲な一致を示します。対応する Java コードは次のとおりです。

Pattern.<String>begin("A").where(...)
  .next(
  	Pattern.<String>begin("B").where(...)
  		.next("C").where(...).oneOrMore().optional().greedy().consecutive())
  .oneOrMore().consecutive()
  .next("D").where(...)

一致が成功した場合、MEASURES 句を使用して、出力用の一致結果から特定の情報を抽出できます。たとえば、グループパターンが結果 b1b2 c1、および b3 c2 c3 に一致すると仮定します。`MEASURES` 句は、次の SQL ステートメントに示すように、この情報の一部のみを出力用に取得できます。イベント b にのみ関心がある場合は、FIRST(B.id) を使用して最初の一致グループから `b` を取得し、FIRST(B.id,1) を使用して 2 番目のグループから `b` を取得します。出力は b1, b2, b3 になります。

SELECT *
FROM MyTable MATCH_RECOGNIZE (
  ORDER BY rowtime
  MEASURES
    FIRST(B.id) AS b1_id,
    FIRST(B.id,1) AS b2_id,
    FIRST(B.id,2) AS b3_id
  PATTERN (A (B C*)+? D)
  DEFINE
    A AS type = 'A',
    B AS type = 'B',
    C AS type = 'C',
    D AS type = 'D'
) AS T

グループパターンとその先行パターンの間の連続性宣言は、グループ内の最初のパターンに適用され、グループ全体のパターンには適用されません。たとえば、PATTERN (A {- X*? -} (B C))では、パターン A とグループパターン (B C) の間に `followedBy` 関係が使用されます。この宣言は実際には、A と B の間に `followedBy` 関係を定義します。つまり、`followedBy` 関係は `A` と `B` の間に適用され、それらの間に他のイベントが発生することを許容します。`A` と全体の `(B C)` グループの間には適用されません。シーケンス a1 b1 d1 b2 c1 に対して、このパターンは出力を生成しません。これは、`b1` が出現した後、マッチング処理が直ちにグループパターン `(B C)` に入ることによるものです。しかし、`d1` はパターン `C` に一致しないため、シーケンスマッチが失敗します。

重要
  • 貪欲マッチングは、PATTERN ((A B)+) のようなループグループパターンではサポートされていません。

  • グループパターンは、PATTERN (A+{(B C)})PATTERN (A [^(B C)]) のような `until` および `notNext` 構文ではサポートされていません。

  • グループパターン内の最初のパターンは、PATTERN (A (B? C)) のようにオプションとして宣言できません。

AFTER MATCH NO SKIP ポリシー

Flink CEP Java API」では、デフォルトの `AFTER MATCH` ポリシーは `NO_SKIP` です。CEP SQL では、デフォルトのポリシーは `SKIP_TO_NEXT_ROW` です。Realtime Compute for Apache Flink は、標準 SQL の `AFTER MATCH` 文を拡張しています。`AFTER MATCH NO SKIP` 文を使用して、`NO_SKIP` ポリシーを宣言できます。シーケンスマッチが完了した場合、`NO_SKIP` ポリシーは、すでに開始されている他のマッチング手順を停止または破棄しません。

`NO_SKIP` ポリシーの一般的なユースケースは、`followedByAny` と組み合わせて、一部のイベントをスキップすることでより緩やかな一致を実現することです。たとえば、シーケンス a1 b1 b2 b3 c1 とパターン PATTERN (A {- X* -} B {- Y*? -} C) を検討します。これは Pattern.begin("A").followedByAny("B").followedBy("C") と同等です。デフォルトの AFTER MATCH SKIP TO NEXT ROW ポリシーが使用される場合、結果は a1 b1 c1 になります。これは、`a1 b1 c1` が一致した場合、`a1` で始まる他のすべてのシーケンスが破棄されるためです。ただし、AFTER MATCH NO SKIP を使用すると、a1 b1 c1a1 b2 c1、および a1 b3 c1 のすべての一致シーケンスを取得できます。