All Products
Search
Document Center

Realtime Compute for Apache Flink:FAQ Konektor

Last Updated:Mar 10, 2026

Jawaban atas pertanyaan umum mengenai konektor di Realtime Compute for Apache Flink, dikelompokkan berdasarkan jenis konektornya.

Indeks gejala cepat

Gunakan tabel ini untuk menemukan entri FAQ yang tepat ketika Anda mengetahui gejalanya tetapi tidak tahu konektor mana yang menyebabkannya.

Gejala

Entri terkait

Tidak ada output data

Kafka: jendela event time tidak menghasilkan output / Hudi: Tidak ada data di penyimpanan / Tablestore: join dimensi tidak mengembalikan data

Data duplikat

MaxCompute: Penulisan data duplikat / Hudi: data duplikat

Out of memory (OOM)

Paimon: Timeout heartbeat TaskManager / Simple Log Service: OOM saat memulihkan

Masalah checkpoint

Hologres: checkpoint vs. visibilitas data / Paimon: checkpoint dan visibilitas data

Startup atau pembacaan lambat

MaxCompute: pekerjaan tetap dalam status starting/ MaxCompute: sumber inkremental lambat memulai pembacaan

Kesalahan izin

MaxCompute: Authorization Failed 4019 / Hologres: izin ditolak untuk database

Ruang disk

Paimon: tidak ada ruang tersisa di perangkat / Paimon: file besar di OSS

Kesalahan koneksi

Kafka: terhubung tetapi tidak dapat membaca/menulis / Hologres: slot koneksi tersisa


Kafka

Mengurai data JSON dari Kafka

Untuk JSON standar, gunakan format JSON dalam DDL Anda.

Untuk JSON bersarang, petakan objek JSON ke tipe ROW dan array JSON ke tipe ARRAY. Contoh dengan input berikut:

{
    "a": "abc",
    "b": 1,
    "c": {
        "e": ["1", "2", "3", "4"],
        "f": {"m": "567"}
    }
}

DDL tabel sumber:

CREATE TEMPORARY TABLE kafka_table (
  `a` VARCHAR,
  b INT,
  `c` ROW<e ARRAY<VARCHAR>, f ROW<m VARCHAR>>  -- Objek JSON = ROW, array JSON = ARRAY
) WITH (
  'connector' = 'kafka',
  'topic' = '<your-topic>',
  'properties.bootstrap.servers' = '<broker-list>',
  'properties.group.id' = '<group-id>',
  'format' = 'json',
  'scan.startup.mode' = '<startup-mode>'
);

DDL tabel sink:

CREATE TEMPORARY TABLE sink (
  `a` VARCHAR,
  b INT,
  e VARCHAR,
  `m` VARCHAR
) WITH (
  'connector' = 'print',
  'logger' = 'true'
);

DML untuk mengekstrak field bersarang:

INSERT INTO sink
  SELECT
    `a`,
    b,
    c.e[1],   -- Pengindeksan array dimulai dari 1 di Flink
    c.f.m
  FROM kafka_table;

Terhubung ke Kafka tetapi tidak dapat membaca atau menulis data

Penyebab

Ketika proxy atau pemetaan port berada di antara Realtime Compute for Apache Flink dan Kafka, klien Kafka mengambil endpoint broker dari metadata Kafka, bukan menggunakan alamat proxy. Meskipun koneksi awal berhasil, operasi data selanjutnya gagal karena Flink mencoba mengakses broker secara langsung.

Pemecahan masalah

  1. Hubungkan ke layanan ZooKeeper kluster Kafka Anda menggunakan zkCli.sh atau zookeeper-shell.sh.

  2. Jalankan get /brokers/ids/0 untuk mengambil metadata broker. Perhatikan field endpoints.

  3. Dari lingkungan Flink, jalankan ping atau telnet terhadap endpoint tersebut. Jika pengujian konektivitas gagal, berarti proxy atau pemetaan port sedang digunakan.

Solusi

  • Buat koneksi jaringan langsung antara Flink dan Kafka, melewati proxy.

  • Atau, konfigurasikan advertised.listeners pada broker Kafka agar menggunakan alamat proxy sehingga metadata broker mengembalikan alamat yang dapat dijangkau.

advertised.listeners tersedia di Kafka 0.10.2.0 dan versi setelahnya. Untuk detailnya, lihat KIP-103: Separation of Internal and External traffic dan Masalah koneksi jaringan Kafka.

Mengapa jendela event time tidak menghasilkan output dari tabel sumber Kafka?

Partisi yang idle tanpa data masuk menghambat kemajuan watermark, sehingga mencegah pemicuan jendela event time.

Solusi

  1. Pastikan semua partisi tabel sumber Kafka berisi data.

  2. Aktifkan deteksi sumber idle dengan menambahkan parameter berikut ke field Other Configuration pada tab Configuration: Untuk detail parameter, lihat Konfigurasi Flink.

       table.exec.source.idle-timeout: 5

Commit offset di Kafka

Realtime Compute for Apache Flink melakukan commit offset baca konsumen ke Kafka pada setiap checkpoint. Hal ini mencatat posisi data yang telah diproses dan mencegah duplikasi atau kehilangan data saat pemulihan.

Jika checkpointing dinonaktifkan atau interval checkpoint terlalu lama, offset mungkin tidak di-commit, sehingga menyebabkan duplikasi atau kehilangan data setelah kegagalan.

Mengurai JSON bersarang dengan UDTF

Ketika konektor Kafka membaca data seperti {"data":[{"cola":"test1","colb":"test2"}, ...]}, array bersarang diurai sebagai ARRAY<ROW<cola VARCHAR, colb VARCHAR>>. Proses elemen array tersebut menggunakan user-defined table-valued function (UDTF).

Terhubung ke kluster Kafka yang diamankan

Tambahkan properti keamanan ke klausa WITH DDL tabel Kafka Anda. Tambahkan awalan properties. pada setiap properti keamanan Kafka.

Penting

Modul login JAAS menggunakan path kelas yang di-shade (org.apache.flink.kafka.shaded.org.apache.kafka...) di Realtime Compute for Apache Flink, yang berbeda dari path kelas Apache Kafka standar.

Contoh SASL/PLAIN:

CREATE TABLE KafkaTable (
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING,
  `ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
  'connector' = 'kafka',
  ...
  'properties.security.protocol' = 'SASL_PLAINTEXT',
  'properties.sasl.mechanism' = 'PLAIN',
  'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<username>\" password=\"<password>\";'
);

Contoh SASL_SSL dengan SCRAM-SHA-256:

CREATE TABLE KafkaTable (
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING,
  `ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
  'connector' = 'kafka',
  ...
  'properties.security.protocol' = 'SASL_SSL',
  /* Konfigurasi SSL */
  'properties.ssl.truststore.location' = '/flink/usrlib/kafka.client.truststore.jks',
  'properties.ssl.truststore.password' = '<truststore-password>',
  'properties.ssl.keystore.location' = '/flink/usrlib/kafka.client.keystore.jks',
  'properties.ssl.keystore.password' = '<keystore-password>',
  /* Konfigurasi SASL */
  'properties.sasl.mechanism' = 'SCRAM-SHA-256',
  'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.scram.ScramLoginModule required username=\"<username>\" password=\"<password>\";'
);
Cocokkan kelas modul login JAAS dengan mekanisme SASL Anda:

Unggah semua file yang diperlukan (sertifikat, keystores) melalui Additional Dependencies dalam penerapan. File yang diunggah disimpan di /flink/usrlib/. Untuk petunjuknya, lihat Menyebar pekerjaan.

Penting

Jika broker Kafka menggunakan SASL_SSL tetapi klien dikonfigurasi dengan SASL_PLAINTEXT, terjadi error OutOfMemory selama validasi SQL. Perbarui protokol keamanan klien agar sesuai dengan broker.

Menyelesaikan konflik nama field antara key dan value

Ketika pesan Kafka memiliki field dengan nama yang sama di key dan value (misalnya, id), gunakan properti key.fields-prefix untuk membedakannya:

CREATE TABLE kafka_table (
  key_id INT,
  value_id INT,
  name STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'test_topic',
  'properties.bootstrap.servers' = 'localhost:9092',
  'format' = 'json',
  'json.ignore-parse-errors' = 'true',
  'key.format' = 'json',
  'key.fields' = 'id',
  'value.format' = 'json',
  'value.fields' = 'id, name',
  'key.fields-prefix' = 'key_'
);

Hasil query:

key_id: 1,
value_id: 100,
name: flink

Awalan key_ ditambahkan ke nama field key, sehingga field key id menjadi key_id di tabel Flink, menghindari konflik dengan field value id (diakses sebagai value_id).

Latensi tak terduga 50 tahun saat membaca dari Kafka

Metrik currentEmitEventTimeLag menunjukkan latensi lebih dari 50 tahun ketika timestamp pada pesan Kafka adalah 0 atau null. Metrik ini dihitung sebagai current_time - message_timestamp, sehingga timestamp nol menghasilkan latensi sejak epoch Unix.

Langkah pemecahan masalah:

  1. Untuk penerapan JAR, verifikasi bahwa dependensi Kafka dalam file POM adalah dependensi bawaan Realtime Compute for Apache Flink. Dependensi Kafka pihak ketiga mungkin tidak melaporkan metrik latensi.

  2. Pastikan semua partisi topik Kafka hulu menerima data.

  3. Periksa apakah timestamp pesan adalah 0 atau null:

    • Untuk penerapan SQL, definisikan kolom metadata untuk mengekstrak timestamp pesan: ``sql CREATE TEMPORARY TABLE kafka_source ( timestamp BIGINT, ts_meta TIMESTAMP METADATA FROM 'timestamp', ts AS TO_TIMESTAMP( FROM_UNIXTIME(timestamp, 'yyyy-MM-dd HH:mm:ss') ), WATERMARK FOR ts AS ts - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = '<your-topic>', 'properties.bootstrap.servers' = '<broker-list>', 'properties.group.id' = '<group-id>', 'format' = 'json', 'scan.startup.mode' = 'latest-offset', 'json.fail-on-missing-field' = 'false', 'json.ignore-parse-errors' = 'true' ); ``

    • Untuk penerapan JAR, tulis program Java sederhana menggunakan KafkaConsumer untuk memeriksa timestamp pesan.

Perbaiki "tabel upsert-kafka memerlukan deklarasi kendala PRIMARY KEY"

Konektor Upsert Kafka memerlukan primary key untuk mempartisi event changelog (INSERT, UPDATE_AFTER, DELETE) agar pesan dengan key yang sama masuk ke partisi yang sama dan diproses secara berurutan.

Tambahkan deklarasi PRIMARY KEY ... NOT ENFORCED ke DDL tabel sink:

CREATE TABLE upsert_kafka_sink (
  id INT,
  name STRING,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  ...
);

DataHub

Lanjutkan penerapan setelah topik DataHub di-split atau diskala-masuk

Penerapan gagal dan tidak dapat dilanjutkan setelah topik DataHub yang dibacanya di-split atau diskala-masuk. Batalkan penerapan dan mulai lagi.

Apakah topik DataHub yang dikonsumsi dapat dihapus?

Tidak. Topik DataHub yang sedang dikonsumsi tidak dapat dihapus atau dibuat ulang.


MaxCompute

endPoint vs. tunnelEndpoint

Untuk detail endpoint, lihat Endpoint. Konfigurasi yang salah dalam Virtual Private Cloud (VPC) menyebabkan masalah berikut:

Parameter

Gejala

endPoint salah

Tugas berhenti pada progres 91%

tunnelEndpoint salah

Tugas gagal dijalankan

Cara tabel sumber MaxCompute membaca data

Baik tabel sumber MaxCompute penuh maupun inkremental membaca data melalui MaxCompute Tunnel. Kecepatan baca dibatasi oleh bandwidth Tunnel.

Data yang ditambahkan tidak dapat dibaca setelah penerapan dimulai

Setelah penerapan dimulai, data yang ditambahkan ke partisi atau tabel yang sedang dibaca tidak dapat dibaca dan mungkin memicu failover.

Tabel sumber MaxCompute penuh dan inkremental menggunakan ODPS DOWNLOAD SESSION untuk membaca data. Saat sesi unduh dibuat, MaxCompute menghasilkan file indeks berdasarkan data pada saat itu. Pembacaan selanjutnya menggunakan indeks ini. Data yang ditambahkan setelah sesi dibuat tidak termasuk.

Jika data baru ditulis setelah sesi dimulai:

  • Pembacaan Tunnel mungkin gagal dengan: ErrorCode=TableModified, ErrorMessage=The specified table has been modified since the download initiated.

  • Jika penerapan pulih dari failover, akurasi data tidak dapat dijamin: data yang ada mungkin dibaca ulang sedangkan data baru mungkin tidak lengkap.

Ubah parallelisme setelah menangguhkan penerapan MaxCompute

Untuk penerapan streaming dengan useNewApi=true (default), parallelisme dapat diubah setelah penangguhan. Data di partisi berikutnya didistribusikan menggunakan parallelisme baru, sedangkan partisi saat ini tetap menggunakan distribusi aslinya. Meningkatkan parallelisme saat membaca partisi besar berarti hanya beberapa operator yang memproses data partisi tersebut.

Perubahan parallelisme tidak didukung untuk penerapan batch atau penerapan dengan useNewApi=false.

Offset awal tidak berlaku untuk MaxCompute

Offset awal hanya berlaku untuk sumber antrian pesan (misalnya, DataHub), bukan MaxCompute. Setelah memulai penerapan:

  • Untuk tabel berpartisi, Flink membaca semua partisi yang ada.

  • Untuk tabel non-partisi, Flink membaca semua data yang ada.

Data partisi tidak lengkap di sumber MaxCompute inkremental

Tidak ada mekanisme yang memeriksa kelengkapan data partisi. Saat partisi baru terdeteksi, sumber segera mulai membaca.

Gunakan INSERT OVERWRITE agar partisi dan datanya tersedia secara bersamaan:

INSERT OVERWRITE TABLE T PARTITION (ds='20191010') ...
Penting

Jangan buat partisi terlebih dahulu lalu tulis data ke dalamnya. Sumber inkremental mulai membaca begitu mendeteksi partisi, sehingga berpotensi membaca data yang belum lengkap.

Otorisasi Gagal [4019] untuk MaxCompute

Identitas pengguna dalam DDL MaxCompute tidak memiliki izin akses. Lakukan autentikasi menggunakan akun Alibaba Cloud, pengguna RAM, atau peran RAM. Untuk detailnya, lihat Autentikasi pengguna.

Konfigurasi startPartition untuk sumber MaxCompute inkremental

Buat nilai startPartition sebagai berikut:

Langkah

Aksi

Contoh

1

Gabungkan setiap kolom kunci partisi dan nilainya dengan =.

dt=20220901

2

Urutkan berdasarkan level partisi (ascending) dan pisahkan dengan koma. Tanpa spasi.

dt=20220901,hh=08,mm=10

Tentukan semua level partisi atau hanya beberapa level pertama.

Perbandingan partisi: Sumber membandingkan semua partisi secara alfabetis terhadap startPartition dan membaca partisi dengan nilai yang sama atau lebih besar.

Contoh partisi:

  • ds=20191201,type=a

  • ds=20191201,type=b

  • ds=20191202,type=a

  • ds=20191202,type=b

  • ds=20191202,type=c

  • ds=20191203,type=a

startPartition value

Partisi yang dibaca

ds=20191202

ds=20191202,type=a, type=b, type=c, ds=20191203,type=a

ds=20191202,type=b

ds=20191202,type=b, type=c, ds=20191203,type=a

Partisi yang ditentukan oleh startPartition tidak perlu ada.

Sumber MaxCompute inkremental lambat memulai pembacaan

Terlalu banyak partisi yang cocok dengan kondisi startPartition, atau terlalu banyak file kecil di partisi tersebut, menunda startup karena sumber harus mengurutkan metadata partisi sebelum membaca.

  • Hindari membaca data historis berlebihan. Gunakan penerapan batch untuk pemrosesan data historis.

  • Kurangi jumlah file kecil di partisi historis.

Konfigurasi parameter partition untuk baca dan tulis

Baca dari partisi statis

Langkah

Action

Contoh

1

Gabungkan kunci partisi dan nilainya dengan =. Tabel dimensi memerlukan nilai tetap. Tabel sumber mendukung wildcard (*).

dt=20220901, dt=202209*, dt=2022*01, dt=*

2

Urutkan berdasarkan level partisi, pisahkan dengan koma.

dt=20220901,hh=08,mm=10

Untuk pemfilteran yang lebih fleksibel, gunakan klausa WHERE dengan pushdown partisi. Deklarasikan kolom partisi menggunakan PARTITIONED BY untuk mengaktifkan pengoptimal SQL:

CREATE TABLE maxcompute_table (
  content VARCHAR,
  dt VARCHAR,
  hh VARCHAR
) PARTITIONED BY (dt, hh) WITH (
  'connector' = 'odps',
  ...
);

SELECT content, dt, hh FROM maxcompute_table
WHERE dt >= '20220901' AND dt <= '20220903' AND hh >= '09' AND hh <= '17';

Baca partisi terbaru

Fungsi

Perilaku

max_pt()

Mengembalikan partisi yang berada di urutan pertama secara alfabetis (biasanya yang terbaru).

max_two_pt()

Mengembalikan dua partisi pertama secara alfabetis.

max_pt_with_done()

Mengembalikan partisi pertama yang memiliki partisi pasangan .done yang sesuai.

Untuk tabel sumber, max_pt() membaca data dari partisi yang cocok satu kali lalu berhenti. Fungsi ini tidak memantau partisi baru. Gunakan tabel sumber inkremental untuk pembacaan berkelanjutan. Untuk tabel dimensi, setiap refresh memeriksa partisi terbaru.

Gunakan max_pt_with_done() ketika partisi terbaru mungkin masih dalam proses pemuatan. Buat partisi .done kosong (misalnya, dt=20220901.done) setelah persiapan data selesai. Tabel dimensi hanya membaca dari partisi yang memiliki pasangan .done.

Tulis ke partisi statis

Konfigurasikan parameter partition dengan cara yang sama seperti untuk membaca, tetapi wildcard (*) tidak didukung di tabel sink.

Tulis ke partisi dinamis

Daftarkan nama kolom kunci partisi dalam urutan ascending berdasarkan level partisi, dipisahkan koma:

'partition' = 'dt,hh,mm'

max_pt() vs. max_pt_with_done()

Diberikan partisi-partisi berikut:

  • ds=20190101

  • ds=20190101.done

  • ds=20190102

  • ds=20190102.done

  • ds=20190103

Fungsi

Mengembalikan

max_pt()

ds=20190103 (yang pertama secara alfabetis)

max_pt_with_done()

ds=20190102 (yang pertama dengan pasangan .done)

Penerapan MaxCompute tetap dalam status starting atau generasi data lambat

Kemungkinan penyebab:

  • File kecil: Terlalu banyak file kecil di tabel sumber MaxCompute.

  • Akses lintas-wilayah: Kluster penyimpanan MaxCompute dan kluster komputasi Flink berada di wilayah berbeda, menyebabkan latensi jaringan. Sebarkan keduanya di wilayah yang sama.

  • Izin tidak valid: Tabel sumber MaxCompute memerlukan izin Download.

Pilih terowongan data: Batch Tunnel vs. Streaming Tunnel

Pertimbangan

Batch Tunnel

Streaming Tunnel

Konsistensi

at-least-once. Duplikasi hanya terjadi jika terjadi error checkpoint dan data ditulis ke beberapa partisi secara bersamaan.

at-least-once. Duplikasi mungkin terjadi pada pengecualian apa pun.

Throughput

Lebih rendah. Data di-commit selama checkpoint, membuat file di server.

Lebih tinggi. Tidak ada commit saat checkpoint. Dengan numFlushThreads > 1, flush data sambil menerima data hulu.

Jika checkpoint Batch Tunnel lambat atau timeout dan penyimpanan hilir mentoleransi duplikasi, beralihlah ke Streaming Tunnel.

Data duplikat ditulis ke MaxCompute

Periksa hal-hal berikut secara berurutan:

  1. Logika SQL: Tabel non-transaksional MaxCompute tidak menegakkan keunikan primary key, bahkan jika primary key dideklarasikan dalam DDL. Jika SQL Anda menghasilkan duplikat, data tersebut ditulis apa adanya.

  2. Beberapa penerapan: Beberapa penerapan yang menulis ke tabel MaxCompute yang sama menghasilkan baris duplikat.

  3. Batch Tunnel + kegagalan checkpoint: Jika penerapan gagal selama checkpointing, data yang telah di-commit mungkin ditulis ulang saat penerapan dilanjutkan dari checkpoint sebelumnya.

  4. Streaming Tunnel + failover: Data antar checkpoint di-commit segera. Setelah failover, data dari checkpoint terakhir hingga titik kegagalan mungkin ditulis ulang. Pertimbangkan beralih ke Batch Tunnel untuk deduplikasi yang lebih ketat.

  5. Batch Tunnel + batalkan/mulai ulang (VVR < vvr-6.0.7-flink-1.15): Data di-commit sebelum konektor dimatikan selama pembatalan (misalnya, oleh optimasi Autopilot). Tingkatkan ke VVR vvr-6.0.7-flink-1.15 atau versi setelahnya untuk memperbaiki masalah ini.

Error "Invalid partition spec" di sink MaxCompute

Nilai kunci partisi dalam data tidak valid. Nilai tidak valid meliputi string kosong, null, dan nilai yang mengandung =, ,, atau /. Periksa data untuk nilai-nilai tidak valid ini.

Error "No more available blockId" di sink MaxCompute

Jumlah blok yang ditulis melebihi batas, artinya setiap flush menulis data terlalu sedikit terlalu sering. Tingkatkan batchSize dan flushIntervalMs untuk mengurangi frekuensi flush.

Gunakan petunjuk SHUFFLE_HASH untuk tabel dimensi

Secara default, setiap subtask menyimpan cache seluruh tabel dimensi. Untuk tabel dimensi besar, gunakan SHUFFLE_HASH untuk mendistribusikan data ke subtask, mengurangi konsumsi memori heap JVM:

-- Tabel sumber dan tabel dimensi
CREATE TABLE source_table (k VARCHAR, v VARCHAR) WITH ( ... );
CREATE TABLE dim_1 (k VARCHAR, v VARCHAR) WITH ('connector' = 'odps', 'cache' = 'ALL', ... );
CREATE TABLE dim_2 (k VARCHAR, v VARCHAR) WITH ('connector' = 'odps', 'cache' = 'ALL', ... );
CREATE TABLE dim_3 (k VARCHAR, v VARCHAR) WITH ('connector' = 'odps', 'cache' = 'ALL', ... );

-- Terapkan SHUFFLE_HASH ke dim_1 dan dim_3; dim_2 tetap di-cache penuh per subtask
SELECT /*+ SHUFFLE_HASH(dim_1), SHUFFLE_HASH(dim_3) */
  k, s.v, d1.v, d2.v, d3.v
FROM source_table AS s
INNER JOIN dim_1 FOR SYSTEM_TIME AS OF PROCTIME() AS d1 ON s.k = d1.k
LEFT JOIN dim_2 FOR SYSTEM_TIME AS OF PROCTIME() AS d2 ON s.k = d2.k
LEFT JOIN dim_3 FOR SYSTEM_TIME AS OF PROCTIME() AS d3 ON s.k = d3.k;

Konfigurasi CacheReloadTimeBlackList

Parameter ini memblokir reload cache tabel dimensi selama periode waktu tertentu.

  • Tipe data: String

  • Format: start_time -> end_time, dipisahkan koma untuk beberapa periode.

  • Format waktu: YYYY-MM-DD HH:mm. Hilangkan tanggal untuk interval harian berulang.

Contoh:

'cacheReloadTimeBlackList' = '14:00 -> 15:00,23:00 -> 01:00'

Skenario

Nilai

Interval harian

14:00 -> 15:00

Beberapa interval harian

14:00 -> 15:00,23:00 -> 01:00

Interval tanggal tertentu

14:00 -> 15:00,23:00 -> 01:00,2025-10-01 22:00 -> 2025-10-01 23:00


MySQL / ApsaraDB RDS for MySQL

Error "SSL peer shut down incorrectly"

Ini terjadi ketika SSL diaktifkan di server MySQL tetapi klien tidak menggunakan SSL. Tambahkan characterEncoding=utf-8&useSSL=false ke URL JDBC:

'url' = 'jdbc:mysql://<host>:<port>/<database>?characterEncoding=utf-8&useSSL=false'

BIGINT UNSIGNED dikonversi ke DECIMAL, lalu ke TEXT di Hologres

Flink tidak mendukung BIGINT UNSIGNED, sehingga mengonversi tipe tersebut ke DECIMAL berdasarkan batasan rentang nilai. Saat menyinkronkan ke Hologres melalui CREATE TABLE AS, primary key selanjutnya dikonversi ke TEXT karena Hologres tidak mendukung DECIMAL sebagai primary key.

Sesuaikan tipe data primary key selama pengembangan. Untuk mempertahankan kolom DECIMAL, buat manual tabel Hologres dan konfigurasikan primary key yang berbeda atau hapus primary key sepenuhnya. Tanpa primary key yang tepat, tangani deduplikasi di tingkat aplikasi.

Perilaku Upsert vs. insert untuk tabel sink ApsaraDB RDS

DDL memiliki primary key?

Perilaku penulisan

Ya

INSERT ... ON DUPLICATE KEY UPDATE -- insert jika key baru, update jika sudah ada

Tidak

INSERT INTO -- selalu insert baris baru

GROUP BY dengan indeks unik di sink ApsaraDB RDS

  • Deklarasikan indeks unik dalam klausa GROUP BY.

  • Primary key auto-increment tidak dapat dideklarasikan sebagai primary key dalam SQL.

Pemetaan tipe INT UNSIGNED di Flink SQL

Driver JDBC MySQL mengonversi tipe data sebelum mencapai Flink:

Tipe fisik MySQL

Tipe driver JDBC

Tipe Flink SQL

INT UNSIGNED

LONG

BIGINT

BIGINT UNSIGNED

BIGINTEGER

DECIMAL(20, 0)

Error "Incorrect string value"

Data berisi karakter yang tidak dapat diurai oleh encoding database. Tambahkan characterEncoding=UTF-8 ke URL JDBC:

'url' = 'jdbc:mysql://<host>:<port>/<database>?characterEncoding=UTF-8'

Deadlock saat menulis ke MySQL melalui konektor ApsaraDB RDS atau TDDL

Kunci baris InnoDB beroperasi pada indeks, bukan catatan individual. Ketika beberapa transaksi mengakses rentang indeks yang tumpang tindih, deadlock dapat terjadi.

Contoh: Transaksi T1 memegang kunci rentang pada (-inf, 2] dan mencoba mengambil (-inf, 1]. Transaksi T2 sedang menunggu (-inf, 2]. Kedua transaksi saling menghalangi.

RDS/TDDL vs. Tablestore:

Penyimpanan

Granularitas kunci

Dampak

RDS/TDDL (InnoDB)

Kunci rentang indeks

Konflik memengaruhi seluruh rentang

Tablestore

Kunci baris tunggal

Tidak berdampak pada baris lain

Solusi: Untuk skenario QPS tinggi atau konkurensi tinggi, gunakan Tablestore sebagai tabel sink.

Jika database relasional diperlukan:

  • Isolasi penerapan dari operasi baca/tulis sistem lain.

  • Gunakan penulisan single-concurrent untuk volume data kecil.

  • Hindari kunci unik jika memungkinkan. Jika diperlukan, urutkan kunci unik berdasarkan diferensiasi (tertinggi terlebih dahulu).

  • Shard database dan tabel untuk menghindari bottleneck tabel tunggal.

Perubahan skema tidak disebarkan ke tabel hilir

Sinkronisasi skema dipicu oleh perbedaan skema antara catatan data berturut-turut, bukan hanya oleh pernyataan DDL. Jika tidak ada data yang ditulis setelah perubahan DDL, pembaruan skema hilir tidak dipicu. Untuk detailnya, lihat bagian "Kebijakan sinkronisasi perubahan skema tabel" di CREATE TABLE AS (CTAS).

"finish split response timeout" di sumber CDC MySQL

Pemanfaatan CPU tinggi mencegah sumber merespons permintaan RPC koordinator. Tingkatkan core CPU TaskManager pada tab Resources.

Perubahan skema selama pembacaan data penuh tabel CDC MySQL

Perubahan skema selama pembacaan data penuh dapat menyebabkan error atau mencegah sinkronisasi skema. Batalkan penerapan, hapus tabel hilir, dan mulai ulang tanpa state.

Perubahan skema tidak didukung selama sinkronisasi CTAS/CDAS

Batalkan penerapan, hapus tabel hilir, dan mulai ulang tanpa state. Hindari modifikasi skema yang tidak kompatibel. Untuk perubahan yang didukung, lihat Pernyataan CREATE TABLE AS.


ClickHouse

Retract data yang diperbarui dari tabel sink ClickHouse

Retraksi dimungkinkan ketika primary key dideklarasikan dan ignoreDelete diatur ke false, tetapi performa turun signifikan. ClickHouse adalah sistem OLAP; operasi ALTER TABLE UPDATE dan ALTER TABLE DELETE secara inheren lambat.

Visibilitas data di tabel sink ClickHouse

exactly-once diaktifkan?

Data menjadi terlihat ketika...

Tidak (default)

Buffer penulisan mencapai batchSize atau waktu tunggu melebihi flushIntervalMs

Ya

Checkpoint selesai


Print

Lihat output konektor print

Metode 1: Konsol Realtime Compute for Apache Flink

  1. Buka O&M > Deployments dan klik nama penerapan.

  2. Klik tab Logs.

  3. Pilih pekerjaan yang sedang berjalan dari daftar drop-down Job.

  4. Klik tab Running Task Managers, lalu klik nilai di kolom Path, ID.

  5. Klik tab Logs untuk melihat hasil print.

Metode 2: UI Flink

  1. Buka O&M > Deployments dan klik nama penerapan.

  2. Pada tab Status, klik Flink UI di kolom Actions.

  3. Di Apache Flink Dashboard, klik Task Managers.

  4. Klik nilai di kolom Path, ID.

  5. Klik tab Logs untuk melihat hasil print.


Tablestore

Join dimensi tidak mengembalikan data (Tablestore)

Verifikasi bahwa tipe dan nama kolom dalam DDL sesuai persis dengan skema tabel fisik.


ApsaraMQ for RocketMQ

"IllegalArgumentException: timeout value is negative"

Nilai default pullIntervalMs adalah -1. Ketika tidak ada pesan yang tiba untuk sementara waktu, thread konsumen tidur selama durasi ini, menyebabkan error. Atur pullIntervalMs ke nilai non-negatif (misalnya, 0).

Perilaku penemuan partisi

Versi VVR

Interval deteksi

Perilaku

Delay

Sebelum 6.0.2

5--10 menit

Memicu failover setelah 3 deteksi berturut-turut

10--30 menit

6.0.2 atau setelahnya

5 menit

Operator sumber membaca partisi baru secara langsung, tanpa failover

1--5 menit


Hologres

Error "BackPressure Exceed reject Limit" (Hologres)

Instans Hologres mengalami beban tulis tinggi. Hubungi dukungan teknis Hologres untuk meningkatkan instans.

Error "remaining connection slots" (Hologres)

Jumlah koneksi melebihi batas instans Hologres.

  1. Periksa app_name untuk setiap node frontend dan hitung koneksi flink-connector.

  2. Verifikasi apakah penerapan lain terhubung ke instans yang sama.

  3. Lepaskan koneksi idle. Untuk detailnya, lihat Mengelola koneksi.

"no table is defined in publication" setelah pembuatan ulang tabel (Hologres)

Publikasi yang terkait dengan tabel yang di-drop tidak dibersihkan.

  1. Query publikasi yang terlantar:

       SELECT * FROM pg_publication
       WHERE pubname NOT IN (SELECT pubname FROM pg_publication_tables);
  2. Hapus publikasi yang terlantar:

       DROP PUBLICATION <publication_name>;
  3. Mulai ulang penerapan.

Interval checkpoint dan visibilitas data Hologres

Interval checkpoint tidak secara langsung terkait dengan visibilitas data. Konektor Hologres secara berkala melakukan flush data ke Hologres terlepas dari checkpoint. Jika buffer memenuhi kondisi flush, data menjadi terlihat sebelum checkpoint berikutnya.

Namun, konektor tidak menyediakan konsistensi transaksional. Checkpoint memaksa flush untuk toleransi kesalahan dan pemulihan, tetapi data mungkin sudah sebagian terlihat di antara checkpoint.

"permission denied for database" (Hologres)

Mulai VVR 8.0.4, Flink menggunakan mode JDBC untuk mengonsumsi log biner dari Hologres V2.0 dan setelahnya, yang memerlukan izin khusus untuk akun non-superuser.

Model otorisasi PostgreSQL standar:

GRANT CREATE ON DATABASE <db_name> TO <user_name>;
ALTER ROLE <user_name> REPLICATION;

Model Izin Sederhana (SPM):

CALL spm_grant('<db_name>_admin', '<user_name>');
ALTER ROLE <user_name> REPLICATION;

Ganti <user_name> dengan ID akun Alibaba Cloud atau nama pengguna RAM Anda. Untuk detailnya, lihat Ikhtisar akun.

"table id parsed from checkpoint is different from the current table id"

Di VVR 8.0.5 hingga 8.0.8, Flink memverifikasi konsistensi ID tabel selama pemulihan checkpoint untuk pekerjaan log biner Hologres. Pengecualian ini menunjukkan tabel Hologres telah dibangun ulang (misalnya, melalui TRUNCATE).

Solusi:

  • Tingkatkan ke VVR 8.0.9 atau versi setelahnya untuk menonaktifkan verifikasi ID tabel.

  • Hindari membangun ulang tabel sumber. Membangun ulang menghapus log biner historis, dan membaca dari tabel baru dengan offset lama menyebabkan inkonsistensi data.

Ketidakcocokan presisi desimal dalam mode JDBC log biner Hologres

Di VVR 8.0.10 atau sebelumnya, ketidakcocokan presisi desimal antara tabel sumber Flink dan tabel Hologres menyebabkan error.

Solusi:

  • Tingkatkan ke VVR 8.0.11 atau versi setelahnya.

  • Pastikan presisi field desimal konsisten antara DDL Flink dan tabel Hologres untuk mencegah kehilangan data akibat pemotongan presisi.

"no table is defined in publication" atau "The table xxx has no slot named xxx" pada tabel yang dibuat ulang (Hologres)

Publikasi dari tabel yang di-drop tidak dihapus.

Solusi 1:

  1. Query publikasi yang terlantar:

       SELECT * FROM pg_publication
       WHERE pubname NOT IN (SELECT pubname FROM pg_publication_tables);
  2. Hapus publikasi:

       DROP PUBLICATION <publication_name>;
  3. Mulai ulang pekerjaan.

Solusi 2:

Tingkatkan ke VVR 8.0.5 atau versi setelahnya. Konektor secara otomatis membersihkan publikasi yang terlantar.


Simple Log Service

LogSizeTooLargeException (Simple Log Service)

Satu baris log melebihi 8 MB (MAX_BATCH_SIZE_IN_BYTES = 8388608). Ubah offset awal penerapan untuk melewati data yang terlalu besar. Untuk petunjuknya, lihat Memulai penerapan pekerjaan.

OOM saat memulihkan dari sumber Simple Log Service

Konektor Simple Log Service mengambil data dalam batch yang dikontrol oleh batchGetSize (default: 100). Setelah failover, backlog besar terakumulasi. Jika single_log_group_size * 100 melebihi memori heap JVM yang tersedia, terjadi error OOM.

Kurangi nilai batchGetSize.


Paimon

Tentukan offset konsumen untuk tabel sumber Paimon

Konfigurasikan scan.mode untuk mengontrol posisi awal:

Mode pemindaian

Batch Read

Baca streaming

default

Ditentukan oleh parameter lain: scan.timestamp-millis memicu perilaku from-timestamp; scan.snapshot-id memicu perilaku from-snapshot; jika tidak, berperilaku sebagai latest-full.

Logika yang sama seperti batch untuk snapshot awal, lalu pembacaan inkremental berkelanjutan.

latest-full

Membaca snapshot terbaru.

Membaca snapshot terbaru saat startup, lalu menghasilkan data inkremental.

compacted-full

Membaca snapshot setelah kompaksi penuh terbaru.

Membaca snapshot pasca-kompaksi saat startup, lalu menghasilkan data inkremental.

latest

Sama dengan latest-full.

Hanya menghasilkan data inkremental, tanpa snapshot awal.

from-timestamp

Membaca snapshot terbaru pada atau sebelum scan.timestamp-millis.

Menghasilkan data inkremental dari scan.timestamp-millis, tanpa snapshot awal.

from-snapshot

Membaca snapshot yang ditentukan oleh scan.snapshot-id.

Menghasilkan data inkremental dari scan.snapshot-id, tanpa snapshot awal.

from-snapshot-full

Sama dengan from-snapshot.

Membaca snapshot yang ditentukan saat startup, lalu menghasilkan data inkremental.

Konfigurasi kedaluwarsa partisi otomatis (Paimon)

Paimon dapat secara otomatis menghapus partisi yang kedaluwarsa berdasarkan waktu yang berlalu sejak timestamp partisi.

Parameter:

Parameter

Tujuan

Contoh

partition.timestamp-pattern

Mengonversi nilai partisi ke string waktu. Gunakan $column_name untuk setiap kunci partisi.

$year-$month-$day $hour:00:00

partition.timestamp-formatter

Pola untuk mengurai string waktu menjadi timestamp. Default-nya yyyy-MM-dd HH:mm:ss atau yyyy-MM-dd. Menerima pola DateTimeFormatter Java apa pun.

yyyy-MM-dd HH:mm:ss

partition.expiration-time

Usia maksimum sebelum partisi dihapus.

7d

Contoh: Untuk partisi year=2023,month=04,day=21,hour=17 dengan pola $year-$month-$day $hour:00:00, timestamp yang diselesaikan adalah 2023-04-21 17:00:00.

Timeout heartbeat TaskManager saat menulis ke Paimon

Ini biasanya menunjukkan memori heap TaskManager tidak mencukupi. Paimon menggunakan memori heap untuk:

  1. Buffer penulisan per tugas konkuren: Setiap operator penulis memiliki buffer berukuran write-buffer-size (default: 256 MB).

  2. Buffer format ORC: Mengonversi data dalam memori ke format kolom. Berukuran orc.write.batch-size (default: 1024 baris). Catatan besar memperbesar konsumsi memori (misalnya, field JSON 4 MB per baris menggunakan 4 MB x 1024 = 4 GB).

  3. Penulis per bucket: Setiap bucket yang dimodifikasi mendapatkan objek penulis khusus.

Solusi berdasarkan penyebab:

  • Nilai write-buffer-size besar: Kurangi nilainya. Nilai yang terlalu kecil menyebabkan penulisan disk dan kompaksi sering, menurunkan performa tulis.

  • Catatan individual besar: Kurangi orc.write.batch-size, atau beralih ke format Avro dengan statistik dinonaktifkan: > Catatan: file.format dan metadata.stats-mode hanya dapat diatur saat pembuatan tabel dan tidak dapat diubah nanti melalui ALTER TABLE atau petunjuk SQL.

      'file.format' = 'avro',
      'metadata.stats-mode' = 'none'
  • Terlalu banyak partisi/bucket: Verifikasi konfigurasi kunci partisi dan jumlah bucket. Targetkan 2–5 GB per bucket. Untuk penyesuaian bucket, lihat Tabel primary key dan tabel append-only.

"Sink materializer must not be used with Paimon sink"

Operator sink materializer (dirancang untuk memperbaiki data out-of-order dari JOIN bertingkat) menyebabkan hasil agregasi salah dan overhead tidak perlu dengan Paimon.

Nonaktifkan:

SET 'table.exec.sink.upsert-materialize' = 'false';

Untuk alternatif penanganan out-of-order, lihat Tabel primary key dan tabel append-only.

"File deletion conflicts detected" atau "LSM conflicts detected" (Paimon)

Kemungkinan penyebab:

  1. Beberapa penerapan menulis ke partisi yang sama: Mulai ulang penerapan yang gagal. Kejadian sesekali adalah normal.

  2. Dilanjutkan dari state lama: Error berulang terus-menerus. Lanjutkan dari state terbaru atau mulai ulang tanpa state.

  3. Beberapa pernyataan INSERT dalam satu penerapan: Paimon tidak mendukung ini. Gunakan UNION ALL untuk menggabungkan aliran data sebelum menulis.

  4. Paralelisme Global Committer atau Compaction Coordinator > 1: Keduanya harus diatur ke 1 untuk konsistensi data.

"File xxx not found" saat membaca dari sumber Paimon

File snapshot telah kedaluwarsa. Efisiensi konsumsi terlalu rendah atau waktu kedaluwarsa snapshot terlalu singkat.

Untuk memeriksa snapshot yang tersedia, lihat bagian "Tabel Snapshots" di Tabel sistem.

"No space left on device" (Paimon)

Untuk join lookup atau changelog-producer=lookup: Konfigurasikan parameter ini melalui petunjuk SQL:

Parameter

Tujuan

Nilai yang direkomendasikan

lookup.cache-max-disk-size

Batasi penggunaan disk untuk cache lookup

256 MB, 512 MB, atau 1 GB

lookup.cache-file-retention

Periode retensi file cache

15 menit atau 30 menit

Untuk penulisan umum: Konfigurasikan write-buffer-spill.max-disk-size melalui petunjuk SQL untuk membatasi ukuran file sementara.

Jumlah besar file Paimon di OSS

  1. Sesuaikan kebijakan retensi: Paimon menyimpan file historis untuk akses time-travel. Kurangi periode retensi. Untuk detailnya, lihat Membersihkan data kedaluwarsa.

  2. Tinjau konfigurasi partisi dan bucket: Targetkan 2–5 GB per bucket. Untuk panduan bucketing, lihat Tabel primary key dan tabel append-only.

  3. Aktifkan kompresi: Tambahkan 'file.compression' = 'zstd' saat pembuatan tabel untuk menggunakan kompresi Zstandard. > Catatan: file.compression hanya dapat diatur saat pembuatan tabel dan tidak dapat diubah nanti melalui ALTER TABLE atau petunjuk SQL.

Apakah interval checkpoint memengaruhi visibilitas data Paimon?

Ya. Paimon melakukan commit data dan membuatnya terlihat bagi konsumen hilir hanya pada checkpoint. Sebelum checkpoint, data di-flush ke sistem file remote tetapi sistem hilir tidak diberi tahu.

Memory leak di pekerjaan Paimon jangka panjang

Dua kemungkinan penyebab:

  1. Perilaku yang diharapkan: Penggunaan memori meningkat sebanding dengan peningkatan laju permintaan (RPS).

  2. Masalah diketahui: Terjadi memory leak di pekerjaan jangka panjang yang menggunakan katalog Paimon dengan tipe metastore filesystem. Untuk mengatasi ini saat membaca dari atau menulis ke OSS, konfigurasikan parameter ini:

    • fs.oss.endpoint

    • fs.oss.accessKeyId

    • fs.oss.accessKeySecret


Hudi

Tidak ada data di penyimpanan (Hudi)

Data di-flush ke penyimpanan ketika:

  • Bucket mencapai 64 MB di memori.

  • Total buffer mencapai 1 GB.

  • Checkpoint dipicu.

Untuk penulisan streaming, pastikan checkpointing diaktifkan.

Data duplikat (Hudi)

Dalam partisi (COW): Atur write.insert.drop.duplicates ke true untuk mengaktifkan deduplikasi. Untuk merge-on-read (MOR), deduplikasi otomatis ketika primary key didefinisikan.

Di Hudi 0.10.0 dan setelahnya, write.insert.drop.duplicates diganti nama menjadi write.precombined dan default-nya true.

Lintas partisi: Atur index.global.enabled ke true.

Data lama (melebihi TTL indeks): Tingkatkan index.state.ttl (satuan: hari, default: 1,5). Mengatur ke kurang dari 0 menyimpan indeks secara permanen.

Di Hudi 0.10.0 dan setelahnya, index.state.ttl default-nya 0 (permanen).

Hanya file log yang dihasilkan dalam mode MOR (Hudi)

Hudi menghasilkan file Parquet hanya setelah kompaksi. Dalam mode MOR, kompaksi asinkron berjalan setiap 5 commit secara default. Kurangi compaction.delta_commits untuk memicu kompaksi lebih cepat.


AnalyticDB for MySQL 3.0

Error "multi-statement be found"

Masalah kompatibilitas antara MySQL JDBC 8.x dan ALLOW_MULTI_QUERIES=true di AnalyticDB for MySQL.

Solusi:

  1. Hubungi dukungan teknis untuk konektor AnalyticDB for MySQL V3.0 kustom yang dibangun dengan MySQL JDBC 5.1.46. Untuk penggunaannya, lihat Mengelola konektor kustom.

  2. Tambahkan allowMultiQueries=true ke URL JDBC:

       jdbc:mysql://<host>.ads.aliyuncs.com:3306/<database>?allowMultiQueries=true

Konektor kustom

"No suitable driver found for..."

Konektor kustom tidak dapat menemukan driver JDBC-nya.

Solusi (pilih salah satu):