All Products
Search
Document Center

Realtime Compute for Apache Flink:Checkpoint FAQ

Last Updated:Mar 10, 2026

Topik ini menjawab pertanyaan umum (FAQ) mengenai checkpoint sistem dan snapshot pekerjaan di Realtime Compute for Apache Flink.

Mengapa tidak ada data baru yang diperbarui setelah table.exec.state.ttl kedaluwarsa saat minibatch diaktifkan?

Saat minibatch diaktifkan, data diproses secara batch dan disimpan dalam state. Data dalam state tersebut didasarkan pada hasil komputasi penuh sebelumnya. Jika state tersebut dihapus karena masa hidup data (TTL) telah kedaluwarsa, hasil komputasi terakumulasi sebelumnya juga hilang. Akibatnya, data tidak dapat diperbarui berdasarkan hasil minibatch.

Sebaliknya, jika minibatch dinonaktifkan, ketika state kedaluwarsa karena TTL, data untuk kunci yang kedaluwarsa akan diakumulasi ulang dan di-output. Hal ini memastikan pembaruan data tetap berlanjut. Namun, peningkatan frekuensi pembaruan data dapat menyebabkan masalah lain, seperti penundaan pemrosesan data.

Oleh karena itu, Anda harus mengonfigurasi minibatch dan TTL sesuai dengan skenario bisnis Anda.

Bagaimana cara menghitung waktu mulai checkpoint periodik berikutnya?

Dua parameter memengaruhi waktu mulai checkpoint berikutnya: interval checkpoint dan jeda minimum antar checkpoint. Checkpoint berikutnya dipicu ketika kedua kondisi berikut terpenuhi:

  • Interval checkpoint: Selisih waktu minimum antara waktu mulai checkpoint sebelumnya dan waktu mulai checkpoint berikutnya.

  • Jeda minimum: Selisih waktu minimum antara waktu selesai checkpoint sebelumnya dan waktu mulai checkpoint berikutnya.

Dua skenario berikut mengilustrasikan hal ini. Dalam kedua skenario, interval checkpoint adalah 3 menit, jeda minimum adalah 3 menit, dan timeout adalah 10 menit.

  • Skenario 1: Pekerjaan berjalan normal dan setiap checkpoint berhasil.

    Checkpoint pertama dimulai pukul 12:00:00 dan berhasil pada 12:00:02. Checkpoint kedua dimulai pukul 12:03:00.

  • Skenario 2: Pekerjaan berjalan tidak normal. Misalnya, checkpoint mengalami timeout atau gagal.

    Checkpoint pertama dimulai pukul 12:00:00 dan berhasil pada 12:00:02. Checkpoint kedua dimulai pukul 12:03:00, tetapi mengalami timeout dan gagal pada 12:13:00. Checkpoint ketiga dimulai pukul 12:16:00.

Untuk informasi lebih lanjut tentang pengaturan jeda minimum antar checkpoint, lihat Tuning Checkpointing.

Apa perbedaan antara GeminiStateBackend yang digunakan di VVR 8.x dan VVR 6.x?

Mesin komputasi Realtime Compute for Apache Flink Ververica Runtime (VVR) 6.x menggunakan versi V3 GeminiStateBackend secara default. VVR 8.x menggunakan versi V4 GeminiStateBackend secara default.

Kategori

Detail

Fitur dasar

  • Versi lama (V3): Mendukung fitur seperti pemisahan key-value, pemisahan penyimpanan dan komputasi, snapshot pekerjaan dalam format standar atau native, serta lazy state loading.

  • Versi baru (V4): Arsitektur inti dan fitur GeminiStateBackend versi lama ditingkatkan berdasarkan karakteristik skenario komputasi aliran. Versi baru mendukung semua fitur versi lama dan memberikan kinerja akses state yang lebih baik serta penskalaan yang lebih cepat.

Parameter lazy state loading

  • Versi baru: state.backend.gemini.file.cache.download.type: LazyDownloadOnRestore

  • Versi lama: state.backend.gemini.file.cache.lazy-restore: ON

Perbedaan dalam penggunaan Managed Memory

Satu-satunya perbedaan terletak pada metrik Resident Set Size (RSS):

  • Versi baru: Memori diminta dari sistem operasi dan tercermin dalam metrik RSS hanya saat benar-benar digunakan.

  • Versi lama: Langsung meminta managed memory state * 80% dari sistem operasi dan mengelola memori tersebut sendiri. Jumlah ini langsung tercermin dalam metrik RSS sejak pekerjaan dimulai.

Catatan

Untuk informasi lebih lanjut tentang Managed Memory, lihat TaskManager Memory.

Apakah normal jika ukuran checkpoint penuh sama dengan checkpoint inkremental?

Jika Anda mengamati bahwa ukuran checkpoint penuh sama dengan checkpoint inkremental saat menggunakan Flink, lakukan langkah-langkah berikut:

  • Periksa apakah snapshot inkremental dikonfigurasi dengan benar dan berlaku.

  • Perilaku ini bisa normal dalam situasi tertentu. Contohnya:

    1. Sebelum ingesti data (sebelum pukul 18:29), pekerjaan belum memproses data apa pun. Checkpoint hanya berisi informasi state awal dari sumber. Karena tidak ada data state lainnya, checkpoint ini merupakan checkpoint penuh.

    2. Pada pukul 18:29, satu juta entri data diingesti. Asumsikan data tersebut sepenuhnya diproses dalam interval checkpoint berikutnya (3 menit), dan tidak ada data tambahan yang diingesti selama periode tersebut. Checkpoint inkremental pertama akan berisi seluruh informasi state yang dihasilkan oleh satu juta entri data tersebut.

    Dalam kasus ini, wajar jika ukuran checkpoint penuh dan checkpoint inkremental sama. Hal ini karena checkpoint inkremental pertama harus berisi seluruh state data agar seluruh state dapat dipulihkan dari titik tersebut. Secara efektif, checkpoint ini menjadi checkpoint penuh.

    Manfaat checkpoint inkremental biasanya mulai terlihat sejak checkpoint kedua dan seterusnya. Ketika input data stabil dan tidak terjadi perubahan state berskala besar, checkpoint inkremental berikutnya diharapkan lebih kecil. Hal ini menunjukkan bahwa sistem berhasil membuat snapshot hanya untuk bagian state yang bertambah. Jika ukurannya tetap sama, Anda perlu meninjau kembali state dan perilaku sistem untuk mengidentifikasi potensi masalah.

Apa yang harus saya lakukan jika checkpoint lambat untuk pekerjaan Python?

  • Penyebab

    Operator Python memiliki cache internal. Saat checkpoint dilakukan, seluruh data dalam cache harus diproses. Oleh karena itu, jika kinerja user-defined function (UDF) Python buruk, waktu checkpoint meningkat dan memengaruhi eksekusi pekerjaan.

  • Solusi

    Untuk mengurangi ukuran cache, atur parameter berikut di bagian Additional Configurations. Untuk informasi lebih lanjut, lihat How do I configure custom runtime parameters for a job?.

    # Nilai default adalah 100000. Satuannya adalah jumlah entri.
    python.fn-execution.bundle.size
    # Nilai default adalah 1000. Satuannya adalah milidetik.
    python.fn-execution.bundle.time

    Untuk informasi lebih lanjut tentang parameter tersebut, lihat Flink Python Configuration.

Bagaimana Cara Memecahkan Masalah Checkpoint Abnormal dalam Suatu Pekerjaan?

  1. Mendiagnosis Jenis Eksepsi

    Di tab Monitoring and Alerts atau di State Management, Anda dapat melihat riwayat checkpoint untuk mengidentifikasi jenis exception, seperti timeout checkpoint atau kegagalan penulisan.

    image

  2. Klasifikasi, Identifikasi, dan Penanganan

    • Skenario 1: Timeout checkpoint sering terjadi. Periksa apakah pekerjaan mengalami tekanan balik (backpressure). Anda dapat menganalisis akar penyebab tekanan balik, menemukan operator yang lambat, dan mengambil tindakan yang sesuai, seperti menyesuaikan sumber daya atau konfigurasi. Untuk informasi lebih lanjut, lihat How do I troubleshoot job backpressure?

    • Skenario 2: Kegagalan penulisan checkpoint. Anda dapat menemukan log TaskManager (TM) dengan langkah-langkah berikut. Kemudian, analisis log tersebut untuk menentukan penyebabnya.

      1. Di halaman log pekerjaan, pada tab Checkpoints, klik Checkpoints History.

        image

      2. Klik tanda plus (+) di sebelah kiri checkpoint abnormal untuk melihat status checkpoint operator tersebut.

      3. Klik tanda plus (+) di sebelah kiri operator abnormal. Lalu, klik ID subtask abnormal untuk menuju TM yang sesuai.

        image

Error: You are using the new V4 state engine to restore old state data from a checkpoint

  • Detail Kesalahan

    Saat Anda melakukan peningkatan dari VVR 6.x ke VVR 8.x, muncul error berikut: You are using the new V4 state engine to restore old state data from a checkpoint.

  • Penyebab

    Versi backend state Gemini yang digunakan oleh VVR 6.x dan 8.x berbeda, sehingga checkpoint-nya tidak kompatibel.

  • Solusi

    Anda dapat menggunakan salah satu metode berikut untuk mengatasi masalah ini:

    • Buat snapshot pekerjaan dalam format standar dan mulai pekerjaan dari state tersebut. Untuk informasi lebih lanjut, lihat Manually create a job snapshot dan Start a job.

    • Mulai ulang pekerjaan tanpa state.

    • (Tidak direkomendasikan) Terus gunakan versi lama Gemini. Anda harus mengonfigurasi state.backend.gemini.engine.type: STREAMING dan mulai ulang pekerjaan agar perubahan berlaku. Untuk informasi lebih lanjut tentang cara mengonfigurasi parameter, lihat How do I configure runtime parameters for a job?

    • (Tidak direkomendasikan) Terus gunakan mesin VVR 6.x untuk menjalankan pekerjaan.

Error: java.lang.NegativeArraySizeException

  • Detail kesalahan

    Saat pekerjaan menggunakan List State, exception berikut terjadi selama runtime.

    Caused by: java.lang.NegativeArraySizeException
      at com.alibaba.gemini.engine.rm.GUnPooledByteBuffer.newTempBuffer(GUnPooledByteBuffer.java:270)
      at com.alibaba.gemini.engine.page.bmap.BinaryValue.merge(BinaryValue.java:85)
      at com.alibaba.gemini.engine.page.bmap.BinaryValue.merge(BinaryValue.java:75)
      at com.alibaba.gemini.engine.pagestore.PageStoreImpl.internalGet(PageStoreImpl.java:428)
      at com.alibaba.gemini.engine.pagestore.PageStoreImpl.get(PageStoreImpl.java:271)
      at com.alibaba.gemini.engine.pagestore.PageStoreImpl.get(PageStoreImpl.java:112)
      at com.alibaba.gemini.engine.table.BinaryKListTable.get(BinaryKListTable.java:118)
      at com.alibaba.gemini.engine.table.BinaryKListTable.get(BinaryKListTable.java:57)
      at com.alibaba.flink.statebackend.gemini.subkeyed.GeminiSubKeyedListStateImpl.getOrDefault(GeminiSubKeyedListStateImpl.java:97)
      at com.alibaba.flink.statebackend.gemini.subkeyed.GeminiSubKeyedListStateImpl.get(GeminiSubKeyedListStateImpl.java:88)
      at com.alibaba.flink.statebackend.gemini.subkeyed.GeminiSubKeyedListStateImpl.get(GeminiSubKeyedListStateImpl.java:47)
      at com.alibaba.flink.statebackend.gemini.context.ContextSubKeyedListState.get(ContextSubKeyedListState.java:60)
      at com.alibaba.flink.statebackend.gemini.context.ContextSubKeyedListState.get(ContextSubKeyedListState.java:44)
      at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:533)
      at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:289)
      at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1435)
  • Penyebab

    Data state untuk satu kunci tunggal dalam List State terlalu besar dan melebihi 2 GB. Proses yang menyebabkan data state berukuran besar adalah sebagai berikut:

    1. Selama operasi pekerjaan normal, nilai-nilai yang ditambahkan di bawah satu kunci dalam List State digabung melalui operasi merge, misalnya dalam List State yang berisi operator window. Hal ini menyebabkan data state terus terakumulasi.

    2. Ketika data state terakumulasi hingga titik tertentu, pertama-tama akan memicu error kehabisan memori (OOM). Setelah pekerjaan pulih dari kegagalan tersebut, proses merge List State dapat menyebabkan ukuran array byte sementara yang diminta oleh backend state melebihi batas yang tersedia, sehingga memicu exception ini.

    Catatan

    RocksDBStateBackend juga dapat mengalami masalah serupa dan memicu ArrayIndexOutOfBoundsException atau segmentation fault. Untuk informasi lebih lanjut, lihat The EmbeddedRocksDBStateBackend.

  • Solusi

    • Jika operator window menyebabkan data state menjadi terlalu besar, Anda dapat mengurangi ukuran window.

    • Jika logika pekerjaan tidak efisien, Anda dapat menyesuaikan logikanya. Misalnya, Anda dapat membagi kunci tersebut.

Error: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Too many ongoing snapshots.

  • Detail error

    org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Too many ongoing snapshots. Increase kafka producers pool size or decrease number of concurrent checkpoints
  • Penyebab

    Error ini disebabkan oleh beberapa kegagalan checkpoint berturut-turut saat Kafka digunakan sebagai sink.

  • Solusi

    Anda dapat menyesuaikan durasi timeout checkpoint menggunakan parameter execution.checkpointing.timeout untuk memastikan checkpoint tidak gagal karena timeout. Untuk informasi lebih lanjut tentang cara mengonfigurasi parameter, lihat How do I configure custom runtime parameters for a job?

Error: Exceeded checkpoint tolerable failure threshold

  • Detail kesalahan

    org.apache.flink.util.FlinkRuntimeException:Exceeded checkpoint tolerable failure threshold.
      at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:66)
  • Penyebab

    Jumlah kegagalan checkpoint yang dapat ditoleransi terlalu rendah. Ketika jumlah checkpoint yang gagal melebihi ambang batas ini, pekerjaan akan memicu failover. Jika parameter ini tidak diatur, secara default tidak ada kegagalan checkpoint yang ditoleransi.

  • Solusi

    Anda dapat mengatur nilai `num` pada parameter execution.checkpointing.tolerable-failed-checkpoints: num untuk menyesuaikan jumlah kegagalan checkpoint yang dapat ditoleransi oleh pekerjaan. Nilai `num` harus berupa 0 atau bilangan bulat positif. Jika `num` bernilai 0, tidak ada exception atau kegagalan checkpoint yang diizinkan. Untuk informasi lebih lanjut tentang cara mengonfigurasi parameter, lihat How do I configure custom runtime parameters for a job?