本文將向您介紹如何在Flink CDC資料攝入作業中使用髒資料收集器。
功能概述
在即時資料同步情境中,源端資料可能因格式錯誤、編碼異常或 schema 不相容等問題導致解析失敗。這類無法正常處理的資料稱為 髒資料(Dirty Data)。
資料攝入自VVR 11.5版本開始支援髒資料收集,資料攝入的Kafka資料來源已經支援該功能。當連接器遇到無法解析的資料時,系統會將未經處理資料和異常資訊寫入收集器。結合連接器配置項,您可以將作業配置為忽略錯誤、記錄詳情、同時保持正常運行。
當連接器遇到無法解析的資料時,系統將自動捕獲原始訊息和異常資訊,並將其寫入指定的收集器。結合配置策略,可以實現:
容忍少量髒資料,避免整條鏈路中斷。
記錄完整上下文,便於後續定位與修複。
設定閾值控制,防止異常泛濫。
典型使用情境
使用情境 | 目標說明 |
日誌採集管道 (如App 日誌等非結構化資料來源) | 資料品質參差不齊,允許跳過少量壞資料,保障主流程持續運行 |
核心業務表同步 (如訂單、賬戶變更等關鍵系統) | 對一致性要求高,發現即警示,及時介入處理 |
資料探查與調研階段 | 快速跑通全量資料,先瞭解整體分布,再回頭治理髒資料 |
使用限制與注意事項
在使用前,請務必瞭解當前功能的能力邊界和潛在風險:
Connector 支援範圍:當前僅 Kafka 資料來源已接入該能力,其他 Source 正在逐步覆蓋。
支援的收集器類型:當前僅支援
logger類型,即將髒資料寫入記錄檔。
此功能適合用於調試期和生產初期;若長期出現大量髒資料,建議推動上遊系統進行資料治理。
文法結構
啟用髒資料收集器
髒資料收集器在Pipeline模組中定義。文法如下所示:
pipeline:
dirty-data.collector:
name: Logger Dirty Data Collector
type: logger參數 | 說明 |
| 收集器名稱,建議命名有意義(如 |
| 收集器類型,可選值如下:
|
未定義此配置項時,即使開啟容錯,髒資料也不會被記錄。
在資料來源中配置容錯策略
配置髒資料收集並無法讓解析報錯被跳過,建議結合Kafka的容錯策略配合使用,詳情可見Kafka連接器文檔。配置樣本如下:
yamlsource:
type: kafka
# 跳過前 100 次出現的解析異常;若超過 100 次則作業失敗。
ingestion.ignore-errors: true
ingestion.error-tolerance.max-count: 100參數 | 預設值 | 說明 |
|
| 是否忽略解析錯誤。 設定為 |
|
| 最大容忍髒資料條數。 在 |
Logger髒資料收集器
Logger髒資料收集器會將髒資料存放區到單獨的記錄檔中。您可以按照下面的方法查看對應的髒資料記錄檔:
進入作業營運頁面,點擊作業日誌選項卡;
點擊作業記錄,選擇運行Task Managers二級選項卡並選擇對應運算元的TM節點;
點擊日誌列表,並在下方列表中點擊名為
yaml-dirty-data.out的記錄檔即可查詢、儲存收集到的髒資料記錄。
目前會為髒資料記錄以下中繼資料資訊:
處理該條髒資料的時間戳記
發出髒資料記錄的運算元及Subtask Index
原始髒資料內容
造成處理失敗的異常資訊
髒資料記錄格式樣本
每條記錄包含以下中繼資料資訊:
text[2025-04-05 10:23:45] [Operator: SourceKafka -> Subtask: 2]
Raw Data: {"id": "abc", "ts": "invalid-timestamp"}
Exception: java.time.format.DateTimeParseException: Text 'invalid-timestamp' could not be parsed at index 0
---欄位 | 說明 |
時間戳記 | 髒資料被捕獲的時間 |
Operator & Subtask | 出錯的具體運算元及並行執行個體編號 |
Raw Data | 原始未解析的訊息內容(Base64 或字串形式) |
Exception | 解析失敗的異常類型與堆棧摘要 |
常見問題
髒資料會影響 Checkpoint 嗎?
不會。髒資料在進入狀態更新前就被攔截,因此不影響 Checkpoint 的成功與否。
和 Flink SQL 的側輸出資料流有什麼區別?
髒資料收集器:用於處理“無法還原序列化或解析失敗”的資料;
側輸出資料流(Side Output):用於處理“能解析但不符合商務規則”的資料。