You can remove duplicates by executing statements such as FIRST_VALUE, LAST_VALUE, and DISTINCT. This topic describes how to execute TopN statements to remove duplicates and describes deduplication considerations.
- Keep the first row of duplicate rows.
- Keep the last row of duplicate rows.
Syntax
ROW_NUMBER OVER WINDOW
statement of Flink SQL to remove duplicates. ROW_NUMBER OVER WINDOW
is similar to a TopN statement and can be considered as a special TopN statement.
For more information about TopN statements, see TopN. SELECT *
FROM (
SELECT *,
ROW_NUMBER() OVER ([PARTITION BY col1[, col2..]
ORDER BY timeAttributeCol [asc|desc]) AS rownum
FROM table_name)
WHERE rownum = 1;
ROW_NUMBER()
: calculates the number of a row. The row number starts from 1.PARTITION BY col1[, col2..]
: specifies one or more columns based on which partitioning is implemented. The specified partition key columns are used as the keys to remove duplicates. This parameter is optional.ORDER BY timeAttributeCol [asc|desc])
: specifies the column that is used for sorting data. This column must be a time attribute column. The time attribute can be processing time or event time. You can sort rows in ascending or descending order based on the time attribute. If you use the ascending order, the first row of duplicate rows is retained. If you use the descending order, the last row of duplicate rows is retained.- The value of
rownum
in the outer query must be 1 (= 1
) or greater than or equal to 1 (<= 1
). In the outer query, the logical operator must beAND
and you cannot use nondeterministic user-defined functions (UDFs).
- Subquery: calls the
ROW_NUMBER()
parameter that is used to sort data based on the time attribute column. - Outer query: keeps the first row of duplicate rows that have the same primary key
and removes the other duplicate rows. The following two sorting orders for the time
attribute column are available:
- Ascending:
deduplicate keep first row
. - Descending:
deduplicate keep last row
.
- Ascending:
Deduplicate Keep First Row
SELECT *
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY b ORDER BY proctime) as rowNum
FROM T
)
WHERE rowNum = 1;
In this example, the system removes duplicates in Table T based on Field b. The system
keeps the first row of duplicate rows based on the system time. In this example, proctime
is a field that has the processing time attribute. To remove deduplicates based on
the system time, you can also call the PROCTIME()
function instead of declaring the proctime filed.
Deduplicate Keep Last Row
SELECT *
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY b, d ORDER BY proctime DESC) as rowNum
FROM T
)
WHERE rowNum = 1;
FAQ
ROW_NUMBER() OVER (PARTITION BY b, d ORDER BY now() as time DESC)
is executed? java.lang.RuntimeException: Can not retract a non-existent record:
38c30001,1b800000008,1c000000013,85000035343a3731,5d304013.
This should never happen.
- Cause: The
now()
function in the code may result in this error. The TopN function does not support nondeterministic sorting fields. Thenow()
function is nondeterministic and returns a different value each time the function is invoked. Therefore, the previous value cannot be found during the retraction.Solution: Make sure that you use a deterministic time attribute field, such as a field of the event time attribute. You can also use a field of the processing time attribute in a source table as the deterministic time attribute field.
- Cause: The value of the
blink.state.ttl.ms
orstate.backend.niagara.ttl.ms
parameter is small.Solution: If the specified time-to-live (TTL) value is small, use the default value or increase the value.