すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:動的 CEP ルール:JSON フォーマットと使用方法

最終更新日:Mar 10, 2026

このトピックでは、動的 Flink 複合イベント処理 (CEP) において JSON フォーマットで定義されるルールについて説明します。

対象読者

  • リスク管理プラットフォームの開発者:動的 Flink CEP に精通しているプラットフォーム開発者は、このトピックで説明するフォーマットを学び、プラットフォームの要件に基づいてさらなるカプセル化が必要かどうかを判断できます。

  • リスク管理戦略担当者:特定のリスク管理戦略にのみ精通し、Java 開発経験がない担当者は、このフォーマットを学び、CEP の概念に基づいて新しいルールを作成し、オンラインのリスク管理デプロイメントに適用できます。

JSON フォーマットの定義

イベントシーケンス内のパターンは、グラフと見なすことができます。グラフ内の各ノードは、特定のイベントのパターンを表します。ノード間のエッジは、イベント選択戦略を表します。この戦略は、あるマッチしたパターンから次のパターンへの遷移を定義します。各グラフは、より大きなグラフの子ノードになることもでき、これによりネストされたパターンが可能になります。この概念に基づき、Real-time Compute for Apache Flink は CEP ルールを記述するための JSON 仕様を提供します。このアプローチにより、ルールのストレージと変更が簡素化されます。以降のセクションでは、これらの仕様のフィールドについて説明します。

  • ノードの定義

    ノードは完全なパターンを表し、以下のプロパティを含みます。

    フィールド

    説明

    タイプ

    必須

    注意

    name

    パターンの名前。

    string

    はい

    一意の文字列。

    説明

    ノード名は一意である必要があります。

    type

    ノードのタイプ。

    enum(string)

    はい

    • 子パターンを持つノードの場合は COMPOSITE。

    • 子パターンを持たないノードの場合は ATOMIC。

    quantifier

    1 回だけマッチするなど、パターンのマッチ方法を記述します。

    dict

    はい

    詳細については、「量指定子の定義」セクションをご参照ください。

    condition

    条件。

    dict

    いいえ

    詳細については、「条件の定義」セクションをご参照ください。

  • 量指定子の定義

    量指定子は、パターンに一致するイベントをどのようにマッチングするかを記述します。たとえば、パターン "A*" の場合、量指定子の `properties` フィールドは LOOPING であり、パターン内のイベント選択戦略は SKIP_TILL_ANY です。

    フィールド

    説明

    タイプ

    必須

    注意

    consumingStrategy

    イベント選択戦略。

    enum(string)

    はい

    有効な値:

    • STRICT

    • SKIP_TILL_NEXT

    • SKIP_TILL_ANY

    値の詳細については、「隣接性の定義」セクションをご参照ください。

    times

    パターンがマッチしなければならない回数。

    dict

    いいえ

    例:

    "times": {
              "from": 3,
              "to": 3,
              "windowTime": {
              "unit": "MINUTES",
              "size": 12
              }
            },

    `from` と `to` フィールドは整数です。`windowTime` の `unit` は DAYS、HOURS、MINUTES、SECONDS、または MILLISECONDS にすることができます。

    説明

    `windowTime` は null に設定できます:"windowTime": null

    properties

    量指定子のプロパティ。

    array of enumString

    はい

    値の詳細については、「量指定子のプロパティの意味」セクションをご参照ください。

    untilCondition

    停止条件。

    説明

    これは、LOOPING 量指定子を持つパターンの後でのみ使用できます。

    dict

    いいえ

    値の詳細については、「条件の定義」セクションをご参照ください。

  • 条件の定義

    条件は、特定の要件を満たすイベントをフィルターするために使用されます。たとえば、5 分以上閲覧した顧客をフィルターしたい場合、「5 分以上の閲覧」が条件になります。

    フィールド

    説明

    タイプ

    必須

    注意

    type

    条件のタイプ。

    enum(string)

    はい

    条件タイプに有効な値:

    • CLASS:ユーザー定義の条件。

    • AVIATOR:AVIATOR 式に基づく条件。

    • GROOVY:GROOVY 式に基づく条件。

    ...

    シリアル化可能なその他のカスタムフィールド。

    いいえ

    ...

    以下のタイプの Condition がサポートされています:

    • Class タイプの Condition

      フィールド名

      説明

      タイプ

      必須

      注意

      type

      条件のタイプ。

      enum(string)

      はい

      値は Class に固定されます。

      className

      クラスの名前。

      string

      はい

      クラスの完全名。例:com.alibaba.ververica.cep.demo.StartCondition

    • カスタムパラメーターを持つ Condition

      通常の Class タイプの条件では、クラス名 (`className`) のみを渡すことができ、パラメーターを動的に渡すことはできません。より表現力豊かな条件を作成するために、動的 CEP はカスタムパラメーター (`CustomArgsCondition`) を持つ条件をサポートします。これにより、`CustomArgsCondition` の必須パラメーターを JSON フォーマットの文字列配列として設定できます。その後、`CustomArgsCondition` インスタンスを動的に構築できます。この機能により、Java コードを変更して再コンパイルすることなく、条件パラメーターを動的に更新できます。

      フィールド

      説明

      タイプ

      必須

      注意

      type

      条件のタイプ。

      enum(string)

      はい

      値は Class に固定されます。

      className

      クラスの名前。

      string

      はい

      クラスの完全名。例:com.alibaba.ververica.cep.demo.CustomMiddleCondition

      args

      カスタムパラメーター。

      array of string

      はい

      文字列の配列。

    • Aviator 式に基づく Condition

      Aviator は、式を動的にバイトコードにコンパイルする式評価エンジンです。詳細については、aviatorscript をご参照ください。ジョブで Aviator 式に基づく条件を使用できます。これにより、Java コードを変更、再コンパイル、再実行することなく、条件のしきい値を動的に変更できます。

      フィールド

      説明

      タイプ

      必須

      注意

      type

      クラスの名前。

      string

      はい

      値は AVIATOR に固定されます。

      expression

      式の文字列。

      string

      はい

      式文字列(例: price > 10)です。この price 変数は、Java コードで定義されたフィールドから取得されます。

      データベースでこの文字列の値を変更できます。たとえば、price > 20 に変更します。Flink CEP ジョブは、動的に price > 20 をロードし、後のイベントを処理するために新しい AviatorCondition を作成します。

    • Groovy 式に基づく Condition

      Groovy は、Java 仮想マシン (JVM) 向けの動的型付け言語です。Groovy の構文の詳細については、構文をご参照ください。動的 CEP は、条件を定義するために Groovy 式をサポートします。これにより、条件のしきい値を動的に変更できます。

      フィールド

      説明

      タイプ

      必須

      注意

      type

      クラスの名前。

      string

      はい

      値は GROOVY に固定されます。

      expression

      式の文字列。

      string

      はい

      price > 5.0 && name.contains("mid") のような式文字列です。pricename などの変数は、Java コードで定義されたフィールドに由来します。この文字列の値はデータベースで変更できます。たとえば、price > 20 && name.contains("end") に変更できます。その後、Flink CEP ジョブは新しい Groovy 文字列を動的にロードし、後続のイベントを処理するための新しい GroovyCondition を作成します。

  • エッジの定義

    フィールド

    説明

    タイプ

    必須

    注意

    source

    ソースパターンの名前。

    string

    はい

    なし。

    target

    ターゲットパターンの名前。

    string

    はい

    なし。

    type

    イベント選択戦略。

    dict

    はい

    有効な値:

    • STRICT

    • SKIP_TILL_NEXT

    • SKIP_TILL_ANY

    • NOT_FOLLOW

    • NOT_NEXT

    値の詳細については、「隣接性の定義」セクションをご参照ください。

  • GraphNode は Node の定義を拡張

    `GraphNode` は完全なパターンシーケンスを表します。その `nodes` は個々のパターンであり、`edges` はあるマッチしたパターンから次のパターンへの遷移を定義します。

    ネストされたパターン (`GroupPattern`) をサポートするために、`GraphNode` は `Node` の子クラスとして扱われます。これは、`GraphNode` がより大きな `GraphNode` 内の `Node` として機能できることを意味します。基本的な `Node` と比較して、`GraphNode` には 2 つの追加タイプのフィールドがあります:

    • グラフ構造を記述するための `nodes` および `edges` フィールド。

    • グラフ内のタイムウィンドウポリシーとマッチ後のスキップ戦略を記述するための `window` および `afterMatchSkipStrategy` フィールド。

    次の表は、`GraphNode` のフィールドを説明しています。

    フィールド

    説明

    タイプ

    必須

    注意

    name

    複合パターンの名前。

    String

    はい

    一意の文字列。

    説明

    グラフ名は一意である必要があります。

    type

    ノードのタイプ。

    enum(string)

    はい

    値は COMPOSITE に固定されます。

    version

    グラフが使用する JSON フォーマットのバージョン。

    Int

    はい

    デフォルト値は 1 です。

    nodes

    パターン内にネストされた子パターン。

    array of Node

    はい

    空でない配列。

    edges

    ネストされた子パターン間の接続。

    array of Edge

    はい

    空にすることができる配列。

    window

    • `type` が `FIRST_AND_LAST` の場合、これは複合パターンの完全なマッチの開始と終了の間の最大時間です。

    • `type` が `PREVIOUS_AND_CURRENT` の場合、これは 2 つの隣接する子パターンのマッチ間の最大時間です。

    dict

    いいえ

    例:

    "window": {
       "type": "FIRST_AND_LAST",
       "time": {
       "unit": "DAYS",
       "size": 1
       }
    }

    unit は DAYS、HOURS、MINUTES、SECONDS、または MILLISECONDS にすることができます。データの型は Long または Integer です。

    afterMatchSkipStrategy

    グラフ内のすべてのイベントがマッチした後に使用するスキップ戦略。

    dict

    はい

    詳細については、「マッチ後のスキップ戦略 (AfterMatchSkipStrategy) の定義」セクションをご参照ください。

    quantifier

    1 回だけマッチするなど、パターンのマッチ方法を記述します。

    dict

    はい

    詳細については、「量指定子の定義」セクションをご参照ください。

  • マッチ後のスキップ戦略 (AfterMatchSkipStrategy) の定義

    フィールド

    説明

    タイプ

    必須

    注意

    type

    戦略のタイプ。

    enum(string)

    はい

    有効な値:

    • NO_SKIP (デフォルト):すべての成功したマッチが出力されます。

    • SKIP_TO_NEXT:同じイベントで開始されたすべての部分的なマッチを破棄します。

    • SKIP_PAST_LAST_EVENT:このマッチの開始と終了の間に開始されたすべての部分的なマッチを破棄します。

    • SKIP_TO_FIRST:このマッチの開始と PatternName という名前のイベントの最初の発生の間に開始されたすべての部分的なマッチを破棄します。

    • SKIP_TO_LAST:このマッチの開始と PatternName という名前のイベントの最後の発生の間に開始されたすべての部分的なマッチを破棄します。

    詳細については、「マッチ後のスキップ戦略」をご参照ください。

    patternName

    戦略が適用されるパターンの名前。

    string

    いいえ

    一意の文字列。

  • 隣接性の定義

    物理値

    意味

    STRICT

    厳密な隣接性。マッチしたイベント間にマッチしないイベントが現れることはありません。

    SKIP_TILL_NEXT

    緩和された隣接性。マッチしたイベント間にマッチしないイベントが現れることがあります。マッチしないイベントは無視されます。

    SKIP_TILL_ANY

    非決定的な緩和された隣接性。一部のマッチしたイベントに対する追加のマッチを無視できる、より緩和された隣接性です。

    NOT_NEXT

    直後のイベントが特定のイベントであってはなりません。

    NOT_FOLLOW

    特定のイベントが後で現れません。

    詳細については、「」と「FlinkCEP - Flink 用の複合イベント処理」をご参照ください。

  • 量指定子のプロパティの意味

    意味

    SINGLE

    パターンは 1 回だけ現れます。

    LOOPING

    パターンはループパターンです。正規表現の `*` や `+` のように、複数回現れることがあります。

    TIMES

    パターンは指定された回数現れます。

    GREEDY

    このパターンをマッチングする際、可能な限り最長のマッチを見つけるために貪欲マッチング戦略が使用されます。

    OPTIONAL

    パターンはオプションです。

例 1:共通パターンの使用

この例では、リアルタイムの E コマースプロモーションイベント中に、10 分のタイムウィンドウ内で以下の条件を満たす顧客に対して、動的 Flink CEP を使用してマーケティング戦略を調整する方法について説明します:

  • 会場のクーポンを取得した。

  • ショッピングカートに商品を 3 回以上追加した。

  • 支払いを完了しなかった。

以下のサンプルコードでは、会場のクーポンを取得する条件は StartCondition として定義され、ショッピングカートに商品を追加する条件は MiddleCondition として定義され、支払いの完了に関連する条件は EndCondition として定義されています。次のパターンが抽象化されます:10 分のタイムウィンドウ内で、StartCondition を満たすイベントが 1 回発生し、MiddleCondition を満たすイベントが 3 回以上発生し、EndCondition を満たすイベントは発生しません。StartCondition を満たすイベントはオプションです。以下のサンプルコードは、この例のパターンを記述する Java コードを示しています。

Pattern<Event, Event> pattern =
    Pattern.<Event>begin("start")
            .where(new StartCondition())
            .optional()
            .followedBy("middle")
            .where(new MiddleCondition())
            .timesOrMore(3)
            .notFollowedBy("end")
            .where(new EndCondition())
            .within(Time.minutes(10));

以下のサンプルコードは、この例のパターンを記述する JSON フォーマットのコードを示しています。

{
  "name": "end",
  "quantifier": {
    "consumingStrategy": "SKIP_TILL_NEXT",
    "properties": [
      "SINGLE"
    ],
    "times": null,
    "untilCondition": null
  },
  "condition": null,
  "nodes": [
    {
      "name": "end",
      "quantifier": {
        "consumingStrategy": "SKIP_TILL_NEXT",
        "properties": [
          "SINGLE"
        ],
        "times": null,
        "untilCondition": null
      },
      "condition": {
        "className": "com.alibaba.ververica.cep.demo.condition.EndCondition",
        "type": "CLASS"
      },
      "type": "ATOMIC"
    },
    {
      "name": "middle",
      "quantifier": {
        "consumingStrategy": "SKIP_TILL_NEXT",
        "properties": [
          "LOOPING"
        ],
        "times": {
          "from": 3,
          "to": 3,
          "windowTime": null
        },
        "untilCondition": null
      },
      "condition": {
        "className": "com.alibaba.ververica.cep.demo.condition.MiddleCondition",
        "type": "CLASS"
      },
      "type": "ATOMIC"
    },
    {
      "name": "start",
      "quantifier": {
        "consumingStrategy": "SKIP_TILL_NEXT",
        "properties": [
          "SINGLE",
          "OPTIONAL"
        ],
        "times": null,
        "untilCondition": null
      },
      "condition": {
        "className": "com.alibaba.ververica.cep.demo.condition.StartCondition",
        "type": "CLASS"
      },
      "type": "ATOMIC"
    }
  ],
  "edges": [
    {
      "source": "middle",
      "target": "end",
      "type": "NOT_FOLLOW"
    },
    {
      "source": "start",
      "target": "middle",
      "type": "SKIP_TILL_NEXT"
    }
  ],
  "window": {
    "type": "FIRST_AND_LAST",
    "time": {
      "unit": "MINUTES",
      "size": 10
    }
  },
  "afterMatchStrategy": {
    "type": "NO_SKIP",
    "patternName": null
  },
  "type": "COMPOSITE",
  "version": 1
}

例 2:パターンでカスタムパラメーターを含む条件の使用

この例では、リアルタイムの E コマースプロモーションイベント中に、異なるクラスの顧客に対して異なるマーケティング戦略を指定する方法について説明します。たとえば、クラス A の顧客にはマーケティング関連のテキストメッセージを送信する戦略、クラス B の顧客にはクーポンを送信する戦略、その他の顧客にはマーケティングアクションを取らない戦略を指定できます。デプロイメントで共通クラスの条件を定義して、前述の要件を満たすことができます。デプロイメントで共通クラスの条件が使用されている場合にマーケティング戦略を調整したい場合は、デプロイメントコードを書き換えて再コンパイルし、デプロイメントを再実行する必要があります。たとえば、クラス C の顧客にクーポンを送信するマーケティング戦略を変更できます。操作を簡素化するために、カスタムパラメーターを含む条件を使用できます。コードで渡されたパラメーターに基づいて戦略を調整する方法を定義した後、データベースで渡されたパラメーターの値を変更するだけで済みます。渡されたパラメーターの値は、カスタムパラメーターを含む条件の args パラメーターの値です。たとえば、["A", "B"] を ["A", "B", "C"] に変更して、マーケティング戦略の動的な更新を実行できます。

以下のサンプルコードは、パターンで最初に定義された条件を示しています。

"condition": {
    "args": [
        "A", "B"
    ],
    "className": "org.apache.flink.cep.pattern.conditions.CustomMiddleCondition",
    "type": "CLASS"
}

前述の条件を、カスタムパラメーターを含む条件に変更できます。以下のサンプルコードは、カスタムパラメーターを含む条件を示しています。

"condition": {
    "args": [
        "A", "B", "C"
    ],
    "className": "org.apache.flink.cep.pattern.conditions.CustomMiddleCondition",
    "type": "CLASS"
}

特定のビジネスシナリオでカスタムパラメーターを含む条件を使用する方法の詳細については、「デモ」をご参照ください。

説明

このトピックで言及されている aviatorscript および デモ は、サードパーティのウェブサイトからのものです。ウェブサイトにアクセスする際、ウェブサイトが開かなかったり、アクセスが遅延したりする場合があります。

参考資料

動的 Flink CEP のクイックスタート