このトピックでは、Realtime Compute for Apache Flink でサポートされている複合イベント処理 (CEP) ステートメントについて説明します。
背景情報
Apache Flink の CEP SQL と比較して、Realtime Compute for Apache Flink の CEP ステートメントは、特定の時間間隔内に到着しない一致イベントの出力、followedBy() を使用した緩和された連続性、イベント間の連続性パターンの設定など、強化された機能を提供します。 Apache Flink の CEP SQL の基本機能の詳細については、「パターン認識」をご参照ください。
制限事項
エンジンバージョンが vvr-6.0.2-flink-1.15 以降の Realtime Compute for Apache Flink のみ、拡張 CEP SQL 構文をサポートしています。
エンジンバージョンが vvr-6.0.5-flink-1.15 以降の Realtime Compute for Apache Flink のみ、グループパターンと AFTER MATCH NO SKIP 構文をサポートしています。
特定の時間間隔内に到着しない一致イベントの出力
次の例は、入力イベントのシーケンスを示しています。
+----+------+------------------+
| id | type | rowtime |
+----+------+------------------+
| 1 | A | 2022-09-19 12:00 |
| 2 | B | 2022-09-19 12:01 |
| 3 | A | 2022-09-19 12:02 |
| 4 | B | 2022-09-19 12:05 |
+----+------+------------------+パターン A B 内のイベントの時間間隔を 2 分以内に指定する場合、PATTERN ステートメントの後に WITHIN INTERVAL '2' MINUTES を追加できます。サンプルコード:
SELECT *
FROM MyTable MATCH_RECOGNIZE (
ORDER BY rowtime
MEASURES
A.id AS aid,
B.id AS bid,
A.rowtime AS atime,
B.rowtime AS btime
PATTERN (A B) WITHIN INTERVAL '2' MINUTES
DEFINE
A AS type = 'A',
B AS type = 'B'
) AS TWITHIN 句を追加しない場合、2 つの一致シーケンス id=1, id=2 と id=3, id=4 が取得されます。 WITHIN 句を追加した後、最初の一致シーケンスのみが取得されます。これは、2 番目に一致するシーケンスのイベント A とイベント B の時間間隔が 3 分であるためです。この時間間隔は、WITHIN 句で指定された 2 分よりも長くなっています。次の出力が返されます。
+-----+-----+------------------+------------------+
| aid | bid | atime | btime |
+-----+-----+------------------+------------------+
| 1 | 2 | 2022-09-19 12:00 | 2022-09-19 12:01 |
+-----+-----+------------------+------------------+WITHIN 句が追加されると、WITHIN 句で指定された時間間隔内にイベントが到着しない一致イベントシーケンスは、一致に失敗したイベントシーケンスと見なされ、破棄されます。指定された時間間隔内にイベントが到着しない一致イベントシーケンスを取得する場合は、ONE ROW PER MATCH SHOW TIMEOUT MATCHES ステートメントを使用できます。サンプルコード:
SELECT *
FROM MyTable MATCH_RECOGNIZE (
ORDER BY rowtime
MEASURES
A.id AS aid,
B.id AS bid,
A.rowtime AS atime,
B.rowtime AS btime
ONE ROW PER MATCH SHOW TIMEOUT MATCHES
PATTERN (A B) WITHIN INTERVAL '2' MINUTES
DEFINE
A AS type = 'A',
B AS type = 'B'
) AS T次の出力が返されます。出力には、指定された時間間隔内にイベントが到着しない一致イベントシーケンスが含まれています。
+-----+--------+------------------+------------------+
| aid | bid | atime | btime |
+-----+--------+------------------+------------------+
| 1 | 2 | 2022-09-19 12:00 | 2022-09-19 12:01 |
| 3 | <NULL> | 2022-09-19 12:00 | <NULL> |
+-----+--------+------------------+------------------+id が 4 のイベント B が到着する時間は、WITHIN 句で指定された時間間隔を超えています。このイベントは、一致シーケンスには含まれていません。したがって、このイベントの bid フィールドと btime フィールドはどちらも NULL 許容フィールドです。
イベント間の連続性パターン
Apache Flink の CEP Java API は、next() を使用した厳密な連続性、followedBy() を使用した緩和された連続性、followedByAny() を使用した非決定的な緩和された連続性、notNext() を使用した厳密な非連続性、notFollowedBy() を使用した緩和された非連続性など、イベント間の次の連続性パターンをサポートしています。
デフォルトでは、Apache Flink の CEP SQL は厳密な連続性を使用します。この連続性パターンでは、すべての一致イベントが厳密に連続して出現する必要があり、一致しないイベントが間に現れることはできません。前の例では、パターン (A B) は、イベント A とイベント B が厳密に連続して出現する必要があることを指定しています。 Realtime Compute for Apache Flink は、この機能を拡張して、Java API と完全に同等の表現機能をサポートしています。
次の表は、入力イベントシーケンスが a1, b1, a2, a3, b2, b3 の場合の、異なるパターンにおける一致シーケンスを示しています。
マッチングプロセス中に、AFTER MATCH SKIP 句は SKIP TO NEXT ROW 戦略を使用します。 AFTER MATCH SKIP 句の戦略の詳細については、「After Match Strategy」をご参照ください。
Java API | SQL | 戦略 | 一致シーケンス |
|
| 厳密な連続性:すべての一致イベントが厳密に連続して出現することを想定し、一致しないイベントが間に現れることはできません。 | |
|
C は DEFINE 句で定義されていない文字であり、任意のマッチを示すために使用されます。 | 緩和された連続性:一致イベントの間に現れる、一致しないイベントを無視します。 | |
|
C は DEFINE 句で定義されていない文字であり、任意のマッチを示すために使用されます。 | 非決定的な緩和された連続性:連続性をさらに緩和し、特定の一致イベントを無視する追加のマッチを許可します。 | 説明 この例のマッチングシーケンスは、CEP SQL のデフォルトの AFTER MATCH 戦略である SKIP TO NEXT ROW 戦略を使用して取得されます。 Apache Flink の CEP Java API のデフォルトの AFTER MATCH 戦略は NO SKIP です。 AFTER MATCH NO SKIP 戦略の使用方法の詳細については、このトピックの AFTER MATCH NO SKIP 戦略 セクションを参照してください。 |
|
| 厳密な非連続性:一致イベントの後に一致イベントが現れないことを想定します。 | |
|
C は DEFINE 句で定義されていない文字であり、任意のマッチを示すために使用されます。 説明 パターンの最後に notFollowedBy() を使用する場合は、パターンに WITHIN 句を追加する必要があります。 | 緩和された非連続性:2 つの一致イベントの間に一致イベントが現れないことを想定します。この構文をパターンの最後に WITHIN 句と一緒に使用すると、特定のタイプの マッチング イベントが特定の期間内に現れません。 | 一致なし |
ループパターン内の連続性と貪欲マッチング
CEP SQL は、ループパターン内の非決定的な緩和された連続性をサポートしていません。
Apache Flink の CEP Java API では、ループパターン内の連続性と貪欲マッチング戦略を指定できます。デフォルトでは、Apache Flink の CEP SQL は厳密な連続性と貪欲マッチングを使用します。たとえば、A+ パターンでは、複数のイベント A の間に他のイベントは許可されず、イベント A はできるだけ多くマッチングされます。ループ数量詞 (*, +, or {3, }) の後に 1 つ以上の疑問符 (?) を追加して、連続性と貪欲マッチング戦略を指定できます。
次の表は、入力イベントシーケンスが a1, b1, a2, a3, c1 で、条件が A AS type = 'a', C AS type = 'a' or type = 'c' の場合の、異なるマッチングパターンにおける一致シーケンスを示しています。
マッチングプロセス中に、AFTER MATCH SKIP 句は SKIP TO NEXT ROW 戦略を使用します。 AFTER MATCH SKIP 句の戦略の詳細については、「After Match Strategy」をご参照ください。
識別子 | 連続性 | 貪欲マッチング戦略 | サンプルパターン | 同等のセマンティクス | 一致シーケンス |
なし | 厳密な連続性 | 貪欲 |
|
| |
? | 厳密な連続性 | 非貪欲 |
|
| |
?? | 緩和された連続性 | 貪欲 |
|
| |
??? | 緩和された連続性 | 非貪欲 |
|
| |
ループパターン内の until(condition)
Apache Flink の CEP Java API では、until(condition) 関数を使用して、ループパターンの until 条件を指定できます。ループパターン内の現在のイベントが until(condition) 関数で指定された条件を満たしている場合、現在のループパターンのマッチングはすぐに終了し、後続のパターンのマッチングは現在のイベントから開始されます。 Realtime Compute for Apache Flink の SQL デプロイでは、{ CONDITION } 構文をループ数量詞 (+、*、{3, } など) に追加して、until セマンティクスを表現できます。
次の表は、入力イベントシーケンスが a1, d1, a2, b1, a3, c1 で、条件が DEFINE A AS A.type = 'a' OR A.type = 'b', B AS B.type = 'b', C AS C.type = 'c' の場合の、異なるパターンにおける一致シーケンスを示しています。
マッチングプロセス中に、AFTER MATCH SKIP 句は SKIP TO NEXT ROW 戦略を使用します。 AFTER MATCH SKIP 句の戦略の詳細については、「After Match Strategy」をご参照ください。
パターン | 同等のセマンティクス | 一致シーケンス | 説明 |
|
| | a または b で始まるイベントはループパターン A と一致する可能性があり、ループパターン A 内のイベントとパターン A および C の間には厳密な連続性が適用されます。入力イベントシーケンスの a1 と a2 の間には d1 が存在します。その結果、マッチは a1 から始めることができません。 |
|
| | until(B) 条件がループパターン A に追加され、パターン A と C の間には依然として厳密な連続性が適用されます。 a2 から始まるループパターンは b1 で終了します。したがって、a2 と c1 は厳密な連続性の要件を満たしていません。 |
|
| | パターン A と C の間には緩和された連続性が適用されます。 a2 から始まるループパターンは b1 で終了し、b1 と a3 をスキップして c1 と一致します。 |
|
| | ループパターン A 内のイベントには緩和された連続性が適用されます。パターンは d1 をスキップし、b1 で終了して a1 と a2 と一致します。 |
グループパターン
Apache Flink の CEP Java API は、グループパターンをサポートしています。グループパターンでは、複数のパターンが組み合わされ、next()、followedBy()、または followedByAny() 関数で使用されます。グループパターンは全体としてループさせることができます。 Realtime Compute for Apache Flink の SQL デプロイでは、SQL 標準の (...) 構文を使用してグループパターンを定義できます。 +、*、{3, } などのループ数量詞を使用できます。
たとえば、PATTERN (A (B C*)+? D) パターンでは、(B C*) はグループパターンであり、グループパターンは複数回出現すると宣言されています。疑問符 (?) は、非貪欲マッチング戦略が使用されていることを示します。 Java サンプルコード:
// グループパターンは複数回出現すると宣言されています。
Pattern.<String>begin("A").where(...)
.next(
Pattern.<String>begin("B").where(...)
.next("C").where(...).oneOrMore().optional().greedy().consecutive())
.oneOrMore().consecutive()
.next("D").where(...)MEASURES 句は、一致するグループパターンの出力に何が含まれるかを定義します。たとえば、指定されたグループパターンが一致するたびに一致シーケンスが b1、b2 c1、b3 c2 c3 の場合、MEASURES 句を使用して、一致結果の一部のみを表示できます。出力にイベント b のみを表示する必要がある場合は、FIRST(B.id) を使用して最初の一致シーケンスのイベント b を取得し、FIRST(B.id,1) を使用して 2 番目に一致するシーケンスのイベント b を取得できます。同じ方法を使用して、3 番目に一致するシーケンスのイベント b を取得します。このようにして、グループパターンの出力は b1, b2, b3 になります。サンプルコード:
SELECT *
FROM MyTable MATCH_RECOGNIZE (
ORDER BY rowtime
MEASURES
FIRST(B.id) AS b1_id,
FIRST(B.id,1) AS b2_id,
FIRST(B.id,2) AS b3_id
PATTERN (A (B C*)+? D)
DEFINE
A AS type = 'A',
B AS type = 'B',
C AS type = 'C',
D AS type = 'D'
) AS Tグループパターンとその前のパターンとの間で宣言された連続性は、グループパターン全体ではなく、グループパターンの最初のパターンに適用されることに注意してください。たとえば、PATTERN (A {- X*? -} (B C)) パターンでは、パターン A とグループパターン (B C) の間に followedBy が使用されます。これは、パターン A とパターン B の間に followedBy 連続性を宣言します。この場合、パターン B と一致しないいくつかのイベントがパターン A とグループパターン (B C) の間に存在する可能性がありますが、グループパターン (B C) と一致しないイベントは存在できません。入力イベントシーケンス a1 b1 d1 b2 c1 に基づいて PATTERN (A {- X*? -} (B C)) パターンの出力が生成されない場合、これは、b1 が出現した後にマッチングプロセスがすぐにグループパターン (B C) に入り、d1 がパターン C と一致しないためです。その結果、シーケンスマッチングは失敗します。
PATTERN ((A B)+)パターンなどのループグループパターンは、貪欲マッチングをサポートしていません。PATTERN (A+{(B C)})やPATTERN (A [^(B C)])パターンなどのグループパターンは、until または notNext 構文で使用できません。PATTERN (A (B? C))パターンなどのグループパターンの最初のパターンをオプションとして宣言することはできません。
AFTER MATCH NO SKIP 戦略
Apache Flink の CEP Java API では、デフォルトの AFTER MATCH 戦略は NO_SKIP です。 Apache Flink の CEP SQL では、デフォルトの AFTER MATCH 戦略は SKIP_TO_NEXT_ROW です。 Realtime Compute for Apache Flink は、SQL 標準の AFTER MATCH 句を拡張しています。 AFTER MATCH NO SKIP 句を使用して、NO_SKIP 戦略を宣言できます。 NO_SKIP 戦略を使用する場合、シーケンスのマッチングが完了しても、既存のマッチングプロセスは終了または破棄されません。
ほとんどの場合、NO_SKIP 戦略は followedByAny と一緒に使用して、緩和された連続性のために特定の マッチング イベントをスキップします。たとえば、入力イベントシーケンスが a1 b1 b2 b3 c1 の場合、デフォルトの戦略 AFTER MATCH SKIP TO NEXT ROW を使用すると、PATTERN (A {- X* -} B {- Y*? -} C) パターンの出力は a1 b1 c1 になります。 PATTERN (A {- X* -} B {- Y*? -} C) パターンは、Pattern.begin("A").followedByAny("B").followedBy("C") と同等です。これは、a1 b1 c1 のマッチングが完了すると、a1 で始まるすべてのシーケンスが破棄されるためです。ただし、AFTER MATCH NO SKIP を使用すると、すべての一致シーケンスを取得できます。この場合、a1 b1 c1、a1 b2 c1、a1 b3 c1 が返されます。