このトピックでは、動的 Flink 複合イベント処理 (CEP) において JSON フォーマットで定義されるルールについて説明します。
対象読者
リスク管理プラットフォームの開発者:動的 Flink CEP に精通しているプラットフォーム開発者は、このトピックで説明するフォーマットを学び、プラットフォームの要件に基づいてさらなるカプセル化が必要かどうかを判断できます。
リスク管理戦略担当者:特定のリスク管理戦略にのみ精通し、Java 開発経験がない担当者は、このフォーマットを学び、CEP の概念に基づいて新しいルールを作成し、オンラインのリスク管理デプロイメントに適用できます。
JSON フォーマットの定義
イベントシーケンス内のパターンは、グラフと見なすことができます。グラフ内の各ノードは、特定のイベントのパターンを表します。ノード間のエッジは、イベント選択戦略を表します。この戦略は、あるマッチしたパターンから次のパターンへの遷移を定義します。各グラフは、より大きなグラフの子ノードになることもでき、これによりネストされたパターンが可能になります。この概念に基づき、Real-time Compute for Apache Flink は CEP ルールを記述するための JSON 仕様を提供します。このアプローチにより、ルールのストレージと変更が簡素化されます。以降のセクションでは、これらの仕様のフィールドについて説明します。
ノードの定義
ノードは完全なパターンを表し、以下のプロパティを含みます。
フィールド
説明
タイプ
必須
注意
name
パターンの名前。
string
はい
一意の文字列。
説明ノード名は一意である必要があります。
type
ノードのタイプ。
enum(string)
はい
子パターンを持つノードの場合は COMPOSITE。
子パターンを持たないノードの場合は ATOMIC。
quantifier
1 回だけマッチするなど、パターンのマッチ方法を記述します。
dict
はい
詳細については、「量指定子の定義」セクションをご参照ください。
condition
条件。
dict
いいえ
詳細については、「条件の定義」セクションをご参照ください。
量指定子の定義
量指定子は、パターンに一致するイベントをどのようにマッチングするかを記述します。たとえば、パターン
"A*"の場合、量指定子の `properties` フィールドは LOOPING であり、パターン内のイベント選択戦略は SKIP_TILL_ANY です。フィールド
説明
タイプ
必須
注意
consumingStrategy
イベント選択戦略。
enum(string)
はい
有効な値:
STRICT
SKIP_TILL_NEXT
SKIP_TILL_ANY
値の詳細については、「隣接性の定義」セクションをご参照ください。
times
パターンがマッチしなければならない回数。
dict
いいえ
例:
"times": { "from": 3, "to": 3, "windowTime": { "unit": "MINUTES", "size": 12 } },`from` と `to` フィールドは整数です。`windowTime` の `unit` は DAYS、HOURS、MINUTES、SECONDS、または MILLISECONDS にすることができます。
説明`windowTime` は null に設定できます:
"windowTime": null。properties
量指定子のプロパティ。
array of enumString
はい
値の詳細については、「量指定子のプロパティの意味」セクションをご参照ください。
untilCondition
停止条件。
説明これは、LOOPING 量指定子を持つパターンの後でのみ使用できます。
dict
いいえ
値の詳細については、「条件の定義」セクションをご参照ください。
条件の定義
条件は、特定の要件を満たすイベントをフィルターするために使用されます。たとえば、5 分以上閲覧した顧客をフィルターしたい場合、「5 分以上の閲覧」が条件になります。
フィールド
説明
タイプ
必須
注意
type
条件のタイプ。
enum(string)
はい
条件タイプに有効な値:
CLASS:ユーザー定義の条件。
AVIATOR:AVIATOR 式に基づく条件。
GROOVY:GROOVY 式に基づく条件。
...
シリアル化可能なその他のカスタムフィールド。
いいえ
...
以下のタイプの Condition がサポートされています:
Class タイプの Condition
フィールド名
説明
タイプ
必須
注意
type
条件のタイプ。
enum(string)
はい
値は Class に固定されます。
className
クラスの名前。
string
はい
クラスの完全名。例:
com.alibaba.ververica.cep.demo.StartCondition。カスタムパラメーターを持つ Condition
通常の Class タイプの条件では、クラス名 (`className`) のみを渡すことができ、パラメーターを動的に渡すことはできません。より表現力豊かな条件を作成するために、動的 CEP はカスタムパラメーター (`CustomArgsCondition`) を持つ条件をサポートします。これにより、`CustomArgsCondition` の必須パラメーターを JSON フォーマットの文字列配列として設定できます。その後、`CustomArgsCondition` インスタンスを動的に構築できます。この機能により、Java コードを変更して再コンパイルすることなく、条件パラメーターを動的に更新できます。
フィールド
説明
タイプ
必須
注意
type
条件のタイプ。
enum(string)
はい
値は Class に固定されます。
className
クラスの名前。
string
はい
クラスの完全名。例:
com.alibaba.ververica.cep.demo.CustomMiddleCondition。args
カスタムパラメーター。
array of string
はい
文字列の配列。
Aviator 式に基づく Condition
Aviator は、式を動的にバイトコードにコンパイルする式評価エンジンです。詳細については、aviatorscript をご参照ください。ジョブで Aviator 式に基づく条件を使用できます。これにより、Java コードを変更、再コンパイル、再実行することなく、条件のしきい値を動的に変更できます。
フィールド
説明
タイプ
必須
注意
type
クラスの名前。
string
はい
値は AVIATOR に固定されます。
expression
式の文字列。
string
はい
式文字列(例: price > 10)です。この price 変数は、Java コードで定義されたフィールドから取得されます。
データベースでこの文字列の値を変更できます。たとえば、price > 20 に変更します。Flink CEP ジョブは、動的に price > 20 をロードし、後のイベントを処理するために新しい AviatorCondition を作成します。
Groovy 式に基づく Condition
Groovy は、Java 仮想マシン (JVM) 向けの動的型付け言語です。Groovy の構文の詳細については、構文をご参照ください。動的 CEP は、条件を定義するために Groovy 式をサポートします。これにより、条件のしきい値を動的に変更できます。
フィールド
説明
タイプ
必須
注意
type
クラスの名前。
string
はい
値は GROOVY に固定されます。
expression
式の文字列。
string
はい
price > 5.0 && name.contains("mid") のような式文字列です。price や name などの変数は、Java コードで定義されたフィールドに由来します。この文字列の値はデータベースで変更できます。たとえば、price > 20 && name.contains("end") に変更できます。その後、Flink CEP ジョブは新しい Groovy 文字列を動的にロードし、後続のイベントを処理するための新しい GroovyCondition を作成します。
エッジの定義
フィールド
説明
タイプ
必須
注意
source
ソースパターンの名前。
string
はい
なし。
target
ターゲットパターンの名前。
string
はい
なし。
type
イベント選択戦略。
dict
はい
有効な値:
STRICT
SKIP_TILL_NEXT
SKIP_TILL_ANY
NOT_FOLLOW
NOT_NEXT
値の詳細については、「隣接性の定義」セクションをご参照ください。
GraphNode は Node の定義を拡張
`GraphNode` は完全なパターンシーケンスを表します。その `nodes` は個々のパターンであり、`edges` はあるマッチしたパターンから次のパターンへの遷移を定義します。
ネストされたパターン (`GroupPattern`) をサポートするために、`GraphNode` は `Node` の子クラスとして扱われます。これは、`GraphNode` がより大きな `GraphNode` 内の `Node` として機能できることを意味します。基本的な `Node` と比較して、`GraphNode` には 2 つの追加タイプのフィールドがあります:
グラフ構造を記述するための `nodes` および `edges` フィールド。
グラフ内のタイムウィンドウポリシーとマッチ後のスキップ戦略を記述するための `window` および `afterMatchSkipStrategy` フィールド。
次の表は、`GraphNode` のフィールドを説明しています。
フィールド
説明
タイプ
必須
注意
name
複合パターンの名前。
String
はい
一意の文字列。
説明グラフ名は一意である必要があります。
type
ノードのタイプ。
enum(string)
はい
値は COMPOSITE に固定されます。
version
グラフが使用する JSON フォーマットのバージョン。
Int
はい
デフォルト値は 1 です。
nodes
パターン内にネストされた子パターン。
array of Node
はい
空でない配列。
edges
ネストされた子パターン間の接続。
array of Edge
はい
空にすることができる配列。
window
`type` が `FIRST_AND_LAST` の場合、これは複合パターンの完全なマッチの開始と終了の間の最大時間です。
`type` が `PREVIOUS_AND_CURRENT` の場合、これは 2 つの隣接する子パターンのマッチ間の最大時間です。
dict
いいえ
例:
"window": { "type": "FIRST_AND_LAST", "time": { "unit": "DAYS", "size": 1 } }unit は DAYS、HOURS、MINUTES、SECONDS、または MILLISECONDS にすることができます。データの型は Long または Integer です。
afterMatchSkipStrategy
グラフ内のすべてのイベントがマッチした後に使用するスキップ戦略。
dict
はい
詳細については、「マッチ後のスキップ戦略 (AfterMatchSkipStrategy) の定義」セクションをご参照ください。
quantifier
1 回だけマッチするなど、パターンのマッチ方法を記述します。
dict
はい
詳細については、「量指定子の定義」セクションをご参照ください。
マッチ後のスキップ戦略 (AfterMatchSkipStrategy) の定義
フィールド
説明
タイプ
必須
注意
type
戦略のタイプ。
enum(string)
はい
有効な値:
NO_SKIP (デフォルト):すべての成功したマッチが出力されます。
SKIP_TO_NEXT:同じイベントで開始されたすべての部分的なマッチを破棄します。
SKIP_PAST_LAST_EVENT:このマッチの開始と終了の間に開始されたすべての部分的なマッチを破棄します。
SKIP_TO_FIRST:このマッチの開始と PatternName という名前のイベントの最初の発生の間に開始されたすべての部分的なマッチを破棄します。
SKIP_TO_LAST:このマッチの開始と PatternName という名前のイベントの最後の発生の間に開始されたすべての部分的なマッチを破棄します。
詳細については、「マッチ後のスキップ戦略」をご参照ください。
patternName
戦略が適用されるパターンの名前。
string
いいえ
一意の文字列。
隣接性の定義
物理値
意味
STRICT
厳密な隣接性。マッチしたイベント間にマッチしないイベントが現れることはありません。
SKIP_TILL_NEXT
緩和された隣接性。マッチしたイベント間にマッチしないイベントが現れることがあります。マッチしないイベントは無視されます。
SKIP_TILL_ANY
非決定的な緩和された隣接性。一部のマッチしたイベントに対する追加のマッチを無視できる、より緩和された隣接性です。
NOT_NEXT
直後のイベントが特定のイベントであってはなりません。
NOT_FOLLOW
特定のイベントが後で現れません。
詳細については、「」と「FlinkCEP - Flink 用の複合イベント処理」をご参照ください。
量指定子のプロパティの意味
値
意味
SINGLE
パターンは 1 回だけ現れます。
LOOPING
パターンはループパターンです。正規表現の `*` や `+` のように、複数回現れることがあります。
TIMES
パターンは指定された回数現れます。
GREEDY
このパターンをマッチングする際、可能な限り最長のマッチを見つけるために貪欲マッチング戦略が使用されます。
OPTIONAL
パターンはオプションです。
例 1:共通パターンの使用
この例では、リアルタイムの E コマースプロモーションイベント中に、10 分のタイムウィンドウ内で以下の条件を満たす顧客に対して、動的 Flink CEP を使用してマーケティング戦略を調整する方法について説明します:
会場のクーポンを取得した。
ショッピングカートに商品を 3 回以上追加した。
支払いを完了しなかった。
以下のサンプルコードでは、会場のクーポンを取得する条件は StartCondition として定義され、ショッピングカートに商品を追加する条件は MiddleCondition として定義され、支払いの完了に関連する条件は EndCondition として定義されています。次のパターンが抽象化されます:10 分のタイムウィンドウ内で、StartCondition を満たすイベントが 1 回発生し、MiddleCondition を満たすイベントが 3 回以上発生し、EndCondition を満たすイベントは発生しません。StartCondition を満たすイベントはオプションです。以下のサンプルコードは、この例のパターンを記述する Java コードを示しています。
Pattern<Event, Event> pattern =
Pattern.<Event>begin("start")
.where(new StartCondition())
.optional()
.followedBy("middle")
.where(new MiddleCondition())
.timesOrMore(3)
.notFollowedBy("end")
.where(new EndCondition())
.within(Time.minutes(10));以下のサンプルコードは、この例のパターンを記述する JSON フォーマットのコードを示しています。
{
"name": "end",
"quantifier": {
"consumingStrategy": "SKIP_TILL_NEXT",
"properties": [
"SINGLE"
],
"times": null,
"untilCondition": null
},
"condition": null,
"nodes": [
{
"name": "end",
"quantifier": {
"consumingStrategy": "SKIP_TILL_NEXT",
"properties": [
"SINGLE"
],
"times": null,
"untilCondition": null
},
"condition": {
"className": "com.alibaba.ververica.cep.demo.condition.EndCondition",
"type": "CLASS"
},
"type": "ATOMIC"
},
{
"name": "middle",
"quantifier": {
"consumingStrategy": "SKIP_TILL_NEXT",
"properties": [
"LOOPING"
],
"times": {
"from": 3,
"to": 3,
"windowTime": null
},
"untilCondition": null
},
"condition": {
"className": "com.alibaba.ververica.cep.demo.condition.MiddleCondition",
"type": "CLASS"
},
"type": "ATOMIC"
},
{
"name": "start",
"quantifier": {
"consumingStrategy": "SKIP_TILL_NEXT",
"properties": [
"SINGLE",
"OPTIONAL"
],
"times": null,
"untilCondition": null
},
"condition": {
"className": "com.alibaba.ververica.cep.demo.condition.StartCondition",
"type": "CLASS"
},
"type": "ATOMIC"
}
],
"edges": [
{
"source": "middle",
"target": "end",
"type": "NOT_FOLLOW"
},
{
"source": "start",
"target": "middle",
"type": "SKIP_TILL_NEXT"
}
],
"window": {
"type": "FIRST_AND_LAST",
"time": {
"unit": "MINUTES",
"size": 10
}
},
"afterMatchStrategy": {
"type": "NO_SKIP",
"patternName": null
},
"type": "COMPOSITE",
"version": 1
}例 2:パターンでカスタムパラメーターを含む条件の使用
この例では、リアルタイムの E コマースプロモーションイベント中に、異なるクラスの顧客に対して異なるマーケティング戦略を指定する方法について説明します。たとえば、クラス A の顧客にはマーケティング関連のテキストメッセージを送信する戦略、クラス B の顧客にはクーポンを送信する戦略、その他の顧客にはマーケティングアクションを取らない戦略を指定できます。デプロイメントで共通クラスの条件を定義して、前述の要件を満たすことができます。デプロイメントで共通クラスの条件が使用されている場合にマーケティング戦略を調整したい場合は、デプロイメントコードを書き換えて再コンパイルし、デプロイメントを再実行する必要があります。たとえば、クラス C の顧客にクーポンを送信するマーケティング戦略を変更できます。操作を簡素化するために、カスタムパラメーターを含む条件を使用できます。コードで渡されたパラメーターに基づいて戦略を調整する方法を定義した後、データベースで渡されたパラメーターの値を変更するだけで済みます。渡されたパラメーターの値は、カスタムパラメーターを含む条件の args パラメーターの値です。たとえば、["A", "B"] を ["A", "B", "C"] に変更して、マーケティング戦略の動的な更新を実行できます。
以下のサンプルコードは、パターンで最初に定義された条件を示しています。
"condition": {
"args": [
"A", "B"
],
"className": "org.apache.flink.cep.pattern.conditions.CustomMiddleCondition",
"type": "CLASS"
}前述の条件を、カスタムパラメーターを含む条件に変更できます。以下のサンプルコードは、カスタムパラメーターを含む条件を示しています。
"condition": {
"args": [
"A", "B", "C"
],
"className": "org.apache.flink.cep.pattern.conditions.CustomMiddleCondition",
"type": "CLASS"
}特定のビジネスシナリオでカスタムパラメーターを含む条件を使用する方法の詳細については、「デモ」をご参照ください。
このトピックで言及されている aviatorscript および デモ は、サードパーティのウェブサイトからのものです。ウェブサイトにアクセスする際、ウェブサイトが開かなかったり、アクセスが遅延したりする場合があります。