Topik ini menjawab beberapa pertanyaan umum terkait Change Data Capture (CDC).
Mengapa konektor CDC MySQL tidak membaca data inkremental setelah membaca data penuh?
Bagaimana cara menentukan apakah sinkronisasi data penuh sudah selesai dalam penyebaran CDC MySQL?
Bisakah tabel CDC MySQL hanya digunakan sebagai tabel sumber?
Bagaimana cara konektor CDC MySQL membaca data dari database MySQL yang telah dilakukan sharding?
Mengapa data inkremental dari tabel database tertentu dalam instance gagal disinkronkan?
Apakah konektor CDC MongoDB mendukung langganan hanya pada koleksi tertentu dari database?
Arsitektur database MongoDB apa yang didukung oleh konektor CDC MongoDB?
Apakah konektor CDC MongoDB mendukung parameter terkait Debezium?
Apakah konektor CDC MongoDB mendukung output pesan UPDATE_BEFORE (gambar pra-pembaruan)?
Mengapa log WAL tidak dilepaskan ketika penggunaan disk pada server database PostgreSQL tinggi?
Apa perbedaan antara flink-sql-connector-xxx.jar dan flink-connector-xxx.jar?
Mengapa saya tidak dapat menemukan paket konektor versi 2.X di repositori Maven?
Apa yang harus saya lakukan jika muncul pesan kesalahan "Replication slot 'xxxx' is active"?
Apa yang harus saya lakukan jika muncul pesan kesalahan "sub account not auth permission"?
Bisakah saya membatalkan penyebaran Flink CDC alih-alih memulai ulang penyebaran jika penyebaran gagal?
Anda dapat memodifikasi konfigurasi penyebaran untuk menentukan kebijakan restart. Sebagai contoh, konfigurasi berikut menentukan bahwa maksimal dua upaya restart dapat dilakukan dengan interval 10 detik antar upaya. Jika penyebaran gagal memulai setelah dua upaya, penyebaran akan dibatalkan.
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 2
restart-strategy.fixed-delay.delay: 10 sTabel sumber CDC MySQL dan tabel sumber CDC Hologres tidak mendukung fungsi jendela. Bagaimana cara mengimplementasikan agregasi data tingkat menit pada tabel sumber CDC MySQL atau tabel sumber CDC Hologres?
Untuk mencapai efek serupa dari agregasi jendela pada tabel sumber CDC MySQL atau tabel sumber CDC Hologres, Anda dapat menggunakan metode berikut untuk melakukan agregasi berbasis waktu:
Gunakan fungsi DATE_FORMAT untuk mengonversi bidang waktu menjadi string yang diformat sebagai menit dan gunakan string tersebut sebagai nilai jendela.
Gunakan fungsi GROUP BY untuk mengagregasi nilai jendela.
Kode sampel berikut menunjukkan cara mengumpulkan statistik jumlah pesanan dan penjualan setiap menit untuk setiap toko.
SELECT
shop_id,
DATE_FORMAT(order_ts, 'yyyy-MM-dd HH:mm') AS window,
COUNT(*) AS order_count,
SUM(price) AS amount
FROM order_mysql_cdc
GROUP BY shop_id, windowBisakah tabel CDC MySQL hanya digunakan sebagai tabel sumber?
Ya, tabel CDC MySQL hanya dapat digunakan sebagai tabel sumber. Tabel CDC MySQL dapat digunakan untuk membaca data penuh dan inkremental dari tabel database MySQL. Tabel MySQL dapat digunakan sebagai tabel dimensi atau tabel sink.
Mengapa konektor CDC MySQL tidak membaca data inkremental setelah membaca data penuh?
Deskripsi masalah | Penyebab | Solusi |
Konektor CDC MySQL hanya membaca data penuh tetapi tidak membaca data inkremental. | Konektor CDC MySQL membaca data dari instans sekunder atau instans baca-saja ApsaraDB RDS untuk MySQL V5.6 berdasarkan konfigurasi penyebaran CDC MySQL. Namun, instans sekunder atau instans baca-saja ApsaraDB RDS untuk MySQL V5.6 tidak menulis data ke file log. Akibatnya, alat sinkronisasi hilir tidak dapat membaca informasi perubahan inkremental. | Kami merekomendasikan agar Anda menggunakan instans ApsaraDB RDS untuk MySQL yang dapat memproses permintaan tulis atau meningkatkan instans ApsaraDB RDS untuk MySQL ke versi lebih baru dari V5.6. |
Saat ini, konektor MySQL CDC tidak mendukung kompresi transaksi log biner. Mengaktifkan fitur ini untuk kluster MySQL yang dikelola sendiri dapat menyebabkan kesalahan dalam pembacaan inkremental. | Untuk mengonsumsi data inkremental dari kluster MySQL yang dikelola sendiri dengan konektor MySQL CDC, nonaktifkan kompresi transaksi log biner. | |
Penyebaran CDC MySQL dihentikan setelah data penuh dibaca. | Waktu yang diperlukan untuk membaca data penuh dalam penyebaran CDC MySQL terlalu lama. Dalam hal ini, jumlah data di shard terakhir terlalu besar, menyebabkan kesalahan out of memory (OOM). Akibatnya, penyebaran dihentikan setelah failover. | Tingkatkan paralelisme pada database MySQL untuk mempercepat pembacaan data penuh. |
Setelah sinkronisasi data penuh selesai, konektor CDC MySQL secara otomatis beralih ke fase sinkronisasi data inkremental. Jika konektor CDC MySQL menjalankan beberapa subtugas secara paralel untuk membaca data penuh, konektor CDC MySQL perlu menunggu satu titik pemeriksaan lagi sebelum penyebaran memasuki fase sinkronisasi data inkremental. Ini memastikan bahwa data penuh ditulis ke sink sebelum data inkremental dibaca. Dengan cara ini, akurasi data dijamin. Jika interval titik pemeriksaan yang Anda tentukan terlalu besar, konektor CDC MySQL perlu menunggu waktu yang lama untuk memulai sinkronisasi data inkremental. Sebagai contoh, jika Anda mengatur interval titik pemeriksaan menjadi 20 menit, konektor CDC MySQL menunggu selama 20 menit setelah sinkronisasi data penuh selesai. | Untuk menghindari masalah ini, kami merekomendasikan agar Anda menentukan interval titik pemeriksaan yang sesuai berdasarkan kebutuhan bisnis Anda. |
Apa yang harus saya lakukan jika koma (,) dalam ekspresi reguler di nilai parameter nama-tabel dalam penyebaran CDC MySQL gagal diproses?
Penyebab
Sebagai contoh, jika konfigurasi 'table-name' = 't_process_wi_history_\d{1,2}' digunakan dalam penyebaran CDC MySQL, terjadi kesalahan. Gambar berikut menunjukkan detail kesalahan.

Penyebab
Debezium menggunakan koma (,) sebagai pemisah dan tidak mendukung ekspresi reguler yang mengandung koma (,). Akibatnya, terjadi kesalahan parsing.
Solusi
Kami merekomendasikan agar Anda menggunakan konfigurasi 'table-name' = '(t_process_wi_history_\d{1}|t_process_wi_history_\d{2})'.
Ketika saya memulai ulang penyebaran, apakah konektor untuk tabel sumber CDC MySQL mengonsumsi data dari posisi file log biner saat penyebaran dibatalkan atau dari posisi file log biner saat penyebaran dikonfigurasi untuk mulai?
Ketika Anda memulai ulang penyebaran, Anda dapat mengonfigurasi kebijakan startup berdasarkan kebutuhan bisnis Anda. Jika Anda mengatur parameter Starting Strategy ke NONE dalam kotak dialog Deployment Starting Configuration, konektor untuk tabel sumber CDC MySQL membaca ulang data dari posisi file log biner saat penyebaran dikonfigurasi untuk mulai. Jika Anda mengatur parameter Starting Strategy ke Latest State dalam kotak dialog Deployment Starting Configuration, konektor untuk tabel sumber CDC MySQL mengonsumsi data dari posisi file log biner saat penyebaran dibatalkan.
Sebagai contoh, penyebaran dikonfigurasi untuk mulai dari posisi file log biner {file=mysql-bin.01, position=40} dan penyebaran dibatalkan setelah berjalan selama beberapa waktu. Dalam hal ini, data dikonsumsi pada posisi file log biner {file=mysql-bin.01, position=210}. Jika Anda mengatur parameter Starting Strategy ke NONE dalam kotak dialog Deployment Starting Configuration, konektor untuk tabel sumber CDC MySQL membaca ulang data dari posisi file biner {file=mysql-bin.01, position=40}. Jika Anda mengatur parameter Starting Strategy ke Latest State dalam kotak dialog Deployment Starting Configuration, konektor untuk tabel sumber CDC MySQL mengonsumsi data dari posisi file log biner {file=mysql-bin.01, position=210}.
Pastikan bahwa file log biner yang diperlukan tidak dihapus dari server karena kedaluwarsa ketika Anda memulai ulang penyebaran. Jika tidak, kesalahan akan dikembalikan.
Bagaimana cara kerja konektor untuk tabel sumber CDC MySQL? Bagaimana tabel sumber CDC MySQL mempengaruhi database?
Jika parameter scan.startup.mode dalam klausa WITH untuk tabel sumber CDC MySQL diatur ke initial, konektor untuk tabel sumber CDC MySQL terhubung ke database MySQL menggunakan driver Java Database Connectivity (JDBC), menjalankan pernyataan SELECT untuk membaca data penuh, dan kemudian mencatat posisi file log biner. Nilai default dari parameter scan.startup.mode adalah initial. Setelah pembacaan data penuh selesai, konektor membaca data inkremental dari file log biner pada posisi file log biner yang telah dicatat.
Selama pembacaan data penuh, beban query di database MySQL mungkin meningkat karena pernyataan SELECT dijalankan untuk menanyakan data. Selama pembacaan data inkremental, klien binlog digunakan untuk terhubung ke database MySQL guna membaca data log biner. Jika jumlah tabel data yang digunakan meningkat, koneksi berlebih mungkin ada. Anda dapat menjalankan perintah MySQL berikut untuk menanyakan jumlah maksimum koneksi:
show variables like '%max_connections%';Bagaimana cara mengaktifkan konektor CDC MySQL untuk melewati fase pembacaan data snapshot dan hanya membaca data perubahan?
Anda dapat mengonfigurasi parameter scan.startup.mode dalam klausa WITH untuk menentukan mode startup yang ingin Anda gunakan untuk konsumsi data. Anda dapat menentukan untuk mengonsumsi data log biner yang dapat diakses dari posisi paling awal, data log biner paling baru, data log biner dari timestamp tertentu, atau data log biner dari posisi tertentu. Untuk informasi lebih lanjut tentang parameter scan.startup.mode, lihat bagian "Parameter dalam klausa WITH" dalam topik Buat tabel sumber CDC MySQL.
Bagaimana cara konektor CDC MySQL membaca data dari database MySQL yang telah dilakukan sharding?
Sebagai contoh, sebuah database MySQL memiliki banyak tabel seperti tabel user_00, tabel user_02, dan tabel user_99 setelah sharding, dan skema tabel-tabel tersebut sama. Dalam skenario ini, Anda dapat menggunakan parameter nama-tabel untuk menentukan ekspresi reguler untuk mencocokkan beberapa tabel yang datanya dapat dibaca. Sebagai contoh, Anda dapat mengatur parameter nama-tabel ke user_.* untuk memantau semua tabel yang memiliki awalan user_. Jika skema semua tabel dalam database sama, Anda dapat menggunakan parameter nama-database untuk mencapai efek yang sama.
Bagaimana cara menentukan apakah sinkronisasi data penuh sudah selesai dalam penyebaran CDC MySQL?
Anda dapat menentukan apakah sinkronisasi data penuh sudah selesai dalam penyebaran berdasarkan nilai metrik currentEmitEventTimeLag pada tab Metrik halaman Penyebaran.
Metrik currentEmitEventTimeLag menunjukkan perbedaan antara waktu ketika sumber mengirimkan catatan data ke sink dan waktu ketika catatan data dihasilkan dalam database. Metrik ini digunakan untuk mengukur jeda dari waktu data dihasilkan dalam database hingga waktu data meninggalkan sumber.

Deskripsi nilai metrik currentEmitEventTimeLag:
Jika nilai currentEmitEventTimeLag kurang dari atau sama dengan 0, sinkronisasi data penuh dalam penyebaran CDC MySQL belum selesai.
Jika nilai currentEmitEventTimeLag lebih besar dari 0, pekerjaan CDC menyelesaikan sinkronisasi data penuh dan mulai membaca data log biner.
Periksa apakah log TaskManager tabel sumber CDC MySQL berisi "BinlogSplitReader is created". Jika pesan ini muncul, data penuh telah dibaca. Gambar berikut menunjukkan "BinlogSplitReader is created" dalam log TaskManager.

Apa yang harus saya lakukan jika beban pada database MySQL menjadi tinggi karena banyaknya penyebaran CDC MySQL?
Konektor untuk tabel sumber CDC MySQL perlu terhubung ke database untuk membaca data log biner. Jika jumlah tabel sumber bertambah, beban pada database juga bertambah. Untuk mengurangi beban pada database, Anda dapat menyinkronkan data dari tabel sumber CDC MySQL ke tabel sink ApsaraMQ for Kafka dan mengonsumsi data dalam tabel sink. Dengan cara ini, penyebaran CDC MySQL tidak bergantung pada pembacaan data log biner. Untuk informasi lebih lanjut, lihat Sinkronkan data dari semua tabel dalam database MySQL ke Kafka.
Jika beban pada database menjadi tinggi karena sinkronisasi data menggunakan pernyataan CREATE TABLE AS, Anda dapat menggabungkan beberapa penyebaran yang menggunakan pernyataan CREATE TABLE AS menjadi satu penyebaran untuk dijalankan. Jika konfigurasi penyebaran yang menggunakan pernyataan CREATE TABLE AS sama, Anda dapat mengonfigurasi ID server yang sama untuk setiap tabel sumber CDC MySQL untuk menggunakan kembali sumber data dan mengurangi beban pada database. Untuk informasi lebih lanjut, lihat Contoh 4: pelaksanaan beberapa pernyataan CREATE TABLE AS.
Sejumlah kecil data diperbarui dalam tabel sumber CDC MySQL. Mengapa konektor untuk tabel sumber CDC MySQL mengonsumsi sejumlah besar sumber daya bandwidth untuk membaca data?
Deskripsi masalah
Sejumlah kecil data diperbarui dalam tabel sumber CDC MySQL, tetapi konektor untuk tabel sumber CDC MySQL mengonsumsi sejumlah besar sumber daya bandwidth untuk membaca data.
Penyebab
Data log biner instans MySQL berisi data perubahan di semua database dan tabel dalam instans. Jika database MySQL Anda berisi tiga tabel, data log biner berisi data perubahan di semua tiga tabel meskipun penyebaran hanya melibatkan perubahan dalam satu tabel.
Data log biner dasar berisi semua data perubahan instans MySQL. Dalam hal ini, Anda dapat mengonfigurasi konektor CDC MySQL untuk membaca data perubahan tabel tertentu. Konfigurasi ini berlaku pada konektor Debezium atau Flink CDC alih-alih instans MySQL.
Solusi
Mekanisme penyimpanan data log biner tidak dapat diubah. Namun, Anda dapat menggunakan kembali tabel sumber CDC MySQL untuk mencegah penggunaan sumber daya bandwidth tambahan. Untuk informasi lebih lanjut, lihat bagian "Pengaktifan penggunaan kembali tabel sumber CDC MySQL" dari topik Konektor MySQL.
Ketika saya menggunakan konektor MySQL CDC untuk membaca data inkremental dari sebuah tabel, terdapat selisih waktu delapan jam antara data bidang timestamp yang dibaca dengan zona waktu server MySQL saya. Mengapa?
Nilai parameter
server-time-zoneyang dikonfigurasikan dalam penyebaran CDC tidak sesuai dengan zona waktu server MySQL Anda. Ketika bidangtimestampdalam data log biner diproses, kesalahan ini terjadi.Serializer kustom, seperti
MyDeserializer implements DebeziumDeserializationSchema, digunakan dalam DataStream. Ketika serializer kustom memproses data tipeTIMESTAMP, kesalahan ini terjadi. Anda dapat menentukanserverTimeZonedalam kode berdasarkan informasi parsing tentang data tipeTIMESTAMPdalamRowDataDebeziumDeserializeSchema.private TimestampData convertToTimestamp(Object dbzObj, Schema schema) { if (dbzObj instanceof Long) { switch (schema.name()) { case Timestamp.SCHEMA_NAME: return TimestampData.fromEpochMillis((Long) dbzObj); case MicroTimestamp.SCHEMA_NAME: long micro = (long) dbzObj; return TimestampData.fromEpochMillis(micro / 1000, (int) (micro % 1000 * 1000)); case NanoTimestamp.SCHEMA_NAME: long nano = (long) dbzObj; return TimestampData.fromEpochMillis(nano / 1000_000, (int) (nano % 1000_000)); } } LocalDateTime localDateTime = TemporalConversions.toLocalDateTime(dbzObj, serverTimeZone); return TimestampData.fromLocalDateTime(localDateTime); }
Dapatkah konektor MySQL CDC mendengarkan database sekunder? Bagaimana cara mengonfigurasi database sekunder?
Ya, konektor MySQL CDC dapat mendengarkan database sekunder. Untuk memungkinkan konektor MySQL CDC mendengarkan database sekunder, Anda harus menambahkan konfigurasi berikut ke kode untuk database sekunder. Setelah konfigurasi selesai, data yang disinkronkan dari database utama ditulis ke file log biner database sekunder.
log-slave-updates = 1Jika mode global transaction identifier (GTID) diaktifkan untuk database utama, Anda juga harus mengaktifkan mode GTID untuk database sekunder. Untuk mengaktifkan mode GTID, tambahkan konfigurasi berikut ke kode untuk database utama dan sekunder:
gtid_mode = on
enforce_gtid_consistency = onBagaimana cara mendapatkan event DDL dari sebuah database?
Ketika Anda menggunakan konektor CDC untuk Apache Flink, Anda dapat memanggil API DataStream untuk menggunakan kelas MySqlSource dan mengonfigurasi parameter includeSchemaChanges(true) untuk mendapatkan event DDL. Setelah Anda mendapatkan event DDL, Anda dapat menulis kode untuk pemrosesan selanjutnya. Contoh pernyataan:
MySqlSource<xxx> mySqlSource =
MySqlSource.<xxx>builder()
.hostname(...)
.port(...)
.databaseList("<databaseName>")
.tableList("<databaseName>.<tableName>")
.username(...)
.password(...)
.serverId(...)
.deserializer(...)
.includeSchemaChanges(true) // Konfigurasikan parameter untuk mendapatkan acara DDL.
.build();
... // Tulis logika pemrosesan lainnya.Apakah konektor MySQL CDC mendukung sinkronisasi data dari semua tabel dalam database MySQL? Bagaimana cara menyinkronkan data dari semua tabel dalam database MySQL?
Ya, Realtime Compute for Apache Flink memungkinkan Anda mengeksekusi pernyataan CREATE TABLE AS atau CREATE DATABASE AS untuk menyinkronkan data dari semua tabel dalam database MySQL. Untuk informasi lebih lanjut, lihat Pernyataan CREATE TABLE AS atau Pernyataan CREATE DATABASE AS.
Instans ApsaraDB RDS untuk MySQL V5.6 tidak menulis data ke file log. Akibatnya, alat sinkronisasi hilir tidak dapat membaca informasi perubahan inkremental.
Mengapa data inkremental dari tabel database tertentu dalam instance gagal disinkronkan?
Filter log biner dikonfigurasikan untuk server MySQL, yang memfilter log biner dari database tertentu. Anda dapat menjalankan perintah show master status untuk menanyakan nilai Binlog_Ignore_DB dan Binlog_Do_DB. Contoh berikut menunjukkan hasilnya.
mysql> show master status;
+------------------+----------+--------------+------------------+----------------------+
| File | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set |
+------------------+----------+--------------+------------------+----------------------+
| mysql-bin.000006 | 4594 | | | xxx:1-15 |
+------------------+----------+--------------+------------------+----------------------+ Bagaimana cara mengonfigurasi parameter tableList ketika saya menggunakan API DataStream untuk membuat tabel sumber MySQL CDC?
Nilai parameter tableList harus mencakup nama database dan nama tabel, bukan nama tabel dalam API DataStream. Saat Anda membuat tabel sumber MySQL CDC, Anda dapat mengonfigurasi parameter tableList dalam format tableList("yourDatabaseName.yourTableName").
Dapatkah konektor MongoDB CDC melanjutkan membaca data dari checkpoint penyebaran jika penyebaran gagal selama fase pembacaan data penuh?
Ya. Anda dapat mengonfigurasi 'scan.incremental.snapshot.enabled'= 'true' dalam klausa WITH dari penyebaran untuk memungkinkan konektor MongoDB CDC melanjutkan membaca data dari checkpoint penyebaran jika penyebaran gagal selama fase pembacaan data penuh.
Apakah konektor MongoDB CDC mendukung pembacaan data penuh dan inkremental atau hanya membaca data inkremental?
Ya. Secara default, konektor MongoDB CDC membaca data penuh dan inkremental. Jika Anda ingin memungkinkan konektor MongoDB CDC hanya membaca data inkremental, konfigurasikan 'scan.startup.mode' = 'latest-offset' dalam klausa WITH dari penyebaran Anda.
Apakah konektor MongoDB CDC mendukung langganan hanya untuk koleksi tertentu dari sebuah database?
Tidak, konektor MongoDB CDC tidak mendukung langganan hanya untuk koleksi tertentu dari sebuah database. Anda dapat menggunakan konektor MongoDB CDC untuk berlangganan semua koleksi dari sebuah database. Sebagai contoh, jika Anda mengonfigurasi 'database' = 'mgdb' dan 'collection' = '' dalam klausa WITH, konektor berlangganan ke semua koleksi dari database MongoDB.
Apakah konektor MongoDB CDC mendukung pembacaan bersamaan?
Jika Anda mengatur parameter scan.incremental.snapshot.enabled ke true, pembacaan bersamaan didukung selama fase snapshot awal.
Versi MongoDB mana saja yang didukung oleh konektor MongoDB CDC?
Konektor MongoDB CDC diimplementasikan berdasarkan fitur change stream. Fitur ini diperkenalkan di MongoDB 3.6. Secara teori, konektor MongoDB CDC mendukung MongoDB 3.6 dan versi lebih baru. Kami sarankan Anda menggunakan MongoDB 4.0 atau lebih baru. Jika versi MongoDB lebih awal dari 3.6, pesan kesalahan "Unrecognized pipeline stage name: '$changeStream'" mungkin muncul ketika konektor MongoDB CDC membaca change stream.
Arsitektur database MongoDB apa saja yang didukung oleh konektor MongoDB CDC?
Change stream memerlukan agar database MongoDB berjalan dalam arsitektur replika set atau arsitektur kluster sharded. Untuk menyederhanakan operasi selama pengujian di tempat, Anda dapat menjalankan database MongoDB dalam arsitektur replika set mandiri. Anda dapat menjalankan perintah rs.initiate() untuk menginisialisasi change stream. Jika Anda menggunakan konektor MongoDB CDC untuk membaca data dari database MongoDB yang berjalan dalam arsitektur replika set mandiri, pesan kesalahan "The $changestage is only supported on replica sets" mungkin muncul.
Apakah konektor MongoDB CDC mendukung parameter terkait Debezium?
Tidak, konektor MongoDB CDC tidak mendukung parameter terkait Debezium. Konektor MongoDB CDC dikembangkan secara independen di Flink CDC dan tidak bergantung pada Debezium.
Konektor CDC MongoDB tidak dapat mengakses database MongoDB berdasarkan nama pengguna dan kata sandinya dan muncul pesan kesalahan yang menunjukkan bahwa nama pengguna atau kata sandi tidak valid. Namun, komponen lain dapat mengakses database MongoDB berdasarkan nama pengguna dan kata sandi tersebut. Mengapa?
Kesalahan ini terjadi karena kredensial pengguna dibuat di bawah database tertentu. Jika Anda ingin mengakses database MongoDB, tambahkan 'connection.options' = 'authSource=Database to which the user belongs' ke klausa WITH.
Dapatkah konektor MongoDB CDC membaca data dari checkpoint penyebaran setelah penyebaran dimulai ulang? Apa prinsip kerjanya?
Ya, konektor MongoDB CDC dapat membaca data dari checkpoint penyebaran setelah penyebaran dimulai ulang. Checkpoint merekam token resume untuk change stream. Konektor melanjutkan membaca change stream berdasarkan token resume terkait. Token resume sesuai dengan lokasi koleksi oplog.rs. Koleksi oplog.rs adalah kumpulan log perubahan MongoDB dan memiliki kapasitas tetap.
Jika catatan data dari token resume tidak ada dalam koleksi oplog.rs, token resume mungkin tidak valid. Dalam hal ini, Anda dapat mengatur ukuran koleksi oplog.rs ke nilai yang sesuai untuk mencegah koleksi oplog.rs disimpan selama periode waktu yang terlalu singkat. Untuk informasi lebih lanjut, lihat Ubah Ukuran Oplog Anggota Replika Set yang Dikelola Sendiri.
Token resume dapat diperbarui berdasarkan catatan perubahan yang baru disimpan dan catatan denyut jantung.
Apakah konektor MongoDB CDC mendukung output pesan UPDATE_BEFORE (pre-update images)?
Jika fitur pre-image atau post-image diaktifkan untuk database MongoDB versi 6.0 atau lebih baru, Anda dapat mengonfigurasi
'scan.full-changelog' = 'true'untuk penyebaran SQL Anda. Dengan cara ini, MongoDBSource dapat menghasilkan pesan UPDATE_BEFORE dan operator ChangelogNormalize tidak digunakan.Koleksi oplog.rs asli dari database MongoDB versi lebih lama dari 6.0 mencakup jenis perubahan INSERT, UPDATE, REPLACE, dan DELETE tetapi tidak termasuk jenis perubahan UPDATE_BEFORE. Oleh karena itu, MongoDBSource tidak dapat langsung menghasilkan pesan UPDATE_BEFORE. Flink hanya mendukung semantik berbasis UPDATE. Ketika Anda menggunakan MongoDBTableSource, perencana Flink secara otomatis mengoptimalkan data menggunakan operator ChangelogNormalize, menambahkan pesan UPDATE_BEFORE, dan kemudian menghasilkan pesan INSERT, UPDATE_BEFORE, UPDATE_AFTER, dan DELETE. Namun, operator ChangelogNormalize menyebabkan overhead besar karena operator menyimpan status semua kunci sebelum pembaruan. Jika penyebaran DataStream menggunakan MongoDBSource tanpa optimasi menggunakan perencana Flink, operator ChangelogNormalize tidak digunakan secara otomatis untuk optimasi. Akibatnya, MongoDBSource tidak dapat menghasilkan pesan UPDATE_BEFORE. Jika Anda ingin mendapatkan gambar pra-pembaruan, Anda harus mengelola data status sendiri. Jika Anda tidak ingin mengelola data status sendiri, Anda dapat mengaktifkan MongoDBTableSource untuk mengonversi aliran asli dalam koleksi oplog.rs menjadi ChangelogStream atau RetractStream, dan menggunakan perencana Flink untuk mendapatkan gambar pra-pembaruan. Contoh kode:
tEnv.executeSql("CREATE TABLE orders ( ... ) WITH ( 'connector'='mongodb-cdc',... )"); Table table = tEnv.from("orders") .select($("*")); tEnv.toChangelogStream(table) .print() .setParallelism(1); env.execute();
Bagaimana cara menggunakan konektor CDC PostgreSQL untuk mengonfigurasi parameter guna menyaring nilai tanggal yang tidak valid?
Untuk menyaring nilai tanggal yang tidak valid, Anda dapat menambahkan salah satu konfigurasi berikut ke klausa WITH untuk konektor Postgres CDC berdasarkan kebutuhan bisnis Anda:
'debezium.event.deserialization.failure.handling.mode'='warn': Lewati data kotor dan cetak data kotor ke log WARN.'debezium.event.deserialization.failure.handling.mode'='ignore': Lewati data kotor dan jangan cetak data kotor ke log.
Mengapa muncul pesan kesalahan yang menunjukkan bahwa data TOAST tidak ditransmisikan ketika saya menggunakan konektor PostgreSQL CDC?
Pastikan identitas replika lengkap. Ukuran data TOAST besar. Untuk mengurangi ukuran log WAL, plug-in wal2json tidak menyisipkan data TOAST ke dalam data yang diperbarui jika data TOAST tetap tidak berubah dan konfigurasi 'debezium.schema.refresh.mode'='columns_diff_exclude_unchanged_toast' digunakan.
Mengapa log WAL tidak dilepaskan ketika penggunaan disk pada server database PostgreSQL tinggi?
Konektor PostgreSQL CDC hanya memperbarui nomor urutan log (LSN) dalam slot replikasi database PostgreSQL setelah checkpointing selesai. Jika penggunaan disk tinggi, periksa apakah checkpointing database PostgreSQL diaktifkan dan apakah slot replikasi tidak digunakan atau memiliki latensi sinkronisasi.
Apa yang dikembalikan jika presisi data DECIMAL yang disinkronkan dari PostgreSQL menggunakan konektor PostgreSQL CDC melebihi presisi maksimum?
Jika presisi data DECIMAL yang diterima oleh konektor PostgreSQL CDC lebih besar daripada presisi tipe data yang dinyatakan dalam pernyataan tabel sumber yang menggunakan konektor Postgres CDC, data DECIMAL diproses sebagai null. Dalam hal ini, Anda dapat mengonfigurasi 'debezium.decimal.handling.mode' = 'string' untuk memproses data yang dibaca dari database PostgreSQL sebagai string.
Bagaimana cara mengonfigurasi parameter tableList ketika saya membuat tabel sumber PostgreSQL CDC menggunakan API DataStream?
Nilai parameter tableList harus mencakup nama tabel dan nama skema, bukan nama tabel dalam API DataStream. Saat Anda membuat tabel sumber PostgreSQL CDC, konfigurasikan parameter tableList dalam format my_schema.my_table.
Mengapa saya gagal mengunduh flink-sql-connector-mysql-cdc-2.2-SNAPSHOT.jar? Mengapa dependensi xxx-SNAPSHOT tidak ada di repositori Maven?
Versi xxx-SNAPSHOT sesuai dengan kode cabang pengembangan berdasarkan mekanisme manajemen versi proyek Maven utama. Jika Anda ingin menggunakan versi ini, Anda harus mengunduh kode sumber dan mengkompilasi paket JAR terkait. Anda dapat menggunakan paket versi stabil, seperti flink-sql-connector-mysql-cdc-2.1.0.jar. Anda dapat memperoleh paket dari repositori pusat Maven.
Apa perbedaan antara flink-sql-connector-xxx.jar dan flink-connector-xxx.jar?
Konvensi penamaan paket konektor Flink CDC konsisten dengan konvensi penamaan paket konektor Flink lainnya.
flink-sql-connector-xx adalah fat JAR. Selain kode konektor, semua paket pihak ketiga yang diperlukan oleh konektor di-shade ke dalam fat JAR. flink-sql-connector-xx disediakan untuk penyebaran SQL. Anda hanya perlu menambahkan fat JAR ke direktori lib.
flink-connector-xx hanya berisi kode konektor dan tidak berisi dependensi yang diperlukan oleh konektor. flink-connector-xx disediakan untuk penyebaran DataStream. Anda harus mengelola dependensi paket pihak ketiga yang diperlukan, dan melakukan operasi exclude dan shade untuk menangani konflik dependensi.
Mengapa saya tidak dapat menemukan paket konektor versi 2.X di repositori Maven?
ID grup diubah dari com.alibaba.ververica menjadi com.ververica di konektor Flink CDC versi 2.0.0. Oleh karena itu, jalur paket versi 2.X di repositori Maven berubah menjadi /com/ververica.
Apa yang harus saya lakukan jika API DataStream menggunakan deserializer JsonDebeziumDeserializationSchema dan data tipe numerik ditampilkan sebagai string?
Metode konversi yang berbeda digunakan ketika Debezium memproses data tipe numerik. Untuk informasi lebih lanjut, lihat Konektor Debezium untuk MySQL. Contoh kode berikut menunjukkan metode konversi yang dikonfigurasi dalam Flink CDC:
Properties properties = new Properties();
....
properties.setProperty("bigint.unsigned.handling.mode","long");
properties.setProperty("decimal.handling.mode","double");
MySqlSource.<String>builder()
.hostname(config.getHostname())
....
.debeziumProperties(properties);Apa yang harus saya lakukan jika pesan kesalahan "Replication slot "xxxx" is active" muncul?
Deskripsi masalah
Setelah penyebaran CDC PostgreSQL Anda berakhir, slot mungkin tidak dilepaskan dengan benar.
Solusi
Gunakan salah satu metode berikut untuk melepaskan slot:
Jalankan perintah berikut dalam penyebaran CDC PostgreSQL Anda untuk melepaskan slot secara manual:
select pg_drop_replication_slot('rep_slot');Jika muncul pesan kesalahan
"ERROR: replication slot "rep_slot" is active for PID 162564", slot sedang digunakan oleh proses yang ID-nya ditentukan dalam pesan tersebut. Anda harus menghentikan proses tersebut sebelum dapat melepaskan slot. Untuk menghentikan proses dan melepaskan slot, jalankan perintah berikut:select pg_terminate_backend(162564); select pg_drop_replication_slot('rep_slot');Aktifkan pembersihan slot otomatis. Untuk mengaktifkan fitur ini, tambahkan konfigurasi
'debezium.slot.drop.on.stop' = 'true'ke sumber Postgres dari penyebaran. Dengan cara ini, slot dapat dihapus secara otomatis ketika penyebaran PostgreSQL CDC dibatalkan.PeringatanJika Anda mengaktifkan pembersihan slot otomatis, log WAL akan diklaim kembali. Ketika penyebaran dimulai ulang, data hilang dan semantik At-Least Once tidak dapat dijamin.
Apa yang harus saya lakukan jika pesan kesalahan "binlog probably contains events generated with statement or mixed based replication format" muncul?
Deskripsi masalah
Caused by: io.debezium.DebeziumException: Received DML 'insert into gd_chat_fetch_log ( id, c_cursor, d_timestamp, msg_cnt, state, ext1, ext2, cost_time ) values ( null, null, '2022-03-23 16:51:00.616', 0, 1, null, null, 0 )' untuk diproses, log biner kemungkinan besar berisi acara yang dihasilkan dengan format replikasi berbasis pernyataan atau campuranPenyebab
Format log biner adalah MIXED. Tabel sumber CDC MySQL hanya mendukung log biner dalam format ROW.
Solusi
Jalankan perintah
show variables like "binlog_format"pada database MySQL untuk menanyakan format log biner saat ini.CatatanAnda dapat menjalankan perintah
show global variables like "binlog_format"untuk melihat format log biner global.Ubah format log biner menjadi ROW pada database MySQL. Untuk informasi lebih lanjut, lihat Mengatur Format Log Biner.
Mulai ulang penyebaran.
Apa yang harus saya lakukan jika pesan kesalahan "Encountered change event for table xxx.xxx whose schema isn't known to this connector" muncul?
Deskripsi masalah

202x-xx-xx xx:xx:xx,xxx ERROR io.debezium.connector.mysql.BinlogReader [] - Encountered change event 'Event{header=EventHeaderV4{timestamp=xxx, eventType=TABLE_MAP, serverId=xxx, headerLength=xxx, dataLength=xxx, nextPosition=xxx, flags=xxx}, data=TableMapEventData{tableId=xxx, database='xxx', table='xxx', columnTypes=xxx, xxx..., columnMetadata=xxx,xxx..., columnNullability={xxx,xxx...}, eventMetadata=null}}' at offset {ts_sec=xxx, file=mysql-bin.xxx, pos=xxx, gtids=xxx, server_id=xxx, event=xxx} untuk tabel xxx.xxx yang skemanya tidak dikenal oleh konektor ini. Salah satu penyebabnya adalah topik riwayat database yang tidak lengkap. Ambil snapshot baru dalam kasus ini. Gunakan alat mysqlbinlog untuk melihat acara bermasalah: mysqlbinlog --start-position=30946 --stop-position=31028 --verbose mysql-bin.004419 202x-xx-xx xx:xx:xx,xxx ERROR io.debezium.connector.mysql.BinlogReader [] - Terjadi kesalahan selama pemrosesan log biner. Offset terakhir yang disimpan = null, pembaca log biner dekat posisi = mysql-bin.xxx/xxx 202x-xx-xx xx:xx:xx,xxx ERROR io.debezium.connector.mysql.BinlogReader [] - Gagal karena kesalahan: Kesalahan memproses acara log biner org.apache.kafka.connect.errors.ConnectException: Encountered change event for table statistic.apk_info yang skemanya tidak dikenal oleh konektor ini at io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:241) ~[ververica-connector-mysql-1.12-vvr-3.0.0-SNAPSHOT-jar-with-dependencies.jar:1.12-vvr-3.0.0-SNAPSHOT] at io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:218) ~[ververica-connector-mysql-1.12-vvr-3.0.0-SNAPSHOT-jar-with-dependencies.jar:1.12-vvr-3.0.0-SNAPSHOT] at io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:607) ~[ververica-connector-mysql-1.12-vvr-3.0.0-SNAPSHOT-jar-with-dependencies.jar:1.12-vvr-3.0.0-SNAPSHOT] at com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1104) [ververica-connector-mysql-1.12-vvr-3.0.0-SNAPSHOT-jar-with-dependencies.jar:1.12-vvr-3.0.0-SNAPSHOT] at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:955) [ververica-connector-mysql-1.12-vvr-3.0.0-SNAPSHOT-jar-with-dependencies.jar:1.12-vvr-3.0.0-SNAPSHOT] at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:595) [ververica-connector-mysql-1.12-vvr-3.0.0-SNAPSHOT-jar-with-dependencies.jar:1.12-vvr-3.0.0-SNAPSHOT] at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:839) [ververica-connector-mysql-1.12-vvr-3.0.0-SNAPSHOT-jar-with-dependencies.jar:1.12-vvr-3.0.0-SNAPSHOT] at java.lang.Thread.run(Thread.java:834) [?:1.8.0_102] Caused by: org.apache.kafka.connect.errors.ConnectException: Encountered change event for table xxx.xxx yang skemanya tidak dikenal oleh konektor ini at io.debezium.connector.mysql.BinlogReader.informAboutUnknownTableIfRequired(BinlogReader.java:875) ~[ververica-connector-mysql-1.12-vvr-3.0.0-SNAPSHOT-jar-with-dependencies.jar:1.12-vvr-3.0.0-SNAPSHOT] at io.debezium.connector.mysql.BinlogReader.handleUpdateTableMetadata(BinlogReader.java:849) ~[ververica-connector-mysql-1.12-vvr-3.0.0-SNAPSHOT-jar-with-dependencies.jar:1.12-vvr-3.0.0-SNAPSHOT] at io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:590) ~[ververica-connector-mysql-1.12-vvr-3.0.0-SNAPSHOT-jar-with-dependencies.jar:1.12-vvr-3.0.0-SNAPSHOT] ... 5 morePenyebab
Anda tidak memiliki izin yang diperlukan pada database tertentu yang digunakan dalam penyebaran.
Konfigurasi 'debezium.snapshot.mode'='never' digunakan dalam penyebaran. Jika parameter debezium.snapshot.mode diatur ke never, data dibaca dari awal logbiner. Namun, skema tabel yang sesuai dengan acara perubahan di awal log biner tidak cocok dengan skema tabel saat ini. Akibatnya, kesalahan ini terjadi.
Perubahan yang tidak dapat diinterpretasikan oleh Debezium ada. Sebagai contoh, jika `DEFAULT (now())` ada, kesalahan ini mungkin terjadi.
Solusi
Periksa apakah Anda memiliki izin yang diperlukan pada semua database yang digunakan dalam penyebaran. Untuk informasi lebih lanjut, lihat Konfigurasikan database MySQL.
Kami merekomendasikan agar Anda tidak menggunakan konfigurasi 'debezium.snapshot.mode'='never'. Untuk menghindari kesalahan ini, Anda dapat menggunakan konfigurasi 'debezium.inconsistent.schema.handling.mode' = 'warn'.
Tanyakan log WARN io.debezium.connector.mysql.MySqlSchema untuk memeriksa perubahan yang tidak dapat diinterpretasikan oleh Debezium. Sebagai contoh, `DEFAULT (now())` tidak dapat diinterpretasikan.
Apa yang harus saya lakukan jika pesan kesalahan "The connector is trying to read binlog starting at GTIDs ..., but this is no longer available on the server" muncul?
Deskripsi masalah
org.apache.kafka.connect.errors.ConnectException: Konektor mencoba membaca log biner mulai dari GTID xxx dan file log biner 'binlog.000064', pos=89887992, melewati 4 acara plus 1 baris, tetapi ini sudah tidak tersedia di server. Konfigurasikan ulang konektor untuk menggunakan snapshot jika diperlukan at io.debezium.connector.mysql.MySqlConnectorTask.start(MySqlConnectorTask.java:133) at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:106) at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:758) at io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:171) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622) at java.lang.Thread.run(Thread.java:834)Penyebab dan solusi
Penyebab
Solusi
File log biner yang sedang dibaca oleh penyebaran dihapus dari server MySQL. Hal ini karena periode retensi log biner di server MySQL terlalu pendek.
Tingkatkan periode retensi log biner. Sebagai contoh, Anda dapat mengubah periode retensi log biner menjadi tujuh hari. Untuk mengubah periode retensi log biner, jalankan perintah berikut:
show variables like 'expire_logs_days'; set global expire_logs_days=7;Penyebaran CDC MySQL mengonsumsi log biner dengan kecepatan rendah. Sebagai contoh, operator agregat hilir atau operator sink memiliki tekanan balik untuk waktu yang lama dan tekanan balik tersebut ditransfer ke sumber. Akibatnya, sumber tidak dapat mengonsumsi data.
Optimalkan konfigurasi sumber daya penyebaran untuk memungkinkan sumber mengonsumsi data sesuai harapan.
Log ApsaraDB untuk RDS MySQL dapat disimpan hingga maksimal 18 jam dan menempati hingga 30% ruang penyimpanan. Jika periode retensi log melebihi 18 jam atau ruang penyimpanan yang ditempati oleh log melebihi 30%, operasi penghapusan log dipicu. Jika lebih dari 30% ruang penyimpanan ditempati karena banyaknya data yang ditulis, log biner mungkin dihapus dan tidak dapat digunakan.
Modifikasi kebijakan kedaluwarsa log biner ApsaraDB untuk RDS MySQL untuk memastikan log biner dapat dibaca sesuai harapan.
Jika Anda menggunakan instans baca-saja ApsaraDB untuk RDS MySQL untuk mengonsumsi data CDC, ketersediaan log biner tidak dijamin. Log biner yang dikonsumsi oleh instans baca-saja ApsaraDB untuk RDS MySQL dapat disimpan minimal 10 detik di mesin lokal dan kemudian diunggah ke Object Storage Service (OSS). Jika konektor untuk tabel sumber CDC MySQL membaca data dari instans baca-saja ApsaraDB untuk RDS MySQL berdasarkan konfigurasi penyebaran dan penyebaran tidak dapat dilanjutkan dalam 10 detik setelah failover, kesalahan ini terjadi.
Kami merekomendasikan agar Anda mengonfigurasi parameter untuk melarang konektor untuk tabel sumber CDC MySQL membaca data dari instans baca-saja ApsaraDB untuk RDS MySQL.
CatatanNama host instans baca-saja dimulai dengan rr, dan nama host instans biasa dimulai dengan rm.
Migrasi data internal dilakukan pada instans ApsaraDB untuk RDS MySQL.
Mulai ulang penyebaran untuk membaca ulang data.
Apa yang harus saya lakukan jika pesan kesalahan "EventDataDeserializationException: Failed to deserialize data of EventHeaderV4" muncul?
Deskripsi masalah
EventDataDeserializationException: Gagal mendeserialisasi data dari EventHeaderV4 .... Disebabkan oleh: java.net.SocketException: Koneksi direset at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.deserializeEventData(EventDeserializer.java:304) at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.nextEvent(EventDeserializer.java:227) at io.debezium.connector.mysql.BinlogReader$1.nextEvent(BinlogReader.java:252) at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:934) ... 3 more Disebabkan oleh: java.io.EOFException at com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.read(ByteArrayInputStream.java:192) at java.io.InputStream.read(InputStream.java:170) at java.io.InputStream.skip(InputStream.java:224) at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.deserializeEventData(EventDeserializer.java:301) ... 6 morePenyebab
Masalah jaringan terjadi.
Penyebaran memiliki tekanan balik.
Jika penyebaran CDC MySQL memiliki tekanan balik, klien Binlog yang digunakan dalam tabel sumber CDC MySQL tidak dapat melanjutkan membaca data karena tekanan balik. Untuk meminimalkan jumlah koneksi yang tersisa di database, database MySQL secara otomatis membatalkan koneksi ketika koneksi klien Binlog tidak aktif selama periode waktu yang melebihi batas waktu yang ditentukan di database. Akibatnya, penyebaran menjadi abnormal.
Parameter net_write_timeout diatur ke nilai yang terlalu kecil. Nilai default dari parameter net_write_timeout adalah 60, dalam detik. Jika nilai parameter ini terlalu kecil, database MySQL secara otomatis memutuskan koneksi klien.
Solusi
Jika kesalahan ini disebabkan oleh masalah jaringan, tambahkan konfigurasi
'debezium.connect.keep.alive.interval.ms' = '40000'untuk tabel sumber MySQL CDC. Jika konfigurasi database dapat dimodifikasi, Anda juga dapat menyetel parameter net_write_timeout ke nilai yang lebih besar. Untuk informasi lebih lanjut, lihat Optimalkan parameter instans.Jika kesalahan ini disebabkan oleh tekanan balik pada penyebaran, modifikasi konfigurasi parameter penyebaran.
Realtime Compute for Apache Flink yang menggunakan VVR 8.0.7 atau lebih baru mendukung upaya ulang untuk kesalahan yang disebabkan oleh tekanan balik. Anda dapat menjalankan penyebaran yang memiliki tekanan balik di Realtime Compute for Apache Flink yang menggunakan VVR 8.0.7 atau lebih baru.
Apa yang harus saya lakukan jika pesan kesalahan "The slave is connecting using CHANGE MASTER TO MASTER_AUTO_POSITION = 1" muncul?
Deskripsi masalah
org.apache.kafka.connect.errors.ConnectException: Slave terhubung menggunakan CHANGE MASTER TO MASTER_AUTO_POSITION = 1, tetapi master telah membersihkan log biner yang berisi GTID yang diperlukan oleh slave. Kode kesalahan: 1236; SQLSTATE: HY000. at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42) ~[?:?] at io.debezium.connector.mysql.MySqlStreamingChangeEventSource$ReaderThreadLifecycleListener.onCommunicationFailure(MySqlStreamingChangeEventSource.java:1185) ~[?:?] at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:962) at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:595) at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:850) ... 1 more Disebabkan oleh: com.github.shyiko.mysql.binlog.network.ServerException: Slave terhubung menggunakan CHANGE MASTER TO MASTER_AUTO_POSITION = 1, tetapi master telah membersihkan log biner yang berisi GTID yang diperlukan oleh slave. at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:926) ~[?:?] ... 3 morePenyebab
Waktu yang diperlukan untuk membaca data penuh terlalu lama. Akibatnya, ketika pembacaan data penuh selesai dan pembacaan log biner dimulai, posisi startup awal yang direkam dalam set GTID telah dihapus dari database MySQL.
Solusi
Tingkatkan waktu yang diperlukan untuk menghapus log biner atau tingkatkan ukuran maksimum file log biner yang diizinkan. Untuk mengubah waktu yang diperlukan untuk menghapus log biner, jalankan perintah berikut:
mysql> show variables like 'expire_logs_days'; mysql> set global expire_logs_days=7;
Apa yang harus saya lakukan jika pesan kesalahan "The "before" field of UPDATE/DELETE message is null" muncul?
Deskripsi masalah
java.lang.IllegalStateException: Bidang "before" dari pesan UPDATE/DELETE adalah null, silakan periksa apakah tabel Postgres telah diatur REPLICA IDENTITY ke level FULL. Anda dapat memperbarui pengaturan dengan menjalankan perintah di Postgres 'ALTER TABLE xxx.xxx REPLICA IDENTITY FULL'. Silakan lihat lebih lanjut di dokumentasi Debezium:https:debezium.io/documentation/reference/1.2/connectors/postresql.html#postgresql-replica-identity at com.alibaba.ververica.cdc.connectors.postgres.table.PostgresValueValidator.validate(PostgresValueValidator.java:46) at com.alibaba.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema.deserialize(RowDataDebeziumDeserializeSchema.java:113) at com.alibaba.ververica.cdc.debezium.internal.DebeziumChangeConsumer.handleBatch(DebeziumChangeConsumer.java:158) at io.debezium.embedded.ConvertingEngineBuilder.lambdaşnotifying$2(ConvertingEngineBuilder.java:82) at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:812) at io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:171) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147) at java.util.concurrent.ThreadPoolExecutorSWorker.run(ThreadPoolExecutor.java:622) at java.lang.Thread.run(Thread.java:834)Penyebab
REPLICA IDENTITY tabel PostgreSQL tidak diatur ke FULL.
Solusi
Jalankan pernyataan
ALTER TABLE yourTableName REPLICA IDENTITY FULL;seperti yang diinstruksikan. Jika kesalahan tetap muncul setelah Anda menjalankan pernyataan dan memulai ulang penyebaran, tambahkan konfigurasiALTER TABLE yourTableName REPLICA IDENTITY FULL;ke kode penyebaran.
Apa yang harus saya lakukan jika pesan kesalahan "Caused by: java.lang.IllegalArgumentException: Can't find any matched tables, please check your configured database-name: xxx and table-name: xxxx" muncul?
Penyebab
Nama tabel yang dikonfigurasi tidak dapat ditemukan di database.
Penyebaran berisi tabel dari database yang berbeda. Namun, akun yang Anda gunakan tidak memiliki izin pada database tertentu.
Solusi
Periksa apakah nama tabel yang dikonfigurasi ada di database.
Berikan izin yang diperlukan pada semua database dalam penyebaran kepada akun yang Anda gunakan.
Apa yang harus saya lakukan jika pesan kesalahan "The primary key is necessary when enable 'Key: 'scan.incremental.snapshot.enabled'" muncul?
Deskripsi masalah
Ketika pemeriksaan sintaks dilakukan pada tabel sumber CDC MySQL dalam VVR 4.0.X, pesan kesalahan berikut muncul:
Disebabkan oleh: org.apache.flink.table.api.ValidationException: Primary key diperlukan ketika mengaktifkan 'Key: 'scan.incremental.snapshot.enabled' , default: true (fallback keys: [])' ke 'true' at com.alibaba.ververica.cdc.connectors.mysql.table.MySqlTableSourceFactory.validatePrimaryKeyIfEnableParallel(MySqlTableSourceFactory.java:186) at com.alibaba.ververica.cdc.connectors.mysql.table.MySqlTableSourceFactory.createDynamicTableSource(MySqlTableSourceFactory.java:85) at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:134) ... 30 morePenyebab
Tidak ada kunci utama yang didefinisikan dalam parameter dalam klausa WITH dari pernyataan DDL yang digunakan untuk membuat tabel sumber CDC MySQL.
Solusi
Tambahkan informasi kunci utama ke pernyataan DDL.
Apa yang harus saya lakukan jika pesan kesalahan "java.io.EOFException: SSL peer shut down incorrectly" muncul?
Deskripsi masalah
Disebabkan oleh: java.io.EOFException: SSL peer shut down incorrectly at sun.security.ssl.SSLSocketInputRecord.decodeInputRecord(SSLSocketInputRecord.java:239) ~[?:1.8.0_302] at sun.security.ssl.SSLSocketInputRecord.decode(SSLSocketInputRecord.java:190) ~[?:1.8.0_302] at sun.security.ssl.SSLTransport.decode(SSLTransport.java:109) ~[?:1.8.0_302] at sun.security.ssl.SSLSocketImpl.decode(SSLSocketImpl.java:1392) ~[?:1.8.0_302] at sun.security.ssl.SSLSocketImpl.readHandshakeRecord(SSLSocketImpl.java:1300) ~[?:1.8.0_302] at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:435) ~[?:1.8.0_302] at com.mysql.cj.protocol.ExportControlled.performTlsHandshake(ExportControlled.java:347) ~[?:?] at com.mysql.cj.protocol.StandardSocketFactory.performTlsHandshake(StandardSocketFactory.java:194) ~[?:?] at com.mysql.cj.protocol.a.NativeSocketConnection.performTlsHandshake(NativeSocketConnection.java:101) ~[?:?] at com.mysql.cj.protocol.a.NativeProtocol.negotiateSSLConnection(NativeProtocol.java:308) ~[?:?] at com.mysql.cj.protocol.a.NativeAuthenticationProvider.connect(NativeAuthenticationProvider.java:204) ~[?:?] at com.mysql.cj.protocol.a.NativeProtocol.connect(NativeProtocol.java:1369) ~[?:?] at com.mysql.cj.NativeSession.connect(NativeSession.java:133) ~[?:?] at com.mysql.cj.jdbc.ConnectionImpl.connectOneTryOnly(ConnectionImpl.java:949) ~[?:?] at com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:819) ~[?:?] at com.mysql.cj.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:449) ~[?:?] at com.mysql.cj.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:242) ~[?:?] at com.mysql.cj.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:198) ~[?:?] at org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider.getOrEstablishConnection(SimpleJdbcConnectionProvider.java:128) ~[?:?] at org.apache.flink.connector.jdbc.internal.AbstractJdbcOutputFormat.open(AbstractJdbcOutputFormat.java:54) ~[?:?] ... 14 morePenyebab
Dalam MySQL 8.0.27, database MySQL terhubung melalui SSL secara default. Namun, database MySQL yang terhubung menggunakan driver JDBC tidak dapat diakses melalui SSL. Akibatnya, kesalahan ini terjadi.
Solusi
Jika Anda dapat melakukan peningkatan versi VVR dari penerapan Anda ke 6.0.2 atau yang lebih baru, tambahkan konfigurasi
'jdbc.properties.useSSL'='false'ke dalam klausa WITH tabel MySQL CDC.Jika tabel dideklarasikan hanya sebagai tabel dimensi, atur parameter konektor ke rds di dalam klausa WITH tabel MySQL CDC dan tambahkan konfigurasi
characterEncoding=utf-8&useSSL=falseke dalam parameter URL. Contoh:'url'='jdbc:mysql://***.***.***.***:3306/test?characterEncoding=utf-8&useSSL=false'
Apa yang harus saya lakukan jika pesan kesalahan "com.github.shyiko.mysql.binlog.network.ServerException: A slave with the same server_uuid/server_id as this slave has connected to the master" muncul?
Deskripsi masalah
Disebabkan oleh: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException: Pengecualian terjadi dalam produsen acara perubahan. Konektor ini akan dihentikan. at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42) ~[?:?] at io.debezium.connector.mysql.MySqlStreamingChangeEventSource$ReaderThreadLifecycleListener.onCommunicationFailure(MySqlStreamingChangeEventSource.java:1185) ~[?:?] at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:973) ~[?:?] at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:606) ~[?:?] at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:850) ~[?:?] ... 1 more Disebabkan oleh: io.debezium.DebeziumException: Slave dengan server_uuid/server_id yang sama dengan slave ini telah terhubung ke master; acara pertama '' pada 4, acara terakhir dibaca dari '/home/mysql/dataxxx/mysql/mysql-bin.xxx' pada xxx, byte terakhir dibaca dari '/home/mysql/dataxxx/mysql/mysql-bin.xxx' pada xxx. Kode kesalahan: 1236; SQLSTATE: HY000. at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.wrap(MySqlStreamingChangeEventSource.java:1146) ~[?:?] at io.debezium.connector.mysql.MySqlStreamingChangeEventSource$ReaderThreadLifecycleListener.onCommunicationFailure(MySqlStreamingChangeEventSource.java:1185) ~[?:?] at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:973) ~[?:?] at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:606) ~[?:?] at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:850) ~[?:?] ... 1 more Disebabkan oleh: com.github.shyiko.mysql.binlog.network.ServerException: Slave dengan server_uuid/server_id yang sama dengan slave ini telah terhubung ke master; acara pertama '' pada 4, acara terakhir dibaca dari '/home/mysql/dataxxx/mysql/mysql-bin.xxx' pada xxx, byte terakhir dibaca dari '/home/mysql/dataxxx/mysql/mysql-bin.xxx' pada xxx. at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:937) ~[?:?] at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:606) ~[?:?] at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:850) ~[?:?] ... 1 morePenyebab
Saat konektor untuk tabel sumber CDC MySQL membaca data, pastikan ID server dikonfigurasi untuk setiap subtugas paralel dan setiap ID server unik. Jika nilai parameter server-id dalam data yang dibaca oleh konektor untuk tabel sumber CDC MySQL bertentangan dengan nilai parameter server-id untuk tabel sumber CDC dari penyebaran yang sama, penyebaran lainnya, atau alat sinkronisasi, kesalahan ini terjadi.
Solusi
Tentukan ID server unik secara global untuk setiap subtugas paralel tabel sumber CDC MySQL. Untuk informasi lebih lanjut, lihat bagian "Perhatian" dalam topik Buat tabel sumber CDC MySQL.
Apa yang harus saya lakukan jika pesan kesalahan "NullPointerException" muncul setelah saya menambahkan kolom ke tabel MySQL selama pembacaan data penuh tabel?
Deskripsi masalah
Disebabkan oleh: org.apache.flink.util.FlinkRuntimeException: Baca split MySqlSnapshotSplit{tableId=iplus.retail_detail, splitId='iplus.retail_detail:68', splitKeyType=[`id` BIGINT NOT NULL], splitStart=[212974500236****], splitEnd=[213118153601****], highWatermark=null} error karena java.lang.NullPointerException. at com.ververica.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader.checkReadException(SnapshotSplitReader.java:361) at com.ververica.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader.pollSplitRecords(SnapshotSplitReader.java:293) at com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader.pollSplitRecords(MySqlSplitReader.java:124) at com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader.fetch(MySqlSplitReader.java:86) at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142) ... 6 morePenyebab
Selama pembacaan data penuh sebuah tabel dalam penyebaran, skema tabel ditentukan ketika penyebaran dimulai, dan skema dicatat dalam titik pemeriksaan. Jika kolom ditambahkan ke tabel selama pembacaan data penuh, skema tidak dapat cocok. Akibatnya, kesalahan dikembalikan.
Solusi
Batalkan penyebaran dan hapus tabel hilir tempat data disinkronkan. Kemudian, mulai ulang penyebaran tanpa status.
Apa yang harus saya lakukan jika muncul pesan kesalahan "The connector is trying to read binlog starting at GTIDs xxx"?
Deskripsi masalah
File log biner yang coba dibaca oleh penyebaran CDC telah dihapus di server MySQL.
Konektor mencoba membaca log biner mulai dari GTID xxx dan file log biner 'binlog.000064', pos=89887992, melewati 4 acara plus 1 baris, tetapi ini sudah tidak tersedia di server. Konfigurasikan ulang konektor untuk menggunakan snapshot jika diperlukanPenyebab
Periode retensi yang dikonfigurasikan untuk file log biner di server MySQL terlalu pendek. Akibatnya, file dihapus secara otomatis.
Penyebaran CDC memproses data log biner dengan kecepatan terlalu rendah.
Solusi
Tingkatkan periode retensi file log biner. Sebagai contoh, Anda dapat mengatur periode retensi menjadi tujuh hari.
mysql> show variables like 'expire_logs_days'; mysql> set global expire_logs_days=7;Alokasikan lebih banyak sumber daya ke penyebaran Flink untuk membantu mempercepat pemrosesan data log biner.
Apa yang harus saya lakukan jika muncul pesan kesalahan "Mysql8.0 Public Key Retrieval is not allowed"?
Penyebab
Pengguna MySQL yang dikonfigurasi menggunakan autentikasi kata sandi SHA256, dan kata sandi harus ditransmisikan melalui Transport Layer Security (TLS).
Solusi
Izinkan pengguna MySQL mengakses database menggunakan kata sandi asli. Untuk mengubah metode autentikasi, jalankan perintah berikut:
mysql> ALTER USER 'username'@'localhost' IDENTIFIED WITH mysql_native_password BY 'password'; mysql> FLUSH PRIVILEGES;
Apa yang harus saya lakukan jika muncul pesan kesalahan "sub account not auth permission"?
Penyebab
Ketika Anda menggunakan konektor ApsaraDB RDS untuk MySQL sebagai tabel sumber, pengguna RAM tidak memiliki izin untuk mengunduh file log biner dari OSS.
Solusi
Berikan izin kepada pengguna RAM. Untuk informasi lebih lanjut, lihat Otorisasi pengguna RAM dengan izin baca saja untuk mengunduh file cadangan.
Apa yang harus saya lakukan jika pesan kesalahan "DELETE command denied to user 'userName'@'*.*.*.*' for table 'table_name'" muncul?
Deskripsi masalah
Disebabkan oleh:java.sql.SQLSyntaxErrorException: Perintah DELETE ditolak untuk pengguna 'userName'@'*.*.*.*' untuk tabel 'table_name' at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:120) ...Penyebab
Jika klausa WHERE ditambahkan ke pernyataan SQL yang digunakan untuk memproses aliran data CDC MySQL, Realtime Compute for Apache Flink mengirimkan catatan data BEFORE UPDATE dan AFTER UPDATE untuk data yang dihasilkan ketika operasi UPDATE dilakukan ke hilir. Hilir mengidentifikasi catatan data BEFORE UPDATE sebagai operasi DELETE. Dalam hal ini, pengguna yang ingin melakukan operasi pada tabel hasil CDC MySQL harus memiliki izin DELETE.
Solusi
Periksa apakah ada operasi retract dalam logika SQL. Jika ada operasi retract, berikan izin DELETE kepada pengguna yang ingin melakukan operasi pada tabel hasil CDC MySQL.