This topic describes the rules that are defined in the JSON format in dynamic Flink complex event progressing (CEP).

Intended audience

  • Risk control platform developers: The platform developers who are familiar with dynamic Flink CEP can learn the format that is described in this topic and determine whether further encapsulation is required based on the platform requirements.
  • Risk control strategy personnel: The personnel who are familiar with only specific risk control strategies but do not have Java development experience can learn and use this format to write new rules based on CEP concepts and apply the rules in online risk control deployments.

Definitions of CEP rules in the JSON format

A pattern in an event sequence is considered a graph. A node in the graph indicates a pattern for specific events. An edge between nodes indicates an event selection strategy that determines how to transform a pattern to another pattern to match events. Each graph can also be considered a child node of a larger graph. This way, pattern nesting is supported. Alibaba Cloud Realtime Compute for Apache Flink defines a set of JSON-based specifications to describe rules in CEP and facilitate the storage and modification of the rules. The following tables describe the meanings of each field in the specifications.
  • Node
    A node indicates a complete pattern. The following table describes the fields in a pattern.
    FieldDescriptionData typeRequiredRemarks
    nameThe name of the pattern. STRINGYesA unique string.
    Note The name of each node must be unique.
    typeThe type of the node. ENUM(STRING)Yes
    • For a node that includes a child pattern, the value of this field is COMPOSITE.
    • For a node that does not include a child pattern, the value of this field is ATOMIC.
    quantifierA quantifier that describes how to match the pattern. For example, you can specify the quantifier to match a pattern only once. DictionaryYesFor more information, see "Quantifier" in this topic.
    conditionThe condition. DictionaryNoFor more information, see "Condition" in this topic.
  • Quantifier
    A quantifier is used to describe how to match events with the pattern. For example, the quantifier property of the pattern "A*" is LOOPING and the consumingStrategy field of the pattern is set to SKIP_TILL_ANY.
    FieldDescriptionData typeRequiredRemarks
    consumingStrategyThe event selection strategy. ENUM(STRING)YesValid values:
    • STRICT
    • SKIP_TILL_NEXT
    • SKIP_TILL_ANY

    For more information about the values and their meanings, see "Contiguity" in this topic.

    timesThe number of times the pattern needs to be matched. DictionaryNo
    Example:
    "times": {
              "from": 3,
              "to": 3,
              "windowTime": {
              "unit": "MINUTES",
              "size": 12
              }
            },
    The values of from and to must be of the INTEGER data type. The unit of windowTime can be DAYS, HOURS, MINUTES, SECONDS, or MILLISECONDS.
    Note windowTime can be set to null. This property is displayed in the following format: "windowTime": null.
    propertiesThe properties of the quantifier. Array of enumeration stringsYesFor more information about the values and their meanings, see "Quantifier" in this topic.
    untilCondition The condition that is used to stop matching the pattern.
    Note This parameter can be configured only after the pattern that uses the LOOPING quantifier.
    Dictionary NoFor more information about the values and their meanings, see "Condition" in this topic.
  • Condition
    Conditions are used to identify events that meet specific requirements. For example, if you want to identify users whose browsing duration is longer than 5 minutes, you can specify a condition that the browsing duration is longer than 5 minutes.
    FieldDescriptionData typeRequiredRemarks
    typeThe type of the condition. ENUM(STRING)YesValid values:
    • CLASS: a custom condition.
    • AVIATOR: a condition based on an aviator expression.
    • GROOVY: a condition based on a Groovy expression.
    ...Other custom fields that can be serialized. No...
    The following conditions are supported:
    • Condition of the Class type
      FieldDescriptionData typeRequiredRemarks
      typeThe type of the condition. ENUM(STRING)YesSet the value to Class.
      classNameThe name of the class. STRINGYesThe full name of the class, such as com.alibaba.ververica.cep.demo.StartCondition.
    • Condition that includes a custom parameter

      When you use a condition of the Class type, you can specify only the className parameter. Parameters cannot be dynamically passed. To improve the expression capabilities of conditions, dynamic Flink CEP supports the CustomArgsCondition condition, which includes a custom parameter. This way, you can configure required parameters in the CustomArgsCondition condition in the form of an array of JSON strings and dynamically construct CustomArgsCondition instances. In this case, you can dynamically update the parameters of the condition without the need to modify the Java code.

      FieldDescriptionData typeRequiredRemarks
      typeThe type of the condition. ENUM(STRING)YesSet the value to Class.
      classNameThe name of the class. STRINGYesThe full name of the class, such as com.alibaba.ververica.cep.demo.CustomMiddleCondition.
      argsThe custom parameter. Array of stringsYesThe value is an array of strings.
    • Condition based on an aviator expression
      Aviator is an expression evaluation engine that dynamically compiles expressions into bytecode. For more information, see aviatorscript. Therefore, you can use a condition based on an aviator expression in a deployment to dynamically modify the threshold of the condition. This way, you do not need to modify the Java code to recompile and run the code.
      FieldDescriptionData typeRequiredRemarks
      typeThe name of the class. STRINGYesSet the value to AVIATOR.
      expressionAn expression string. STRINGYesThis field specifies an expression string in a form similar to price > 10. The price variable is a field defined in Java code.

      You can change the value of the string in the database. For example, if you change the value of the string to price > 20, the price > 20 expression is dynamically loaded in a dynamic Flink CEP deployment to construct a new aviator condition to process subsequent events.

    • Condition based on a Groovy expression
      Groovy is a dynamic language based on the Java virtual machine (JVM) platform. For more information about the Groovy syntax, see Syntax. Dynamic Flink CEP allows you to use Groovy expressions to define conditions. This way, the thresholds of conditions can be dynamically modified.
      FieldDescriptionData typeRequiredRemarks
      typeThe name of the class. STRINGYesSet the value to GROOVY.
      expressionAn expression string. STRINGYesThis field specifies an expression string in a form similar to price > 5.0 && name.contains("mid"). The variables, such as price and name, are the fields defined in Java code. You can change the value of the string in the database. For example, if you change the value of the string to price > 20 && name.contains("end"), the new Groovy string is dynamically loaded in a dynamic Flink CEP deployment to construct a new Groovy condition to process subsequent events.
  • Edge
    FieldDescriptionData typeRequiredRemarks
    sourceThe name of the source pattern. STRINGYesN/A.
    targetThe name of the destination pattern. STRINGYesN/A.
    typeThe event selection strategy. DictionaryYesValid values:
    • STRICT
    • SKIP_TILL_NEXT
    • SKIP_TILL_ANY
    • NOT_FOLLOW
    • NOT_NEXT

    For more information about the values and their meanings, see "Contiguity" in this topic.

  • GraphNode extends Node

    A GraphNode indicates a complete pattern sequence. Each node of a GraphNode indicates an independent pattern. Each edge of a GraphNode indicates how to transform a pattern to another pattern to match events.

    A GraphNode is considered a subclass of Node and can be used as a Node in a larger GraphNode. This way, GroupPattern that indicates pattern nesting is supported. Compared with Node, GraphNode provides the following additional types of fields:
    • The nodes and edges fields that describe the structure of a graph.
    • The window field that describes the time window policy in a graph and the afterMatchSkipStrategy field that describes the skipping strategy used after event matching.
    The following table describes the fields of GraphNode.
    FieldDescriptionData typeRequiredRemarks
    nameThe name of the composite pattern. STRINGYesA unique string.
    Note The name of each graph must be unique.
    typeThe type of the node. ENUM(STRING)YesSet the value to COMPOSITE.
    versionThe version of the JSON format that is used by the graph. INTEGERYesDefault value: 1.
    nodesThe child patterns that are nested in the pattern. Array of nodesYesThe value of this field is an array. The array cannot be empty.
    edgesThe connection relationships between the nested child patterns. Array of edgesYesThe value of this field is an array. The array can be empty.
    window
    • If the type of the window is FIRST_AND_LAST, this field indicates the maximum time interval between the start and end of a complete match of the composite pattern.
    • If the type of the window is PREVIOUS_AND_CURRENT, this field indicates the maximum time interval between the matches of two adjacent child patterns.
    DictionaryNoExample:
    "window": {
       "type": "FIRST_AND_LAST",
       "time": {
       "unit": "DAYS",
       "size": 1
       }
    }

    The unit can be DAYS, HOURS, MINUTES, SECONDS, or MILLISECONDS. The data type can be LONG or INTEGER.

    afterMatchSkipStrategyThe skipping strategy after all events in the graph are matched. DictionaryYesFor more information, see "AfterMatchSkipStrategy" in this topic.
    quantifierA quantifier that describes how to match the pattern. For example, you can specify the quantifier to match a pattern only once. DictionaryYesFor more information, see "Quantifier" in this topic.
  • AfterMatchSkipStrategy
    FieldDescriptionData typeRequiredRemarks
    typeThe type of the strategy. ENUM(STRING)Yes
    Valid values:
    • NO_SKIP: returns each successful match in the output. This is the default value.
    • SKIP_TO_NEXT: discards all partial matches that start with the same event.
    • SKIP_PAST_LAST_EVENT: discards all partial matches that start between the beginning and end of the match.
    • SKIP_TO_FIRST: discards all partial matches that start between the beginning of the match and the first occurrence of the event named PatternName.
    • SKIP_TO_LAST: discards all partial matches that start between the beginning of the match and the last occurrence of the event named PatternName.

    For more information, see After Match Skip Strategy.

    patternNameThe name of the pattern that uses the strategy. STRINGNoA unique string.
  • Contiguity
    Valid valueDescription
    STRICTStrict contiguity. Unmatched events cannot appear between matched events.
    SKIP_TILL_NEXTRelaxed contiguity. Unmatched events can appear between matched events. The unmatched events are ignored.
    SKIP_TILL_ANYNon-deterministic relaxed contiguity. This value indicates a further relaxed contiguity. In this contiguity mode, additional matches for specific matched events can be ignored.
    NOT_NEXTThe subsequent event that occurs after an event cannot be a specified event.
    NOT_FOLLOWA specified event cannot appear subsequently.

    For more information, see FlinkCEP - Complex event processing for Flink.

  • Properties of a quantifier
    Valid valueDescription
    SINGLEThe pattern occurs only once.
    LOOPINGThe pattern is a looping pattern and may occur multiple times. This quantifier is similar to the asterisk (*) and plus sign (+) in regular expressions.
    TIMESThe pattern can occur for a specified number of times.
    GREEDYThe greedy matching strategy is used to match the pattern to obtain the maximum number of matches.
    OPTIONALThe pattern is optional.

Example 1: Use a common pattern

This example describes how to use dynamic Flink CEP to adjust marketing strategies for the customers that meet the following conditions in a 10-minute time window during a real-time e-commerce promotional event:
  • Obtained coupons for a venue.
  • Added items to their shopping carts for more than three times.
  • Did not complete the payments.
In the following sample code, the condition for obtaining coupons for a venue is defined as StartCondition, the condition for adding items to the shopping cart is defined as MiddleCondition, and the condition related to payment completion is defined as EndCondition. The following pattern is abstracted: In a 10-minute time window, an event that meets StartCondition occurs once, an event that meets MiddleCondition occurs three or more times, and no event that meets EndCondition occurs. The event that meets StartCondition is optional. The following sample code shows the Java code that describes the pattern in this example.
Pattern<Event, Event> pattern =
    Pattern.<Event>begin("start")
            .where(new StartCondition())
            .optional()
            .followedBy("middle")
            .where(new MiddleCondition())
            .timesOrMore(3)
            .notFollowedBy("end")
            .where(new EndCondition())
            .within(Time.minutes(10));
The following sample code shows the JSON-formatted code that describes the pattern in this example.
{
  "name": "end",
  "quantifier": {
    "consumingStrategy": "SKIP_TILL_NEXT",
    "properties": [
      "SINGLE"
    ],
    "times": null,
    "untilCondition": null
  },
  "condition": null,
  "nodes": [
    {
      "name": "end",
      "quantifier": {
        "consumingStrategy": "SKIP_TILL_NEXT",
        "properties": [
          "SINGLE"
        ],
        "times": null,
        "untilCondition": null
      },
      "condition": {
        "className": "com.alibaba.ververica.cep.demo.condition.EndCondition",
        "type": "CLASS"
      },
      "type": "ATOMIC"
    },
    {
      "name": "middle",
      "quantifier": {
        "consumingStrategy": "SKIP_TILL_NEXT",
        "properties": [
          "LOOPING"
        ],
        "times": {
          "from": 3,
          "to": 3,
          "windowTime": null
        },
        "untilCondition": null
      },
      "condition": {
        "className": "com.alibaba.ververica.cep.demo.condition.MiddleCondition",
        "type": "CLASS"
      },
      "type": "ATOMIC"
    },
    {
      "name": "start",
      "quantifier": {
        "consumingStrategy": "SKIP_TILL_NEXT",
        "properties": [
          "SINGLE",
          "OPTIONAL"
        ],
        "times": null,
        "untilCondition": null
      },
      "condition": {
        "className": "com.alibaba.ververica.cep.demo.condition.StartCondition",
        "type": "CLASS"
      },
      "type": "ATOMIC"
    }
  ],
  "edges": [
    {
      "source": "middle",
      "target": "end",
      "type": "NOT_FOLLOW"
    },
    {
      "source": "start",
      "target": "middle",
      "type": "SKIP_TILL_NEXT"
    }
  ],
  "window": {
    "type": "FIRST_AND_LAST",
    "time": {
      "unit": "MINUTES",
      "size": 10
    }
  },
  "afterMatchStrategy": {
    "type": "NO_SKIP",
    "patternName": null
  },
  "type": "COMPOSITE",
  "version": 1
}

Example 2: Use the condition that includes a custom parameter in a pattern

This example describes how to specify different marketing strategies for customers of different classes during a real-time e-commerce promotional event. For example, you can specify a marketing strategy that sends marketing-related text messages to customers of Class A, a marketing strategy that sends coupons to customers of Class B, and a marketing strategy that does not take marketing actions for other customers. You can define a condition of the common class in your deployment to meet the preceding requirements. If you want to adjust marketing strategies when a condition of the common class is used in your deployment, you must rewrite the deployment code and recompile and run the deployment. For example, you can modify a marketing strategy that sends coupons to customers of Class C. To simplify the operation, you can use the condition that includes a custom parameter. After you define how to adjust the strategies based on the passed parameter in the code, you need to only change the value of the passed parameter in the database. The value of the passed parameter is the value of the args parameter of the condition that includes a custom parameter. For example, you can change ["A", "B"] to ["A", "B", "C"] to perform dynamic updates of marketing strategies.

The following sample code shows the condition that is initially defined in the pattern.
"condition": {
    "args": [
        "A", "B"
    ],
    "className": "org.apache.flink.cep.pattern.conditions.CustomMiddleCondition",
    "type": "CLASS"
}
You can change the preceding condition to the condition that includes a custom parameter. The following sample code shows the condition that includes a custom parameter.
"condition": {
    "args": [
        "A", "B", "C"
    ],
    "className": "org.apache.flink.cep.pattern.conditions.CustomMiddleCondition",
    "type": "CLASS"
}
For more information about how to use the condition that includes a custom parameter in specific business scenarios, see Demo.
Note aviatorscript and Demo that are mentioned in this topic are from third-party websites. When you visit the websites, the websites may fail to open or the access may be delayed.

References

Getting started with dynamic Flink CEP