You can deduplicate data by using multiple statements, such as FIRST_VALUE, LAST_VALUE, and DISTINCT. This topic describes how to use the TopN function to implement deduplication and relevant precautions.
- Keep the first value.
- Keep the last value.
ROW_NUMBER OVER WINDOWstatement of Flink SQL to implement deduplication. Similar to TopN statement,
ROW_NUMBER OVER WINDOWcan be regarded as a special TopN statement.
SELECT * FROM ( SELECT *, ROW_NUMBER() OVER ([PARTITION BY col1[, col2..] ORDER BY timeAttributeCol [asc|desc]) AS rownum FROM table_name) WHERE rownum = 1
ROW_NUMBER(): specifies an over window to compute the row number. The row number starts from 1.
PARTITION BY col1[, col2..]: specifies partition columns that store primary keys of duplicate data. This parameter is optional.
ORDER BY timeAttributeCol [asc|desc]): specifies the column used for sorting. The column must be a processing time or event time. You can sort data records in ascending order (keep first row) or descending order (keep last row) of the time attribute.
rownumin the outer query must be
equal to 1or
less than or equal to 1. The condition must be
AND, and undeterministic UDFs cannot exist.
- Subquery: calls the
ROW_NUMBER()window function that is used to sort data based on the time attribute column.
- Outer query: reserves the first data record under the specified primary key and removes
the rest duplicate data records. The time column can be sorted in ascending or descending
deduplicate keep first rowindicates the ascending order with the first data record reserved.
deduplicate keep last rowindicates the descending order with the first data record reserved.
Deduplication policy: deduplicate keep first row
SELECT * FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY b ORDER BY proctime) as rowNum FROM T ) WHERE rowNum = 1
In this example, Realtime Compute removes duplicate data records in table T based
on field b and reserves the first data record under the specified primary key based
on the system time. The proctime attribute indicates the processing time attribute.
Realtime Compute sorts data records in table T based on this attribute. To remove
duplicate data records based on the system time, you can also call the
PROCTIME function instead of declaring the proctime attribute.
Deduplication policy: deduplicate keep last row
SELECT * FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY b, d ORDER BY proctime DESC) as rowNum FROM T ) WHERE rowNum = 1
ROW_NUMBER() OVER (PARTITION BY b, d ORDER BY now() as time DESC)is executed?
java.lang.RuntimeException: Can not retract a non-existent record: 38c30001,1b800000008,1c000000013,85000035343a3731,1d80000008d,2680000000c,8400000073616173,8500003130303333,8600a6bae9a7a4e5,0,0,27800000011,2900000001c,85000069616d6164,2b00000000c,0,0,8100000000000059,2c00000000a,2d000000016,0,0,0,2e800000016,3000000000c,3100000000c,32000000011,8765636976726573,3380000000c,8500004554554f52,3480000000c,3580000000a,8600a6bae9a7a4e5,2,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,5d304013. This should never happen.
now()function in the code is undeterministic.
The TopN function does not support undeterministic sorting fields. The output values of the
now()function are different because it is undeterministic. As a result, the previous value cannot be found in the retraction. Make sure that you use deterministic time fields such as the event time or a field of the processing time attribute in a source table.
- The value of the
state.backend.niagara.ttl.msparameter is too small.
Comment out the TTL parameter whose value is too small so that its default value can take effect, or set this parameter to a larger value.