You can deduplicate data by using multiple statements, such as FIRST_VALUE, LAST_VALUE, and DISTINCT. This topic describes how to use the TopN function to implement deduplication and relevant precautions.

You can deduplicate data in any of the following two ways:
  • Keep the first value.
  • Keep the last value.
Note The time attributes obtained after ORDER BY is used must be defined in the source table.

Syntax

Flink SQL does not support deduplication statements. Therefore, Realtime Compute uses the ROW_NUMBER OVER WINDOW statement of Flink SQL to implement deduplication. Similar to TopN statement, ROW_NUMBER OVER WINDOW can be regarded as a special TopN statement.
SELECT *
FROM (
   SELECT *,
    ROW_NUMBER() OVER ([PARTITION BY col1[, col2..]
     ORDER BY timeAttributeCol [asc|desc]) AS rownum
   FROM table_name)
WHERE rownum = 1
Parameters:
  • ROW_NUMBER(): specifies an over window to compute the row number. The row number starts from 1.
  • PARTITION BY col1[, col2..]: specifies partition columns that store primary keys of duplicate data. This parameter is optional.
  • ORDER BY timeAttributeCol [asc|desc]): specifies the column used for sorting. The column must be a processing time or event time. You can sort data records in ascending order (keep first row) or descending order (keep last row) of the time attribute.
  • rownum in the outer query must be equal to 1 or less than or equal to 1. The condition must be AND, and undeterministic UDFs cannot exist.
Based on the preceding syntax, deduplication involves two-level queries:
  • Subquery: calls the ROW_NUMBER() window function that is used to sort data based on the time attribute column.
  • Outer query: reserves the first data record under the specified primary key and removes the rest duplicate data records. The time column can be sorted in ascending or descending order. deduplicate keep first row indicates the ascending order with the first data record reserved. deduplicate keep last row indicates the descending order with the first data record reserved.
If the column of the processing time attribute is used for sorting, Flink SQL removes duplicate data records based on the system time. In this case, the result may change each time the function is executed. If the column of the event time attribute is used for sorting, Flink SQL removes duplicate data records based on the business time. In this case, the result remains unchanged each time the function is executed.

Deduplication policy: deduplicate keep first row

If you select this policy, Realtime Compute reserves the first data record under the specified primary key and discards the rest duplicate data records. Only the key data is stored in the state data. This provides better performance. An example is as follows:
SELECT *
FROM (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY b ORDER BY proctime) as rowNum
  FROM T
)
WHERE rowNum = 1

In this example, Realtime Compute removes duplicate data records in table T based on field b and reserves the first data record under the specified primary key based on the system time. The proctime attribute indicates the processing time attribute. Realtime Compute sorts data records in table T based on this attribute. To remove duplicate data records based on the system time, you can also call the PROCTIME function instead of declaring the proctime attribute.

Note In a version later than Blink 3.3.1, the policy of deduplicate keep first row allows you to open a window by using the event time attribute without triggering a retraction.

Deduplication policy: deduplicate keep last row

Notice This policy does not allow you to open a window by using the event time attribute.
This policy is also used for deduplication, and it only keeps the last data record under the specified primary key. Its performance is slightly better than the LAST_VALUE function. An example is as follows:
SELECT *
FROM (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY b, d ORDER BY proctime DESC) as rowNum
  FROM T
)
WHERE rowNum = 1

FAQ

Q: What do I do if an error occurs when ROW_NUMBER() OVER (PARTITION BY b, d ORDER BY now() as time DESC) is executed?
java.lang.RuntimeException: Can not retract a non-existent record:
    38c30001,1b800000008,1c000000013,85000035343a3731,1d80000008d,2680000000c,8400000073616173,8500003130303333,8600a6bae9a7a4e5,0,0,27800000011,2900000001c,85000069616d6164,2b00000000c,0,0,8100000000000059,2c00000000a,2d000000016,0,0,0,2e800000016,3000000000c,3100000000c,32000000011,8765636976726573,3380000000c,8500004554554f52,3480000000c,3580000000a,8600a6bae9a7a4e5,2,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,5d304013.
    This should never happen.
A: This error may be caused due to the following two reasons:
  • The now() function in the code is undeterministic.

    The TopN function does not support undeterministic sorting fields. The output values of the now() function are different because it is undeterministic. As a result, the previous value cannot be found in the retraction. Make sure that you use deterministic time fields such as the event time or a field of the processing time attribute in a source table.

  • The value of the blink.state.ttl.ms or state.backend.niagara.ttl.ms parameter is too small.

    Comment out the TTL parameter whose value is too small so that its default value can take effect, or set this parameter to a larger value.