TopN 文は、リアルタイムデータのインジケーターの最大または最小の N データレコードを計算するために使用されます。 Flink SQL では、TopN の計算を柔軟に実装するために、OVER
ウィンドウ関数を使用することができます。
構文
SELECT *
FROM (
SELECT *,
ROW_NUMBER() OVER ([PARTITION BY col1[, col2..]]
ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
FROM table_name)
WHERE rownum <= N [AND conditions]
-
ROW_NUMBER()
: 行数を計算するためのOVER
ウィンドウ関数を指定します。 値は 1 から始まります。 -
PARTITION BY col1[, col2..]
: パーティションに使用する列を指定します。 このパラメーターは省略可能です。 -
ORDER BY col1 [asc|desc][, col2 [asc|desc]...]
: ソートに使用する列と、各列のソート順を指定します。
上記の構文で示すように、TopN には 2 つのレベルのクエリが必要です。
- サブクエリでは、
ROW_NUMBER()
ウィンドウ関数を使用して、指定した列でデータをソートし、データをランク付けします。 - 外部クエリでは、ランキング一覧の最初の N 件のデータレコードのみが取得されます。 たとえば、N = 10 の場合、最初の 10 件のデータレコードが取得されます。
実行中に、Flink SQL はソートキーに基づいて入力データストリームをソートします。 パーティション内の最初の N 件のデータレコードが変更されると、更新されたデータは更新ストリームとしてダウンストリームに送信されます。
WHERE 条件の制約
Flink SQL で TopN クエリを識別できるようにするには、外部ループで rownum <= N
形式を使用して、最初の N 件のデータを指定します。 このときに rownum
を rownum - 5 <= N
のような表現では記載しないでください。 WHERE 条件には、他の条件を AND
で結合して含めることもできます。
例 1
次の例では、各キーワードが照会される回数は、時間と都市に基づいて算出されます。 次に、最も照会された上位 100 件のキーワードがエクスポートされます。 出力テーブルの時間、都市、およびランク付け列の組み合わせにより、一意のレコードが識別されます。 したがって、3 つの列を複合キーとして宣言する必要があります。 (また、キーを外部ストレージに設定する必要があります。)
CREATE TABLE rds_output (
rownum BIGINT,
start_time BIGINT,
city VARCHAR,
keyword VARCHAR,
pv BIGINT,
PRIMARY KEY (rownum, start_time, city)
) WITH (
type = 'rds',
...
)
INSERT INTO rds_output
SELECT rownum, start_time, city, keyword, pv
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY start_time, city ORDER BY pv desc) AS rownum
FROM (
SELECT SUBSTRING(time_str,1,12) AS start_time,
keyword,
count(1) AS pv,
city
FROM tmp_search
GROUP BY SUBSTRING(time_str,1,12), keyword, city
) a
)
WHERE rownum <= 100
例 2
- テストデータ
ip (VARCHAR) time (VARCHAR) 192.168.1.1
100000000 192.168.1.2
100000000 192.168.1.2
100000000 192.168.1.3
100030000 192.168.1.3
100000000 192.168.1.3
100000000 - テスト文
CREATE TABLE source_table ( IP VARCHAR, `TIME` VARCHAR )WITH( type='datahub', endPoint='xxxxxxx', project='xxxxxxx', topic='xxxxxxx', accessId='xxxxxxx', accessKey='xxxxxxx' ); CREATE TABLE result_table ( rownum BIGINT, start_time VARCHAR, IP VARCHAR, cc BIGINT, PRIMARY KEY (start_time, IP) ) WITH ( type = 'rds', url='xxxxxxx', tableName='blink_rds_test', userName='xxxxxxx', password='xxxxxxx' ); INSERT INTO result_table SELECT rownum,start_time,IP,cc FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY start_time ORDER BY cc DESC) AS rownum FROM ( SELECT SUBSTRING(`TIME`,1,2) AS start_time, -- You can specify a value based on the actual time. The data specified in this example is test data. COUNT(IP) AS cc, IP FROM source_table GROUP BY SUBSTRING(`TIME`,1,2), IP )a ) WHERE rownum <= 3 -- You can specify a value based on the number of data records you want to obtain. The data specified in this example is test data.
- テスト結果
rownum (BIGINT) start_time (VARCHAR) ip (VARCHAR) cc (BIGINT) 1 10 192.168.1.3
6 2 10 192.168.1.2
4 3 10 192.168.1.1
2
ランク付けなし
- ランク付けをしないことにより、データの肥大化問題を解決することができます。
-
データの肥大化問題
TopN 構文に基づいて、rownum
フィールドはプライマリキーの 1 つとして結果テーブルに書き込まれます。 これはデータの肥大化につながる可能性があります。 たとえば、更新後にレコードのランク付けが 9 位から 1 位に向上すると、1 位から 9 位にランク付けされたすべてのレコードが変更されます。 変更は結果テーブルで更新する必要があります。 その結果、データの肥大化が発生します。 結果テーブルは過剰な量のデータを受信することになるため、更新速度が低下する場合があります。 -
ランク付けなしの方法
データの肥大化を防ぐには、結果テーブルからrownum
を除外し、フロントエンドでrownum
を計算します。 通常は、上位 N 件のデータレコード量はあまり多くなく、フロントエンドで上位 100 件のデータレコードをすばやくソートすることができます。 この場合、更新後にレコードのランク付けが 9 位から 1 位に改善されても、そのレコードのみを配信するだけで済みます。 これにより、結果テーブルの更新速度が大幅に向上します。
-
- ランク付けなしの構文
SELECT col1, col2, col3 FROM ( SELECT col1, col2, col3 ROW_NUMBER() OVER ([PARTITION BY col1[, col2..]] ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum FROM table_name) WHERE rownum <= N [AND conditions]
構文は、元の TopN 構文と似ています。 必要なことは、rownum フィールドを外部クエリから除外するだけです。
注 rownum を除外する場合は、結果テーブルのプライマリキーの定義に特に注意してください。 定義が間違っていると、TopN クエリの結果も正しくないものとなります。 rownum を除外する場合、プライマリキーは、TopN 文の前の GROUP BY ノードのキーリストに含まれているものでなければなりません。 - ランク付けなしの例
こちらは、ビデオ業界の顧客のケースを単純化した例です。 各ビデオが配信されるときに大量のトラフィックが発生します。 ビデオトラフィックに基づいて、最も人気のあるビデオを特定できます。 次の例では、1 分あたりのトラフィックが最も多い上位 5 件の動画を特定します。
- テスト文
-- Read the original data storage table from Log Service. CREATE TABLE sls_cdnlog_stream ( vid VARCHAR, -- video id rowtime TIMESTAMP, -- Identify the time when the videos are watched. response_size BIGINT, -- Identify the traffic for watching the videos. WATERMARK FOR rowtime as withOffset(rowtime, 0) ) WITH ( type='sls', ... ); -- Compute the consumed bandwidth by video ID in the 1-minute window. CREATE VIEW cdnvid_group_view AS SELECT vid, TUMBLE_START(rowtime, INTERVAL '1' MINUTE) AS start_time, SUM(response_size) AS rss FROM sls_cdnlog_stream GROUP BY vid, TUMBLE(rowtime, INTERVAL '1' MINUTE); -- Create the result table. CREATE TABLE hbase_out_cdnvidtoplog ( vid VARCHAR, rss BIGINT, start_time VARCHAR, -- Do not store the rownum field in the result table. -- Pay special attention to the definition of the primary keys. The primary keys must be those in the key list at the GROUP BY node before the TopN statement. PRIMARY KEY(start_time, vid) ) WITH ( type='RDS', ... ); -- Identify and export the IDs of the top 5 videos that consume the most traffic per minute. INSERT INTO hbase_out_cdnvidtoplog -- The outer query cannot include the rownum field. SELECT vid, rss, start_time FROM ( SELECT vid, start_time, rss, ROW_NUMBER() OVER (PARTITION BY start_time ORDER BY rss DESC) as rownum, FROM cdnvid_group_view ) WHERE rownum <= 5;
- テストデータ
vid (VARCHAR) rowtime (Timestamp) response_size (BIGINT) 10000 2017-12-18 15:00:10
2000 10000 2017-12-18 15:00:15
4000 10000 2017-12-18 15:00:20
3000 10001 2017-12-18 15:00:20
3000 10002 2017-12-18 15:00:20
4000 10003 2017-12-18 15:00:20
1000 10004 2017-12-18 15:00:30
1000 10005 2017-12-18 15:00:30
5000 10006 2017-12-18 15:00:40
6000 10007 2017-12-18 15:00:50
8000 - テスト結果
start_time (VARCHAR) vid (VARCHAR) rss (BIGINT) 2017-12-18 15:00:00
10000 9000 2017-12-18 15:00:00
10007 8000 2017-12-18 15:00:00
10006 6000 2017-12-18 15:00:00
10005 5000 2017-12-18 15:00:00
10002 4000
- テスト文