A TopN clause is used to compute the largest or smallest N data records of a metric in real-time data. Flink SQL uses an OVER window function to flexibly implement TopN computing.
Syntax
SELECT *
FROM (
SELECT *,
ROW_NUMBER() OVER ([PARTITION BY col1[, col2..]]
ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
FROM table_name)
WHERE rownum <= N [AND conditions]
ROW_NUMBER()
: specifies anOVER
window function to compute the row number. The value starts from 1.PARTITION BY col1[, col2..]
: specifies one or more partitioning columns. This parameter is optional.ORDER BY col1 [asc|desc][, col2 [asc|desc]...]
: specifies the columns based on which you want to sort data and the sorting order of each column.
- In the subquery, the
ROW_NUMBER()
window function is used to sort data based on the specified columns and mark the data with rankings. - In the outer query, only the first N data records in the ranking list are obtained. For example, if N is set to 10, the first 10 data records are obtained.
During the execution, Flink SQL sorts an input data stream based on the sort key. If the first N data records in a partition are changed, the updated data is sent downstream as an update stream.
WHERE
To enable Flink SQL to identify a TopN query, use rownum <= N
in the outer query to specify the first N records. Do not place rownum
in an expression, for example, rownum - 5 <= N
. If you specify multiple conditions in the WHERE clause, you must use AND
to join the conditions.
Example 1
You can use the following example to return the top 100 keywords that are queried the most in each city within a specific hour. The hour, city, and ranking columns in the output table identify a unique record. Therefore, you must declare the three columns as a composite key. The key must also be configured in the external storage.
CREATE TABLE rds_output (
rownum BIGINT,
start_time BIGINT,
city VARCHAR,
keyword VARCHAR,
pv BIGINT,
PRIMARY KEY (rownum, start_time, city)
) WITH (
type = 'rds',
...
)
INSERT INTO rds_output
SELECT rownum, start_time, city, keyword, pv
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY start_time, city ORDER BY pv desc) AS rownum
FROM (
SELECT SUBSTRING(time_str,1,12) AS start_time,
keyword,
count(1) AS pv,
city
FROM tmp_search
GROUP BY SUBSTRING(time_str,1,12), keyword, city
) a
) t
WHERE rownum <= 100
Example 2
- Test data
ip (VARCHAR) time (VARCHAR) 192.168.1.1
100,000,000 192.168.1.2
100,000,000 192.168.1.2
100,000,000 192.168.1.3
100,030,000 192.168.1.3
100,000,000 192.168.1.3
100,000,000 - Test statements
CREATE TABLE source_table ( IP VARCHAR, `TIME` VARCHAR )WITH( type='datahub', endPoint='<yourEndpoint>', project='<yourProjectName>', topic='<yourTopicName>', accessId='<yourAccessId>', accessKey='<yourAccessSecret>' ); CREATE TABLE result_table ( rownum BIGINT, start_time VARCHAR, IP VARCHAR, cc BIGINT, PRIMARY KEY (start_time, IP) ) WITH ( type = 'rds', url='<yourDatabaseAddress>', tableName='blink_rds_test', userName='<yourDatabaseUserName>', password='<yourDatabasePassword>' ); INSERT INTO result_table SELECT rownum,start_time,IP,cc FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY start_time ORDER BY cc DESC) AS rownum FROM ( SELECT SUBSTRING(`TIME`,1,2) AS start_time,-- You can specify a value based on the actual time. The data specified here is an example. COUNT(IP) AS cc, IP FROM source_table GROUP BY SUBSTRING(`TIME`,1,2), IP )a ) t WHERE rownum <= 3 -- You can specify a value based on the number of data records you want to obtain. The data specified here is an example.
- Test results
rownum (BIGINT) start_time (VARCHAR) ip (VARCHAR) cc (BIGINT) 1 10 192.168.1.3
3 2 10 192.168.1.2
2 3 10 192.168.1.1
1
No ranking number optimization
- You can use no ranking number optimization to solve the data bloat issue.
-
Data bloat
Based on the TopN syntax, therownum
field is written into a result table as one of its primary keys. This may lead to data bloat. For example, if the ranking of a data record rises from the ninth to the first after a data update, the rankings of the records from the first to the ninth all change. The changes must be updated in the result table. This results in data bloat. The data update in the result table may slow down because an excessive volume of data is received. -
Method of no ranking number optimization
To avoid data bloat, removerownum
from the result table and computerownum
at the front end. The volume of TopN data records is not large, so the top 100 data records can be obtained quickly at the front end. In this case, if the ranking of a data record rises from the ninth to the first after an update, only this record needs to be delivered to the result table. This accelerates data update in the result table.
-
- Syntax
SELECT col1, col2, col3 FROM ( SELECT col1, col2, col3 ROW_NUMBER() OVER ([PARTITION BY col1[, col2..]] ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum FROM table_name) WHERE rownum <= N [AND conditions]
The syntax is similar to the original TopN syntax. You only need to remove the rownum field from the outer query.
Note If the rownum field is removed, pay attention to the definition of the primary keys in the result table. If the definition is incorrect, the TopN query result is incorrect. The primary keys must be defined in the key list at the GROUP BY node before the TopN clause. - Example
This example is a case from a customer in the video industry. Heavy traffic is generated each time a video is distributed. You can identify the most popular videos based on the video traffic. The following example identifies the top 5 videos that generate the most traffic per minute.
- Test statements
-- Read the original data storage table from Log Service. CREATE TABLE sls_cdnlog_stream ( vid VARCHAR, -- The video ID. rowtime TIMESTAMP, -- The time when the video was watched. response_size BIGINT, -- The traffic generated for watching the video. WATERMARK FOR rowtime as withOffset(rowtime, 0) ) WITH ( type='sls', ... ); -- Compute the consumed bandwidth by video ID in a 1-minute window. CREATE VIEW cdnvid_group_view AS SELECT vid, TUMBLE_START(rowtime, INTERVAL '1' MINUTE) AS start_time, SUM(response_size) AS rss FROM sls_cdnlog_stream GROUP BY vid, TUMBLE(rowtime, INTERVAL '1' MINUTE); -- Create a result table. CREATE TABLE hbase_out_cdnvidtoplog ( vid VARCHAR, rss BIGINT, start_time VARCHAR, -- Do not store the rownum field in the result table. -- Pay attention to the definition of the primary keys. The primary keys must be defined in the key list at the GROUP BY node before the TopN clause. PRIMARY KEY(start_time, vid) ) WITH ( type='RDS', ... ); -- Identify and export the IDs of the top 5 videos that generate the most traffic per minute. INSERT INTO hbase_out_cdnvidtoplog -- Do not include the rownum field in the outer query. SELECT vid, rss, start_time FROM ( SELECT vid, start_time, rss, ROW_NUMBER() OVER (PARTITION BY start_time ORDER BY rss DESC) as rownum FROM cdnvid_group_view ) WHERE rownum <= 5;
- Test data
vid (VARCHAR) rowtime (TIMESTAMP) response_size (BIGINT) 10000 2017-12-18 15:00:10
2000 10000 2017-12-18 15:00:15
4000 10000 2017-12-18 15:00:20
3000 10001 2017-12-18 15:00:20
3000 10002 2017-12-18 15:00:20
4000 10003 2017-12-18 15:00:20
1000 10004 2017-12-18 15:00:30
1000 10005 2017-12-18 15:00:30
5000 10006 2017-12-18 15:00:40
6000 10007 2017-12-18 15:00:50
8000 - Test results
start_time (VARCHAR) vid (VARCHAR) rss (BIGINT) 2017-12-18 15:00:00
10000 9000 2017-12-18 15:00:00
10007 8000 2017-12-18 15:00:00
10006 6000 2017-12-18 15:00:00
10005 5000 2017-12-18 15:00:00
10002 4000
- Test statements