Topik ini menjelaskan kueri yang didukung oleh Realtime Compute for Apache Flink.
Realtime Compute for Apache Flink kompatibel dengan kueri asli dari Apache Flink. Bentuk Backus-Naur (BNF) berikut menggambarkan superset dari kueri SQL streaming dan batch yang didukung:
query:
values
| WITH withItem [ , withItem ]* query
| {
select
| selectWithoutFrom
| query UNION [ ALL ] query
| query EXCEPT query
| query INTERSECT query
}
[ ORDER BY orderItem [, orderItem ]* ]
[ LIMIT { count | ALL } ]
[ OFFSET start { ROW | ROWS } ]
[ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ONLY]
withItem:
name
[ '(' column [, column ]* ')' ]
AS '(' query ')'
orderItem:
expression [ ASC | DESC ]
select:
SELECT [ ALL | DISTINCT ]
{ * | projectItem [, projectItem ]* }
FROM tableExpression
[ WHERE booleanExpression ]
[ GROUP BY { groupItem [, groupItem ]* } ]
[ HAVING booleanExpression ]
[ WINDOW windowName AS windowSpec [, windowName AS windowSpec ]* ]
selectWithoutFrom:
SELECT [ ALL | DISTINCT ]
{ * | projectItem [, projectItem ]* }
projectItem:
expression [ [ AS ] columnAlias ]
| tableAlias . *
tableExpression:
tableReference [, tableReference ]*
| tableExpression [ NATURAL ] [ LEFT | RIGHT | FULL ] JOIN tableExpression [ joinCondition ]
joinCondition:
ON booleanExpression
| USING '(' column [, column ]* ')'
tableReference:
tablePrimary
[ matchRecognize ]
[ [ AS ] alias [ '(' columnAlias [, columnAlias ]* ')' ] ]
tablePrimary:
[ TABLE ] tablePath [ dynamicTableOptions ] [systemTimePeriod] [[AS] correlationName]
| LATERAL TABLE '(' functionName '(' expression [, expression ]* ')' ')'
| [ LATERAL ] '(' query ')'
| UNNEST '(' expression ')'
tablePath:
[ [ catalogName . ] databaseName . ] tableName
systemTimePeriod:
FOR SYSTEM_TIME AS OF dateTimeExpression
dynamicTableOptions:
/*+ OPTIONS(key=val [, key=val]*) */
key:
stringLiteral
val:
stringLiteral
values:
VALUES expression [, expression ]*
groupItem:
expression
| '(' ')'
| '(' expression [, expression ]* ')'
| CUBE '(' expression [, expression ]* ')'
| ROLLUP '(' expression [, expression ]* ')'
| GROUPING SETS '(' groupItem [, groupItem ]* ')'
windowRef:
windowName
| windowSpec
windowSpec:
[ windowName ]
'('
[ ORDER BY orderItem [, orderItem ]* ]
[ PARTITION BY expression [, expression ]* ]
[
RANGE numericOrIntervalExpression {PRECEDING}
| ROWS numericExpression {PRECEDING}
]
')'
matchRecognize:
MATCH_RECOGNIZE '('
[ PARTITION BY expression [, expression ]* ]
[ ORDER BY orderItem [, orderItem ]* ]
[ MEASURES measureColumn [, measureColumn ]* ]
[ ONE ROW PER MATCH ]
[ AFTER MATCH
( SKIP TO NEXT ROW
| SKIP PAST LAST ROW
| SKIP TO FIRST variable
| SKIP TO LAST variable
| SKIP TO variable )
]
PATTERN '(' pattern ')'
[ WITHIN intervalLiteral ]
DEFINE variable AS condition [, variable AS condition ]*
')'
measureColumn:
expression AS alias
pattern:
patternTerm [ '|' patternTerm ]*
patternTerm:
patternFactor [ patternFactor ]*
patternFactor:
variable [ patternQuantifier ]
patternQuantifier:
'*'
| '*?'
| '+'
| '+?'
| '?'
| '??'
| '{' { [ minRepeat ], [ maxRepeat ] } '}' ['?']
| '{' repeat '}'Pengenal
Mirip dengan Java, Flink SQL menggunakan kebijakan sintaksis berikut untuk pengenal seperti nama tabel, nama kolom, dan nama fungsi:
Definisi pengenal bersifat peka huruf besar/kecil, terlepas apakah diapit tanda backticks (`).
Pencocokan pengenal bersifat peka huruf besar/kecil.
Berbeda dengan Java, Flink SQL mengizinkan karakter non-alfanumerik dalam pengenal. Contoh:
SELECT a AS `my field` FROM tKonstanta string
Konstanta string harus diapit tanda kutip tunggal (') bukan tanda kutip ganda ("). Contoh:
SELECT 'Hello World' Duplikasi tanda kutip tunggal (') dalam string digunakan untuk pelolosan. Contoh:
Flink SQL> SELECT 'Hello World', 'It''s me';
+-------------+---------+
| EXPR$0 | EXPR$1 |
+-------------+---------+
| Hello World | It's me |
+-------------+---------+
1 row in setNilai Unicode didukung dalam konstanta string. Untuk menyertakan nilai Unicode dalam konstanta string, gunakan salah satu metode berikut:
Gunakan karakter pelolos default dengan menambahkan garis miring terbalik (\) sebelum nilai Unicode.
SELECT U&'\263A'Gunakan karakter pelolos kustom. Contoh:
SELECT U&'#263A' UESCAPE '#' -- Tanda pagar (#) digunakan sebagai karakter pelolos.
Tabel berikut menjelaskan kueri yang didukung oleh Apache Flink 1.15 dan memberikan referensi yang sesuai.
Untuk melihat referensi versi lain dari Apache Flink, Anda dapat beralih ke versi tersebut di situs web Apache Flink.
Kueri | Referensi |
Petunjuk | |
Klausa WITH | |
Klausa SELECT dan WHERE | |
SELECT DISTINCT | |
Fungsi jendela | |
Agregasi jendela | |
Agregasi grup | |
Agregasi over | |
Gabung | |
Gabungan jendela | |
Operasi himpunan | |
Klausa ORDER BY | |
Klausa LIMIT | |
Top-N | |
Top-N Jendela | |
Penghapusan duplikat | |
Penghapusan duplikat jendela | |
Pengenalan pola |
Ekseskusi Kueri
Dalam mode streaming, aliran masukan dapat dikategorikan menjadi aliran pembaruan dan aliran non-pembaruan. Aliran non-pembaruan hanya berisi peristiwa jenis INSERT, sedangkan aliran pembaruan mencakup jenis peristiwa lainnya. Sebagai contoh, sumber change data capture (CDC) menghasilkan aliran pembaruan dari sistem eksternal. Operasi tertentu dalam Flink, seperti agregasi grup dan komputasi Top-N, juga menghasilkan peristiwa pembaruan. Dalam banyak kasus, operasi yang menghasilkan peristiwa pembaruan dilakukan oleh operator stateful. Operator stateful menggunakan state untuk mengelola pembaruan. Namun, tidak semua operator stateful dapat mengonsumsi aliran pembaruan. Sebagai contoh, operator untuk agregasi over dan gabungan interval tidak mendukung aliran pembaruan sebagai input.
Tabel berikut menjelaskan informasi waktu proses dari setiap kueri yang didukung, termasuk nama operator, apakah operator bersifat stateful, apakah operator dapat mengonsumsi aliran pembaruan, dan apakah peristiwa pembaruan dihasilkan. Informasi ini berlaku untuk Ververica Runtime (VVR) 6.0.X dan versi lebih baru.
Kueri | Operator waktu proses | Menggunakan data state | Mengonsumsi aliran pembaruan | Menghasilkan peristiwa pembaruan | Catatan |
SELECT dan WHERE | Calc | Tidak | Ya | Tidak | N/A |
Lookup Join | LookupJoin | Tidak* | Ya | Tidak | Untuk VVR 8.0.1 atau lebih baru, jika Anda mengatur parameter table.optimizer.non-deterministic-update.strategy ke TRY_RESOLVE dan masalah non-deterministik terdeteksi dalam kueri, data state secara otomatis digunakan untuk menyelesaikan masalah tersebut. Jika Anda ingin menonaktifkan penggunaan data state, atur parameter table.optimizer.non-deterministic-update.strategy ke IGNORE. Perhatikan bahwa setelah Anda mengubah nilai parameter ini, ketidaksesuaian mungkin terjadi. Dalam hal ini, Anda harus mengeksekusi ulang kueri. |
Fungsi Tabel | Correlate | Tidak | Ya | Tidak | N/A |
SELECT DISTINCT | GroupAggregate | Ya | Ya | Ya | N/A |
Agregasi grup | GroupAggregate LocalGroupAggregate GlobalGroupAggregate IncrementalGroupAggregate | Ya* | Ya | Ya | Operator pra-agregasi LocalGroupAggregate tidak menggunakan data state. |
Agregasi over | OverAggregate | Ya | Tidak | Tidak | N/A |
Agregasi jendela | GroupWindowAggregate WindowAggregate LocalWindowAggregate GlobalWindowAggregate | Ya* | Ya* | Tidak* |
|
Gabungan pada dua aliran (Regular join) | Join | Ya | Ya | Ya* | Jika Anda menggunakan outer join, seperti LEFT JOIN, RIGHT JOIN, atau FULL OUTER JOIN, peristiwa pembaruan dihasilkan. |
Interval Join | IntervalJoin | Ya | Tidak | Tidak | N/A |
Temporal Join | TemporalJoin | Ya | Ya | Tidak | N/A |
Window Join | WindowJoin | Ya | Tidak | Tidak | N/A |
Top-N | Rank | Ya | Ya | Ya | Kueri Top-N tidak mendukung peringkat berdasarkan waktu pemrosesan. Anda dapat menggunakan fungsi bawaan lainnya untuk peringkat, seperti CURRENT_TIMESTAMP. Peringatan Jika Anda menentukan bidang waktu pemrosesan dalam klausa ORDER BY dari kueri Top-N, kesalahan data mungkin terjadi. Masalah ini tidak dilaporkan selama pemeriksaan sintaksis di VVR 8.0.7 atau lebih lama. Kami sarankan Anda menggunakan fungsi bawaan lainnya, seperti CURRENT_TIMESTAMP. |
Top-N Jendela | WindowRank | Ya | Tidak | Tidak | N/A |
Penghapusan duplikat | Deduplicate | Ya | Tidak | Ya* | Jika Anda menggunakan kebijakan Deduplicate Keep FirstRow untuk menghapus duplikat berdasarkan waktu pemrosesan (Proctime), tidak ada peristiwa pembaruan yang dihasilkan. |
Penghapusan duplikat jendela | WindowDeduplicate | Ya | Tidak | Tidak | N/A |
Operator tanpa state hanya meneruskan jenis peristiwa, memastikan bahwa peristiwa keluaran memiliki jenis yang sama dengan peristiwa masukan. Operator tanpa state tidak menghasilkan peristiwa pembaruan, terlepas dari apakah inputnya adalah aliran pembaruan.