All Products
Search
Document Center

Realtime Compute for Apache Flink:Dirty data collection

Last Updated:Mar 26, 2026

When a Kafka source connector encounters data it cannot parse, the dirty data collector captures the raw message and exception details and writes them to a log file — letting the job continue running instead of failing immediately.

How it works

During data ingestion, source data can fail to parse for various reasons: format errors, encoding issues, or schema mismatches. Data that cannot be processed is called dirty data.

The dirty data collector is a two-part configuration:

  1. Pipeline-level collector — defines where to write dirty data (currently logger, which writes to a log file)

  2. Source-level fault tolerance — controls whether the job skips parsing errors and how many it tolerates before triggering a failover

Both parts must be configured together. Defining the collector alone does not enable error skipping; configuring fault tolerance without a collector means dirty data is silently dropped with no record.

Dirty data collection is supported since Ververica Runtime (VVR) 11.5 and is currently available for Kafka data sources only.

Use cases

Scenario Approach
Log collection pipelines (e.g., app logs from unstructured sources) Skip a small amount of dirty data to keep the main pipeline running
Core business table sync (e.g., orders, account changes) Set a low threshold and trigger an alert when dirty data is detected for prompt intervention
Data exploration phase Process the full dataset to understand overall data distribution, then handle dirty data separately

Limitations

  • Connector support: Only the Kafka data source is currently supported. Support for other sources is being gradually added.

  • Collector types: Only the logger type is supported. It writes dirty data to a log file named yaml-dirty-data.out.

This feature is suited for debugging and early production stages. If a large amount of dirty data persists, perform data governance on the upstream system.

Configure the dirty data collector

The following example shows a complete pipeline configuration with dirty data collection and fault tolerance enabled. Copy and adapt it to your deployment:

pipeline:
  dirty-data.collector:
    name: Kafka-DQ-Collector
    type: logger

source:
  type: kafka
  # Skip the first 100 parsing errors. The job fails if the count exceeds 100.
  ingestion.ignore-errors: true
  ingestion.error-tolerance.max-count: 100

Pipeline module parameters

The dirty data collector is defined in the Pipeline module under dirty-data.collector.

Parameter Description
name The name of the collector. Use a meaningful name, such as Kafka-DQ-Collector.
type The collector type. Currently only logger is supported. It writes dirty data to a log file.
If dirty-data.collector is not defined, dirty data is not recorded even if fault tolerance is enabled on the source.

Source fault tolerance parameters

Fault tolerance is configured on the Kafka source. For the full list of Kafka connector options, see the Kafka connector document.

Parameter Default Description
ingestion.ignore-errors false Controls how the job handles parsing errors. If false (default), the job fails immediately on the first parsing error. If true, the job skips the error and continues processing.
ingestion.error-tolerance.max-count -1 (unlimited) The maximum number of dirty data records to tolerate. When ingestion.ignore-errors is true and the number of skipped records exceeds this value, a failover is triggered and the job stops.
Setting ingestion.ignore-errors to true without configuring ingestion.error-tolerance.max-count means the job tolerates an unlimited number of parsing errors. Set a threshold appropriate to your data quality requirements to avoid silent data loss at scale.

View dirty data logs

Dirty data is written to yaml-dirty-data.out on the Task Manager (TM) node where the error occurred. To access the file:

  1. Go to the Job O&M page and click the Job Log tab.

  2. Click Operational Log, go to the Running Task Managers sub-tab, and select the TM node for the operator.

  3. Click Log List and select yaml-dirty-data.out to query and download the dirty data records.

Dirty data record format

Each record follows this format:

[2025-04-05 10:23:45] [Operator: SourceKafka -> Subtask: 2]
Raw Data: {"id": "abc", "ts": "invalid-timestamp"}
Exception: java.time.format.DateTimeParseException: Text 'invalid-timestamp' could not be parsed at index 0
---
Field Description
Timestamp The time when the dirty data was caught
Operator & Subtask The operator and parallel instance number where the error occurred
Raw Data The raw, unparsed message content (in Base64 or string format)
Exception The exception type and stack summary for the parsing failure

FAQ

Does dirty data affect checkpoints?

No. Dirty data is intercepted before the state is updated, so it does not affect checkpoint success.

What is the difference between dirty data collector and Flink SQL side output?

  • Dirty data collector: Handles data that fails deserialization or parsing — the connector cannot read the message at all.

  • Side output: Handles data that parses successfully but does not conform to business rules — the data is readable but logically invalid.