Topik ini menjelaskan cara menggunakan collector data kotor dalam pekerjaan ingest data Flink CDC.
Ikhtisar fungsi
Dalam skenario sinkronisasi data real-time, data sumber mungkin gagal diurai karena kesalahan format, masalah encoding, atau ketidakcocokan skema. Data semacam ini yang tidak dapat diproses dengan benar disebut data kotor.
Pengumpulan data kotor telah didukung dalam ingest data sejak Ververica Runtime (VVR) 11.5 dan tersedia untuk sumber data Kafka. Ketika sebuah connector menghadapi data yang tidak dapat diurai, sistem menuliskan data mentah beserta informasi pengecualian ke collector. Anda dapat menggunakan item konfigurasi connector untuk mengonfigurasi pekerjaan agar mengabaikan error, mencatat detailnya, dan terus berjalan.
Saat connector menghadapi data yang tidak dapat diurai, sistem secara otomatis menangkap pesan mentah dan informasi pengecualian, lalu menuliskannya ke collector yang ditentukan. Dengan kebijakan konfigurasi, Anda dapat:
Memberikan toleransi terhadap jumlah kecil data kotor untuk mencegah seluruh pipeline terganggu.
Mencatat konteks lengkap untuk troubleshooting dan resolusi di kemudian hari.
Menetapkan ambang batas untuk mencegah limpahan pengecualian.
Skenario khas
Skenario | Deskripsi |
Pipeline pengumpulan log (misalnya, dari sumber data tidak terstruktur seperti log aplikasi) | Kualitas data bisa tidak konsisten. Anda dapat melewatkan jumlah kecil data kotor untuk memastikan proses utama tetap berjalan. |
Sinkronisasi tabel bisnis inti (misalnya, untuk sistem kunci seperti pesanan atau perubahan akun) | Persyaratan konsistensi tinggi. Anda dapat memicu peringatan saat data kotor ditemukan agar intervensi cepat dapat dilakukan. |
Fase eksplorasi dan investigasi data | Memproses seluruh set data dengan cepat untuk memahami distribusi data secara keseluruhan terlebih dahulu, lalu menangani data kotornya. |
Batasan dan catatan
Sebelum menggunakan fitur ini, Anda harus memahami batasan dan potensi ancamannya:
Dukungan connector: Saat ini, hanya sumber data Kafka yang mendukung fitur ini. Dukungan untuk sumber lain sedang ditambahkan secara bertahap.
Jenis collector yang didukung: Saat ini, hanya tipe
loggeryang didukung. Tipe ini menulis data kotor ke file log.
Fitur ini cocok untuk tahap debugging dan produksi awal. Jika jumlah besar data kotor terus berlanjut, lakukan tata kelola data pada sistem hulu.
Sintaks
Aktifkan collector data kotor
Collector data kotor didefinisikan dalam modul Pipeline. Sintaksnya adalah sebagai berikut:
pipeline:
dirty-data.collector:
name: Logger Dirty Data Collector
type: loggerParameter | Deskripsi |
| Nama collector. Berikan nama yang bermakna, seperti |
| Tipe collector. Nilai berikut tersedia:
|
Jika item konfigurasi ini tidak didefinisikan, data kotor tidak akan dicatat, meskipun toleransi kesalahan diaktifkan.
Konfigurasikan kebijakan toleransi kesalahan di sumber data
Hanya mengonfigurasi pengumpulan data kotor saja tidak cukup untuk mengaktifkan pengabaian error parsing. Anda harus menggunakan fitur ini bersamaan dengan kebijakan toleransi kesalahan Kafka. Untuk informasi lebih lanjut, lihat dokumen connector Kafka. Contoh berikut menunjukkan konfigurasi sampel:
source:
type: kafka
# Lewati 100 pengecualian parsing pertama. Jika jumlah pengecualian melebihi 100, pekerjaan gagal.
ingestion.ignore-errors: true
ingestion.error-tolerance.max-count: 100Parameter | Default | Deskripsi |
|
| Menentukan apakah akan mengabaikan error parsing. Jika diatur ke |
|
| Jumlah maksimum catatan data kotor yang dapat ditoleransi. Jika |
Collector data kotor logger
Collector data kotor logger menyimpan data kotor dalam file log terpisah. Untuk melihat file log data kotor, lakukan langkah-langkah berikut:
Buka halaman Job O&M dan klik tab Job Log.
Klik Operational Log, buka sub-tab Running Task Managers, lalu pilih node task manager (TM) untuk operator tersebut.
Klik Log List dan pilih file log
yaml-dirty-data.outdari daftar untuk mengkueri dan menyimpan catatan data kotor.
Metadata berikut saat ini direkam untuk data kotor:
Timestamp saat data kotor diproses
Operator dan Indeks Subtask yang melaporkan catatan data kotor
Konten data kotor mentah
Informasi pengecualian yang menyebabkan kegagalan pemrosesan
Contoh format catatan data kotor
Setiap catatan berisi metadata berikut:
text[2025-04-05 10:23:45] [Operator: SourceKafka -> Subtask: 2]
Raw Data: {"id": "abc", "ts": "invalid-timestamp"}
Exception: java.time.format.DateTimeParseException: Text 'invalid-timestamp' could not be parsed at index 0
---Field | Deskripsi |
Timestamp | Waktu saat data kotor ditangkap. |
Operator & Subtask | Operator spesifik dan nomor instans paralel tempat error terjadi. |
Raw Data | Konten pesan mentah yang tidak terurai (dalam format Base64 atau string). |
Exception | Jenis pengecualian dan ringkasan stack untuk kegagalan parsing. |
FAQ
Apakah data kotor memengaruhi checkpoint?
Tidak. Data kotor dicegat sebelum state diperbarui sehingga tidak memengaruhi keberhasilan checkpoint.
Apa perbedaan antara fitur ini dan side output stream di Flink SQL?
Collector data kotor: Memproses data yang gagal dideserialisasi atau diurai.
Side Output: Memproses data yang dapat diurai tetapi tidak sesuai dengan aturan bisnis.