すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:動的 Flink CEP における JSON 形式のルールの定義

最終更新日:Jan 08, 2025

このトピックでは、動的 Flink 複合イベント処理 (CEP) における JSON 形式で定義されたルールについて説明します。

対象読者

  • リスク管理プラットフォーム開発者: 動的 Flink CEP に精通しているプラットフォーム開発者は、このトピックで説明されている形式を学習し、プラットフォームの要件に基づいてさらなるカプセル化が必要かどうかを判断できます。
  • リスク管理戦略担当者: 特定のリスク管理戦略のみに精通しており、Java 開発経験がない担当者は、この形式を学習して使用し、CEP の概念に基づいて新しいルールを作成し、オンラインのリスク管理デプロイメントでルールを適用できます。

JSON 形式での CEP ルールの定義

イベントシーケンスのパターンはグラフと見なされます。グラフのノードは、特定のイベントのパターンを示します。ノード間のエッジは、イベントを照合するためにパターンを別のパターンに変換する方法を決定するイベント選択戦略を示します。各グラフは、より大きなグラフの子ノードと見なすこともできます。このようにして、パターンのネストがサポートされます。Alibaba Cloud Realtime Compute for Apache Flink は、CEP のルールを記述し、ルールの保存と変更を容易にする JSON ベースの仕様のセットを定義しています。次の表は、仕様の各フィールドの意味を示しています。
  • ノード
    ノードは完全なパターンを示します。次の表は、パターン内のフィールドを示しています。
    フィールド説明データ型必須備考
    nameパターンの名前。STRINGはい一意の文字列。
    説明 各ノードの名前は一意である必要があります。
    typeノードのタイプ。ENUM(STRING)はい
    • 子パターンを含むノードの場合、このフィールドの値は COMPOSITE です。
    • 子パターンを含まないノードの場合、このフィールドの値は ATOMIC です。
    quantifierパターンを照合する方法を記述する数量詞。たとえば、パターンを 1 回だけ照合するように数量詞を指定できます。辞書はい詳細については、このトピックの「数量詞」をご参照ください。
    condition条件。辞書いいえ詳細については、このトピックの「条件」をご参照ください。
  • 数量詞
    数量詞は、イベントをパターンと照合する方法を記述するために使用されます。たとえば、パターン "A*" の quantifier プロパティは LOOPING で、パターンの consumingStrategy フィールドは SKIP_TILL_ANY に設定されています。
    フィールド説明データ型必須備考
    consumingStrategyイベント選択戦略。ENUM(STRING)はい有効な値:
    • STRICT
    • SKIP_TILL_NEXT
    • SKIP_TILL_ANY

    値とその意味の詳細については、このトピックの「連続性」をご参照ください。

    timesパターンを照合する必要がある回数。辞書いいえ
    例:
    "times": {
              "from": 3,
              "to": 3,
              "windowTime": {
              "unit": "MINUTES",
              "size": 12
              }
            },
    from と to の値は INTEGER データ型である必要があります。 windowTime の単位は、DAYS、HOURS、MINUTES、SECONDS、または MILLISECONDS です。
    説明 windowTime は null に設定できます。このプロパティは、"windowTime": null の形式で表示されます。
    properties数量詞のプロパティ。列挙文字列の配列はい値とその意味の詳細については、このトピックの「数量詞」をご参照ください。
    untilConditionパターンの照合を停止するために使用される条件。
    説明 このパラメータは、LOOPING 数量詞を使用するパターンの後にのみ構成できます。
    辞書いいえ値とその意味の詳細については、このトピックの「条件」をご参照ください。
  • 条件
    条件は、特定の要件を満たすイベントを識別するために使用されます。たとえば、閲覧時間が 5 分を超えるユーザーを識別する場合、閲覧時間が 5 分を超えるという条件を指定できます。
    フィールド説明データ型必須備考
    type条件のタイプ。ENUM(STRING)はい有効な値:
    • CLASS: カスタム条件。
    • AVIATOR: aviator 式に基づく条件。
    • GROOVY: Groovy 式に基づく条件。
    ...シリアル化できるその他のカスタムフィールド。いいえ...
    次の条件がサポートされています:
    • Class タイプの条件
      フィールド説明データ型必須備考
      type条件のタイプ。ENUM(STRING)はい値を Class に設定します。
      classNameクラスの名前。STRINGはいcom.alibaba.ververica.cep.demo.StartCondition などのクラスの完全名。
    • カスタムパラメータを含む条件

      Class タイプの条件を使用する場合、className パラメータのみを指定できます。パラメータを動的に渡すことはできません。条件の表現機能を向上させるために、動的 Flink CEP はカスタムパラメータを含む CustomArgsCondition 条件をサポートしています。このようにして、JSON 文字列の配列の形式で CustomArgsCondition 条件に必要なパラメータを構成し、CustomArgsCondition インスタンスを動的に構築できます。この場合、Java コードを変更せずに条件のパラメータを動的に更新できます。

      フィールド説明データ型必須備考
      type条件のタイプ。ENUM(STRING)はい値を Class に設定します。
      classNameクラスの名前。STRINGはいcom.alibaba.ververica.cep.demo.CustomMiddleCondition などのクラスの完全名。
      argsカスタムパラメータ。文字列の配列はい値は文字列の配列です。
    • aviator 式に基づく条件
      Aviator は、式をバイトコードに動的にコンパイルする式評価エンジンです。詳細については、aviatorscript をご参照ください。したがって、デプロイメントで aviator 式に基づく条件を使用して、条件のしきい値を動的に変更できます。このようにして、Java コードを変更してコードを再コンパイルして実行する必要はありません。
      フィールド説明データ型必須備考
      typeクラスの名前。STRINGはい値を AVIATOR に設定します。
      expression式文字列。STRINGはいこのフィールドは、price > 10 のような形式で式文字列を指定します。 price 変数は、Java コードで定義されたフィールドです。

      データベース内の文字列の値を変更できます。たとえば、文字列の値を price > 20 に変更すると、price > 20 式が動的 Flink CEP デプロイメントに動的にロードされ、後続のイベントを処理するための新しい aviator 条件が構築されます。

    • Groovy 式に基づく条件
      Groovy は、Java 仮想マシン (JVM) プラットフォームに基づく動的言語です。Groovy 構文の詳細については、構文 をご参照ください。動的 Flink CEP では、Groovy 式を使用して条件を定義できます。このようにして、条件のしきい値を動的に変更できます。
      フィールド説明データ型必須備考
      typeクラスの名前。STRINGはい値を GROOVY に設定します。
      expression式文字列。STRINGはいこのフィールドは、price > 5.0 && name.contains("mid") のような形式で式文字列を指定します。 pricename などの変数は、Java コードで定義されたフィールドです。データベース内の文字列の値を変更できます。たとえば、文字列の値を price > 20 && name.contains("end") に変更すると、新しい Groovy 文字列が動的 Flink CEP デプロイメントに動的にロードされ、後続のイベントを処理するための新しい Groovy 条件が構築されます。
  • エッジ
    フィールド説明データ型必須備考
    sourceソースパターンの名前。STRINGはい該当なし。
    targetデスティネーションパターンの名前。STRINGはい該当なし。
    typeイベント選択戦略。辞書はい有効な値:
    • STRICT
    • SKIP_TILL_NEXT
    • SKIP_TILL_ANY
    • NOT_FOLLOW
    • NOT_NEXT

    値とその意味の詳細については、このトピックの「連続性」をご参照ください。

  • GraphNode extends Node

    GraphNode は、完全なパターンシーケンスを示します。 GraphNode の各ノードは、独立したパターンを示します。 GraphNode の各エッジは、イベントを照合するためにパターンを別のパターンに変換する方法を示します。

    GraphNode は Node のサブクラスと見なされ、より大きな GraphNode の Node として使用できます。このようにして、パターンのネストを示す GroupPattern がサポートされます。 Node と比較して、GraphNode は次の追加タイプのフィールドを提供します:
    • グラフの構造を記述する nodes フィールドと edges フィールド。
    • グラフの時間ウィンドウポリシーを記述する window フィールドと、イベント照合後に使用されるスキップ戦略を記述する afterMatchSkipStrategy フィールド。
    次の表は、GraphNode のフィールドを示しています。
    フィールド説明データ型必須備考
    name複合パターンの名前。STRINGはい一意の文字列。
    説明 各グラフの名前は一意である必要があります。
    typeノードのタイプ。ENUM(STRING)はい値を COMPOSITE に設定します。
    versionグラフで使用される JSON 形式のバージョン。INTEGERはいデフォルト値: 1。
    nodesパターンにネストされている子パターン。ノードの配列はいこのフィールドの値は配列です。配列は空にできません。
    edgesネストされている子パターン間の接続関係。エッジの配列はいこのフィールドの値は配列です。配列は空にできます。
    window
    • ウィンドウのタイプが FIRST_AND_LAST の場合、このフィールドは複合パターンの完全一致の開始と終了の間の最大時間間隔を示します。
    • ウィンドウのタイプが PREVIOUS_AND_CURRENT の場合、このフィールドは 2 つの隣接する子パターンの照合の間の最大時間間隔を示します。
    辞書いいえ例:
    "window": {
       "type": "FIRST_AND_LAST",
       "time": {
       "unit": "DAYS",
       "size": 1
       }
    }

    単位は、DAYS、HOURS、MINUTES、SECONDS、または MILLISECONDS です。データ型は LONG または INTEGER です。

    afterMatchSkipStrategyグラフ内のすべてのイベントが照合された後のスキップ戦略。辞書はい詳細については、このトピックの「AfterMatchSkipStrategy」をご参照ください。
    quantifierパターンを照合する方法を記述する数量詞。たとえば、パターンを 1 回だけ照合するように数量詞を指定できます。辞書はい詳細については、このトピックの「数量詞」をご参照ください。
  • AfterMatchSkipStrategy
    フィールド説明データ型必須備考
    type戦略のタイプ。ENUM(STRING)はい
    有効な値:
    • NO_SKIP: 出力に各成功一致を返します。これはデフォルト値です。
    • SKIP_TO_NEXT: 同じイベントで始まるすべての部分一致を破棄します。
    • SKIP_PAST_LAST_EVENT: 一致の開始と終了の間で始まるすべての部分一致を破棄します。
    • SKIP_TO_FIRST: 一致の開始と PatternName という名前のイベントの最初の出現の間で始まるすべての部分一致を破棄します。
    • SKIP_TO_LAST: 一致の開始と PatternName という名前のイベントの最後の出現の間で始まるすべての部分一致を破棄します。

    詳細については、「」After Match Skip Strategy」をご参照ください。

    patternName戦略を使用するパターンの名前。STRINGいいえ一意の文字列。
  • 連続性
    有効な値説明
    STRICT厳密な連続性。一致しないイベントは、一致するイベントの間に表示できません。
    SKIP_TILL_NEXT緩和された連続性。一致しないイベントは、一致するイベントの間に表示できます。一致しないイベントは無視されます。
    SKIP_TILL_ANY非決定論的緩和連続性。この値は、さらに緩和された連続性を示します。この連続性モードでは、特定の一致するイベントの追加一致が無視される可能性があります。
    NOT_NEXTイベントの後に発生する後続のイベントは、指定されたイベントにすることはできません。
    NOT_FOLLOW指定されたイベントは後続に表示できません。

    詳細については、「」FlinkCEP - Flink の複合イベント処理」をご参照ください。

  • 数量詞のプロパティ
    有効な値説明
    SINGLEパターンは 1 回だけ発生します。
    LOOPINGパターンはループパターンであり、複数回発生する可能性があります。この数量詞は、正規表現のアスタリスク (*) とプラス記号 (+) に似ています。
    TIMESパターンは指定された回数だけ発生する可能性があります。
    GREEDY貪欲マッチング戦略を使用して、最大数の一致を取得するためにパターンを照合します。
    OPTIONALパターンはオプションです。

例 1: 共通パターンを使用する

この例では、リアルタイムの e コマースプロモーションイベント中に 10 分の時間枠内で次の条件を満たす顧客のマーケティング戦略を調整するために、動的 Flink CEP を使用する方法について説明します。
  • 会場のクーポンを取得しました。
  • 3 回以上ショッピングカートに商品を追加しました。
  • 支払いを完了していません。
次のサンプルコードでは、会場のクーポンを取得するための条件は StartCondition として定義され、ショッピングカートに商品を追加するための条件は MiddleCondition として定義され、支払いの完了に関連する条件は EndCondition として定義されています。次のパターンが抽象化されています。10 分の時間枠内で、StartCondition を満たすイベントが 1 回発生し、MiddleCondition を満たすイベントが 3 回以上発生し、EndCondition を満たすイベントは発生しません。 StartCondition を満たすイベントはオプションです。次のサンプルコードは、この例のJavaコードを示しています。
Pattern<Event, Event> pattern =
    Pattern.<Event>begin("start")
            .where(new StartCondition())
            .optional()
            .followedBy("middle")
            .where(new MiddleCondition())
            .timesOrMore(3)
            .notFollowedBy("end")
            .where(new EndCondition())
            .within(Time.minutes(10));
次のサンプルコードは、この例の JSON 形式のコードを示しています。
{
  "name": "end",
  "quantifier": {
    "consumingStrategy": "SKIP_TILL_NEXT",
    "properties": [
      "SINGLE"
    ],
    "times": null,
    "untilCondition": null
  },
  "condition": null,
  "nodes": [
    {
      "name": "end",
      "quantifier": {
        "consumingStrategy": "SKIP_TILL_NEXT",
        "properties": [
          "SINGLE"
        ],
        "times": null,
        "untilCondition": null
      },
      "condition": {
        "className": "com.alibaba.ververica.cep.demo.condition.EndCondition",
        "type": "CLASS"
      },
      "type": "ATOMIC"
    },
    {
      "name": "middle",
      "quantifier": {
        "consumingStrategy": "SKIP_TILL_NEXT",
        "properties": [
          "LOOPING"
        ],
        "times": {
          "from": 3,
          "to": 3,
          "windowTime": null
        },
        "untilCondition": null
      },
      "condition": {
        "className": "com.alibaba.ververica.cep.demo.condition.MiddleCondition",
        "type": "CLASS"
      },
      "type": "ATOMIC"
    },
    {
      "name": "start",
      "quantifier": {
        "consumingStrategy": "SKIP_TILL_NEXT",
        "properties": [
          "SINGLE",
          "OPTIONAL"
        ],
        "times": null,
        "untilCondition": null
      },
      "condition": {
        "className": "com.alibaba.ververica.cep.demo.condition.StartCondition",
        "type": "CLASS"
      },
      "type": "ATOMIC"
    }
  ],
  "edges": [
    {
      "source": "middle",
      "target": "end",
      "type": "NOT_FOLLOW"
    },
    {
      "source": "start",
      "target": "middle",
      "type": "SKIP_TILL_NEXT"
    }
  ],
  "window": {
    "type": "FIRST_AND_LAST",
    "time": {
      "unit": "MINUTES",
      "size": 10
    }
  },
  "afterMatchStrategy": {
    "type": "NO_SKIP",
    "patternName": null
  },
  "type": "COMPOSITE",
  "version": 1
}

例 2: パターンでカスタムパラメータを含む条件を使用する

この例では、リアルタイムの e コマースプロモーションイベント中に、異なるクラスの顧客に異なるマーケティング戦略を指定する方法について説明します。たとえば、クラス A の顧客にマーケティング関連のテキストメッセージを送信するマーケティング戦略、クラス B の顧客にクーポンを送信するマーケティング戦略、その他の顧客にはマーケティングアクションを実行しないマーケティング戦略を指定できます。前述の要件を満たすために、デプロイメントで共通クラスの条件を定義できます。デプロイメントで共通クラスの条件が使用されている場合にマーケティング戦略を調整する場合、デプロイメントコードを書き直し、デプロイメントを再コンパイルして実行する必要があります。たとえば、クラス C の顧客にクーポンを送信するマーケティング戦略を変更できます。操作を簡素化するために、カスタムパラメータを含む条件を使用できます。渡されたパラメータに基づいて戦略を調整する方法をコードで定義した後、データベースで渡されたパラメータの値を変更するだけで済みます。渡されたパラメータの値は、カスタムパラメータを含む条件の args パラメータの値です。たとえば、["A", "B"] を ["A", "B", "C"] に変更して、マーケティング戦略の動的更新を実行できます。

次のサンプルコードは、パターンで最初に定義された条件を示しています。
"condition": {
    "args": [
        "A", "B"
    ],
    "className": "org.apache.flink.cep.pattern.conditions.CustomMiddleCondition",
    "type": "CLASS"
}
前述の条件を、カスタムパラメータを含む条件に変更できます。次のサンプルコードは、カスタムパラメータを含む条件を示しています。
"condition": {
    "args": [
        "A", "B", "C"
    ],
    "className": "org.apache.flink.cep.pattern.conditions.CustomMiddleCondition",
    "type": "CLASS"
}
特定のビジネスシナリオでカスタムパラメータを含む条件を使用する方法の詳細については、「デモ」をご参照ください。
説明 このトピックで言及されている aviatorscriptデモ は、サードパーティの Web サイトからのものです。Web サイトにアクセスすると、Web サイトが開かない場合やアクセスが遅延する場合があります。

関連情報

動的 Flink CEP を使用する