このトピックでは、動的 Flink 複合イベント処理 (CEP) における JSON 形式で定義されたルールについて説明します。
対象読者
- リスク管理プラットフォーム開発者: 動的 Flink CEP に精通しているプラットフォーム開発者は、このトピックで説明されている形式を学習し、プラットフォームの要件に基づいてさらなるカプセル化が必要かどうかを判断できます。
- リスク管理戦略担当者: 特定のリスク管理戦略のみに精通しており、Java 開発経験がない担当者は、この形式を学習して使用し、CEP の概念に基づいて新しいルールを作成し、オンラインのリスク管理デプロイメントでルールを適用できます。
JSON 形式での CEP ルールの定義
- ノードノードは完全なパターンを示します。次の表は、パターン内のフィールドを示しています。
フィールド 説明 データ型 必須 備考 name パターンの名前。 STRING はい 一意の文字列。 説明 各ノードの名前は一意である必要があります。type ノードのタイプ。 ENUM(STRING) はい - 子パターンを含むノードの場合、このフィールドの値は COMPOSITE です。
- 子パターンを含まないノードの場合、このフィールドの値は ATOMIC です。
quantifier パターンを照合する方法を記述する数量詞。たとえば、パターンを 1 回だけ照合するように数量詞を指定できます。 辞書 はい 詳細については、このトピックの「数量詞」をご参照ください。 condition 条件。 辞書 いいえ 詳細については、このトピックの「条件」をご参照ください。 - 数量詞数量詞は、イベントをパターンと照合する方法を記述するために使用されます。たとえば、パターン
"A*"の quantifier プロパティは LOOPING で、パターンの consumingStrategy フィールドは SKIP_TILL_ANY に設定されています。フィールド 説明 データ型 必須 備考 consumingStrategy イベント選択戦略。 ENUM(STRING) はい 有効な値: - STRICT
- SKIP_TILL_NEXT
- SKIP_TILL_ANY
値とその意味の詳細については、このトピックの「連続性」をご参照ください。
times パターンを照合する必要がある回数。 辞書 いいえ 例:"times": { "from": 3, "to": 3, "windowTime": { "unit": "MINUTES", "size": 12 } },from と to の値は INTEGER データ型である必要があります。 windowTime の単位は、DAYS、HOURS、MINUTES、SECONDS、または MILLISECONDS です。説明 windowTime は null に設定できます。このプロパティは、"windowTime": nullの形式で表示されます。properties 数量詞のプロパティ。 列挙文字列の配列 はい 値とその意味の詳細については、このトピックの「数量詞」をご参照ください。 untilCondition パターンの照合を停止するために使用される条件。 説明 このパラメータは、LOOPING 数量詞を使用するパターンの後にのみ構成できます。辞書 いいえ 値とその意味の詳細については、このトピックの「条件」をご参照ください。 - 条件条件は、特定の要件を満たすイベントを識別するために使用されます。たとえば、閲覧時間が 5 分を超えるユーザーを識別する場合、閲覧時間が 5 分を超えるという条件を指定できます。
フィールド 説明 データ型 必須 備考 type 条件のタイプ。 ENUM(STRING) はい 有効な値: - CLASS: カスタム条件。
- AVIATOR: aviator 式に基づく条件。
- GROOVY: Groovy 式に基づく条件。
... シリアル化できるその他のカスタムフィールド。 いいえ ... 次の条件がサポートされています:- Class タイプの条件
フィールド 説明 データ型 必須 備考 type 条件のタイプ。 ENUM(STRING) はい 値を Class に設定します。 className クラスの名前。 STRING はい com.alibaba.ververica.cep.demo.StartConditionなどのクラスの完全名。 - カスタムパラメータを含む条件
Class タイプの条件を使用する場合、className パラメータのみを指定できます。パラメータを動的に渡すことはできません。条件の表現機能を向上させるために、動的 Flink CEP はカスタムパラメータを含む CustomArgsCondition 条件をサポートしています。このようにして、JSON 文字列の配列の形式で CustomArgsCondition 条件に必要なパラメータを構成し、CustomArgsCondition インスタンスを動的に構築できます。この場合、Java コードを変更せずに条件のパラメータを動的に更新できます。
フィールド 説明 データ型 必須 備考 type 条件のタイプ。 ENUM(STRING) はい 値を Class に設定します。 className クラスの名前。 STRING はい com.alibaba.ververica.cep.demo.CustomMiddleConditionなどのクラスの完全名。args カスタムパラメータ。 文字列の配列 はい 値は文字列の配列です。 - aviator 式に基づく条件Aviator は、式をバイトコードに動的にコンパイルする式評価エンジンです。詳細については、aviatorscript をご参照ください。したがって、デプロイメントで aviator 式に基づく条件を使用して、条件のしきい値を動的に変更できます。このようにして、Java コードを変更してコードを再コンパイルして実行する必要はありません。
フィールド 説明 データ型 必須 備考 type クラスの名前。 STRING はい 値を AVIATOR に設定します。 expression 式文字列。 STRING はい このフィールドは、price > 10 のような形式で式文字列を指定します。 price 変数は、Java コードで定義されたフィールドです。 データベース内の文字列の値を変更できます。たとえば、文字列の値を price > 20 に変更すると、price > 20 式が動的 Flink CEP デプロイメントに動的にロードされ、後続のイベントを処理するための新しい aviator 条件が構築されます。
- Groovy 式に基づく条件Groovy は、Java 仮想マシン (JVM) プラットフォームに基づく動的言語です。Groovy 構文の詳細については、構文 をご参照ください。動的 Flink CEP では、Groovy 式を使用して条件を定義できます。このようにして、条件のしきい値を動的に変更できます。
フィールド 説明 データ型 必須 備考 type クラスの名前。 STRING はい 値を GROOVY に設定します。 expression 式文字列。 STRING はい このフィールドは、price > 5.0 && name.contains("mid") のような形式で式文字列を指定します。 price や name などの変数は、Java コードで定義されたフィールドです。データベース内の文字列の値を変更できます。たとえば、文字列の値を price > 20 && name.contains("end") に変更すると、新しい Groovy 文字列が動的 Flink CEP デプロイメントに動的にロードされ、後続のイベントを処理するための新しい Groovy 条件が構築されます。
- エッジ
フィールド 説明 データ型 必須 備考 source ソースパターンの名前。 STRING はい 該当なし。 target デスティネーションパターンの名前。 STRING はい 該当なし。 type イベント選択戦略。 辞書 はい 有効な値: - STRICT
- SKIP_TILL_NEXT
- SKIP_TILL_ANY
- NOT_FOLLOW
- NOT_NEXT
値とその意味の詳細については、このトピックの「連続性」をご参照ください。
- GraphNode extends Node
GraphNode は、完全なパターンシーケンスを示します。 GraphNode の各ノードは、独立したパターンを示します。 GraphNode の各エッジは、イベントを照合するためにパターンを別のパターンに変換する方法を示します。
GraphNode は Node のサブクラスと見なされ、より大きな GraphNode の Node として使用できます。このようにして、パターンのネストを示す GroupPattern がサポートされます。 Node と比較して、GraphNode は次の追加タイプのフィールドを提供します:- グラフの構造を記述する nodes フィールドと edges フィールド。
- グラフの時間ウィンドウポリシーを記述する window フィールドと、イベント照合後に使用されるスキップ戦略を記述する afterMatchSkipStrategy フィールド。
次の表は、GraphNode のフィールドを示しています。フィールド 説明 データ型 必須 備考 name 複合パターンの名前。 STRING はい 一意の文字列。 説明 各グラフの名前は一意である必要があります。type ノードのタイプ。 ENUM(STRING) はい 値を COMPOSITE に設定します。 version グラフで使用される JSON 形式のバージョン。 INTEGER はい デフォルト値: 1。 nodes パターンにネストされている子パターン。 ノードの配列 はい このフィールドの値は配列です。配列は空にできません。 edges ネストされている子パターン間の接続関係。 エッジの配列 はい このフィールドの値は配列です。配列は空にできます。 window - ウィンドウのタイプが FIRST_AND_LAST の場合、このフィールドは複合パターンの完全一致の開始と終了の間の最大時間間隔を示します。
- ウィンドウのタイプが PREVIOUS_AND_CURRENT の場合、このフィールドは 2 つの隣接する子パターンの照合の間の最大時間間隔を示します。
辞書 いいえ 例: "window": { "type": "FIRST_AND_LAST", "time": { "unit": "DAYS", "size": 1 } }単位は、DAYS、HOURS、MINUTES、SECONDS、または MILLISECONDS です。データ型は LONG または INTEGER です。
afterMatchSkipStrategy グラフ内のすべてのイベントが照合された後のスキップ戦略。 辞書 はい 詳細については、このトピックの「AfterMatchSkipStrategy」をご参照ください。 quantifier パターンを照合する方法を記述する数量詞。たとえば、パターンを 1 回だけ照合するように数量詞を指定できます。 辞書 はい 詳細については、このトピックの「数量詞」をご参照ください。 - AfterMatchSkipStrategy
フィールド 説明 データ型 必須 備考 type 戦略のタイプ。 ENUM(STRING) はい 有効な値:- NO_SKIP: 出力に各成功一致を返します。これはデフォルト値です。
- SKIP_TO_NEXT: 同じイベントで始まるすべての部分一致を破棄します。
- SKIP_PAST_LAST_EVENT: 一致の開始と終了の間で始まるすべての部分一致を破棄します。
- SKIP_TO_FIRST: 一致の開始と PatternName という名前のイベントの最初の出現の間で始まるすべての部分一致を破棄します。
- SKIP_TO_LAST: 一致の開始と PatternName という名前のイベントの最後の出現の間で始まるすべての部分一致を破棄します。
詳細については、「」After Match Skip Strategy」をご参照ください。
patternName 戦略を使用するパターンの名前。 STRING いいえ 一意の文字列。 - 連続性
有効な値 説明 STRICT 厳密な連続性。一致しないイベントは、一致するイベントの間に表示できません。 SKIP_TILL_NEXT 緩和された連続性。一致しないイベントは、一致するイベントの間に表示できます。一致しないイベントは無視されます。 SKIP_TILL_ANY 非決定論的緩和連続性。この値は、さらに緩和された連続性を示します。この連続性モードでは、特定の一致するイベントの追加一致が無視される可能性があります。 NOT_NEXT イベントの後に発生する後続のイベントは、指定されたイベントにすることはできません。 NOT_FOLLOW 指定されたイベントは後続に表示できません。 詳細については、「」FlinkCEP - Flink の複合イベント処理」をご参照ください。
- 数量詞のプロパティ
有効な値 説明 SINGLE パターンは 1 回だけ発生します。 LOOPING パターンはループパターンであり、複数回発生する可能性があります。この数量詞は、正規表現のアスタリスク (*) とプラス記号 (+) に似ています。 TIMES パターンは指定された回数だけ発生する可能性があります。 GREEDY 貪欲マッチング戦略を使用して、最大数の一致を取得するためにパターンを照合します。 OPTIONAL パターンはオプションです。
例 1: 共通パターンを使用する
- 会場のクーポンを取得しました。
- 3 回以上ショッピングカートに商品を追加しました。
- 支払いを完了していません。
Pattern<Event, Event> pattern =
Pattern.<Event>begin("start")
.where(new StartCondition())
.optional()
.followedBy("middle")
.where(new MiddleCondition())
.timesOrMore(3)
.notFollowedBy("end")
.where(new EndCondition())
.within(Time.minutes(10));{
"name": "end",
"quantifier": {
"consumingStrategy": "SKIP_TILL_NEXT",
"properties": [
"SINGLE"
],
"times": null,
"untilCondition": null
},
"condition": null,
"nodes": [
{
"name": "end",
"quantifier": {
"consumingStrategy": "SKIP_TILL_NEXT",
"properties": [
"SINGLE"
],
"times": null,
"untilCondition": null
},
"condition": {
"className": "com.alibaba.ververica.cep.demo.condition.EndCondition",
"type": "CLASS"
},
"type": "ATOMIC"
},
{
"name": "middle",
"quantifier": {
"consumingStrategy": "SKIP_TILL_NEXT",
"properties": [
"LOOPING"
],
"times": {
"from": 3,
"to": 3,
"windowTime": null
},
"untilCondition": null
},
"condition": {
"className": "com.alibaba.ververica.cep.demo.condition.MiddleCondition",
"type": "CLASS"
},
"type": "ATOMIC"
},
{
"name": "start",
"quantifier": {
"consumingStrategy": "SKIP_TILL_NEXT",
"properties": [
"SINGLE",
"OPTIONAL"
],
"times": null,
"untilCondition": null
},
"condition": {
"className": "com.alibaba.ververica.cep.demo.condition.StartCondition",
"type": "CLASS"
},
"type": "ATOMIC"
}
],
"edges": [
{
"source": "middle",
"target": "end",
"type": "NOT_FOLLOW"
},
{
"source": "start",
"target": "middle",
"type": "SKIP_TILL_NEXT"
}
],
"window": {
"type": "FIRST_AND_LAST",
"time": {
"unit": "MINUTES",
"size": 10
}
},
"afterMatchStrategy": {
"type": "NO_SKIP",
"patternName": null
},
"type": "COMPOSITE",
"version": 1
}例 2: パターンでカスタムパラメータを含む条件を使用する
この例では、リアルタイムの e コマースプロモーションイベント中に、異なるクラスの顧客に異なるマーケティング戦略を指定する方法について説明します。たとえば、クラス A の顧客にマーケティング関連のテキストメッセージを送信するマーケティング戦略、クラス B の顧客にクーポンを送信するマーケティング戦略、その他の顧客にはマーケティングアクションを実行しないマーケティング戦略を指定できます。前述の要件を満たすために、デプロイメントで共通クラスの条件を定義できます。デプロイメントで共通クラスの条件が使用されている場合にマーケティング戦略を調整する場合、デプロイメントコードを書き直し、デプロイメントを再コンパイルして実行する必要があります。たとえば、クラス C の顧客にクーポンを送信するマーケティング戦略を変更できます。操作を簡素化するために、カスタムパラメータを含む条件を使用できます。渡されたパラメータに基づいて戦略を調整する方法をコードで定義した後、データベースで渡されたパラメータの値を変更するだけで済みます。渡されたパラメータの値は、カスタムパラメータを含む条件の args パラメータの値です。たとえば、["A", "B"] を ["A", "B", "C"] に変更して、マーケティング戦略の動的更新を実行できます。
"condition": {
"args": [
"A", "B"
],
"className": "org.apache.flink.cep.pattern.conditions.CustomMiddleCondition",
"type": "CLASS"
}"condition": {
"args": [
"A", "B", "C"
],
"className": "org.apache.flink.cep.pattern.conditions.CustomMiddleCondition",
"type": "CLASS"
}特定のビジネスシナリオでカスタムパラメータを含む条件を使用する方法の詳細については、「デモ」をご参照ください。