All Products
Search
Document Center

Realtime Compute for Apache Flink:Aturan CEP dinamis: Format JSON dan penggunaan

Last Updated:Mar 10, 2026

Topik ini menjelaskan aturan yang didefinisikan dalam format JSON pada Flink Complex Event Processing (CEP) dinamis.

Audien yang dituju

  • Pengembang platform kontrol risiko: Pengembang platform yang telah memahami Flink CEP dinamis dapat mempelajari format yang dijelaskan dalam topik ini dan menentukan apakah perlu melakukan enkapsulasi lebih lanjut berdasarkan kebutuhan platform.

  • Personel strategi kontrol risiko: Personel yang hanya memahami strategi kontrol risiko tertentu namun tidak memiliki pengalaman pengembangan Java dapat mempelajari dan menggunakan format ini untuk menulis aturan baru berdasarkan konsep CEP serta menerapkan aturan tersebut dalam penerapan kontrol risiko online.

Definisi format JSON

Pola dalam urutan event dapat dipandang sebagai sebuah graf. Setiap node dalam graf merepresentasikan pola untuk event tertentu. Sisi (edge) antar node merepresentasikan strategi seleksi event, yang mendefinisikan transisi dari satu pola yang cocok ke pola berikutnya. Setiap graf juga dapat menjadi node anak dari graf yang lebih besar, sehingga memungkinkan pola bersarang (nested). Berdasarkan konsep ini, Real-time Compute for Apache Flink menyediakan spesifikasi JSON untuk menggambarkan aturan CEP. Pendekatan ini menyederhanakan penyimpanan dan modifikasi aturan. Bagian-bagian berikut menjelaskan bidang-bidang dalam spesifikasi tersebut.

  • Definisi Node

    Node merepresentasikan pola lengkap dan berisi properti-properti berikut.

    Field

    Description

    Type

    Required

    Notes

    name

    Nama pola.

    string

    Yes

    String unik.

    Catatan

    Nama node harus unik.

    type

    Tipe node.

    enum(string)

    Yes

    • COMPOSITE untuk node dengan pola anak.

    • ATOMIC untuk node tanpa pola anak.

    quantifier

    Menjelaskan cara mencocokkan pola, misalnya hanya mencocokkan sekali.

    dict

    Yes

    Untuk informasi lebih lanjut, lihat bagian definisi Quantifier.

    condition

    Kondisi.

    dict

    No

    Untuk informasi lebih lanjut, lihat bagian definisi Condition.

  • Definisi Quantifier

    Quantifier menjelaskan cara mencocokkan event yang memenuhi pola. Misalnya, untuk pola "A*" , field `properties` dari quantifier bernilai LOOPING, dan strategi seleksi event dalam pola tersebut adalah SKIP_TILL_ANY.

    Field

    Description

    Type

    Required

    Notes

    consumingStrategy

    Strategi seleksi event.

    enum(string)

    Yes

    Nilai valid:

    • STRICT

    • SKIP_TILL_NEXT

    • SKIP_TILL_ANY

    Untuk informasi lebih lanjut tentang nilai-nilai tersebut, lihat bagian definisi Contiguity.

    times

    Jumlah kali pola harus dicocokkan.

    dict

    No

    Contoh:

    "times": {
              "from": 3,
              "to": 3,
              "windowTime": {
              "unit": "MINUTES",
              "size": 12
              }
            },

    Field `from` dan `to` bertipe integer. Satuan (`unit`) untuk `windowTime` dapat berupa DAYS, HOURS, MINUTES, SECONDS, atau MILLISECONDS.

    Catatan

    `windowTime` dapat diatur ke null: "windowTime": null.

    properties

    Properti quantifier.

    array of enumString

    Yes

    Untuk informasi lebih lanjut tentang nilai-nilai tersebut, lihat bagian makna properti Quantifier.

    untilCondition

    Kondisi berhenti.

    Catatan

    Hanya dapat digunakan setelah pola yang memiliki quantifier LOOPING.

    dict

    No

    Untuk informasi lebih lanjut tentang nilai-nilai tersebut, lihat bagian definisi Condition.

  • Definisi Kondisi

    Condition digunakan untuk memfilter event yang memenuhi persyaratan tertentu. Misalnya, jika Anda ingin memfilter pelanggan yang menjelajah selama lebih dari 5 menit, maka "menjelajah selama lebih dari 5 menit" merupakan condition tersebut.

    Field

    Description

    Type

    Required

    Notes

    type

    Tipe condition.

    enum(string)

    Yes

    Nilai valid untuk tipe condition:

    • CLASS: Condition yang ditentukan pengguna.

    • AVIATOR: Condition berdasarkan ekspresi AVIATOR.

    • GROOVY: Condition berdasarkan ekspresi GROOVY.

    ...

    Field kustom lain yang dapat diserialisasi.

    No

    ...

    Tipe-tipe Condition berikut didukung:

    • Kondisi Jenis Kelas

      Field Name

      Description

      Type

      Required

      Notes

      type

      Jenis kondisi.

      enum(string)

      Yes

      Nilainya tetap Class.

      className

      Nama kelas.

      string

      Yes

      Nama lengkap kelas, seperti com.alibaba.ververica.cep.demo.StartCondition.

    • Condition dengan parameter kustom

      Dengan condition tipe Class biasa, Anda hanya dapat mengirimkan nama kelas (`className`) dan tidak dapat mengirimkan parameter secara dinamis. Untuk membuat condition yang lebih ekspresif, CEP dinamis mendukung condition dengan parameter kustom (`CustomArgsCondition`). Ini memungkinkan Anda menetapkan parameter yang diperlukan untuk `CustomArgsCondition` sebagai array string dalam format JSON, sehingga Anda dapat secara dinamis membuat instance `CustomArgsCondition`. Fitur ini memungkinkan pembaruan parameter condition secara dinamis tanpa mengubah dan mengompilasi ulang kode Java.

      Field

      Description

      Type

      Required

      Notes

      type

      Tipe condition.

      enum(string)

      Yes

      Nilainya tetap Class.

      className

      Nama kelas.

      string

      Yes

      Nama lengkap kelas, seperti com.alibaba.ververica.cep.demo.CustomMiddleCondition.

      args

      Parameter kustom.

      array of string

      Yes

      Array string.

    • Condition berdasarkan ekspresi Aviator

      Aviator adalah mesin evaluasi ekspresi yang secara dinamis mengompilasi ekspresi menjadi bytecode. Untuk informasi lebih lanjut, lihat aviatorscript. Anda dapat menggunakan condition berbasis ekspresi Aviator dalam job, sehingga memungkinkan perubahan ambang batas condition secara dinamis tanpa memodifikasi, mengompilasi ulang, dan menjalankan ulang kode Java.

      Field

      Description

      Type

      Required

      Notes

      type

      Nama kelas.

      string

      Yes

      Nilainya tetap AVIATOR.

      expression

      String ekspresi.

      string

      Yes

      String ekspresi, seperti price > 10. Variabel price berasal dari field yang didefinisikan dalam kode Java.

      Anda dapat mengubah nilai string ini di database. Misalnya, ubah menjadi price > 20. Job Flink CEP kemudian akan memuat secara dinamis price > 20 dan membuat AviatorCondition baru untuk memproses event berikutnya.

    • Condition berdasarkan ekspresi Groovy

      Groovy adalah bahasa bertipe dinamis untuk Java Virtual Machine (JVM). Untuk informasi lebih lanjut tentang sintaks Groovy, lihat syntax. CEP dinamis mendukung ekspresi Groovy untuk mendefinisikan condition, sehingga memungkinkan perubahan ambang batas condition secara dinamis.

      Field

      Description

      Type

      Required

      Notes

      type

      Nama kelas.

      string

      Yes

      Nilainya tetap GROOVY.

      expression

      String ekspresi.

      string

      Yes

      String ekspresi, seperti price > 5.0 && name.contains("mid"). Variabel seperti price dan name berasal dari field yang didefinisikan dalam kode Java. Anda dapat mengubah nilai string ini di database. Misalnya, ubah menjadi price > 20 && name.contains("end"). Job Flink CEP kemudian akan memuat string Groovy baru secara dinamis dan membuat GroovyCondition baru untuk memproses event berikutnya.

  • Definisi Edge

    Field

    Description

    Type

    Required

    Notes

    source

    Nama pola sumber.

    string

    Yes

    None.

    target

    Nama pola target.

    string

    Yes

    None.

    type

    Strategi seleksi event.

    dict

    Yes

    Nilai valid:

    • STRICT

    • SKIP_TILL_NEXT

    • SKIP_TILL_ANY

    • NOT_FOLLOW

    • NOT_NEXT

    Untuk informasi lebih lanjut tentang nilai-nilai tersebut, lihat bagian definisi Contiguity.

  • GraphNode memperluas definisi Node.

    `GraphNode` merepresentasikan urutan pola lengkap. Field `nodes`-nya berisi pola individual, sedangkan field `edges`-nya mendefinisikan transisi dari satu pola yang cocok ke pola berikutnya.

    Untuk mendukung pola bersarang (`GroupPattern`), `GraphNode` diperlakukan sebagai kelas turunan dari `Node`. Artinya, `GraphNode` dapat berfungsi sebagai `Node` dalam `GraphNode` yang lebih besar. Dibandingkan dengan `Node` dasar, `GraphNode` memiliki dua jenis field tambahan:

    • Field `nodes` dan `edges` untuk menggambarkan struktur graf.

    • Field `window` dan `afterMatchSkipStrategy` untuk menggambarkan kebijakan jendela waktu dan strategi lewati setelah pencocokan dalam graf.

    Tabel berikut menjelaskan field-field `GraphNode`.

    Field

    Description

    Type

    Required

    Notes

    name

    Nama pola komposit.

    String

    Yes

    String unik.

    Catatan

    Nama graf harus unik.

    type

    Tipe node.

    enum(string)

    Yes

    Nilainya tetap COMPOSITE.

    version

    Versi format JSON yang digunakan graf.

    Int

    Yes

    Nilai default adalah 1.

    nodes

    Pola anak yang bersarang dalam pola.

    array of Node

    Yes

    Array tidak kosong.

    edges

    Koneksi antara pola anak yang bersarang.

    array of Edge

    Yes

    Array yang boleh kosong.

    window

    • Jika `type` adalah `FIRST_AND_LAST`, ini adalah waktu maksimum antara awal dan akhir pencocokan lengkap untuk pola komposit.

    • Jika `type` adalah `PREVIOUS_AND_CURRENT`, ini adalah waktu maksimum antara pencocokan dua pola anak yang berdekatan.

    dict

    No

    Contoh:

    "window": {
       "type": "FIRST_AND_LAST",
       "time": {
       "unit": "DAYS",
       "size": 1
       }
    }

    Satuannya dapat berupa DAYS, HOURS, MINUTES, SECONDS, atau MILLISECONDS. Tipe datanya Long atau Integer.

    afterMatchSkipStrategy

    Strategi lewati yang digunakan setelah semua event dalam graf dicocokkan.

    dict

    Yes

    Untuk informasi lebih lanjut, lihat bagian definisi strategi lewati setelah pencocokan (AfterMatchSkipStrategy).

    quantifier

    Menjelaskan cara mencocokkan pola, seperti hanya mencocokkan sekali.

    dict

    Yes

    Untuk informasi lebih lanjut, lihat bagian definisi Quantifier.

  • Definisi strategi lewati setelah pencocokan (AfterMatchSkipStrategy)

    Field

    Description

    Type

    Required

    Notes

    type

    Tipe strategi.

    enum(string)

    Yes

    Nilai valid:

    • NO_SKIP (default): Setiap pencocokan sukses dipancarkan.

    • SKIP_TO_NEXT: Mengabaikan setiap pencocokan parsial yang dimulai dengan event yang sama.

    • SKIP_PAST_LAST_EVENT: Mengabaikan setiap pencocokan parsial yang dimulai antara awal dan akhir pencocokan ini.

    • SKIP_TO_FIRST: Mengabaikan setiap pencocokan parsial yang dimulai antara awal pencocokan ini dan kemunculan pertama event bernama PatternName.

    • SKIP_TO_LAST: Mengabaikan setiap pencocokan parsial yang dimulai antara awal pencocokan ini dan kemunculan terakhir event bernama PatternName.

    Untuk informasi lebih lanjut, lihat After Match Skip Strategy.

    patternName

    Nama pola tempat strategi ini diterapkan.

    string

    No

    String unik.

  • Definisi Contiguity

    Physical value

    Meaning

    STRICT

    Contiguity ketat. Tidak ada event yang tidak cocok yang boleh muncul di antara event yang cocok.

    SKIP_TILL_NEXT

    Contiguity longgar. Event yang tidak cocok dapat muncul di antara event yang cocok. Event yang tidak cocok tersebut diabaikan.

    SKIP_TILL_ANY

    Contiguity longgar non-deterministik. Contiguity yang lebih longgar yang memungkinkan Anda mengabaikan pencocokan tambahan untuk beberapa event yang cocok.

    NOT_NEXT

    Event berikutnya langsung tidak boleh berupa event tertentu.

    NOT_FOLLOW

    Event tertentu tidak muncul kemudian.

    Untuk informasi lebih lanjut, lihat FlinkCEP - Complex event processing for Flink.

  • Makna properti Quantifier

    Value

    Meaning

    SINGLE

    Pola muncul hanya sekali.

    LOOPING

    Pola merupakan pola looping. Pola ini dapat muncul beberapa kali, mirip dengan `*` dan `+` dalam ekspresi reguler.

    TIMES

    Pola muncul sejumlah kali yang ditentukan.

    GREEDY

    Saat mencocokkan pola ini, strategi pencocokan serakah digunakan untuk menemukan pencocokan terpanjang yang mungkin.

    OPTIONAL

    Pola bersifat opsional.

Contoh 1: Menggunakan pola umum

Contoh ini menjelaskan cara menggunakan Flink CEP dinamis untuk menyesuaikan strategi pemasaran bagi pelanggan yang memenuhi kondisi berikut dalam jendela waktu 10 menit selama acara promosi e-commerce real-time:

  • Mendapatkan kupon untuk suatu venue.

  • Menambahkan item ke keranjang belanja mereka lebih dari tiga kali.

  • Tidak menyelesaikan pembayaran.

Dalam kode contoh berikut, kondisi untuk mendapatkan kupon venue didefinisikan sebagai StartCondition, kondisi untuk menambahkan item ke keranjang belanja didefinisikan sebagai MiddleCondition, dan kondisi terkait penyelesaian pembayaran didefinisikan sebagai EndCondition. Pola berikut diabstraksikan: dalam jendela waktu 10 menit, event yang memenuhi StartCondition terjadi sekali, event yang memenuhi MiddleCondition terjadi tiga kali atau lebih, dan tidak ada event yang memenuhi EndCondition. Event yang memenuhi StartCondition bersifat opsional. Kode contoh berikut menunjukkan kode Java yang menggambarkan pola dalam contoh ini.

Pattern<Event, Event> pattern =
    Pattern.<Event>begin("start")
            .where(new StartCondition())
            .optional()
            .followedBy("middle")
            .where(new MiddleCondition())
            .timesOrMore(3)
            .notFollowedBy("end")
            .where(new EndCondition())
            .within(Time.minutes(10));

Kode contoh berikut menunjukkan kode dalam format JSON yang menggambarkan pola dalam contoh ini.

{
  "name": "end",
  "quantifier": {
    "consumingStrategy": "SKIP_TILL_NEXT",
    "properties": [
      "SINGLE"
    ],
    "times": null,
    "untilCondition": null
  },
  "condition": null,
  "nodes": [
    {
      "name": "end",
      "quantifier": {
        "consumingStrategy": "SKIP_TILL_NEXT",
        "properties": [
          "SINGLE"
        ],
        "times": null,
        "untilCondition": null
      },
      "condition": {
        "className": "com.alibaba.ververica.cep.demo.condition.EndCondition",
        "type": "CLASS"
      },
      "type": "ATOMIC"
    },
    {
      "name": "middle",
      "quantifier": {
        "consumingStrategy": "SKIP_TILL_NEXT",
        "properties": [
          "LOOPING"
        ],
        "times": {
          "from": 3,
          "to": 3,
          "windowTime": null
        },
        "untilCondition": null
      },
      "condition": {
        "className": "com.alibaba.ververica.cep.demo.condition.MiddleCondition",
        "type": "CLASS"
      },
      "type": "ATOMIC"
    },
    {
      "name": "start",
      "quantifier": {
        "consumingStrategy": "SKIP_TILL_NEXT",
        "properties": [
          "SINGLE",
          "OPTIONAL"
        ],
        "times": null,
        "untilCondition": null
      },
      "condition": {
        "className": "com.alibaba.ververica.cep.demo.condition.StartCondition",
        "type": "CLASS"
      },
      "type": "ATOMIC"
    }
  ],
  "edges": [
    {
      "source": "middle",
      "target": "end",
      "type": "NOT_FOLLOW"
    },
    {
      "source": "start",
      "target": "middle",
      "type": "SKIP_TILL_NEXT"
    }
  ],
  "window": {
    "type": "FIRST_AND_LAST",
    "time": {
      "unit": "MINUTES",
      "size": 10
    }
  },
  "afterMatchStrategy": {
    "type": "NO_SKIP",
    "patternName": null
  },
  "type": "COMPOSITE",
  "version": 1
}

Contoh 2: Menggunakan condition yang mencakup parameter kustom dalam pola

Contoh ini menjelaskan cara menentukan strategi pemasaran berbeda untuk pelanggan kelas berbeda selama acara promosi e-commerce real-time. Misalnya, Anda dapat menentukan strategi pemasaran yang mengirim pesan teks terkait pemasaran kepada pelanggan Kelas A, strategi pemasaran yang mengirim kupon kepada pelanggan Kelas B, dan strategi pemasaran yang tidak mengambil tindakan pemasaran untuk pelanggan lainnya. Anda dapat mendefinisikan condition kelas umum dalam penerapan Anda untuk memenuhi persyaratan tersebut. Namun, jika ingin menyesuaikan strategi pemasaran saat condition kelas umum digunakan dalam penerapan Anda, Anda harus menulis ulang kode penerapan dan mengompilasi serta menjalankannya kembali—misalnya, untuk mengubah strategi agar mengirim kupon kepada pelanggan Kelas C. Untuk menyederhanakan operasi tersebut, Anda dapat menggunakan condition yang mencakup parameter kustom. Setelah mendefinisikan cara menyesuaikan strategi berdasarkan parameter yang dikirim dalam kode, Anda hanya perlu mengubah nilai parameter tersebut di database. Nilai parameter yang dikirim merupakan nilai parameter `args` dari condition yang mencakup parameter kustom. Sebagai contoh, Anda dapat mengubah ["A", "B"] menjadi ["A", "B", "C"] untuk melakukan pembaruan dinamis strategi pemasaran.

Kode contoh berikut menunjukkan condition yang awalnya didefinisikan dalam pola.

"condition": {
    "args": [
        "A", "B"
    ],
    "className": "org.apache.flink.cep.pattern.conditions.CustomMiddleCondition",
    "type": "CLASS"
}

Anda dapat mengubah condition tersebut menjadi condition yang mencakup parameter kustom. Kode contoh berikut menunjukkan condition yang mencakup parameter kustom.

"condition": {
    "args": [
        "A", "B", "C"
    ],
    "className": "org.apache.flink.cep.pattern.conditions.CustomMiddleCondition",
    "type": "CLASS"
}

Untuk informasi lebih lanjut tentang cara menggunakan condition yang mencakup parameter kustom dalam skenario bisnis tertentu, lihat Demo.

Catatan

aviatorscript dan Demo yang disebutkan dalam topik ini berasal dari situs web pihak ketiga. Saat Anda mengunjungi situs web tersebut, situs web mungkin gagal dibuka atau aksesnya tertunda.

Referensi

Quick Start untuk Flink CEP dinamis