TopN 文は、リアルタイムデータのインジケーターの最大または最小の N データレコードを計算するために使用されます。 Flink SQL では、TopN の計算を柔軟に実装するために、OVER ウィンドウ関数を使用することができます。

構文

SELECT *
FROM (
  SELECT *,
    ROW_NUMBER() OVER ([PARTITION BY col1[, col2..]]
    ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
  FROM table_name)
WHERE rownum <= N [AND conditions]
  • ROW_NUMBER(): 行数を計算するための OVER ウィンドウ関数を指定します。 値は 1 から始まります。
  • PARTITION BY col1[, col2..]: パーティションに使用する列を指定します。 このパラメーターは省略可能です。
  • ORDER BY col1 [asc|desc][, col2 [asc|desc]...]: ソートに使用する列と、各列のソート順を指定します。

上記の構文で示すように、TopN には 2 つのレベルのクエリが必要です。

  • サブクエリでは、ROW_NUMBER() ウィンドウ関数を使用して、指定した列でデータをソートし、データをランク付けします。
  • 外部クエリでは、ランキング一覧の最初の N 件のデータレコードのみが取得されます。 たとえば、N = 10 の場合、最初の 10 件のデータレコードが取得されます。

実行中に、Flink SQL はソートキーに基づいて入力データストリームをソートします。 パーティション内の最初の N 件のデータレコードが変更されると、更新されたデータは更新ストリームとしてダウンストリームに送信されます。

したがって、TopN データを外部ストレージにエクスポートする場合は、ターゲットの結果テーブルにプライマリキーが含まれている必要があります。

WHERE 条件の制約

Flink SQL で TopN クエリを識別できるようにするには、外部ループで rownum <= N 形式を使用して、最初の N 件のデータを指定します。 このときに rownumrownum - 5 <= N のような表現では記載しないでください。 WHERE 条件には、他の条件を AND で結合して含めることもできます。

例 1

次の例では、各キーワードが照会される回数は、時間と都市に基づいて算出されます。 次に、最も照会された上位 100 件のキーワードがエクスポートされます。 出力テーブルの時間、都市、およびランク付け列の組み合わせにより、一意のレコードが識別されます。 したがって、3 つの列を複合キーとして宣言する必要があります。 (また、キーを外部ストレージに設定する必要があります。)

CREATE TABLE rds_output (
  rownum BIGINT,
  start_time BIGINT,
  city VARCHAR,
  keyword VARCHAR,
  pv BIGINT,
  PRIMARY KEY (rownum, start_time, city)
) WITH (
  type = 'rds',
  ...
)

INSERT INTO rds_output
SELECT rownum, start_time, city, keyword, pv
FROM (
  SELECT *,
     ROW_NUMBER() OVER (PARTITION BY start_time, city ORDER BY pv desc) AS rownum
  FROM (
        SELECT SUBSTRING(time_str,1,12) AS start_time, 
            keyword,
            count(1) AS pv,
            city
        FROM tmp_search
        GROUP BY SUBSTRING(time_str,1,12), keyword, city
    ) a 
) 
WHERE rownum <= 100

例 2

  • テストデータ
    ip (VARCHAR) time (VARCHAR)
    192.168.1.1 100000000
    192.168.1.2 100000000
    192.168.1.2 100000000
    192.168.1.3 100030000
    192.168.1.3 100000000
    192.168.1.3 100000000
  • テスト文
    CREATE TABLE source_table (
      IP VARCHAR,
      `TIME` VARCHAR 
    )WITH(
      type='datahub',
      endPoint='xxxxxxx',
      project='xxxxxxx',
      topic='xxxxxxx',
      accessId='xxxxxxx',
      accessKey='xxxxxxx'
    );
    
    CREATE TABLE result_table (
      rownum BIGINT,
      start_time VARCHAR,
      IP VARCHAR,
      cc BIGINT,
      PRIMARY KEY (start_time, IP)
    ) WITH (
      type = 'rds',
      url='xxxxxxx',
      tableName='blink_rds_test',
      userName='xxxxxxx',
      password='xxxxxxx'
    );
    INSERT INTO result_table
    SELECT rownum,start_time,IP,cc
    FROM (
      SELECT *,
         ROW_NUMBER() OVER (PARTITION BY start_time ORDER BY cc DESC) AS rownum
      FROM (
            SELECT SUBSTRING(`TIME`,1,2) AS start_time, -- You can specify a value based on the actual time. The data specified in this example is test data.
            COUNT(IP) AS cc,
            IP
            FROM  source_table
            GROUP BY SUBSTRING(`TIME`,1,2), IP
        )a
    ) 
    WHERE rownum <= 3 -- You can specify a value based on the number of data records you want to obtain. The data specified in this example is test data.
    
    
    
  • テスト結果
    rownum (BIGINT) start_time (VARCHAR) ip (VARCHAR) cc (BIGINT)
    1 10 192.168.1.3 6
    2 10 192.168.1.2 4
    3 10 192.168.1.1 2

ランク付けなし

  • ランク付けをしないことにより、データの肥大化問題を解決することができます。
    • データの肥大化問題

      TopN 構文に基づいて、rownum フィールドはプライマリキーの 1 つとして結果テーブルに書き込まれます。 これはデータの肥大化につながる可能性があります。 たとえば、更新後にレコードのランク付けが 9 位から 1 位に向上すると、1 位から 9 位にランク付けされたすべてのレコードが変更されます。 変更は結果テーブルで更新する必要があります。 その結果、データの肥大化が発生します。 結果テーブルは過剰な量のデータを受信することになるため、更新速度が低下する場合があります。
    • ランク付けなしの方法

      データの肥大化を防ぐには、結果テーブルから rownum を除外し、フロントエンドで rownum を計算します。 通常は、上位 N 件のデータレコード量はあまり多くなく、フロントエンドで上位 100 件のデータレコードをすばやくソートすることができます。 この場合、更新後にレコードのランク付けが 9 位から 1 位に改善されても、そのレコードのみを配信するだけで済みます。 これにより、結果テーブルの更新速度が大幅に向上します。
  • ランク付けなしの構文
    SELECT col1, col2, col3
    FROM (
     SELECT col1, col2, col3
       ROW_NUMBER() OVER ([PARTITION BY col1[, col2..]]
       ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
     FROM table_name)
    WHERE rownum <= N [AND conditions]
    
    

    構文は、元の TopN 構文と似ています。 必要なことは、rownum フィールドを外部クエリから除外するだけです。

    rownum を除外する場合は、結果テーブルのプライマリキーの定義に特に注意してください。 定義が間違っていると、TopN クエリの結果も正しくないものとなります。 rownum を除外する場合、プライマリキーは、TopN 文の前の GROUP BY ノードのキーリストに含まれているものでなければなりません。
  • ランク付けなしの例

    こちらは、ビデオ業界の顧客のケースを単純化した例です。 各ビデオが配信されるときに大量のトラフィックが発生します。 ビデオトラフィックに基づいて、最も人気のあるビデオを特定できます。 次の例では、1 分あたりのトラフィックが最も多い上位 5 件の動画を特定します。

    • テスト文
      -- Read the original data storage table from Log Service.
      CREATE TABLE sls_cdnlog_stream (
      vid VARCHAR, -- video id
      rowtime TIMESTAMP, -- Identify the time when the videos are watched.
      response_size BIGINT, -- Identify the traffic for watching the videos.
      WATERMARK FOR rowtime as withOffset(rowtime, 0)
      ) WITH (
      type='sls',
      ...
      );
      
      -- Compute the consumed bandwidth by video ID in the 1-minute window.
      CREATE VIEW cdnvid_group_view AS
      SELECT vid,
      TUMBLE_START(rowtime, INTERVAL '1' MINUTE) AS start_time,
      SUM(response_size) AS rss
      FROM sls_cdnlog_stream
      GROUP BY vid, TUMBLE(rowtime, INTERVAL '1' MINUTE);
      
      -- Create the result table.
      CREATE TABLE hbase_out_cdnvidtoplog (
      vid VARCHAR,
      rss BIGINT,
      start_time VARCHAR,
         -- Do not store the rownum field in the result table.
         -- Pay special attention to the definition of the primary keys. The primary keys must be those in the key list at the GROUP BY node before the TopN statement.
      PRIMARY KEY(start_time, vid)
      ) WITH (
      type='RDS',
      ...
      );
      
      -- Identify and export the IDs of the top 5 videos that consume the most traffic per minute.
      INSERT INTO hbase_out_cdnvidtoplog
      
      -- The outer query cannot include the rownum field.
      SELECT vid, rss, start_time FROM
      (
      SELECT
      vid, start_time, rss,
      ROW_NUMBER() OVER (PARTITION BY start_time ORDER BY rss DESC) as rownum,
      FROM
      cdnvid_group_view
      )
      WHERE rownum <= 5;
      
      
    • テストデータ
      vid (VARCHAR) rowtime (Timestamp) response_size (BIGINT)
      10000 2017-12-18 15:00:10 2000
      10000 2017-12-18 15:00:15 4000
      10000 2017-12-18 15:00:20 3000
      10001 2017-12-18 15:00:20 3000
      10002 2017-12-18 15:00:20 4000
      10003 2017-12-18 15:00:20 1000
      10004 2017-12-18 15:00:30 1000
      10005 2017-12-18 15:00:30 5000
      10006 2017-12-18 15:00:40 6000
      10007 2017-12-18 15:00:50 8000
    • テスト結果
      start_time (VARCHAR) vid (VARCHAR) rss (BIGINT)
      2017-12-18 15:00:00 10000 9000
      2017-12-18 15:00:00 10007 8000
      2017-12-18 15:00:00 10006 6000
      2017-12-18 15:00:00 10005 5000
      2017-12-18 15:00:00 10002 4000