全部产品
Search
文档中心

Realtime Compute for Apache Flink:Panduan penanganan pengecualian

更新时间:Oct 24, 2025

Panduan ini memberikan panduan yang jelas dan dapat ditindaklanjuti untuk penanganan pengecualian bagi pengembang. Panduan ini memastikan kode Anda berfungsi secara efektif dengan toleransi kesalahan Flink, meningkatkan stabilitas pekerjaan dan observabilitas.

Latar Belakang

Apache Flink menyediakan toleransi kesalahan yang andal melalui checkpointing, restart otomatis, dan semantik tepat-sekali.

Namun, penanganan pengecualian yang tidak tepat dalam kode pengguna, seperti TaskCancelledException, OutOfMemoryError, atau ClassCastException di pekerjaan DataStream dan UDF pada pekerjaan SQL, dapat mengganggu mekanisme-mekanisme tersebut. Penanganan pengecualian yang buruk dapat menghambat pemulihan Flink, menyebabkan ketidakkonsistenan state atau kehilangan data.

Prinsip inti

Delegasikan pemulihan kesalahan tingkat sistem kepada Flink dan fokus pada penanganan pengecualian spesifik bisnis.

Jenis-jenis pengecualian dan strategi penanganan yang direkomendasikan

Jenis Pengecualian

Contoh

Strategi Penanganan yang Direkomendasikan

Pengecualian Bisnis (Dapat Dipulihkan)

Kegagalan dalam penguraian JSON, bidang data yang hilang, atau pelanggaran aturan bisnis.

Tangkap, log, dan keluarkan catatan yang salah ke Side Outputs. Ini menjaga kontinuitas aliran data utama.

Pengecualian Dependensi Eksternal (Sebagian Dapat Dipulihkan)

Masalah dengan timeout HTTP, konektivitas database, atau kesalahan API pihak ketiga (misalnya, 5xx).

Gunakan strategi retry terbatas dengan kebijakan backoff, dan eskalasi ke pengecualian jika retry gagal.

Pengecualian Sistem (Tidak Dapat Dipulihkan)

`TaskCancelledException`, `OutOfMemoryError`, `ClassCastException`, dan masalah akses state.

Jangan tangkap ini. Biarkan sistem pemulihan kesalahan bawaan Flink mengelola dan menangani kegagalan kritis ini.

Praktik terbaik

1. Hindari menangkap pengecualian generik

Jangan:

Menangkap jenis Exception yang luas dapat menyembunyikan kesalahan internal Flink yang penting (seperti masalah checkpointing atau pembatalan tugas). Hal ini dapat mengakibatkan pekerjaan yang tampak beroperasi tetapi gagal memproses data.

try {
    // logika pengguna
} catch (Exception e) {
    LOG.warn("Ada yang salah", e);
}

Lakukan:

Tangkap hanya pengecualian bisnis spesifik yang dapat dipulihkan dan gunakan Side Outputs untuk mengeluarkan catatan kesalahan.

try {
    processRecord(value);
} catch (JsonParseException e) {
    LOG.warn("Catatan input tidak valid: {}", value, e);
    ctx.output(ERROR_TAG, new ErrorRecord(value, e.getMessage()));
}

2. Delegasikan kesalahan tingkat sistem ke Flink

Jika Anda menemukan kesalahan status yang tidak dapat dipulihkan (misalnya, status belum diinisialisasi, kegagalan deserialisasi), lempar pengecualian. Ini memicu mekanisme failover Flink.

if (state.value() == null) {
    // State tidak diinisialisasi dengan benar
    throw new IllegalStateException("State tidak diinisialisasi dengan benar");
}

Flink kemudian akan secara otomatis memulihkan pekerjaan menggunakan strategi restart yang dikonfigurasi dan checkpoint berhasil terakhir.

3. Batasi retry untuk panggilan eksternal

Saat berinteraksi dengan sistem eksternal (misalnya, database, layanan HTTP), hindari menerapkan mekanisme retry tak terbatas.

int maxRetries = 3;
for (int i = 0; i < maxRetries; i++) {
    try {
        callExternalService(record);
        return;
    } catch (IOException e) {
        if (i == maxRetries - 1) {
            throw new RuntimeException("Gagal setelah " + maxRetries + " percobaan", e);
        }
        Thread.sleep(1000 * (i + 1)); // Gunakan backoff eksponensial.
    }
}

4. Pertahankan konteks pengecualian lengkap

Mencatat hanya e.getMessage() sering kali tidak memadai untuk pemecahan masalah. Sebagai alternatif:

  • Catat jejak stack lengkap.

  • Pertahankan data input asli.

  • Gunakan Side Outputs untuk mengeluarkan event kesalahan terstruktur untuk pemantauan downstream atau replay.

ctx.output(ERROR_TAG, new ErrorRecord(
    originalInput,
    Instant.now(),
    e.getClass().getSimpleName(),
    e.getMessage(),
    ExceptionUtils.stringifyException(e)
));

5. Validasi jalur penanganan pengecualian

  • Sertakan skenario pengecualian dalam unit dan tes integrasi Anda.

  • Selama tinjauan kode, periksa secara khusus logika yang menggunakan catch (Exception) yang luas atau mengabaikan pengecualian secara diam-diam.

  • Untuk setiap blok penanganan pengecualian, tanyakan: "Apakah kesalahan ini benar-benar dapat dipulihkan?" dan "Jika dapat dipulihkan, apakah logika pemulihan memengaruhi konsistensi state?"

Penanganan pengecualian yang efektif bukan hanya tentang mencegah crash pekerjaan. Penanganan pengecualian yang tepat memastikan kesalahan dapat dilacak, dapat dipulihkan, dan mempertahankan konsistensi state. Selalu percayai kemampuan toleransi kesalahan Flink dan berhati-hatilah untuk tidak mengganggu proses pemulihannya. Integrasikan praktik-praktik ini ke dalam pipeline CI/CD Anda dengan menggunakan alat analisis kode statis seperti aturan SonarQube untuk secara otomatis menandai penanganan pengecualian yang tidak tepat.