Topik ini menjelaskan cara menggunakan Konektor OceanBase.
Informasi latar belakang
OceanBase adalah sistem manajemen database pemrosesan transaksional dan analitik hibrida (HTAP) terdistribusi yang bersifat native. Untuk informasi lebih lanjut, lihat situs web resmi OceanBase. Guna mengurangi biaya modifikasi sistem bisnis selama migrasi dari database MySQL atau Oracle, OceanBase mendukung mode kompatibilitas Oracle dan MySQL. Dalam mode tersebut, tipe data, fitur SQL, dan tampilan internal kompatibel dengan database MySQL atau Oracle. Konektor yang direkomendasikan untuk masing-masing mode adalah sebagai berikut:
Mode Oracle: Gunakan hanya konektor OceanBase.
Mode MySQL: Mode ini sangat kompatibel dengan sintaks MySQL native. Anda dapat menggunakan konektor OceanBase dan MySQL untuk membaca dan menulis data di OceanBase.
PentingKonektor OceanBase sedang dalam pratinjau publik. Untuk OceanBase 3.2.4.4 dan versi setelahnya, Anda dapat menggunakan konektor MySQL untuk membaca dan menulis data di OceanBase. Fitur ini juga dalam pratinjau publik. Evaluasi fitur ini secara menyeluruh dan gunakan dengan hati-hati.
Saat menggunakan konektor MySQL untuk membaca data inkremental dari OceanBase, pastikan binary logging (Binlog) OceanBase telah diaktifkan dan dikonfigurasi dengan benar. Untuk informasi lebih lanjut tentang Binlog OceanBase, lihat Ikhtisar atau Operasi terkait Binlog.
Konektor OceanBase mendukung hal-hal berikut.
Kategori | Rincian |
Tipe yang didukung | Tabel Sumber, Dimensi, dan Sink |
Mode runtime | Mode streaming dan mode batch |
Format data | Tidak berlaku |
Metrik pemantauan spesifik | Tidak ada |
Tipe API | SQL |
Mendukung pembaruan atau penghapusan data di tabel sink | Ya |
Prasyarat
Database dan tabel yang ingin Anda hubungkan telah dibuat.
Daftar putih alamat IP telah dikonfigurasi. Untuk informasi selengkapnya, lihat Konfigurasi grup daftar putih.
Untuk mengumpulkan data change data capture (CDC) inkremental dari OceanBase, Anda juga harus mengaktifkan layanan Binlog OceanBase. Untuk informasi lebih lanjut, lihat Operasi terkait Binlog.
Untuk menggunakan impor bypass pada tabel sink, Anda harus terlebih dahulu mengaktifkan port impor bypass. Untuk informasi lebih lanjut, lihat dokumen impor bypass.
Batasan
Konektor OceanBase didukung di Ververica Runtime (VVR) 8.0.1 dan versi setelahnya.
Jaminan semantik
Tabel sumber CDC mendukung semantik tepat-sekali (exactly-once). Ini memastikan bahwa data tidak hilang atau diduplikasi saat Anda membaca seluruh data historis lalu beralih ke pembacaan data Binlog. Bahkan jika terjadi kesalahan, semantik ini menjamin kebenaran pemrosesan data.
Tabel sink mendukung semantik paling sedikit sekali (at-least-once). Jika tabel sink memiliki kunci primer, idempotensi menjamin kebenaran data.
Sintaksis
CREATE TABLE oceanbase_source (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
'connector' = 'oceanbase',
'url' = '<yourJdbcUrl>',
'tableName' = '<yourTableName>',
'userName' = '<yourUserName>',
'password' = '<yourPassword>'
);Saat menulis ke tabel sink, konektor membuat dan mengeksekusi pernyataan SQL untuk setiap catatan data yang diterima. Jenis pernyataan SQL yang dibuat bergantung pada kondisi berikut:
Jika tabel sink tidak memiliki kunci primer, pernyataan INSERT INTO dibuat.
Jika tabel sink memiliki kunci primer, pernyataan UPSERT dibuat berdasarkan mode kompatibilitas database.
Parameter WITH
Umum
Parameter
Deskripsi
Wajib
Tipe data
Nilai default
Catatan
connector
Tipe tabel.
Ya
STRING
Tidak ada
Bidang statis diatur ke
oceanbase.password
Kata sandi.
Ya
STRING
Tidak ada
Tidak ada.
Hanya berlaku untuk tabel sumber.
PentingMulai dari Realtime Compute for Apache Flink VVR 11.4.0, arsitektur dan fitur konektor CDC OceanBase telah ditingkatkan. Perubahan utama berikut dapat membantu Anda memahami pembaruan dan melakukan migrasi versi dengan lancar:
Konektor CDC asli yang berbasis layanan OceanBase LogProxy telah ditinggalkan dan dihapus dari distribusi. Mulai dari VVR 11.4.0, konektor CDC OceanBase hanya mendukung pengambilan log inkremental dan sinkronisasi data melalui layanan Binlog OceanBase.
Konektor CDC OceanBase menyediakan kompatibilitas protokol dan stabilitas koneksi yang lebih baik dengan layanan Binlog OceanBase. Kami merekomendasikan agar Anda memprioritaskan penggunaan konektor CDC OceanBase.
Layanan Binlog OceanBase sepenuhnya kompatibel dengan protokol replikasi MySQL pada lapisan protokol. Anda juga dapat menghubungkan konektor MySQL CDC standar ke layanan Binlog OceanBase untuk pelacakan perubahan, tetapi hal ini tidak disarankan.
Mulai dari Realtime Compute for Apache Flink VVR 11.4.0, konektor CDC OceanBase tidak lagi mendukung pelacakan perubahan inkremental dalam mode kompatibilitas Oracle. Untuk pelacakan perubahan inkremental dalam mode kompatibilitas Oracle, Anda dapat menghubungi Dukungan Teknis Enterprise OceanBase.
Parameter
Deskripsi
Wajib
Tipe data
Nilai default
Keterangan
hostname
Alamat IP atau hostname database OceanBase.
Ya
STRING
Tidak
Kami menyarankan agar Anda menentukan alamat virtual private cloud (VPC).
CatatanJika OceanBase dan Realtime Compute for Apache Flink tidak berada dalam VPC yang sama, Anda harus membuat koneksi jaringan cross-VPC atau menggunakan titik akhir publik untuk akses. Untuk informasi lebih lanjut, lihat Manajemen dan operasi workspace dan Bagaimana kluster Flink yang sepenuhnya dikelola mengakses Internet?.
username
Username untuk layanan database OceanBase.
Ya
STRING
Tidak
Tidak ada.
database-name
Nama database OceanBase.
Ya
STRING
Tidak ada
Saat digunakan sebagai tabel sumber, nama database mendukung ekspresi reguler untuk membaca data dari beberapa database.
Saat menggunakan ekspresi reguler, hindari penggunaan karakter ^ dan $ untuk mencocokkan awal dan akhir. Untuk informasi lebih lanjut, lihat keterangan untuk parameter table-name.
table-name
Nama tabel OceanBase.
Ya
STRING
Tidak ada
Saat digunakan sebagai tabel sumber, nama tabel mendukung ekspresi reguler untuk membaca data dari beberapa tabel.
Saat menggunakan ekspresi reguler, hindari penggunaan karakter ^ dan $ untuk mencocokkan awal dan akhir. Untuk informasi lebih lanjut, lihat catatan berikut.
CatatanSaat mencocokkan nama tabel, konektor tabel sumber OceanBase menggabungkan database-name dan table-name yang Anda tentukan dengan string \\. (karakter . digunakan untuk versi VVR sebelum 8.0.1) untuk membentuk ekspresi reguler jalur lengkap. Ekspresi reguler ini kemudian digunakan untuk mencocokkan nama tabel lengkap di database OceanBase.
Sebagai contoh, jika Anda mengatur 'database-name' ke 'db_.*' dan 'table-name' ke 'tb_.+', konektor menggunakan ekspresi reguler db_.*\\.tb_.+ (atau db_.*.tb_.+ untuk versi sebelum 8.0.1) untuk mencocokkan nama tabel lengkap dan menentukan tabel mana yang akan dibaca.
port
Nomor port layanan database OceanBase.
Tidak
INTEGER
3306
Tidak ada.
server-id
ID numerik untuk client database.
Tidak
STRING
Nilai acak antara 5400 dan 6400 dihasilkan.
ID ini harus unik secara global. Kami merekomendasikan agar Anda menetapkan ID berbeda untuk setiap pekerjaan yang terhubung ke database yang sama.
Parameter ini juga mendukung rentang ID, seperti 5400-5408. Saat pembacaan inkremental diaktifkan, kami merekomendasikan agar Anda menentukan rentang ID untuk memungkinkan setiap pembaca konkuren menggunakan ID berbeda. Untuk informasi lebih lanjut, lihat Penggunaan Server ID.
scan.incremental.snapshot.chunk.size
Ukuran setiap chunk dalam jumlah baris.
Tidak
INTEGER
8096
Saat pembacaan snapshot inkremental diaktifkan, tabel dibagi menjadi beberapa chunk untuk dibaca. Sebelum data dalam sebuah chunk sepenuhnya dibaca, data tersebut disimpan sementara di memori.
Jumlah baris per chunk yang lebih kecil menghasilkan jumlah total chunk yang lebih besar untuk tabel tersebut. Hal ini memberikan pemulihan kesalahan yang lebih detail tetapi dapat menyebabkan error kehabisan memori (OOM) dan throughput keseluruhan yang lebih rendah. Oleh karena itu, Anda harus menyeimbangkan dan menetapkan ukuran chunk yang wajar.
scan.snapshot.fetch.size
Jumlah maksimum catatan yang ditarik sekaligus saat membaca data lengkap suatu tabel.
Tidak
INTEGER
1024
Tidak ada.
scan.startup.mode
Mode startup untuk konsumsi data.
Tidak
STRING
initial
Nilai yang valid:
initial (default): Memindai semua data historis lalu membaca data Binlog terbaru saat startup pertama kali.
latest-offset: Tidak memindai data historis saat startup pertama kali. Mulai membaca dari akhir Binlog, artinya hanya membaca perubahan terbaru setelah konektor dimulai.
earliest-offset: Tidak memindai data historis. Mulai membaca dari Binlog paling awal yang tersedia.
specific-offset: Tidak memindai data historis. Mulai dari offset Binlog tertentu yang Anda tentukan. Anda dapat menentukan offset dengan mengatur kedua parameter scan.startup.specific-offset.file dan scan.startup.specific-offset.pos, atau hanya mengatur parameter scan.startup.specific-offset.gtid-set untuk memulai dari set GTID tertentu.
timestamp: Tidak memindai data historis. Mulai membaca Binlog dari timestamp tertentu. Timestamp ditentukan oleh parameter scan.startup.timestamp-millis dalam milidetik.
PentingJika Anda menggunakan mode startup earliest-offset, specific-offset, atau timestamp, pastikan skema tabel yang sesuai tidak berubah antara posisi konsumsi Binlog yang ditentukan dan waktu startup pekerjaan. Hal ini mencegah error akibat perbedaan skema.
scan.startup.specific-offset.file
Nama file Binlog untuk offset awal saat menggunakan mode startup offset tertentu.
Tidak
STRING
Tidak ada
Untuk menggunakan parameter ini, Anda harus mengatur scan.startup.mode ke specific-offset. Contoh format nama file:
mysql-bin.000003.scan.startup.specific-offset.pos
Offset dalam file Binlog yang ditentukan untuk offset awal saat menggunakan mode startup offset tertentu.
Tidak
INTEGER
Tidak ada
Untuk menggunakan parameter ini, Anda harus mengatur scan.startup.mode ke specific-offset.
scan.startup.specific-offset.gtid-set
Set GTID untuk offset awal saat menggunakan mode startup offset tertentu.
Tidak
STRING
Tidak ada
Untuk menggunakan parameter ini, Anda harus mengatur scan.startup.mode ke specific-offset. Contoh format set GTID:
24DA167-0C0C-11E8-8442-00059A3C7B00:1-19.scan.startup.timestamp-millis
Timestamp dalam milidetik untuk offset awal saat menggunakan mode startup timestamp.
Tidak
LONG
Tidak ada
Untuk menggunakan parameter ini, Anda harus mengatur scan.startup.mode ke timestamp. Satuannya adalah milidetik.
PentingSaat Anda menentukan waktu, CDC OceanBase mencoba membaca event awal setiap file Binlog untuk menentukan timestamp-nya dan menemukan file Binlog yang sesuai dengan waktu yang ditentukan. Pastikan file Binlog untuk timestamp yang ditentukan belum dihapus dari database dan dapat dibaca.
server-time-zone
Zona waktu sesi yang digunakan oleh database.
Tidak
STRING
Jika Anda tidak menentukan parameter ini, sistem menggunakan zona waktu lingkungan runtime pekerjaan Flink sebagai zona waktu server database. Ini adalah zona waktu dari zona yang Anda pilih.
Contoh: Asia/Shanghai. Parameter ini mengontrol bagaimana tipe TIMESTAMP dikonversi ke tipe STRING. Untuk informasi lebih lanjut, lihat Tipe temporal Debezium.
debezium.min.row.count.to.stream.results
Saat jumlah baris dalam tabel melebihi nilai ini, mode pembacaan batch digunakan.
Tidak
INTEGER
1000
Flink membaca data dari tabel sumber OceanBase dengan salah satu cara berikut:
Pembacaan penuh: Membaca seluruh data tabel langsung ke memori. Metode ini cepat tetapi mengonsumsi memori dalam jumlah yang sesuai. Jika tabel sumber sangat besar, hal ini dapat menyebabkan error OOM.
Pembacaan batch: Membaca data dalam beberapa batch, dengan jumlah baris tertentu per batch, hingga semua data terbaca. Metode ini menghindari error OOM saat membaca tabel besar tetapi relatif lebih lambat.
connect.timeout
Waktu maksimum menunggu sebelum mencoba kembali koneksi saat koneksi ke server database OceanBase mengalami timeout.
Tidak
DURATION
30s
Tidak ada.
connect.max-retries
Jumlah maksimum percobaan ulang setelah koneksi ke layanan database OceanBase gagal.
Tidak
INTEGER
3
Tidak ada.
connection.pool.size
Ukuran kolam koneksi database.
Tidak
INTEGER
20
Kolam koneksi database digunakan untuk menggunakan kembali koneksi, yang dapat mengurangi jumlah koneksi database.
jdbc.properties.*
Parameter koneksi kustom dalam URL JDBC.
Tidak
STRING
Tidak ada
Anda dapat meneruskan parameter koneksi kustom. Misalnya, untuk tidak menggunakan protokol SSL, Anda dapat mengonfigurasi 'jdbc.properties.useSSL' = 'false'.
Untuk informasi selengkapnya tentang parameter koneksi yang didukung, lihat MySQL Configuration Properties.
debezium.*
Parameter kustom untuk Debezium guna membaca data Binlog.
Tidak
STRING
Tidak ada
Anda dapat meneruskan parameter Debezium kustom. Misalnya, gunakan 'debezium.event.deserialization.failure.handling.mode'='ignore' untuk menentukan logika penanganan error parsing.
heartbeat.interval
Interval waktu di mana sumber menggunakan event heartbeat untuk memajukan offset Binlog.
Tidak
DURATION
30s
Event heartbeat digunakan untuk memajukan offset Binlog di sumber, yang berguna untuk tabel di OceanBase yang jarang diperbarui. Untuk tabel tersebut, offset Binlog tidak maju secara otomatis. Event heartbeat dapat mendorong offset Binlog maju, mencegahnya kedaluwarsa. Offset Binlog yang kedaluwarsa menyebabkan pekerjaan gagal dan tidak dapat dipulihkan, sehingga memerlukan restart tanpa status.
scan.incremental.snapshot.chunk.key-column
Menentukan kolom yang akan digunakan sebagai kunci chunk untuk membagi chunk selama fase snapshot.
Lihat kolom Keterangan.
STRING
Tidak ada
Parameter ini wajib untuk tabel tanpa kunci primer. Kolom yang dipilih harus bertipe non-null (NOT NULL).
Parameter ini opsional untuk tabel dengan kunci primer. Anda hanya dapat memilih satu kolom dari kunci primer.
scan.incremental.close-idle-reader.enabled
Menentukan apakah pembaca idle ditutup setelah snapshot selesai.
Tidak
BOOLEAN
false
Hanya didukung di Realtime Compute for Apache Flink VVR 8.0.1 dan versi setelahnya.
Agar konfigurasi ini berlaku, atur execution.checkpointing.checkpoints-after-tasks-finish.enabled ke true.
scan.read-changelog-as-append-only.enabled
Menentukan apakah aliran changelog diubah menjadi aliran append-only.
Tidak
BOOLEAN
false
Nilai yang valid:
true: Semua jenis pesan, termasuk INSERT, DELETE, UPDATE_BEFORE, dan UPDATE_AFTER, diubah menjadi pesan INSERT. Aktifkan ini hanya dalam skenario khusus, seperti saat Anda perlu menyimpan pesan hapus dari tabel leluhur.
false (default): Semua jenis pesan dikirim ke downstream apa adanya.
CatatanHanya didukung di Realtime Compute for Apache Flink VVR 8.0.8 dan versi setelahnya.
scan.only.deserialize.captured.tables.changelog.enabled
Selama fase inkremental, menentukan apakah hanya mendeserialisasi event perubahan untuk tabel yang ditentukan.
Tidak
BOOLEAN
Nilai default adalah false di versi VVR 8.x.
Nilai default adalah true di VVR 11.1 dan versi yang lebih baru.
Nilai yang valid:
true: Mendeserialisasi data perubahan hanya untuk tabel target, yang mempercepat pembacaan Binlog.
false (default): Mendeserialisasi data perubahan untuk semua tabel.
CatatanHanya didukung di Realtime Compute for Apache Flink VVR 8.0.7 dan versi setelahnya.
Saat digunakan di Realtime Compute for Apache Flink VVR 8.0.8 dan versi sebelumnya, nama parameter harus diubah menjadi debezium.scan.only.deserialize.captured.tables.changelog.enable.
scan.parse.online.schema.changes.enabled
Selama fase inkremental, menentukan apakah mencoba mengurai event DDL untuk perubahan tanpa lock ApsaraDB RDS.
Tidak
BOOLEAN
false
Nilai yang valid:
true: Mengurai event DDL untuk perubahan tanpa lock ApsaraDB RDS.
false (default): Tidak mengurai event DDL untuk perubahan tanpa lock ApsaraDB RDS.
Ini adalah fitur eksperimen. Kami merekomendasikan agar Anda mengambil snapshot pekerjaan Flink untuk pemulihan sebelum melakukan perubahan tanpa lock online.
CatatanHanya didukung di Realtime Compute for Apache Flink VVR 11.1 dan versi setelahnya.
scan.incremental.snapshot.backfill.skip
Menentukan apakah melewati backfill selama fase pembacaan snapshot.
Tidak
BOOLEAN
false
Nilai yang valid:
true: Melewati backfill selama fase pembacaan snapshot.
false (default): Tidak melewati backfill selama fase pembacaan snapshot.
Jika Anda melewati backfill, perubahan pada tabel selama fase snapshot dibaca dalam fase inkremental berikutnya, bukan digabungkan ke dalam snapshot.
PentingMelewati backfill dapat menyebabkan ketidakkonsistenan data karena perubahan yang terjadi selama fase snapshot mungkin diputar ulang. Hanya semantik at-least-once yang dijamin.
CatatanHanya didukung di Realtime Compute for Apache Flink VVR 11.1 dan versi setelahnya.
scan.incremental.snapshot.unbounded-chunk-first.enabled
Selama fase pembacaan snapshot, menentukan apakah mendistribusikan chunk tak terbatas terlebih dahulu.
Tidak
BOOLEAN
false
Nilai yang valid:
true: Mendistribusikan chunk tak terbatas terlebih dahulu selama fase pembacaan snapshot.
false (default): Tidak mendistribusikan chunk tak terbatas terlebih dahulu selama fase pembacaan snapshot.
Ini adalah fitur eksperimen. Mengaktifkannya dapat mengurangi risiko error OOM saat Pengelola Tugas menyinkronkan chunk terakhir selama fase snapshot. Kami merekomendasikan agar Anda menambahkan parameter ini sebelum pekerjaan dimulai untuk pertama kalinya.
CatatanHanya didukung di Realtime Compute for Apache Flink VVR 11.1 dan versi setelahnya.
Hanya untuk tabel dimensi
Parameter
Deskripsi
Wajib
Tipe data
Nilai default
Catatan
url
URL JDBC.
Ya
STRING
Tidak ada
URL harus berisi nama database MySQL atau nama layanan Oracle.
userName
Username.
Ya
STRING
Tidak ada
Tidak ada.
cache
Kebijakan cache.
Tidak
STRING
ALL
Tiga kebijakan cache berikut didukung:
ALL: Menyimpan cache semua data dari tabel dimensi. Sebelum pekerjaan dijalankan, sistem memuat semua data dari tabel dimensi ke dalam cache. Semua pencarian data tabel dimensi selanjutnya dilakukan melalui cache. Jika data tidak ditemukan di cache, kunci tersebut tidak ada. Cache penuh dimuat ulang setelah kedaluwarsa.
Kebijakan ini cocok untuk skenario di mana tabel remote kecil dan terdapat banyak kunci yang tidak ditemukan (kondisi ON tidak dapat diasosiasikan saat menggabungkan tabel sumber dan tabel dimensi).
LRU: Menyimpan cache sebagian data dari tabel dimensi. Untuk setiap catatan dari tabel sumber, sistem terlebih dahulu mencari data di cache. Jika tidak ditemukan, sistem mencari di tabel dimensi fisik. Jika Anda menggunakan kebijakan cache ini, Anda harus mengonfigurasi parameter cacheSize.
None: Tidak ada cache.
PentingJika Anda menggunakan kebijakan cache ALL, perhatikan ukuran memori node untuk mencegah error OOM.
Karena sistem memuat data tabel dimensi secara asinkron, jika Anda menggunakan kebijakan cache ALL, Anda harus menambah memori node penggabungan tabel dimensi. Ukuran penambahan memori harus dua kali ukuran data tabel remote.
cacheSize
Jumlah maksimum entri yang dicache.
Tidak
INTEGER
100000
Jika Anda memilih kebijakan cache LRU, Anda harus mengatur ukuran cache.
Jika Anda memilih kebijakan cache ALL, Anda tidak perlu mengatur ukuran cache.
cacheTTLMs
Periode timeout cache.
Tidak
LONG
Long.MAX_VALUE
Konfigurasi cacheTTLMs bergantung pada parameter cache:
Jika cache diatur ke None, Anda tidak perlu mengonfigurasi cacheTTLMs. Artinya cache tidak kedaluwarsa.
Jika cache diatur ke LRU, cacheTTLMs adalah periode timeout cache. Secara default, cache tidak kedaluwarsa.
Jika cache diatur ke ALL, cacheTTLMs adalah waktu reload cache. Secara default, cache tidak dimuat ulang.
maxRetryTimeout
Waktu retry maksimum.
Tidak
DURATION
60s
Tidak ada.
Tabel sink: hanya JDBC
Parameter
Deskripsi
Wajib
Tipe data
Nilai default
Catatan
userName
Username.
Ya
STRING
Tidak ada
Tidak ada.
compatibleMode
Mode kompatibilitas OceanBase.
Tidak
STRING
mysql
Nilai valid:
mysql
oracle
CatatanIni adalah parameter khusus OceanBase.
url
URL JDBC.
Ya
STRING
Tidak ada
URL harus berisi nama database MySQL atau nama layanan Oracle.
tableName
Nama tabel.
Ya
STRING
Tidak ada
Tidak ada.
sink.mode
Mode penulisan untuk tabel sink OceanBase.
Ya
STRING
jdbc
Mendukung
jdbcdandirect-load.maxRetryTimes
Jumlah maksimum percobaan ulang.
Tidak
INTEGER
3
Tidak ada.
poolInitialSize
Ukuran awal kumpulan koneksi basis data.
Tidak
INTEGER
1
Tidak ada.
poolMaxActive
Jumlah maksimum koneksi dalam kumpulan koneksi basis data.
Tidak
INTEGER
8
Tidak ada.
poolMaxWait
Waktu maksimum menunggu koneksi dari kolam koneksi database.
Tidak
INTEGER
2000
Satuannya adalah milidetik.
poolMinIdle
Jumlah minimum koneksi idle dalam kumpulan koneksi basis data.
Tidak
INTEGER
1
Tidak ada.
connectionProperties
Properti koneksi untuk JDBC.
Tidak
STRING
Tidak ada
Formatnya adalah "k1=v1;k2=v2;k3=v3".
ignoreDelete
Menentukan apakah mengabaikan operasi penghapusan data.
Tidak
Boolean
false
Tidak ada.
excludeUpdateColumns
Menentukan nama kolom yang akan dikecualikan. Kolom-kolom ini tidak diperbarui selama operasi update.
Tidak
STRING
Tidak ada
Jika Anda menentukan beberapa kolom, pisahkan dengan koma (,), misalnya,
excludeUpdateColumns=column1,column2.CatatanNilai ini selalu mencakup kolom kunci primer. Kolom yang berlaku adalah kolom yang Anda tentukan ditambah kolom kunci primer.
partitionKey
Kunci partisi.
Tidak
STRING
Tidak ada
Saat kunci partisi diatur, konektor terlebih dahulu mengelompokkan data berdasarkan kunci partisi, dan setiap kelompok ditulis ke database secara terpisah. Pengelompokan ini terjadi sebelum pemrosesan modRule.
modRule
Aturan pengelompokan.
Tidak
STRING
Tidak ada
Format aturan pengelompokan harus berupa "nama_kolom mod angka", seperti
user_id mod 8. Tipe kolom harus numerik.Saat aturan pengelompokan diatur, data terlebih dahulu dipartisi berdasarkan partitionKey. Di dalam setiap partisi, data kemudian dikelompokkan berdasarkan hasil perhitungan modRule.
bufferSize
Ukuran buffer data.
Tidak
INTEGER
1000
Tidak ada.
flushIntervalMs
Interval waktu untuk membersihkan cache. Jika data dalam cache tidak memenuhi kondisi output setelah menunggu waktu yang ditentukan, sistem secara otomatis mengeluarkan semua data dalam cache.
Tidak
LONG
1000
Tidak ada.
retryIntervalMs
Waktu retry maksimum.
Tidak
INTEGER
5000
Satuannya adalah milidetik.
Hanya untuk impor bypass ke tabel sink
Impor bypass untuk tabel sink tersedia di Ververica Runtime (VVR) 11.5 dan versi setelahnya. Untuk informasi lebih lanjut tentang impor bypass, lihat dokumen.
Hanya mendukung aliran data terbatas: Sumber data harus berupa aliran data terbatas. Aliran data tak terbatas tidak didukung. Anda dapat menggunakan mode Batch Flink untuk kinerja yang lebih baik.
Penulisan throughput tinggi: Metode ini cocok untuk skenario impor data batch besar.
Penguncian tabel selama impor: Impor bypass mengunci tabel target. Selama tabel dikunci, penulisan data perubahan dan perubahan DDL diblokir. Kueri data tidak terpengaruh.
Tidak untuk penulisan real-time: Untuk skenario penulisan real-time atau streaming, gunakan tabel sink Java Database Connectivity (JDBC).
Parameter | Deskripsi | Wajib | Tipe data | Nilai default | Keterangan |
sink.mode | Metode untuk menulis data ke tabel sink OceanBase. | Tidak | STRING | jdbc | Mendukung mode `jdbc` dan `direct-load`. Untuk menulis data ke tabel sink OceanBase menggunakan impor bypass, atur parameter ini ke bidang statis `direct-load`. |
host | Alamat IP atau hostname database OceanBase. | Ya | STRING | Tidak ada | Tidak ada. |
port | Port RPC database OceanBase. | Tidak | INTEGER | 2882 | Tidak ada. |
username | Username. | Ya | STRING | Tidak ada | Tidak ada. |
tenant-name | Nama penyewa database OceanBase. | Ya | STRING | Tidak ada | |
schema-name |
| Ya | STRING | Tidak ada | Tidak ada. |
table-name | Nama tabel OceanBase. | Ya | STRING | Tidak ada | Tidak ada. |
parallel | Konkurensi sisi server untuk tugas impor bypass. | Tidak | INTEGER | 8 |
|
buffer-size | Ukuran buffer untuk menulis ke OceanBase dalam tugas impor bypass. | Tidak | INTEGER | 1024 | Flink menyimpan cache jumlah catatan data yang ditentukan oleh |
dup-action | Kebijakan untuk menangani kunci primer duplikat selama tugas impor bypass. Nilai yang valid adalah | Tidak | STRING | REPLACE |
|
load-method | Mode impor bypass. | full |
| ||
max-error-rows | Jumlah maksimum baris error yang dapat ditoleransi oleh tugas impor bypass. | Tidak | LONG | 0 | Baris dianggap sebagai baris error dalam kasus berikut:
|
timeout | Durasi timeout keseluruhan untuk tugas impor bypass. | Tidak | DURATION | 7d | |
heartbeat-timeout | Timeout heartbeat sisi klien untuk tugas impor bypass. | Tidak | DURATION | 60s | |
heartbeat-interval | Interval heartbeat sisi klien untuk tugas impor bypass. | Tidak | DURATION | 10s |
Pemetaan tipe
Mode kompatibel MySQL
Tipe bidang OceanBase
Tipe bidang Flink
TINYINT
TINYINT
SMALLINT
SMALLINT
TINYINT UNSIGNED
INT
INT
MEDIUMINT
SMALLINT UNSIGNED
BIGINT
BIGINT
INT UNSIGNED
BIGINT UNSIGNED
DECIMAL(20, 0)
REAL
FLOAT
FLOAT
DOUBLE
DOUBLE
NUMERIC(p, s)
DECIMAL(p, s)
Catatandengan p <= 38.
DECIMAL(p, s)
BOOLEAN
BOOLEAN
TINYINT(1)
DATE
DATE
TIME [(p)]
TIME [(p)] [WITHOUT TIME ZONE]
DATETIME [(p)]
TIMESTAMP [(p)] [WITHOUT TIME ZONE]
TIMESTAMP [(p)]
CHAR(n)
CHAR(n)
VARCHAR(n)
VARCHAR(n)
BIT(n)
BINARY(⌈n/8⌉)
BINARY(n)
BINARY(n)
VARBINARY(N)
VARBINARY(N)
TINYTEXT
STRING
TEXT
MEDIUMTEXT
LONGTEXT
TINYBLOB
BYTES
PentingFlink mendukung catatan BLOB dengan ukuran 2.147.483.647 (2^31 - 1) byte atau kurang.
BLOB
MEDIUMBLOB
LONGBLOB
Mode kompatibel Oracle
Tipe bidang OceanBase
Tipe bidang Flink
NUMBER(p, s <= 0), p - s < 3
TINYINT
NUMBER(p, s <= 0), p - s < 5
SMALLINT
NUMBER(p, s <= 0), p - s < 10
INT
NUMBER(p, s <= 0), p - s < 19
BIGINT
NUMBER(p, s <= 0), 19 <= p - s <= 38
DECIMAL(p - s, 0)
NUMBER(p, s > 0)
DECIMAL(p, s)
NUMBER(p, s <= 0), p - s > 38
STRING
FLOAT
FLOAT
BINARY_FLOAT
BINARY_DOUBLE
DOUBLE
NUMBER(1)
BOOLEAN
DATE
TIMESTAMP [(p)] [WITHOUT TIME ZONE]
TIMESTAMP [(p)]
CHAR(n)
STRING
NCHAR(n)
NVARCHAR2(n)
VARCHAR(n)
VARCHAR2(n)
CLOB
BLOB
BYTES
ROWID
Contoh penggunaan
Tabel sumber dan tabel sink
-- Tabel sumber CDC OceanBase CREATE TEMPORARY TABLE oceanbase_source ( a INT, b VARCHAR, c VARCHAR ) WITH ( 'connector' = 'oceanbase', 'hostname' = '<yourHostname>', 'port' = '3306', 'username' = '<yourUsername>', 'password' = '<yourPassword>', 'database-name' = '<yourDatabaseName>', 'table-name' = '<yourTableName>' ); -- Tabel sink JDBC OceanBase CREATE TEMPORARY TABLE oceanbase_sink ( a INT, b VARCHAR, c VARCHAR ) WITH ( 'connector' = 'oceanbase', 'url' = '<yourJdbcUrl>', 'userName' = '<yourUserName>', 'password' = '<yourPassword>', 'tableName' = '<yourTableName>' ); -- Tabel sink direct load OceanBase CREATE TEMPORARY TABLE oceanbase_directload_sink ( a INT, b VARCHAR, c VARCHAR ) WITH ( 'connector' = 'oceanbase', 'sink.mode' = 'direct-load', 'host' = '<yourHost>', 'port' = 'yourPort', 'tenant-name' = '<yourTenantName>', 'schema-name' = '<yourSchemaName>', 'table-name' = '<yourTableName>', 'username' = '<yourUsername>', 'password' = '<yourPassword>' ); BEGIN STATEMENT SET; INSERT INTO oceanbase_sink SELECT * FROM oceanbase_source; END;Tabel dimensi
CREATE TEMPORARY TABLE datagen_source( a INT, b BIGINT, c STRING, `proctime` AS PROCTIME() ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE oceanbase_dim ( a INT, b VARCHAR, c VARCHAR ) WITH ( 'connector' = 'oceanbase', 'url' = '<yourJdbcUrl>', 'userName' = '<yourUserName>', 'password' = '${secret_values.password}', 'tableName' = '<yourTableName>' ); CREATE TEMPORARY TABLE blackhole_sink( a INT, b STRING ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT T.a, H.b FROM datagen_source AS T JOIN oceanbase_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.a = H.a;
Referensi
Untuk daftar konektor yang didukung oleh Flink, lihat Konektor yang didukung.