全部產品
Search
文件中心

Realtime Compute for Apache Flink:髒資料收集

更新時間:Jan 10, 2026

本文將向您介紹如何在Flink CDC資料攝入作業中使用髒資料收集器。

功能概述

在即時資料同步情境中,源端資料可能因格式錯誤、編碼異常或 schema 不相容等問題導致解析失敗。這類無法正常處理的資料稱為 髒資料(Dirty Data)。

資料攝入自VVR 11.5版本開始支援髒資料收集,資料攝入的Kafka資料來源已經支援該功能。當連接器遇到無法解析的資料時,系統會將未經處理資料和異常資訊寫入收集器。結合連接器配置項,您可以將作業配置為忽略錯誤、記錄詳情、同時保持正常運行。

當連接器遇到無法解析的資料時,系統將自動捕獲原始訊息和異常資訊,並將其寫入指定的收集器。結合配置策略,可以實現:

  • 容忍少量髒資料,避免整條鏈路中斷。

  • 記錄完整上下文,便於後續定位與修複。

  • 設定閾值控制,防止異常泛濫。

典型使用情境

使用情境

目標說明

日誌採集管道

(如App 日誌等非結構化資料來源)

資料品質參差不齊,允許跳過少量壞資料,保障主流程持續運行

核心業務表同步

(如訂單、賬戶變更等關鍵系統)

對一致性要求高,發現即警示,及時介入處理

資料探查與調研階段

快速跑通全量資料,先瞭解整體分布,再回頭治理髒資料

使用限制與注意事項

在使用前,請務必瞭解當前功能的能力邊界和潛在風險:

  • Connector 支援範圍:當前僅 Kafka 資料來源已接入該能力,其他 Source 正在逐步覆蓋。

  • 支援的收集器類型:當前僅支援 logger 類型,即將髒資料寫入記錄檔。

說明

此功能適合用於調試期和生產初期;若長期出現大量髒資料,建議推動上遊系統進行資料治理。

文法結構

啟用髒資料收集器

髒資料收集器在Pipeline模組中定義。文法如下所示:

pipeline:
  dirty-data.collector:
    name: Logger Dirty Data Collector
    type: logger

參數

說明

name

收集器名稱,建議命名有意義(如 Kafka-DQ-Collector

type

收集器類型,可選值如下:

  • logger:將髒資料寫入記錄檔。

說明

未定義此配置項時,即使開啟容錯,髒資料也不會被記錄。

在資料來源中配置容錯策略

配置髒資料收集並無法讓解析報錯被跳過,建議結合Kafka的容錯策略配合使用,詳情可見Kafka連接器文檔。配置樣本如下:

yamlsource:
  type: kafka
  # 跳過前 100 次出現的解析異常;若超過 100 次則作業失敗。
  ingestion.ignore-errors: true
  ingestion.error-tolerance.max-count: 100

參數

預設值

說明

ingestion.ignore-errors

false

是否忽略解析錯誤。

設定為 true 時,跳過資料處理;設定為 false 時立即失敗。

ingestion.error-tolerance.max-count

-1(不限)

最大容忍髒資料條數。

ingestion.ignore-errorstrue時,如果收集到的髒資料超過此數值則觸發Failover停止作業。

Logger髒資料收集器

Logger髒資料收集器會將髒資料存放區到單獨的記錄檔中。您可以按照下面的方法查看對應的髒資料記錄檔:

  1. 進入作業營運頁面,點擊作業日誌選項卡;

  2. 點擊作業記錄,選擇運行Task Managers二級選項卡並選擇對應運算元的TM節點;

  3. 點擊日誌列表,並在下方列表中點擊名為yaml-dirty-data.out的記錄檔即可查詢、儲存收集到的髒資料記錄。

目前會為髒資料記錄以下中繼資料資訊:

  • 處理該條髒資料的時間戳記

  • 發出髒資料記錄的運算元及Subtask Index

  • 原始髒資料內容

  • 造成處理失敗的異常資訊

髒資料記錄格式樣本

每條記錄包含以下中繼資料資訊:

text[2025-04-05 10:23:45] [Operator: SourceKafka -> Subtask: 2]
Raw Data: {"id": "abc", "ts": "invalid-timestamp"}
Exception: java.time.format.DateTimeParseException: Text 'invalid-timestamp' could not be parsed at index 0
---

欄位

說明

時間戳記

髒資料被捕獲的時間

Operator & Subtask

出錯的具體運算元及並行執行個體編號

Raw Data

原始未解析的訊息內容(Base64 或字串形式)

Exception

解析失敗的異常類型與堆棧摘要

常見問題

髒資料會影響 Checkpoint 嗎?

不會。髒資料在進入狀態更新前就被攔截,因此不影響 Checkpoint 的成功與否。

和 Flink SQL 的側輸出資料流有什麼區別?

  • 髒資料收集器:用於處理“無法還原序列化或解析失敗”的資料;

  • 側輸出資料流(Side Output):用於處理“能解析但不符合商務規則”的資料。