When a Kafka source connector encounters data it cannot parse, the dirty data collector captures the raw message and exception details and writes them to a log file — letting the job continue running instead of failing immediately.
How it works
During data ingestion, source data can fail to parse for various reasons: format errors, encoding issues, or schema mismatches. Data that cannot be processed is called dirty data.
The dirty data collector is a two-part configuration:
-
Pipeline-level collector — defines where to write dirty data (currently
logger, which writes to a log file) -
Source-level fault tolerance — controls whether the job skips parsing errors and how many it tolerates before triggering a failover
Both parts must be configured together. Defining the collector alone does not enable error skipping; configuring fault tolerance without a collector means dirty data is silently dropped with no record.
Dirty data collection is supported since Ververica Runtime (VVR) 11.5 and is currently available for Kafka data sources only.
Use cases
| Scenario | Approach |
|---|---|
| Log collection pipelines (e.g., app logs from unstructured sources) | Skip a small amount of dirty data to keep the main pipeline running |
| Core business table sync (e.g., orders, account changes) | Set a low threshold and trigger an alert when dirty data is detected for prompt intervention |
| Data exploration phase | Process the full dataset to understand overall data distribution, then handle dirty data separately |
Limitations
-
Connector support: Only the Kafka data source is currently supported. Support for other sources is being gradually added.
-
Collector types: Only the
loggertype is supported. It writes dirty data to a log file namedyaml-dirty-data.out.
This feature is suited for debugging and early production stages. If a large amount of dirty data persists, perform data governance on the upstream system.
Configure the dirty data collector
The following example shows a complete pipeline configuration with dirty data collection and fault tolerance enabled. Copy and adapt it to your deployment:
pipeline:
dirty-data.collector:
name: Kafka-DQ-Collector
type: logger
source:
type: kafka
# Skip the first 100 parsing errors. The job fails if the count exceeds 100.
ingestion.ignore-errors: true
ingestion.error-tolerance.max-count: 100
Pipeline module parameters
The dirty data collector is defined in the Pipeline module under dirty-data.collector.
| Parameter | Description |
|---|---|
name |
The name of the collector. Use a meaningful name, such as Kafka-DQ-Collector. |
type |
The collector type. Currently only logger is supported. It writes dirty data to a log file. |
If dirty-data.collector is not defined, dirty data is not recorded even if fault tolerance is enabled on the source.
Source fault tolerance parameters
Fault tolerance is configured on the Kafka source. For the full list of Kafka connector options, see the Kafka connector document.
| Parameter | Default | Description |
|---|---|---|
ingestion.ignore-errors |
false |
Controls how the job handles parsing errors. If false (default), the job fails immediately on the first parsing error. If true, the job skips the error and continues processing. |
ingestion.error-tolerance.max-count |
-1 (unlimited) |
The maximum number of dirty data records to tolerate. When ingestion.ignore-errors is true and the number of skipped records exceeds this value, a failover is triggered and the job stops. |
Settingingestion.ignore-errorstotruewithout configuringingestion.error-tolerance.max-countmeans the job tolerates an unlimited number of parsing errors. Set a threshold appropriate to your data quality requirements to avoid silent data loss at scale.
View dirty data logs
Dirty data is written to yaml-dirty-data.out on the Task Manager (TM) node where the error occurred. To access the file:
-
Go to the Job O&M page and click the Job Log tab.
-
Click Operational Log, go to the Running Task Managers sub-tab, and select the TM node for the operator.
-
Click Log List and select
yaml-dirty-data.outto query and download the dirty data records.
Dirty data record format
Each record follows this format:
[2025-04-05 10:23:45] [Operator: SourceKafka -> Subtask: 2]
Raw Data: {"id": "abc", "ts": "invalid-timestamp"}
Exception: java.time.format.DateTimeParseException: Text 'invalid-timestamp' could not be parsed at index 0
---
| Field | Description |
|---|---|
| Timestamp | The time when the dirty data was caught |
| Operator & Subtask | The operator and parallel instance number where the error occurred |
| Raw Data | The raw, unparsed message content (in Base64 or string format) |
| Exception | The exception type and stack summary for the parsing failure |
FAQ
Does dirty data affect checkpoints?
No. Dirty data is intercepted before the state is updated, so it does not affect checkpoint success.
What is the difference between dirty data collector and Flink SQL side output?
-
Dirty data collector: Handles data that fails deserialization or parsing — the connector cannot read the message at all.
-
Side output: Handles data that parses successfully but does not conform to business rules — the data is readable but logically invalid.