Topik ini menjelaskan aturan yang didefinisikan dalam format JSON pada Flink Complex Event Processing (CEP) dinamis.
Audien yang dituju
Pengembang platform kontrol risiko: Pengembang platform yang telah memahami Flink CEP dinamis dapat mempelajari format yang dijelaskan dalam topik ini dan menentukan apakah perlu melakukan enkapsulasi lebih lanjut berdasarkan kebutuhan platform.
Personel strategi kontrol risiko: Personel yang hanya memahami strategi kontrol risiko tertentu namun tidak memiliki pengalaman pengembangan Java dapat mempelajari dan menggunakan format ini untuk menulis aturan baru berdasarkan konsep CEP serta menerapkan aturan tersebut dalam penerapan kontrol risiko online.
Definisi format JSON
Pola dalam urutan event dapat dipandang sebagai sebuah graf. Setiap node dalam graf merepresentasikan pola untuk event tertentu. Sisi (edge) antar node merepresentasikan strategi seleksi event, yang mendefinisikan transisi dari satu pola yang cocok ke pola berikutnya. Setiap graf juga dapat menjadi node anak dari graf yang lebih besar, sehingga memungkinkan pola bersarang (nested). Berdasarkan konsep ini, Real-time Compute for Apache Flink menyediakan spesifikasi JSON untuk menggambarkan aturan CEP. Pendekatan ini menyederhanakan penyimpanan dan modifikasi aturan. Bagian-bagian berikut menjelaskan bidang-bidang dalam spesifikasi tersebut.
Definisi Node
Node merepresentasikan pola lengkap dan berisi properti-properti berikut.
Field
Description
Type
Required
Notes
name
Nama pola.
string
Yes
String unik.
CatatanNama node harus unik.
type
Tipe node.
enum(string)
Yes
COMPOSITE untuk node dengan pola anak.
ATOMIC untuk node tanpa pola anak.
quantifier
Menjelaskan cara mencocokkan pola, misalnya hanya mencocokkan sekali.
dict
Yes
Untuk informasi lebih lanjut, lihat bagian definisi Quantifier.
condition
Kondisi.
dict
No
Untuk informasi lebih lanjut, lihat bagian definisi Condition.
Definisi Quantifier
Quantifier menjelaskan cara mencocokkan event yang memenuhi pola. Misalnya, untuk pola
"A*", field `properties` dari quantifier bernilai LOOPING, dan strategi seleksi event dalam pola tersebut adalah SKIP_TILL_ANY.Field
Description
Type
Required
Notes
consumingStrategy
Strategi seleksi event.
enum(string)
Yes
Nilai valid:
STRICT
SKIP_TILL_NEXT
SKIP_TILL_ANY
Untuk informasi lebih lanjut tentang nilai-nilai tersebut, lihat bagian definisi Contiguity.
times
Jumlah kali pola harus dicocokkan.
dict
No
Contoh:
"times": { "from": 3, "to": 3, "windowTime": { "unit": "MINUTES", "size": 12 } },Field `from` dan `to` bertipe integer. Satuan (`unit`) untuk `windowTime` dapat berupa DAYS, HOURS, MINUTES, SECONDS, atau MILLISECONDS.
Catatan`windowTime` dapat diatur ke null:
"windowTime": null.properties
Properti quantifier.
array of enumString
Yes
Untuk informasi lebih lanjut tentang nilai-nilai tersebut, lihat bagian makna properti Quantifier.
untilCondition
Kondisi berhenti.
CatatanHanya dapat digunakan setelah pola yang memiliki quantifier LOOPING.
dict
No
Untuk informasi lebih lanjut tentang nilai-nilai tersebut, lihat bagian definisi Condition.
Definisi Kondisi
Condition digunakan untuk memfilter event yang memenuhi persyaratan tertentu. Misalnya, jika Anda ingin memfilter pelanggan yang menjelajah selama lebih dari 5 menit, maka "menjelajah selama lebih dari 5 menit" merupakan condition tersebut.
Field
Description
Type
Required
Notes
type
Tipe condition.
enum(string)
Yes
Nilai valid untuk tipe condition:
CLASS: Condition yang ditentukan pengguna.
AVIATOR: Condition berdasarkan ekspresi AVIATOR.
GROOVY: Condition berdasarkan ekspresi GROOVY.
...
Field kustom lain yang dapat diserialisasi.
No
...
Tipe-tipe Condition berikut didukung:
Kondisi Jenis Kelas
Field Name
Description
Type
Required
Notes
type
Jenis kondisi.
enum(string)
Yes
Nilainya tetap Class.
className
Nama kelas.
string
Yes
Nama lengkap kelas, seperti
com.alibaba.ververica.cep.demo.StartCondition.Condition dengan parameter kustom
Dengan condition tipe Class biasa, Anda hanya dapat mengirimkan nama kelas (`className`) dan tidak dapat mengirimkan parameter secara dinamis. Untuk membuat condition yang lebih ekspresif, CEP dinamis mendukung condition dengan parameter kustom (`CustomArgsCondition`). Ini memungkinkan Anda menetapkan parameter yang diperlukan untuk `CustomArgsCondition` sebagai array string dalam format JSON, sehingga Anda dapat secara dinamis membuat instance `CustomArgsCondition`. Fitur ini memungkinkan pembaruan parameter condition secara dinamis tanpa mengubah dan mengompilasi ulang kode Java.
Field
Description
Type
Required
Notes
type
Tipe condition.
enum(string)
Yes
Nilainya tetap Class.
className
Nama kelas.
string
Yes
Nama lengkap kelas, seperti
com.alibaba.ververica.cep.demo.CustomMiddleCondition.args
Parameter kustom.
array of string
Yes
Array string.
Condition berdasarkan ekspresi Aviator
Aviator adalah mesin evaluasi ekspresi yang secara dinamis mengompilasi ekspresi menjadi bytecode. Untuk informasi lebih lanjut, lihat aviatorscript. Anda dapat menggunakan condition berbasis ekspresi Aviator dalam job, sehingga memungkinkan perubahan ambang batas condition secara dinamis tanpa memodifikasi, mengompilasi ulang, dan menjalankan ulang kode Java.
Field
Description
Type
Required
Notes
type
Nama kelas.
string
Yes
Nilainya tetap AVIATOR.
expression
String ekspresi.
string
Yes
String ekspresi, seperti price > 10. Variabel price berasal dari field yang didefinisikan dalam kode Java.
Anda dapat mengubah nilai string ini di database. Misalnya, ubah menjadi price > 20. Job Flink CEP kemudian akan memuat secara dinamis price > 20 dan membuat AviatorCondition baru untuk memproses event berikutnya.
Condition berdasarkan ekspresi Groovy
Groovy adalah bahasa bertipe dinamis untuk Java Virtual Machine (JVM). Untuk informasi lebih lanjut tentang sintaks Groovy, lihat syntax. CEP dinamis mendukung ekspresi Groovy untuk mendefinisikan condition, sehingga memungkinkan perubahan ambang batas condition secara dinamis.
Field
Description
Type
Required
Notes
type
Nama kelas.
string
Yes
Nilainya tetap GROOVY.
expression
String ekspresi.
string
Yes
String ekspresi, seperti price > 5.0 && name.contains("mid"). Variabel seperti price dan name berasal dari field yang didefinisikan dalam kode Java. Anda dapat mengubah nilai string ini di database. Misalnya, ubah menjadi price > 20 && name.contains("end"). Job Flink CEP kemudian akan memuat string Groovy baru secara dinamis dan membuat GroovyCondition baru untuk memproses event berikutnya.
Definisi Edge
Field
Description
Type
Required
Notes
source
Nama pola sumber.
string
Yes
None.
target
Nama pola target.
string
Yes
None.
type
Strategi seleksi event.
dict
Yes
Nilai valid:
STRICT
SKIP_TILL_NEXT
SKIP_TILL_ANY
NOT_FOLLOW
NOT_NEXT
Untuk informasi lebih lanjut tentang nilai-nilai tersebut, lihat bagian definisi Contiguity.
GraphNode memperluas definisi Node.
`GraphNode` merepresentasikan urutan pola lengkap. Field `nodes`-nya berisi pola individual, sedangkan field `edges`-nya mendefinisikan transisi dari satu pola yang cocok ke pola berikutnya.
Untuk mendukung pola bersarang (`GroupPattern`), `GraphNode` diperlakukan sebagai kelas turunan dari `Node`. Artinya, `GraphNode` dapat berfungsi sebagai `Node` dalam `GraphNode` yang lebih besar. Dibandingkan dengan `Node` dasar, `GraphNode` memiliki dua jenis field tambahan:
Field `nodes` dan `edges` untuk menggambarkan struktur graf.
Field `window` dan `afterMatchSkipStrategy` untuk menggambarkan kebijakan jendela waktu dan strategi lewati setelah pencocokan dalam graf.
Tabel berikut menjelaskan field-field `GraphNode`.
Field
Description
Type
Required
Notes
name
Nama pola komposit.
String
Yes
String unik.
CatatanNama graf harus unik.
type
Tipe node.
enum(string)
Yes
Nilainya tetap COMPOSITE.
version
Versi format JSON yang digunakan graf.
Int
Yes
Nilai default adalah 1.
nodes
Pola anak yang bersarang dalam pola.
array of Node
Yes
Array tidak kosong.
edges
Koneksi antara pola anak yang bersarang.
array of Edge
Yes
Array yang boleh kosong.
window
Jika `type` adalah `FIRST_AND_LAST`, ini adalah waktu maksimum antara awal dan akhir pencocokan lengkap untuk pola komposit.
Jika `type` adalah `PREVIOUS_AND_CURRENT`, ini adalah waktu maksimum antara pencocokan dua pola anak yang berdekatan.
dict
No
Contoh:
"window": { "type": "FIRST_AND_LAST", "time": { "unit": "DAYS", "size": 1 } }Satuannya dapat berupa DAYS, HOURS, MINUTES, SECONDS, atau MILLISECONDS. Tipe datanya Long atau Integer.
afterMatchSkipStrategy
Strategi lewati yang digunakan setelah semua event dalam graf dicocokkan.
dict
Yes
Untuk informasi lebih lanjut, lihat bagian definisi strategi lewati setelah pencocokan (AfterMatchSkipStrategy).
quantifier
Menjelaskan cara mencocokkan pola, seperti hanya mencocokkan sekali.
dict
Yes
Untuk informasi lebih lanjut, lihat bagian definisi Quantifier.
Definisi strategi lewati setelah pencocokan (AfterMatchSkipStrategy)
Field
Description
Type
Required
Notes
type
Tipe strategi.
enum(string)
Yes
Nilai valid:
NO_SKIP (default): Setiap pencocokan sukses dipancarkan.
SKIP_TO_NEXT: Mengabaikan setiap pencocokan parsial yang dimulai dengan event yang sama.
SKIP_PAST_LAST_EVENT: Mengabaikan setiap pencocokan parsial yang dimulai antara awal dan akhir pencocokan ini.
SKIP_TO_FIRST: Mengabaikan setiap pencocokan parsial yang dimulai antara awal pencocokan ini dan kemunculan pertama event bernama PatternName.
SKIP_TO_LAST: Mengabaikan setiap pencocokan parsial yang dimulai antara awal pencocokan ini dan kemunculan terakhir event bernama PatternName.
Untuk informasi lebih lanjut, lihat After Match Skip Strategy.
patternName
Nama pola tempat strategi ini diterapkan.
string
No
String unik.
Definisi Contiguity
Physical value
Meaning
STRICT
Contiguity ketat. Tidak ada event yang tidak cocok yang boleh muncul di antara event yang cocok.
SKIP_TILL_NEXT
Contiguity longgar. Event yang tidak cocok dapat muncul di antara event yang cocok. Event yang tidak cocok tersebut diabaikan.
SKIP_TILL_ANY
Contiguity longgar non-deterministik. Contiguity yang lebih longgar yang memungkinkan Anda mengabaikan pencocokan tambahan untuk beberapa event yang cocok.
NOT_NEXT
Event berikutnya langsung tidak boleh berupa event tertentu.
NOT_FOLLOW
Event tertentu tidak muncul kemudian.
Untuk informasi lebih lanjut, lihat FlinkCEP - Complex event processing for Flink.
Makna properti Quantifier
Value
Meaning
SINGLE
Pola muncul hanya sekali.
LOOPING
Pola merupakan pola looping. Pola ini dapat muncul beberapa kali, mirip dengan `*` dan `+` dalam ekspresi reguler.
TIMES
Pola muncul sejumlah kali yang ditentukan.
GREEDY
Saat mencocokkan pola ini, strategi pencocokan serakah digunakan untuk menemukan pencocokan terpanjang yang mungkin.
OPTIONAL
Pola bersifat opsional.
Contoh 1: Menggunakan pola umum
Contoh ini menjelaskan cara menggunakan Flink CEP dinamis untuk menyesuaikan strategi pemasaran bagi pelanggan yang memenuhi kondisi berikut dalam jendela waktu 10 menit selama acara promosi e-commerce real-time:
Mendapatkan kupon untuk suatu venue.
Menambahkan item ke keranjang belanja mereka lebih dari tiga kali.
Tidak menyelesaikan pembayaran.
Dalam kode contoh berikut, kondisi untuk mendapatkan kupon venue didefinisikan sebagai StartCondition, kondisi untuk menambahkan item ke keranjang belanja didefinisikan sebagai MiddleCondition, dan kondisi terkait penyelesaian pembayaran didefinisikan sebagai EndCondition. Pola berikut diabstraksikan: dalam jendela waktu 10 menit, event yang memenuhi StartCondition terjadi sekali, event yang memenuhi MiddleCondition terjadi tiga kali atau lebih, dan tidak ada event yang memenuhi EndCondition. Event yang memenuhi StartCondition bersifat opsional. Kode contoh berikut menunjukkan kode Java yang menggambarkan pola dalam contoh ini.
Pattern<Event, Event> pattern =
Pattern.<Event>begin("start")
.where(new StartCondition())
.optional()
.followedBy("middle")
.where(new MiddleCondition())
.timesOrMore(3)
.notFollowedBy("end")
.where(new EndCondition())
.within(Time.minutes(10));Kode contoh berikut menunjukkan kode dalam format JSON yang menggambarkan pola dalam contoh ini.
{
"name": "end",
"quantifier": {
"consumingStrategy": "SKIP_TILL_NEXT",
"properties": [
"SINGLE"
],
"times": null,
"untilCondition": null
},
"condition": null,
"nodes": [
{
"name": "end",
"quantifier": {
"consumingStrategy": "SKIP_TILL_NEXT",
"properties": [
"SINGLE"
],
"times": null,
"untilCondition": null
},
"condition": {
"className": "com.alibaba.ververica.cep.demo.condition.EndCondition",
"type": "CLASS"
},
"type": "ATOMIC"
},
{
"name": "middle",
"quantifier": {
"consumingStrategy": "SKIP_TILL_NEXT",
"properties": [
"LOOPING"
],
"times": {
"from": 3,
"to": 3,
"windowTime": null
},
"untilCondition": null
},
"condition": {
"className": "com.alibaba.ververica.cep.demo.condition.MiddleCondition",
"type": "CLASS"
},
"type": "ATOMIC"
},
{
"name": "start",
"quantifier": {
"consumingStrategy": "SKIP_TILL_NEXT",
"properties": [
"SINGLE",
"OPTIONAL"
],
"times": null,
"untilCondition": null
},
"condition": {
"className": "com.alibaba.ververica.cep.demo.condition.StartCondition",
"type": "CLASS"
},
"type": "ATOMIC"
}
],
"edges": [
{
"source": "middle",
"target": "end",
"type": "NOT_FOLLOW"
},
{
"source": "start",
"target": "middle",
"type": "SKIP_TILL_NEXT"
}
],
"window": {
"type": "FIRST_AND_LAST",
"time": {
"unit": "MINUTES",
"size": 10
}
},
"afterMatchStrategy": {
"type": "NO_SKIP",
"patternName": null
},
"type": "COMPOSITE",
"version": 1
}Contoh 2: Menggunakan condition yang mencakup parameter kustom dalam pola
Contoh ini menjelaskan cara menentukan strategi pemasaran berbeda untuk pelanggan kelas berbeda selama acara promosi e-commerce real-time. Misalnya, Anda dapat menentukan strategi pemasaran yang mengirim pesan teks terkait pemasaran kepada pelanggan Kelas A, strategi pemasaran yang mengirim kupon kepada pelanggan Kelas B, dan strategi pemasaran yang tidak mengambil tindakan pemasaran untuk pelanggan lainnya. Anda dapat mendefinisikan condition kelas umum dalam penerapan Anda untuk memenuhi persyaratan tersebut. Namun, jika ingin menyesuaikan strategi pemasaran saat condition kelas umum digunakan dalam penerapan Anda, Anda harus menulis ulang kode penerapan dan mengompilasi serta menjalankannya kembali—misalnya, untuk mengubah strategi agar mengirim kupon kepada pelanggan Kelas C. Untuk menyederhanakan operasi tersebut, Anda dapat menggunakan condition yang mencakup parameter kustom. Setelah mendefinisikan cara menyesuaikan strategi berdasarkan parameter yang dikirim dalam kode, Anda hanya perlu mengubah nilai parameter tersebut di database. Nilai parameter yang dikirim merupakan nilai parameter `args` dari condition yang mencakup parameter kustom. Sebagai contoh, Anda dapat mengubah ["A", "B"] menjadi ["A", "B", "C"] untuk melakukan pembaruan dinamis strategi pemasaran.
Kode contoh berikut menunjukkan condition yang awalnya didefinisikan dalam pola.
"condition": {
"args": [
"A", "B"
],
"className": "org.apache.flink.cep.pattern.conditions.CustomMiddleCondition",
"type": "CLASS"
}Anda dapat mengubah condition tersebut menjadi condition yang mencakup parameter kustom. Kode contoh berikut menunjukkan condition yang mencakup parameter kustom.
"condition": {
"args": [
"A", "B", "C"
],
"className": "org.apache.flink.cep.pattern.conditions.CustomMiddleCondition",
"type": "CLASS"
}Untuk informasi lebih lanjut tentang cara menggunakan condition yang mencakup parameter kustom dalam skenario bisnis tertentu, lihat Demo.
aviatorscript dan Demo yang disebutkan dalam topik ini berasal dari situs web pihak ketiga. Saat Anda mengunjungi situs web tersebut, situs web mungkin gagal dibuka atau aksesnya tertunda.