Jawaban atas pertanyaan umum mengenai konektor di Realtime Compute for Apache Flink, dikelompokkan berdasarkan jenis konektornya.
Indeks gejala cepat
Gunakan tabel ini untuk menemukan entri FAQ yang tepat ketika Anda mengetahui gejalanya tetapi tidak tahu konektor mana yang menyebabkannya.
Gejala | Entri terkait |
Tidak ada output data | Kafka: jendela event time tidak menghasilkan output / Hudi: Tidak ada data di penyimpanan / Tablestore: join dimensi tidak mengembalikan data |
Data duplikat | |
Out of memory (OOM) | Paimon: Timeout heartbeat TaskManager / Simple Log Service: OOM saat memulihkan |
Masalah checkpoint | Hologres: checkpoint vs. visibilitas data / Paimon: checkpoint dan visibilitas data |
Startup atau pembacaan lambat | MaxCompute: pekerjaan tetap dalam status starting/ MaxCompute: sumber inkremental lambat memulai pembacaan |
Kesalahan izin | MaxCompute: Authorization Failed 4019 / Hologres: izin ditolak untuk database |
Ruang disk | Paimon: tidak ada ruang tersisa di perangkat / Paimon: file besar di OSS |
Kesalahan koneksi | Kafka: terhubung tetapi tidak dapat membaca/menulis / Hologres: slot koneksi tersisa |
Kafka
Mengurai data JSON dari Kafka
Untuk JSON standar, gunakan format JSON dalam DDL Anda.
Untuk JSON bersarang, petakan objek JSON ke tipe ROW dan array JSON ke tipe ARRAY. Contoh dengan input berikut:
{
"a": "abc",
"b": 1,
"c": {
"e": ["1", "2", "3", "4"],
"f": {"m": "567"}
}
}DDL tabel sumber:
CREATE TEMPORARY TABLE kafka_table (
`a` VARCHAR,
b INT,
`c` ROW<e ARRAY<VARCHAR>, f ROW<m VARCHAR>> -- Objek JSON = ROW, array JSON = ARRAY
) WITH (
'connector' = 'kafka',
'topic' = '<your-topic>',
'properties.bootstrap.servers' = '<broker-list>',
'properties.group.id' = '<group-id>',
'format' = 'json',
'scan.startup.mode' = '<startup-mode>'
);DDL tabel sink:
CREATE TEMPORARY TABLE sink (
`a` VARCHAR,
b INT,
e VARCHAR,
`m` VARCHAR
) WITH (
'connector' = 'print',
'logger' = 'true'
);DML untuk mengekstrak field bersarang:
INSERT INTO sink
SELECT
`a`,
b,
c.e[1], -- Pengindeksan array dimulai dari 1 di Flink
c.f.m
FROM kafka_table;Terhubung ke Kafka tetapi tidak dapat membaca atau menulis data
Penyebab
Ketika proxy atau pemetaan port berada di antara Realtime Compute for Apache Flink dan Kafka, klien Kafka mengambil endpoint broker dari metadata Kafka, bukan menggunakan alamat proxy. Meskipun koneksi awal berhasil, operasi data selanjutnya gagal karena Flink mencoba mengakses broker secara langsung.
Pemecahan masalah
Hubungkan ke layanan ZooKeeper kluster Kafka Anda menggunakan
zkCli.shatauzookeeper-shell.sh.Jalankan
get /brokers/ids/0untuk mengambil metadata broker. Perhatikan fieldendpoints.Dari lingkungan Flink, jalankan
pingatautelnetterhadap endpoint tersebut. Jika pengujian konektivitas gagal, berarti proxy atau pemetaan port sedang digunakan.
Solusi
Buat koneksi jaringan langsung antara Flink dan Kafka, melewati proxy.
Atau, konfigurasikan
advertised.listenerspada broker Kafka agar menggunakan alamat proxy sehingga metadata broker mengembalikan alamat yang dapat dijangkau.
advertised.listeners tersedia di Kafka 0.10.2.0 dan versi setelahnya. Untuk detailnya, lihat KIP-103: Separation of Internal and External traffic dan Masalah koneksi jaringan Kafka.Mengapa jendela event time tidak menghasilkan output dari tabel sumber Kafka?
Partisi yang idle tanpa data masuk menghambat kemajuan watermark, sehingga mencegah pemicuan jendela event time.
Solusi
Pastikan semua partisi tabel sumber Kafka berisi data.
Aktifkan deteksi sumber idle dengan menambahkan parameter berikut ke field Other Configuration pada tab Configuration: Untuk detail parameter, lihat Konfigurasi Flink.
table.exec.source.idle-timeout: 5
Commit offset di Kafka
Realtime Compute for Apache Flink melakukan commit offset baca konsumen ke Kafka pada setiap checkpoint. Hal ini mencatat posisi data yang telah diproses dan mencegah duplikasi atau kehilangan data saat pemulihan.
Jika checkpointing dinonaktifkan atau interval checkpoint terlalu lama, offset mungkin tidak di-commit, sehingga menyebabkan duplikasi atau kehilangan data setelah kegagalan.
Mengurai JSON bersarang dengan UDTF
Ketika konektor Kafka membaca data seperti {"data":[{"cola":"test1","colb":"test2"}, ...]}, array bersarang diurai sebagai ARRAY<ROW<cola VARCHAR, colb VARCHAR>>. Proses elemen array tersebut menggunakan user-defined table-valued function (UDTF).
Terhubung ke kluster Kafka yang diamankan
Tambahkan properti keamanan ke klausa WITH DDL tabel Kafka Anda. Tambahkan awalan properties. pada setiap properti keamanan Kafka.
Modul login JAAS menggunakan path kelas yang di-shade (org.apache.flink.kafka.shaded.org.apache.kafka...) di Realtime Compute for Apache Flink, yang berbeda dari path kelas Apache Kafka standar.
Contoh SASL/PLAIN:
CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
...
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.sasl.mechanism' = 'PLAIN',
'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<username>\" password=\"<password>\";'
);Contoh SASL_SSL dengan SCRAM-SHA-256:
CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
...
'properties.security.protocol' = 'SASL_SSL',
/* Konfigurasi SSL */
'properties.ssl.truststore.location' = '/flink/usrlib/kafka.client.truststore.jks',
'properties.ssl.truststore.password' = '<truststore-password>',
'properties.ssl.keystore.location' = '/flink/usrlib/kafka.client.keystore.jks',
'properties.ssl.keystore.password' = '<keystore-password>',
/* Konfigurasi SASL */
'properties.sasl.mechanism' = 'SCRAM-SHA-256',
'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.scram.ScramLoginModule required username=\"<username>\" password=\"<password>\";'
);Cocokkan kelas modul login JAAS dengan mekanisme SASL Anda:
Unggah semua file yang diperlukan (sertifikat, keystores) melalui Additional Dependencies dalam penerapan. File yang diunggah disimpan di /flink/usrlib/. Untuk petunjuknya, lihat Menyebar pekerjaan.
Jika broker Kafka menggunakan SASL_SSL tetapi klien dikonfigurasi dengan SASL_PLAINTEXT, terjadi error OutOfMemory selama validasi SQL. Perbarui protokol keamanan klien agar sesuai dengan broker.
Menyelesaikan konflik nama field antara key dan value
Ketika pesan Kafka memiliki field dengan nama yang sama di key dan value (misalnya, id), gunakan properti key.fields-prefix untuk membedakannya:
CREATE TABLE kafka_table (
key_id INT,
value_id INT,
name STRING
) WITH (
'connector' = 'kafka',
'topic' = 'test_topic',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json',
'json.ignore-parse-errors' = 'true',
'key.format' = 'json',
'key.fields' = 'id',
'value.format' = 'json',
'value.fields' = 'id, name',
'key.fields-prefix' = 'key_'
);Hasil query:
key_id: 1,
value_id: 100,
name: flinkAwalan key_ ditambahkan ke nama field key, sehingga field key id menjadi key_id di tabel Flink, menghindari konflik dengan field value id (diakses sebagai value_id).
Latensi tak terduga 50 tahun saat membaca dari Kafka
Metrik currentEmitEventTimeLag menunjukkan latensi lebih dari 50 tahun ketika timestamp pada pesan Kafka adalah 0 atau null. Metrik ini dihitung sebagai current_time - message_timestamp, sehingga timestamp nol menghasilkan latensi sejak epoch Unix.
Langkah pemecahan masalah:
Untuk penerapan JAR, verifikasi bahwa dependensi Kafka dalam file POM adalah dependensi bawaan Realtime Compute for Apache Flink. Dependensi Kafka pihak ketiga mungkin tidak melaporkan metrik latensi.
Pastikan semua partisi topik Kafka hulu menerima data.
Periksa apakah timestamp pesan adalah
0ataunull:Untuk penerapan SQL, definisikan kolom metadata untuk mengekstrak timestamp pesan: ``
sql CREATE TEMPORARY TABLE kafka_source (timestampBIGINT,ts_metaTIMESTAMP METADATA FROM 'timestamp', ts AS TO_TIMESTAMP( FROM_UNIXTIME(timestamp, 'yyyy-MM-dd HH:mm:ss') ), WATERMARK FOR ts AS ts - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = '<your-topic>', 'properties.bootstrap.servers' = '<broker-list>', 'properties.group.id' = '<group-id>', 'format' = 'json', 'scan.startup.mode' = 'latest-offset', 'json.fail-on-missing-field' = 'false', 'json.ignore-parse-errors' = 'true' );``Untuk penerapan JAR, tulis program Java sederhana menggunakan
KafkaConsumeruntuk memeriksa timestamp pesan.
Perbaiki "tabel upsert-kafka memerlukan deklarasi kendala PRIMARY KEY"
Konektor Upsert Kafka memerlukan primary key untuk mempartisi event changelog (INSERT, UPDATE_AFTER, DELETE) agar pesan dengan key yang sama masuk ke partisi yang sama dan diproses secara berurutan.
Tambahkan deklarasi PRIMARY KEY ... NOT ENFORCED ke DDL tabel sink:
CREATE TABLE upsert_kafka_sink (
id INT,
name STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
...
);DataHub
Lanjutkan penerapan setelah topik DataHub di-split atau diskala-masuk
Penerapan gagal dan tidak dapat dilanjutkan setelah topik DataHub yang dibacanya di-split atau diskala-masuk. Batalkan penerapan dan mulai lagi.
Apakah topik DataHub yang dikonsumsi dapat dihapus?
Tidak. Topik DataHub yang sedang dikonsumsi tidak dapat dihapus atau dibuat ulang.
MaxCompute
endPoint vs. tunnelEndpoint
Untuk detail endpoint, lihat Endpoint. Konfigurasi yang salah dalam Virtual Private Cloud (VPC) menyebabkan masalah berikut:
Parameter | Gejala |
| Tugas berhenti pada progres 91% |
| Tugas gagal dijalankan |
Cara tabel sumber MaxCompute membaca data
Baik tabel sumber MaxCompute penuh maupun inkremental membaca data melalui MaxCompute Tunnel. Kecepatan baca dibatasi oleh bandwidth Tunnel.
Data yang ditambahkan tidak dapat dibaca setelah penerapan dimulai
Setelah penerapan dimulai, data yang ditambahkan ke partisi atau tabel yang sedang dibaca tidak dapat dibaca dan mungkin memicu failover.
Tabel sumber MaxCompute penuh dan inkremental menggunakan ODPS DOWNLOAD SESSION untuk membaca data. Saat sesi unduh dibuat, MaxCompute menghasilkan file indeks berdasarkan data pada saat itu. Pembacaan selanjutnya menggunakan indeks ini. Data yang ditambahkan setelah sesi dibuat tidak termasuk.
Jika data baru ditulis setelah sesi dimulai:
Pembacaan Tunnel mungkin gagal dengan:
ErrorCode=TableModified, ErrorMessage=The specified table has been modified since the download initiated.Jika penerapan pulih dari failover, akurasi data tidak dapat dijamin: data yang ada mungkin dibaca ulang sedangkan data baru mungkin tidak lengkap.
Ubah parallelisme setelah menangguhkan penerapan MaxCompute
Untuk penerapan streaming dengan useNewApi=true (default), parallelisme dapat diubah setelah penangguhan. Data di partisi berikutnya didistribusikan menggunakan parallelisme baru, sedangkan partisi saat ini tetap menggunakan distribusi aslinya. Meningkatkan parallelisme saat membaca partisi besar berarti hanya beberapa operator yang memproses data partisi tersebut.
Perubahan parallelisme tidak didukung untuk penerapan batch atau penerapan dengan useNewApi=false.
Offset awal tidak berlaku untuk MaxCompute
Offset awal hanya berlaku untuk sumber antrian pesan (misalnya, DataHub), bukan MaxCompute. Setelah memulai penerapan:
Untuk tabel berpartisi, Flink membaca semua partisi yang ada.
Untuk tabel non-partisi, Flink membaca semua data yang ada.
Data partisi tidak lengkap di sumber MaxCompute inkremental
Tidak ada mekanisme yang memeriksa kelengkapan data partisi. Saat partisi baru terdeteksi, sumber segera mulai membaca.
Gunakan INSERT OVERWRITE agar partisi dan datanya tersedia secara bersamaan:
INSERT OVERWRITE TABLE T PARTITION (ds='20191010') ...Jangan buat partisi terlebih dahulu lalu tulis data ke dalamnya. Sumber inkremental mulai membaca begitu mendeteksi partisi, sehingga berpotensi membaca data yang belum lengkap.
Otorisasi Gagal [4019] untuk MaxCompute
Identitas pengguna dalam DDL MaxCompute tidak memiliki izin akses. Lakukan autentikasi menggunakan akun Alibaba Cloud, pengguna RAM, atau peran RAM. Untuk detailnya, lihat Autentikasi pengguna.
Konfigurasi startPartition untuk sumber MaxCompute inkremental
Buat nilai startPartition sebagai berikut:
Langkah | Aksi | Contoh |
1 | Gabungkan setiap kolom kunci partisi dan nilainya dengan |
|
2 | Urutkan berdasarkan level partisi (ascending) dan pisahkan dengan koma. Tanpa spasi. |
|
Tentukan semua level partisi atau hanya beberapa level pertama.
Perbandingan partisi: Sumber membandingkan semua partisi secara alfabetis terhadap startPartition dan membaca partisi dengan nilai yang sama atau lebih besar.
Contoh partisi:
ds=20191201,type=ads=20191201,type=bds=20191202,type=ads=20191202,type=bds=20191202,type=cds=20191203,type=a
| Partisi yang dibaca |
|
|
|
|
Partisi yang ditentukan oleh startPartition tidak perlu ada.Sumber MaxCompute inkremental lambat memulai pembacaan
Terlalu banyak partisi yang cocok dengan kondisi startPartition, atau terlalu banyak file kecil di partisi tersebut, menunda startup karena sumber harus mengurutkan metadata partisi sebelum membaca.
Hindari membaca data historis berlebihan. Gunakan penerapan batch untuk pemrosesan data historis.
Kurangi jumlah file kecil di partisi historis.
Konfigurasi parameter partition untuk baca dan tulis
Baca dari partisi statis
Langkah | Action | Contoh |
1 | Gabungkan kunci partisi dan nilainya dengan |
|
2 | Urutkan berdasarkan level partisi, pisahkan dengan koma. |
|
Untuk pemfilteran yang lebih fleksibel, gunakan klausa WHERE dengan pushdown partisi. Deklarasikan kolom partisi menggunakan PARTITIONED BY untuk mengaktifkan pengoptimal SQL:
CREATE TABLE maxcompute_table (
content VARCHAR,
dt VARCHAR,
hh VARCHAR
) PARTITIONED BY (dt, hh) WITH (
'connector' = 'odps',
...
);
SELECT content, dt, hh FROM maxcompute_table
WHERE dt >= '20220901' AND dt <= '20220903' AND hh >= '09' AND hh <= '17';Baca partisi terbaru
Fungsi | Perilaku |
| Mengembalikan partisi yang berada di urutan pertama secara alfabetis (biasanya yang terbaru). |
| Mengembalikan dua partisi pertama secara alfabetis. |
| Mengembalikan partisi pertama yang memiliki partisi pasangan |
Untuk tabel sumber, max_pt() membaca data dari partisi yang cocok satu kali lalu berhenti. Fungsi ini tidak memantau partisi baru. Gunakan tabel sumber inkremental untuk pembacaan berkelanjutan. Untuk tabel dimensi, setiap refresh memeriksa partisi terbaru.Gunakan max_pt_with_done() ketika partisi terbaru mungkin masih dalam proses pemuatan. Buat partisi .done kosong (misalnya, dt=20220901.done) setelah persiapan data selesai. Tabel dimensi hanya membaca dari partisi yang memiliki pasangan .done.
Tulis ke partisi statis
Konfigurasikan parameter partition dengan cara yang sama seperti untuk membaca, tetapi wildcard (*) tidak didukung di tabel sink.
Tulis ke partisi dinamis
Daftarkan nama kolom kunci partisi dalam urutan ascending berdasarkan level partisi, dipisahkan koma:
'partition' = 'dt,hh,mm'max_pt() vs. max_pt_with_done()
Diberikan partisi-partisi berikut:
ds=20190101ds=20190101.doneds=20190102ds=20190102.doneds=20190103
Fungsi | Mengembalikan |
|
|
|
|
Penerapan MaxCompute tetap dalam status starting atau generasi data lambat
Kemungkinan penyebab:
File kecil: Terlalu banyak file kecil di tabel sumber MaxCompute.
Akses lintas-wilayah: Kluster penyimpanan MaxCompute dan kluster komputasi Flink berada di wilayah berbeda, menyebabkan latensi jaringan. Sebarkan keduanya di wilayah yang sama.
Izin tidak valid: Tabel sumber MaxCompute memerlukan izin Download.
Pilih terowongan data: Batch Tunnel vs. Streaming Tunnel
Pertimbangan | Batch Tunnel | Streaming Tunnel |
Konsistensi | at-least-once. Duplikasi hanya terjadi jika terjadi error checkpoint dan data ditulis ke beberapa partisi secara bersamaan. | at-least-once. Duplikasi mungkin terjadi pada pengecualian apa pun. |
Throughput | Lebih rendah. Data di-commit selama checkpoint, membuat file di server. | Lebih tinggi. Tidak ada commit saat checkpoint. Dengan |
Jika checkpoint Batch Tunnel lambat atau timeout dan penyimpanan hilir mentoleransi duplikasi, beralihlah ke Streaming Tunnel.
Data duplikat ditulis ke MaxCompute
Periksa hal-hal berikut secara berurutan:
Logika SQL: Tabel non-transaksional MaxCompute tidak menegakkan keunikan primary key, bahkan jika primary key dideklarasikan dalam DDL. Jika SQL Anda menghasilkan duplikat, data tersebut ditulis apa adanya.
Beberapa penerapan: Beberapa penerapan yang menulis ke tabel MaxCompute yang sama menghasilkan baris duplikat.
Batch Tunnel + kegagalan checkpoint: Jika penerapan gagal selama checkpointing, data yang telah di-commit mungkin ditulis ulang saat penerapan dilanjutkan dari checkpoint sebelumnya.
Streaming Tunnel + failover: Data antar checkpoint di-commit segera. Setelah failover, data dari checkpoint terakhir hingga titik kegagalan mungkin ditulis ulang. Pertimbangkan beralih ke Batch Tunnel untuk deduplikasi yang lebih ketat.
Batch Tunnel + batalkan/mulai ulang (VVR < vvr-6.0.7-flink-1.15): Data di-commit sebelum konektor dimatikan selama pembatalan (misalnya, oleh optimasi Autopilot). Tingkatkan ke VVR vvr-6.0.7-flink-1.15 atau versi setelahnya untuk memperbaiki masalah ini.
Error "Invalid partition spec" di sink MaxCompute
Nilai kunci partisi dalam data tidak valid. Nilai tidak valid meliputi string kosong, null, dan nilai yang mengandung =, ,, atau /. Periksa data untuk nilai-nilai tidak valid ini.
Error "No more available blockId" di sink MaxCompute
Jumlah blok yang ditulis melebihi batas, artinya setiap flush menulis data terlalu sedikit terlalu sering. Tingkatkan batchSize dan flushIntervalMs untuk mengurangi frekuensi flush.
Gunakan petunjuk SHUFFLE_HASH untuk tabel dimensi
Secara default, setiap subtask menyimpan cache seluruh tabel dimensi. Untuk tabel dimensi besar, gunakan SHUFFLE_HASH untuk mendistribusikan data ke subtask, mengurangi konsumsi memori heap JVM:
-- Tabel sumber dan tabel dimensi
CREATE TABLE source_table (k VARCHAR, v VARCHAR) WITH ( ... );
CREATE TABLE dim_1 (k VARCHAR, v VARCHAR) WITH ('connector' = 'odps', 'cache' = 'ALL', ... );
CREATE TABLE dim_2 (k VARCHAR, v VARCHAR) WITH ('connector' = 'odps', 'cache' = 'ALL', ... );
CREATE TABLE dim_3 (k VARCHAR, v VARCHAR) WITH ('connector' = 'odps', 'cache' = 'ALL', ... );
-- Terapkan SHUFFLE_HASH ke dim_1 dan dim_3; dim_2 tetap di-cache penuh per subtask
SELECT /*+ SHUFFLE_HASH(dim_1), SHUFFLE_HASH(dim_3) */
k, s.v, d1.v, d2.v, d3.v
FROM source_table AS s
INNER JOIN dim_1 FOR SYSTEM_TIME AS OF PROCTIME() AS d1 ON s.k = d1.k
LEFT JOIN dim_2 FOR SYSTEM_TIME AS OF PROCTIME() AS d2 ON s.k = d2.k
LEFT JOIN dim_3 FOR SYSTEM_TIME AS OF PROCTIME() AS d3 ON s.k = d3.k;Konfigurasi CacheReloadTimeBlackList
Parameter ini memblokir reload cache tabel dimensi selama periode waktu tertentu.
Tipe data: String
Format:
start_time -> end_time, dipisahkan koma untuk beberapa periode.Format waktu:
YYYY-MM-DD HH:mm. Hilangkan tanggal untuk interval harian berulang.
Contoh:
'cacheReloadTimeBlackList' = '14:00 -> 15:00,23:00 -> 01:00'Skenario | Nilai |
Interval harian |
|
Beberapa interval harian |
|
Interval tanggal tertentu |
|
MySQL / ApsaraDB RDS for MySQL
Error "SSL peer shut down incorrectly"
Ini terjadi ketika SSL diaktifkan di server MySQL tetapi klien tidak menggunakan SSL. Tambahkan characterEncoding=utf-8&useSSL=false ke URL JDBC:
'url' = 'jdbc:mysql://<host>:<port>/<database>?characterEncoding=utf-8&useSSL=false'BIGINT UNSIGNED dikonversi ke DECIMAL, lalu ke TEXT di Hologres
Flink tidak mendukung BIGINT UNSIGNED, sehingga mengonversi tipe tersebut ke DECIMAL berdasarkan batasan rentang nilai. Saat menyinkronkan ke Hologres melalui CREATE TABLE AS, primary key selanjutnya dikonversi ke TEXT karena Hologres tidak mendukung DECIMAL sebagai primary key.
Sesuaikan tipe data primary key selama pengembangan. Untuk mempertahankan kolom DECIMAL, buat manual tabel Hologres dan konfigurasikan primary key yang berbeda atau hapus primary key sepenuhnya. Tanpa primary key yang tepat, tangani deduplikasi di tingkat aplikasi.
Perilaku Upsert vs. insert untuk tabel sink ApsaraDB RDS
DDL memiliki primary key? | Perilaku penulisan |
Ya |
|
Tidak |
|
GROUP BY dengan indeks unik di sink ApsaraDB RDS
Deklarasikan indeks unik dalam klausa
GROUP BY.Primary key auto-increment tidak dapat dideklarasikan sebagai primary key dalam SQL.
Pemetaan tipe INT UNSIGNED di Flink SQL
Driver JDBC MySQL mengonversi tipe data sebelum mencapai Flink:
Tipe fisik MySQL | Tipe driver JDBC | Tipe Flink SQL |
|
|
|
|
|
|
Error "Incorrect string value"
Data berisi karakter yang tidak dapat diurai oleh encoding database. Tambahkan characterEncoding=UTF-8 ke URL JDBC:
'url' = 'jdbc:mysql://<host>:<port>/<database>?characterEncoding=UTF-8'Deadlock saat menulis ke MySQL melalui konektor ApsaraDB RDS atau TDDL
Kunci baris InnoDB beroperasi pada indeks, bukan catatan individual. Ketika beberapa transaksi mengakses rentang indeks yang tumpang tindih, deadlock dapat terjadi.
Contoh: Transaksi T1 memegang kunci rentang pada (-inf, 2] dan mencoba mengambil (-inf, 1]. Transaksi T2 sedang menunggu (-inf, 2]. Kedua transaksi saling menghalangi.
RDS/TDDL vs. Tablestore:
Penyimpanan | Granularitas kunci | Dampak |
RDS/TDDL (InnoDB) | Kunci rentang indeks | Konflik memengaruhi seluruh rentang |
Tablestore | Kunci baris tunggal | Tidak berdampak pada baris lain |
Solusi: Untuk skenario QPS tinggi atau konkurensi tinggi, gunakan Tablestore sebagai tabel sink.
Jika database relasional diperlukan:
Isolasi penerapan dari operasi baca/tulis sistem lain.
Gunakan penulisan single-concurrent untuk volume data kecil.
Hindari kunci unik jika memungkinkan. Jika diperlukan, urutkan kunci unik berdasarkan diferensiasi (tertinggi terlebih dahulu).
Shard database dan tabel untuk menghindari bottleneck tabel tunggal.
Perubahan skema tidak disebarkan ke tabel hilir
Sinkronisasi skema dipicu oleh perbedaan skema antara catatan data berturut-turut, bukan hanya oleh pernyataan DDL. Jika tidak ada data yang ditulis setelah perubahan DDL, pembaruan skema hilir tidak dipicu. Untuk detailnya, lihat bagian "Kebijakan sinkronisasi perubahan skema tabel" di CREATE TABLE AS (CTAS).
"finish split response timeout" di sumber CDC MySQL
Pemanfaatan CPU tinggi mencegah sumber merespons permintaan RPC koordinator. Tingkatkan core CPU TaskManager pada tab Resources.
Perubahan skema selama pembacaan data penuh tabel CDC MySQL
Perubahan skema selama pembacaan data penuh dapat menyebabkan error atau mencegah sinkronisasi skema. Batalkan penerapan, hapus tabel hilir, dan mulai ulang tanpa state.
Perubahan skema tidak didukung selama sinkronisasi CTAS/CDAS
Batalkan penerapan, hapus tabel hilir, dan mulai ulang tanpa state. Hindari modifikasi skema yang tidak kompatibel. Untuk perubahan yang didukung, lihat Pernyataan CREATE TABLE AS.
ClickHouse
Retract data yang diperbarui dari tabel sink ClickHouse
Retraksi dimungkinkan ketika primary key dideklarasikan dan ignoreDelete diatur ke false, tetapi performa turun signifikan. ClickHouse adalah sistem OLAP; operasi ALTER TABLE UPDATE dan ALTER TABLE DELETE secara inheren lambat.
Visibilitas data di tabel sink ClickHouse
exactly-once diaktifkan? | Data menjadi terlihat ketika... |
Tidak (default) | Buffer penulisan mencapai |
Ya | Checkpoint selesai |
Lihat output konektor print
Metode 1: Konsol Realtime Compute for Apache Flink
Buka O&M > Deployments dan klik nama penerapan.
Klik tab Logs.
Pilih pekerjaan yang sedang berjalan dari daftar drop-down Job.
Klik tab Running Task Managers, lalu klik nilai di kolom Path, ID.
Klik tab Logs untuk melihat hasil print.
Metode 2: UI Flink
Buka O&M > Deployments dan klik nama penerapan.
Pada tab Status, klik Flink UI di kolom Actions.
Di Apache Flink Dashboard, klik Task Managers.
Klik nilai di kolom Path, ID.
Klik tab Logs untuk melihat hasil print.
Tablestore
Join dimensi tidak mengembalikan data (Tablestore)
Verifikasi bahwa tipe dan nama kolom dalam DDL sesuai persis dengan skema tabel fisik.
ApsaraMQ for RocketMQ
"IllegalArgumentException: timeout value is negative"
Nilai default pullIntervalMs adalah -1. Ketika tidak ada pesan yang tiba untuk sementara waktu, thread konsumen tidur selama durasi ini, menyebabkan error. Atur pullIntervalMs ke nilai non-negatif (misalnya, 0).
Perilaku penemuan partisi
Versi VVR | Interval deteksi | Perilaku | Delay |
Sebelum 6.0.2 | 5--10 menit | Memicu failover setelah 3 deteksi berturut-turut | 10--30 menit |
6.0.2 atau setelahnya | 5 menit | Operator sumber membaca partisi baru secara langsung, tanpa failover | 1--5 menit |
Hologres
Error "BackPressure Exceed reject Limit" (Hologres)
Instans Hologres mengalami beban tulis tinggi. Hubungi dukungan teknis Hologres untuk meningkatkan instans.
Error "remaining connection slots" (Hologres)
Jumlah koneksi melebihi batas instans Hologres.
Periksa
app_nameuntuk setiap node frontend dan hitung koneksi flink-connector.Verifikasi apakah penerapan lain terhubung ke instans yang sama.
Lepaskan koneksi idle. Untuk detailnya, lihat Mengelola koneksi.
"no table is defined in publication" setelah pembuatan ulang tabel (Hologres)
Publikasi yang terkait dengan tabel yang di-drop tidak dibersihkan.
Query publikasi yang terlantar:
SELECT * FROM pg_publication WHERE pubname NOT IN (SELECT pubname FROM pg_publication_tables);Hapus publikasi yang terlantar:
DROP PUBLICATION <publication_name>;Mulai ulang penerapan.
Interval checkpoint dan visibilitas data Hologres
Interval checkpoint tidak secara langsung terkait dengan visibilitas data. Konektor Hologres secara berkala melakukan flush data ke Hologres terlepas dari checkpoint. Jika buffer memenuhi kondisi flush, data menjadi terlihat sebelum checkpoint berikutnya.
Namun, konektor tidak menyediakan konsistensi transaksional. Checkpoint memaksa flush untuk toleransi kesalahan dan pemulihan, tetapi data mungkin sudah sebagian terlihat di antara checkpoint.
"permission denied for database" (Hologres)
Mulai VVR 8.0.4, Flink menggunakan mode JDBC untuk mengonsumsi log biner dari Hologres V2.0 dan setelahnya, yang memerlukan izin khusus untuk akun non-superuser.
Model otorisasi PostgreSQL standar:
GRANT CREATE ON DATABASE <db_name> TO <user_name>;
ALTER ROLE <user_name> REPLICATION;Model Izin Sederhana (SPM):
CALL spm_grant('<db_name>_admin', '<user_name>');
ALTER ROLE <user_name> REPLICATION;Ganti <user_name> dengan ID akun Alibaba Cloud atau nama pengguna RAM Anda. Untuk detailnya, lihat Ikhtisar akun.
"table id parsed from checkpoint is different from the current table id"
Di VVR 8.0.5 hingga 8.0.8, Flink memverifikasi konsistensi ID tabel selama pemulihan checkpoint untuk pekerjaan log biner Hologres. Pengecualian ini menunjukkan tabel Hologres telah dibangun ulang (misalnya, melalui TRUNCATE).
Solusi:
Tingkatkan ke VVR 8.0.9 atau versi setelahnya untuk menonaktifkan verifikasi ID tabel.
Hindari membangun ulang tabel sumber. Membangun ulang menghapus log biner historis, dan membaca dari tabel baru dengan offset lama menyebabkan inkonsistensi data.
Ketidakcocokan presisi desimal dalam mode JDBC log biner Hologres
Di VVR 8.0.10 atau sebelumnya, ketidakcocokan presisi desimal antara tabel sumber Flink dan tabel Hologres menyebabkan error.
Solusi:
Tingkatkan ke VVR 8.0.11 atau versi setelahnya.
Pastikan presisi field desimal konsisten antara DDL Flink dan tabel Hologres untuk mencegah kehilangan data akibat pemotongan presisi.
"no table is defined in publication" atau "The table xxx has no slot named xxx" pada tabel yang dibuat ulang (Hologres)
Publikasi dari tabel yang di-drop tidak dihapus.
Solusi 1:
Query publikasi yang terlantar:
SELECT * FROM pg_publication WHERE pubname NOT IN (SELECT pubname FROM pg_publication_tables);Hapus publikasi:
DROP PUBLICATION <publication_name>;Mulai ulang pekerjaan.
Solusi 2:
Tingkatkan ke VVR 8.0.5 atau versi setelahnya. Konektor secara otomatis membersihkan publikasi yang terlantar.
Simple Log Service
LogSizeTooLargeException (Simple Log Service)
Satu baris log melebihi 8 MB (MAX_BATCH_SIZE_IN_BYTES = 8388608). Ubah offset awal penerapan untuk melewati data yang terlalu besar. Untuk petunjuknya, lihat Memulai penerapan pekerjaan.
OOM saat memulihkan dari sumber Simple Log Service
Konektor Simple Log Service mengambil data dalam batch yang dikontrol oleh batchGetSize (default: 100). Setelah failover, backlog besar terakumulasi. Jika single_log_group_size * 100 melebihi memori heap JVM yang tersedia, terjadi error OOM.
Kurangi nilai batchGetSize.
Paimon
Tentukan offset konsumen untuk tabel sumber Paimon
Konfigurasikan scan.mode untuk mengontrol posisi awal:
Mode pemindaian | Batch Read | Baca streaming |
| Ditentukan oleh parameter lain: | Logika yang sama seperti batch untuk snapshot awal, lalu pembacaan inkremental berkelanjutan. |
| Membaca snapshot terbaru. | Membaca snapshot terbaru saat startup, lalu menghasilkan data inkremental. |
| Membaca snapshot setelah kompaksi penuh terbaru. | Membaca snapshot pasca-kompaksi saat startup, lalu menghasilkan data inkremental. |
| Sama dengan | Hanya menghasilkan data inkremental, tanpa snapshot awal. |
| Membaca snapshot terbaru pada atau sebelum | Menghasilkan data inkremental dari |
| Membaca snapshot yang ditentukan oleh | Menghasilkan data inkremental dari |
| Sama dengan | Membaca snapshot yang ditentukan saat startup, lalu menghasilkan data inkremental. |
Konfigurasi kedaluwarsa partisi otomatis (Paimon)
Paimon dapat secara otomatis menghapus partisi yang kedaluwarsa berdasarkan waktu yang berlalu sejak timestamp partisi.
Parameter:
Parameter | Tujuan | Contoh |
| Mengonversi nilai partisi ke string waktu. Gunakan |
|
| Pola untuk mengurai string waktu menjadi timestamp. Default-nya |
|
| Usia maksimum sebelum partisi dihapus. |
|
Contoh: Untuk partisi year=2023,month=04,day=21,hour=17 dengan pola $year-$month-$day $hour:00:00, timestamp yang diselesaikan adalah 2023-04-21 17:00:00.
Timeout heartbeat TaskManager saat menulis ke Paimon
Ini biasanya menunjukkan memori heap TaskManager tidak mencukupi. Paimon menggunakan memori heap untuk:
Buffer penulisan per tugas konkuren: Setiap operator penulis memiliki buffer berukuran
write-buffer-size(default: 256 MB).Buffer format ORC: Mengonversi data dalam memori ke format kolom. Berukuran
orc.write.batch-size(default: 1024 baris). Catatan besar memperbesar konsumsi memori (misalnya, field JSON 4 MB per baris menggunakan 4 MB x 1024 = 4 GB).Penulis per bucket: Setiap bucket yang dimodifikasi mendapatkan objek penulis khusus.
Solusi berdasarkan penyebab:
Nilai
write-buffer-sizebesar: Kurangi nilainya. Nilai yang terlalu kecil menyebabkan penulisan disk dan kompaksi sering, menurunkan performa tulis.Catatan individual besar: Kurangi
orc.write.batch-size, atau beralih ke format Avro dengan statistik dinonaktifkan: > Catatan:file.formatdanmetadata.stats-modehanya dapat diatur saat pembuatan tabel dan tidak dapat diubah nanti melaluiALTER TABLEatau petunjuk SQL.'file.format' = 'avro', 'metadata.stats-mode' = 'none'Terlalu banyak partisi/bucket: Verifikasi konfigurasi kunci partisi dan jumlah bucket. Targetkan 2–5 GB per bucket. Untuk penyesuaian bucket, lihat Tabel primary key dan tabel append-only.
"Sink materializer must not be used with Paimon sink"
Operator sink materializer (dirancang untuk memperbaiki data out-of-order dari JOIN bertingkat) menyebabkan hasil agregasi salah dan overhead tidak perlu dengan Paimon.
Nonaktifkan:
SET 'table.exec.sink.upsert-materialize' = 'false';Untuk alternatif penanganan out-of-order, lihat Tabel primary key dan tabel append-only.
"File deletion conflicts detected" atau "LSM conflicts detected" (Paimon)
Kemungkinan penyebab:
Beberapa penerapan menulis ke partisi yang sama: Mulai ulang penerapan yang gagal. Kejadian sesekali adalah normal.
Dilanjutkan dari state lama: Error berulang terus-menerus. Lanjutkan dari state terbaru atau mulai ulang tanpa state.
Beberapa pernyataan INSERT dalam satu penerapan: Paimon tidak mendukung ini. Gunakan
UNION ALLuntuk menggabungkan aliran data sebelum menulis.Paralelisme Global Committer atau Compaction Coordinator > 1: Keduanya harus diatur ke 1 untuk konsistensi data.
"File xxx not found" saat membaca dari sumber Paimon
File snapshot telah kedaluwarsa. Efisiensi konsumsi terlalu rendah atau waktu kedaluwarsa snapshot terlalu singkat.
Tentukan ID konsumen untuk pelacakan offset
Untuk memeriksa snapshot yang tersedia, lihat bagian "Tabel Snapshots" di Tabel sistem.
"No space left on device" (Paimon)
Untuk join lookup atau changelog-producer=lookup: Konfigurasikan parameter ini melalui petunjuk SQL:
Parameter | Tujuan | Nilai yang direkomendasikan |
| Batasi penggunaan disk untuk cache lookup | 256 MB, 512 MB, atau 1 GB |
| Periode retensi file cache | 15 menit atau 30 menit |
Untuk penulisan umum: Konfigurasikan write-buffer-spill.max-disk-size melalui petunjuk SQL untuk membatasi ukuran file sementara.
Jumlah besar file Paimon di OSS
Sesuaikan kebijakan retensi: Paimon menyimpan file historis untuk akses time-travel. Kurangi periode retensi. Untuk detailnya, lihat Membersihkan data kedaluwarsa.
Tinjau konfigurasi partisi dan bucket: Targetkan 2–5 GB per bucket. Untuk panduan bucketing, lihat Tabel primary key dan tabel append-only.
Aktifkan kompresi: Tambahkan
'file.compression' = 'zstd'saat pembuatan tabel untuk menggunakan kompresi Zstandard. > Catatan:file.compressionhanya dapat diatur saat pembuatan tabel dan tidak dapat diubah nanti melaluiALTER TABLEatau petunjuk SQL.
Apakah interval checkpoint memengaruhi visibilitas data Paimon?
Ya. Paimon melakukan commit data dan membuatnya terlihat bagi konsumen hilir hanya pada checkpoint. Sebelum checkpoint, data di-flush ke sistem file remote tetapi sistem hilir tidak diberi tahu.
Memory leak di pekerjaan Paimon jangka panjang
Dua kemungkinan penyebab:
Perilaku yang diharapkan: Penggunaan memori meningkat sebanding dengan peningkatan laju permintaan (RPS).
Masalah diketahui: Terjadi memory leak di pekerjaan jangka panjang yang menggunakan katalog Paimon dengan tipe metastore
filesystem. Untuk mengatasi ini saat membaca dari atau menulis ke OSS, konfigurasikan parameter ini:fs.oss.endpointfs.oss.accessKeyIdfs.oss.accessKeySecret
Hudi
Tidak ada data di penyimpanan (Hudi)
Data di-flush ke penyimpanan ketika:
Bucket mencapai 64 MB di memori.
Total buffer mencapai 1 GB.
Checkpoint dipicu.
Untuk penulisan streaming, pastikan checkpointing diaktifkan.
Data duplikat (Hudi)
Dalam partisi (COW): Atur write.insert.drop.duplicates ke true untuk mengaktifkan deduplikasi. Untuk merge-on-read (MOR), deduplikasi otomatis ketika primary key didefinisikan.
Di Hudi 0.10.0 dan setelahnya,write.insert.drop.duplicatesdiganti nama menjadiwrite.precombineddan default-nyatrue.
Lintas partisi: Atur index.global.enabled ke true.
Data lama (melebihi TTL indeks): Tingkatkan index.state.ttl (satuan: hari, default: 1,5). Mengatur ke kurang dari 0 menyimpan indeks secara permanen.
Di Hudi 0.10.0 dan setelahnya,index.state.ttldefault-nya0(permanen).
Hanya file log yang dihasilkan dalam mode MOR (Hudi)
Hudi menghasilkan file Parquet hanya setelah kompaksi. Dalam mode MOR, kompaksi asinkron berjalan setiap 5 commit secara default. Kurangi compaction.delta_commits untuk memicu kompaksi lebih cepat.
AnalyticDB for MySQL 3.0
Error "multi-statement be found"
Masalah kompatibilitas antara MySQL JDBC 8.x dan ALLOW_MULTI_QUERIES=true di AnalyticDB for MySQL.
Solusi:
Hubungi dukungan teknis untuk konektor AnalyticDB for MySQL V3.0 kustom yang dibangun dengan MySQL JDBC 5.1.46. Untuk penggunaannya, lihat Mengelola konektor kustom.
Tambahkan
allowMultiQueries=trueke URL JDBC:jdbc:mysql://<host>.ads.aliyuncs.com:3306/<database>?allowMultiQueries=true
Konektor kustom
"No suitable driver found for..."
Konektor kustom tidak dapat menemukan driver JDBC-nya.
Solusi (pilih salah satu):
Muat driver di kelas pabrik Anda menggunakan
Class.forName("<driver-class>").Tambahkan JAR driver ke Additional Dependencies dan atur parameter: Untuk petunjuknya, lihat Bagaimana cara mengonfigurasi parameter kustom untuk penerapan yang sedang berjalan?
kubernetes.application-mode.classpath.include-user-jar: true