This topic describes how to use the dirty data collector in Flink CDC data ingestion jobs.
Function overview
In real-time data synchronization scenarios, source data may fail to parse because of format errors, encoding issues, or incompatible schemas. This type of data that cannot be processed is called dirty data.
The data ingestion feature has supported dirty data collection since Ververica Runtime (VVR) 11.5. This feature is available for the Kafka data source. When a connector encounters data that it cannot parse, the system writes the raw data and exception information to a collector. You can use connector configuration items to configure a job to ignore errors, record details, and continue running.
When a connector encounters data that it cannot parse, the system automatically catches the raw message and exception information and writes them to a specified collector. With configuration policies, you can:
Tolerate a small amount of dirty data to prevent the entire pipeline from breaking.
Record the full context for easier troubleshooting and resolution.
Set a threshold to prevent an overflow of exceptions.
Typical use scenarios
Scenario | Objective |
Log collection pipelines (for example, unstructured data from sources such as app logs) | Handle inconsistent data quality by skipping small amounts of bad data to ensure that the main process continues to run. |
Core business table synchronization (for example, key systems such as those for orders or account changes) | Data consistency requirements are high. The goal is to trigger an alert immediately upon discovery for prompt intervention. |
Data exploration and investigation phase | Quickly process all data to understand the overall distribution, and then handle the dirty data later. |
Limits and notes
Before you use this feature, understand its capabilities and potential risks:
Connector support: Currently, only the Kafka data source supports this feature. Support for other data sources is being gradually added.
Supported collector types: Currently, only the
loggertype is supported. This type writes dirty data to a log file.
This feature is suitable for debugging and early production stages. If a large amount of dirty data persists, perform data governance on the upstream system.
Syntax structure
Enable the dirty data collector
The dirty data collector is defined in the Pipeline module. The syntax is as follows:
pipeline:
dirty-data.collector:
name: Logger Dirty Data Collector
type: loggerParameter | Description |
| The name of the collector. Use a meaningful name, such as |
| The type of the collector. The following value is available:
|
If you do not define this configuration item, dirty data is not recorded, even if you enable fault tolerance.
Configure a fault tolerance policy in the data source
Enabling the dirty data collector does not automatically skip parsing errors. To skip errors, you must use this feature with the Kafka fault tolerance policy. For more information, see the Kafka connector documentation. The following example shows the configuration:
yamlsource:
type: kafka
# Skip the first 100 parsing exceptions. If the number of exceptions exceeds 100, the job fails.
ingestion.ignore-errors: true
ingestion.error-tolerance.max-count: 100Parameter | Default value | Description |
|
| Specifies whether to ignore parsing errors. If you set this parameter to |
|
| The maximum number of dirty data records to tolerate. If |
Logger dirty data collector
The Logger dirty data collector stores dirty data in a separate log file. To view the dirty data log file, follow these steps:
Go to the Job O&M page and click the Job Logs tab.
Click Operational Logs, click the Running Task Managers subtab, and select the technology manager (TM) node of the corresponding operator.
Click Log List and click the log file named
yaml-dirty-data.outin the list to query and save the collected dirty data records.
The following metadata is recorded for dirty data:
The timestamp when the dirty data was processed
The operator and Subtask Index that generated the dirty data record
The content of the raw dirty data
The exception information that caused the processing failure
Dirty data record format example
Each record contains the following metadata:
text[2025-04-05 10:23:45] [Operator: SourceKafka -> Subtask: 2]
Raw Data: {"id": "abc", "ts": "invalid-timestamp"}
Exception: java.time.format.DateTimeParseException: Text 'invalid-timestamp' could not be parsed at index 0
---Field | Description |
Timestamp | The time when the dirty data was caught. |
Operator & Subtask | The specific operator and parallel instance number that caused the error. |
Raw Data | The content of the raw, unparsed message (in Base64 or string format). |
Exception | The exception type and stack summary for the parsing failure. |
FAQ
Does dirty data affect checkpoints?
No, it does not. Dirty data is intercepted before the state is updated, so it does not affect the success of checkpoints.
What is the difference between this feature and the side output stream in Flink SQL?
Dirty data collector: Processes data that fails to deserialize or parse.
Side Output: Processes data that can be parsed but does not meet business rules.