All Products
Search
Document Center

Realtime Compute for Apache Flink:Pemrosesan Peristiwa Kompleks (CEP)

Last Updated:Mar 10, 2026

Topik ini menjelaskan pernyataan Pemrosesan Peristiwa Kompleks (Complex Event Processing/CEP) yang tersedia di Realtime Compute for Apache Flink.

Informasi latar belakang

Realtime Compute for Apache Flink meningkatkan CEP SQL standar Apache Flink dengan menambahkan dukungan untuk mengoutput timeout matches, loose contiguity (followedBy), serta penentuan contiguity antar event. Untuk informasi lebih lanjut mengenai fitur dasar CEP SQL Apache Flink, lihat Pattern Recognition.

Batasan

  • Hanya versi mesin komputasi waktu nyata vvr-6.0.2-flink-1.15 dan yang lebih baru yang mendukung sintaks CEP SQL yang diperluas.

  • Hanya versi mesin komputasi waktu nyata vvr-6.0.5-flink-1.15 dan yang lebih baru yang mendukung group patterns dan sintaks NO SKIP.

Batas waktu keluaran cocok

Pertimbangkan urutan input event berikut:

+----+------+------------------+
| id | type |          rowtime |
+----+------+------------------+
|  1 |    A | 2022-09-19 12:00 |
|  2 |    B | 2022-09-19 12:01 |
|  3 |    A | 2022-09-19 12:02 |
|  4 |    B | 2022-09-19 12:05 |
+----+------+------------------+

Untuk pola A B, Anda dapat membatasi rentang waktu seluruh pencocokan pola menjadi dua menit dengan menambahkan klausa WITHIN INTERVAL '2' MINUTES setelah pernyataan `PATTERN`. Contoh kode berikut mengilustrasikan hal tersebut.

SELECT *
FROM MyTable MATCH_RECOGNIZE (
  ORDER BY rowtime
  MEASURES
    A.id AS aid,
    B.id AS bid,
    A.rowtime AS atime,
    B.rowtime AS btime
  PATTERN (A B) WITHIN INTERVAL '2' MINUTES
  DEFINE
    A AS type = 'A',
    B AS type = 'B'
) AS T

Tanpa klausa `WITHIN`, ditemukan dua pencocokan: `id=1, id=2` dan `id=3, id=4`. Jika klausa `WITHIN` ditambahkan, interval waktu antara event A dan event B pada pencocokan kedua adalah 3 menit, yang melebihi batas 2 menit. Oleh karena itu, pernyataan SQL hanya mengembalikan pencocokan pertama, seperti yang ditunjukkan pada output berikut.

+-----+-----+------------------+------------------+
| aid | bid |            atime |            btime |
+-----+-----+------------------+------------------+
|   1 |   2 | 2022-09-19 12:00 | 2022-09-19 12:01 |
+-----+-----+------------------+------------------+

Saat Anda mendefinisikan klausa `WITHIN`, pencocokan parsial yang melebihi batas waktu akan dibuang. Untuk mengoutput urutan timeout—yaitu urutan yang tidak sepenuhnya cocok dalam batas waktu—Anda dapat menggunakan pernyataan ONE ROW PER MATCH SHOW TIMEOUT MATCHES. Contoh berikut menunjukkan penggunaan pernyataan SQL tersebut untuk mengoutput urutan timeout.

SELECT *
FROM MyTable MATCH_RECOGNIZE (
  ORDER BY rowtime
  MEASURES
    A.id AS aid,
    B.id AS bid,
    A.rowtime AS atime,
    B.rowtime AS btime
  ONE ROW PER MATCH SHOW TIMEOUT MATCHES
  PATTERN (A B) WITHIN INTERVAL '2' MINUTES
  DEFINE
    A AS type = 'A',
    B AS type = 'B'
) AS T

Pernyataan ini mengoutput urutan yang tidak cocok. Hasilnya sebagai berikut.

+-----+--------+------------------+------------------+
| aid |    bid |            atime |            btime |
+-----+--------+------------------+------------------+
|   1 |      2 | 2022-09-19 12:00 | 2022-09-19 12:01 |
|   3 | <NULL> | 2022-09-19 12:00 |           <NULL> |
+-----+--------+------------------+------------------+
Catatan

Karena event B dengan id=4 melebihi batas waktu `WITHIN`, event tersebut tidak termasuk dalam urutan pencocokan. Oleh karena itu, nilai `bid` dan `btime` bernilai `NULL`.

Contiguity antar event

open source Flink CEP Java API memungkinkan Anda menentukan kebijakan contiguity antar event, yaitu strict contiguity (next()), loose contiguity (followedBy()), non-deterministic loose contiguity (followedByAny()), strict non-contiguity (notNext()), dan loose non-contiguity (notFollowedBy()).

Secara default, Flink CEP SQL menggunakan kebijakan strict contiguity, yang mensyaratkan bahwa event yang cocok dengan pola harus muncul secara berurutan tanpa event lain di antaranya. Misalnya, pola `(A B)` menyatakan bahwa event A harus segera diikuti oleh event B. Realtime Compute for Apache Flink memperluas fitur ini sehingga kemampuan ekspresinya setara sepenuhnya dengan Java API.

Sebagai contoh, untuk urutan input event a1, b1, a2, a3, b2, b3, tabel berikut menunjukkan urutan pencocokan untuk berbagai pola.

Catatan

Selama proses pencocokan, klausa `AFTER MATCH` menggunakan kebijakan `SKIP TO NEXT ROW`. Untuk informasi lebih lanjut, lihat After Match Strategy.

Java API

SQL

Kebijakan

Urutan pencocokan

A.next(B)

(A B)

Strict contiguity: Mengharapkan semua event yang cocok muncul secara ketat satu demi satu, tanpa event yang tidak cocok di antaranya.

{a1 b1}
{a3 b2}

A.followedBy(B)

(A {- C*? -} B)

C adalah simbol yang tidak didefinisikan dalam klausa `DEFINE` dan menunjukkan pencocokan apa pun.

Loose contiguity: Mengabaikan event yang tidak cocok di antara event yang cocok.

{a1 b1}
{a2 b2}
{a3 b2}

A.followedByAny(B)

(A {- C* -} B)

C adalah simbol yang tidak didefinisikan dalam klausa `DEFINE` dan menunjukkan pencocokan apa pun.

Non-deterministic loose contiguity: Bentuk loose contiguity yang lebih longgar yang memungkinkan pencocokan tambahan dari beberapa event yang cocok untuk diabaikan.

{a1 b1}
{a2 b2}
{a3 b2}
Catatan

Hasil dalam contoh ini didasarkan pada kebijakan default `SKIP TO NEXT ROW` CEP SQL. Kebijakan default Flink CEP Java API adalah `NO SKIP`. Jika Anda ingin menggunakan `NO SKIP`, lihat Kebijakan AFTER MATCH NO SKIP.

A.notNext(B)

(A [^B])

Strict non-contiguity: Mengharapkan bahwa event lain tidak segera mengikuti suatu event.

{a2}

A.notFollowedBy(B)

(A {- C*? -} [^B]

C adalah simbol yang tidak didefinisikan dalam klausa `DEFINE` dan menunjukkan pencocokan apa pun.

Catatan

Saat menggunakan sintaks `notFollowedBy` di akhir pola, Anda harus menentukan kondisi `WITHIN`.

Loose non-contiguity: Mengharapkan bahwa suatu event tidak muncul di mana pun di antara dua event lain. Saat digunakan di akhir pola dengan kondisi `WITHIN`, ini menunjukkan bahwa tidak ada event jenis tertentu yang tiba dalam periode waktu tertentu.

Tidak ada pencocokan

Contiguity dan greedy matching dalam pola berulang

Penting

Saat ini, CEP SQL tidak mendukung non-deterministic loose contiguity dalam pola berulang.

open source Flink CEP Java API memungkinkan Anda menentukan kebijakan contiguity dan greedy untuk mencocokkan pola berulang. Secara default, Apache Flink CEP SQL menggunakan kebijakan strict dan greedy. Misalnya, untuk pola A+, tidak boleh ada event lain di antara event A yang cocok, dan pola tersebut mencocokkan sebanyak mungkin event A. Untuk menentukan kebijakan contiguity dan greedy, Anda dapat menambahkan simbol ? setelah kuantifier, seperti *, +, atau {3, }.

Sebagai contoh, untuk urutan event a1, b1, a2, a3, c1 dengan kondisi A AS type = 'a', C AS type = 'a' or type = 'c', tabel berikut menunjukkan urutan pencocokan untuk berbagai pola.

Catatan

Selama proses pencocokan, klausa `AFTER MATCH` menggunakan kebijakan `SKIP TO NEXT ROW`. Untuk informasi lebih lanjut, lihat After Match Strategy.

Identifier

Contiguity

Greediness

Pola contoh

Semantik ekuivalen

Urutan pencocokan

None

Strict contiguity

Greedy

A+ C

A.oneOrMore().greedy().consecutive().next(C)

{a2 a3 c1}
{a3 c1}

?

Strict contiguity

Non-greedy

A+? C

A.oneOrMore().consecutive().next(C)

{a2 a3}
{a3 c1}

??

Loose contiguity

Greedy

A+?? C

A.oneOrMore().greedy().next(C)

{a1 a2 a3 c1}
{a2 a3 c1}
{a3 c1}

???

Loose contiguity

Non-greedy

A+??? C

A.oneOrMore().next(C)

{a1 a2 a3}
{a2 a3}
{a3 c1}

Menentukan kondisi berhenti untuk pola berulang (Until)

open source Flink CEP Java API menyediakan fungsi until(condition) untuk menentukan kondisi berhenti bagi pola berulang. Saat mencocokkan pola berulang, jika event saat ini memenuhi kondisi `until`, pencocokan untuk loop tersebut segera berhenti, dan proses melanjutkan pencocokan pola berikutnya mulai dari event tersebut. Dalam pekerjaan SQL Realtime Compute for Apache Flink, Anda dapat menggunakan sintaks { CONDITION } setelah kuantifier, seperti +, *, atau {3, }, untuk menyatakan semantik `until`.

Sebagai contoh, untuk urutan event a1, d1, a2, b1, a3, c1 dan kondisi pencocokan DEFINE A AS A.type = 'a' OR A.type = 'b', B AS B.type = 'b', C AS C.type = 'c', tabel berikut menunjukkan urutan pencocokan untuk berbagai pola:

Catatan

Selama proses pencocokan, kebijakan AFTER MATCH SKIP TO NEXT ROW digunakan. Untuk informasi lebih lanjut, lihat After Match Strategy.

Pola

Semantik ekuivalen

Urutan pencocokan

Deskripsi

A+ C

A.oneOrMore().consecutive().greedy().next(C)

a2 b1 a3 c1
b1 a3 c1
a3 c1

Event yang dimulai dengan `a` atau `b` dapat mencocokkan pola A. Strict contiguity diterapkan dalam pola A dan antara A dengan C. Karena `d1` berada di antara `a1` dan `a2`, pencocokan tidak dapat dimulai dari `a1`.

A+{B} C

A.oneOrMore().consecutive().greedy().until(B).next(C)

a3 c1

Kondisi `until B` ditambahkan ke pola berulang A. Strict contiguity tetap diterapkan antara A dan C. Karena pola berulang yang dimulai dari `a2` harus berakhir di `b1`, persyaratan strict contiguity dengan `c1` tidak dapat dipenuhi.

A+{B} {- X*? -} C

A.oneOrMore().consecutive().greedy().until(B).followedBy(C)

a2 c1
a3 c1

Loose contiguity diterapkan antara A dan C. Pola berulang yang dimulai dari `a2` berakhir di `b1`, lalu melewati `b1` dan `a3` untuk mencocokkan `c1`.

A+??{B} {- X*? -} C

A.oneOrMore().greedy().until(B).followedBy(C)

a1 a2 c1
a2 c1
a3 c1

Loose contiguity diterapkan dalam pola berulang A. Pola ini dapat melewati `d1`, berakhir di `b1`, dan mencocokkan `a1` serta `a2`.

Pola kelompok

open source Flink CEP Java API mendukung pola kelompok. Anda dapat menggabungkan beberapa pola menjadi satu kelompok dan menggunakan kelompok tersebut dalam fungsi seperti next(), followedBy(), dan followedByAny(). Anda juga dapat mengulang seluruh kelompok tersebut. Dalam pekerjaan SQL Realtime Compute for Apache Flink, Anda dapat menggunakan sintaks SQL standar (...) untuk mendefinisikan pola kelompok dan menerapkan kuantifier seperti +, *, dan {3, }.

Sebagai contoh, dalam pola PATTERN (A (B C*)+? D), (B C*) adalah pola kelompok. Simbol `+` menentukan bahwa kelompok tersebut diulang satu kali atau lebih, dan `?` menunjukkan pencocokan non-greedy. Kode Java yang sesuai adalah sebagai berikut:

Pattern.<String>begin("A").where(...)
  .next(
  	Pattern.<String>begin("B").where(...)
  		.next("C").where(...).oneOrMore().optional().greedy().consecutive())
  .oneOrMore().consecutive()
  .next("D").where(...)

Saat pencocokan berhasil, klausa MEASURES dapat digunakan untuk mengekstrak informasi spesifik dari hasil pencocokan untuk output. Sebagai contoh, asumsikan pola kelompok mencocokkan hasil b1, b2 c1, dan b3 c2 c3. Klausa `MEASURES` dapat mengambil hanya sebagian informasi ini untuk output, seperti yang ditunjukkan pada pernyataan SQL berikut. Jika Anda hanya tertarik pada event b, Anda dapat menggunakan FIRST(B.id) untuk mengambil `b` dari kelompok pertama yang cocok, FIRST(B.id,1) untuk mengambil `b` dari kelompok kedua, dan seterusnya. Outputnya adalah b1, b2, b3.

SELECT *
FROM MyTable MATCH_RECOGNIZE (
  ORDER BY rowtime
  MEASURES
    FIRST(B.id) AS b1_id,
    FIRST(B.id,1) AS b2_id,
    FIRST(B.id,2) AS b3_id
  PATTERN (A (B C*)+? D)
  DEFINE
    A AS type = 'A',
    B AS type = 'B',
    C AS type = 'C',
    D AS type = 'D'
) AS T

Perhatikan bahwa deklarasi contiguity antara pola kelompok dan pola sebelumnya berlaku untuk pola pertama dalam kelompok tersebut, bukan seluruh pola kelompok. Sebagai contoh, dalam PATTERN (A {- X*? -} (B C)), hubungan `followedBy` digunakan antara pola A dan pola kelompok (B C). Deklarasi ini sebenarnya mendefinisikan hubungan `followedBy` antara A dan B. Artinya, hubungan `followedBy` berlaku antara `A` dan `B`, memungkinkan event lain muncul di antara keduanya. Hubungan ini tidak berlaku antara `A` dan seluruh kelompok `(B C)`. Untuk urutan a1 b1 d1 b2 c1, pola ini tidak menghasilkan output. Hal ini karena setelah `b1` muncul, proses pencocokan langsung memasuki pola kelompok `(B C)`. Namun, `d1` tidak cocok dengan pola `C`, sehingga pencocokan urutan gagal.

Penting
  • Greedy matching tidak didukung untuk pola kelompok berulang, misalnya, PATTERN ((A B)+).

  • Pola kelompok tidak didukung dalam sintaks `until` dan `notNext`, misalnya, PATTERN (A+{(B C)}) dan PATTERN (A [^(B C)]).

  • Pola pertama dalam pola kelompok tidak boleh dideklarasikan sebagai opsional, misalnya, PATTERN (A (B? C)).

Kebijakan AFTER MATCH NO SKIP

Dalam Flink CEP Java API, kebijakan `AFTER MATCH` default adalah `NO_SKIP`. Dalam CEP SQL, kebijakan default adalah `SKIP_TO_NEXT_ROW`. Realtime Compute for Apache Flink memperluas pernyataan SQL `AFTER MATCH` standar sehingga Anda dapat menggunakan AFTER MATCH NO SKIP untuk mendeklarasikan kebijakan `NO_SKIP`. Dengan kebijakan ini, setelah pencocokan urutan selesai, prosedur pencocokan lain yang telah dimulai tidak dihentikan atau dibuang.

Kasus penggunaan umum kebijakan `NO_SKIP` adalah menggabungkannya dengan `followedByAny` untuk mencapai pencocokan yang lebih longgar dengan melewati beberapa event. Sebagai contoh, pertimbangkan urutan a1 b1 b2 b3 c1 dan pola PATTERN (A {- X* -} B {- Y*? -} C), yang setara dengan Pattern.begin("A").followedByAny("B").followedBy("C"). Jika menggunakan kebijakan default AFTER MATCH SKIP TO NEXT ROW, hasilnya adalah a1 b1 c1, karena saat `a1 b1 c1` dicocokkan, semua urutan lain yang dimulai dengan `a1` dibuang. Namun, dengan AFTER MATCH NO SKIP, Anda dapat memperoleh semua urutan pencocokan: a1 b1 c1, a1 b2 c1, dan a1 b3 c1.