全部产品
Search
文档中心

Realtime Compute for Apache Flink:OceanBase (Pratinjau publik)

更新时间:Jan 10, 2026

Topik ini menjelaskan cara menggunakan Konektor OceanBase.

Informasi latar belakang

OceanBase adalah sistem manajemen database pemrosesan transaksional dan analitik hibrida (HTAP) terdistribusi yang bersifat native. Untuk informasi lebih lanjut, lihat situs web resmi OceanBase. Guna mengurangi biaya modifikasi sistem bisnis selama migrasi dari database MySQL atau Oracle, OceanBase mendukung mode kompatibilitas Oracle dan MySQL. Dalam mode tersebut, tipe data, fitur SQL, dan tampilan internal kompatibel dengan database MySQL atau Oracle. Konektor yang direkomendasikan untuk masing-masing mode adalah sebagai berikut:

  • Mode Oracle: Gunakan hanya konektor OceanBase.

  • Mode MySQL: Mode ini sangat kompatibel dengan sintaks MySQL native. Anda dapat menggunakan konektor OceanBase dan MySQL untuk membaca dan menulis data di OceanBase.

    Penting
    • Konektor OceanBase sedang dalam pratinjau publik. Untuk OceanBase 3.2.4.4 dan versi setelahnya, Anda dapat menggunakan konektor MySQL untuk membaca dan menulis data di OceanBase. Fitur ini juga dalam pratinjau publik. Evaluasi fitur ini secara menyeluruh dan gunakan dengan hati-hati.

    • Saat menggunakan konektor MySQL untuk membaca data inkremental dari OceanBase, pastikan binary logging (Binlog) OceanBase telah diaktifkan dan dikonfigurasi dengan benar. Untuk informasi lebih lanjut tentang Binlog OceanBase, lihat Ikhtisar atau Operasi terkait Binlog.

Konektor OceanBase mendukung hal-hal berikut.

Kategori

Rincian

Tipe yang didukung

Tabel Sumber, Dimensi, dan Sink

Mode runtime

Mode streaming dan mode batch

Format data

Tidak berlaku

Metrik pemantauan spesifik

Tidak ada

Tipe API

SQL

Mendukung pembaruan atau penghapusan data di tabel sink

Ya

Prasyarat

  • Database dan tabel yang ingin Anda hubungkan telah dibuat.

  • Daftar putih alamat IP telah dikonfigurasi. Untuk informasi selengkapnya, lihat Konfigurasi grup daftar putih.

  • Untuk mengumpulkan data change data capture (CDC) inkremental dari OceanBase, Anda juga harus mengaktifkan layanan Binlog OceanBase. Untuk informasi lebih lanjut, lihat Operasi terkait Binlog.

  • Untuk menggunakan impor bypass pada tabel sink, Anda harus terlebih dahulu mengaktifkan port impor bypass. Untuk informasi lebih lanjut, lihat dokumen impor bypass.

Batasan

  • Konektor OceanBase didukung di Ververica Runtime (VVR) 8.0.1 dan versi setelahnya.

  • Jaminan semantik

    • Tabel sumber CDC mendukung semantik tepat-sekali (exactly-once). Ini memastikan bahwa data tidak hilang atau diduplikasi saat Anda membaca seluruh data historis lalu beralih ke pembacaan data Binlog. Bahkan jika terjadi kesalahan, semantik ini menjamin kebenaran pemrosesan data.

    • Tabel sink mendukung semantik paling sedikit sekali (at-least-once). Jika tabel sink memiliki kunci primer, idempotensi menjamin kebenaran data.

Sintaksis

CREATE TABLE oceanbase_source (
   order_id INT,
   order_date TIMESTAMP(0),
   customer_name STRING,
   price DECIMAL(10, 5),
   product_id INT,
   order_status BOOLEAN,
   PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
  'connector' = 'oceanbase',
  'url' = '<yourJdbcUrl>',
  'tableName' = '<yourTableName>',
  'userName' = '<yourUserName>',
  'password' = '<yourPassword>'
);
Catatan

Saat menulis ke tabel sink, konektor membuat dan mengeksekusi pernyataan SQL untuk setiap catatan data yang diterima. Jenis pernyataan SQL yang dibuat bergantung pada kondisi berikut:

  • Jika tabel sink tidak memiliki kunci primer, pernyataan INSERT INTO dibuat.

  • Jika tabel sink memiliki kunci primer, pernyataan UPSERT dibuat berdasarkan mode kompatibilitas database.

Parameter WITH

  • Umum

    Parameter

    Deskripsi

    Wajib

    Tipe data

    Nilai default

    Catatan

    connector

    Tipe tabel.

    Ya

    STRING

    Tidak ada

    Bidang statis diatur ke oceanbase.

    password

    Kata sandi.

    Ya

    STRING

    Tidak ada

    Tidak ada.

  • Hanya berlaku untuk tabel sumber.

    Penting

    Mulai dari Realtime Compute for Apache Flink VVR 11.4.0, arsitektur dan fitur konektor CDC OceanBase telah ditingkatkan. Perubahan utama berikut dapat membantu Anda memahami pembaruan dan melakukan migrasi versi dengan lancar:

    • Konektor CDC asli yang berbasis layanan OceanBase LogProxy telah ditinggalkan dan dihapus dari distribusi. Mulai dari VVR 11.4.0, konektor CDC OceanBase hanya mendukung pengambilan log inkremental dan sinkronisasi data melalui layanan Binlog OceanBase.

    • Konektor CDC OceanBase menyediakan kompatibilitas protokol dan stabilitas koneksi yang lebih baik dengan layanan Binlog OceanBase. Kami merekomendasikan agar Anda memprioritaskan penggunaan konektor CDC OceanBase.

      Layanan Binlog OceanBase sepenuhnya kompatibel dengan protokol replikasi MySQL pada lapisan protokol. Anda juga dapat menghubungkan konektor MySQL CDC standar ke layanan Binlog OceanBase untuk pelacakan perubahan, tetapi hal ini tidak disarankan.

    • Mulai dari Realtime Compute for Apache Flink VVR 11.4.0, konektor CDC OceanBase tidak lagi mendukung pelacakan perubahan inkremental dalam mode kompatibilitas Oracle. Untuk pelacakan perubahan inkremental dalam mode kompatibilitas Oracle, Anda dapat menghubungi Dukungan Teknis Enterprise OceanBase.

    Parameter

    Deskripsi

    Wajib

    Tipe data

    Nilai default

    Keterangan

    hostname

    Alamat IP atau hostname database OceanBase.

    Ya

    STRING

    Tidak

    Kami menyarankan agar Anda menentukan alamat virtual private cloud (VPC).

    Catatan

    Jika OceanBase dan Realtime Compute for Apache Flink tidak berada dalam VPC yang sama, Anda harus membuat koneksi jaringan cross-VPC atau menggunakan titik akhir publik untuk akses. Untuk informasi lebih lanjut, lihat Manajemen dan operasi workspace dan Bagaimana kluster Flink yang sepenuhnya dikelola mengakses Internet?.

    username

    Username untuk layanan database OceanBase.

    Ya

    STRING

    Tidak

    Tidak ada.

    database-name

    Nama database OceanBase.

    Ya

    STRING

    Tidak ada

    • Saat digunakan sebagai tabel sumber, nama database mendukung ekspresi reguler untuk membaca data dari beberapa database.

    • Saat menggunakan ekspresi reguler, hindari penggunaan karakter ^ dan $ untuk mencocokkan awal dan akhir. Untuk informasi lebih lanjut, lihat keterangan untuk parameter table-name.

    table-name

    Nama tabel OceanBase.

    Ya

    STRING

    Tidak ada

    • Saat digunakan sebagai tabel sumber, nama tabel mendukung ekspresi reguler untuk membaca data dari beberapa tabel.

    • Saat menggunakan ekspresi reguler, hindari penggunaan karakter ^ dan $ untuk mencocokkan awal dan akhir. Untuk informasi lebih lanjut, lihat catatan berikut.

    Catatan

    Saat mencocokkan nama tabel, konektor tabel sumber OceanBase menggabungkan database-name dan table-name yang Anda tentukan dengan string \\. (karakter . digunakan untuk versi VVR sebelum 8.0.1) untuk membentuk ekspresi reguler jalur lengkap. Ekspresi reguler ini kemudian digunakan untuk mencocokkan nama tabel lengkap di database OceanBase.

    Sebagai contoh, jika Anda mengatur 'database-name' ke 'db_.*' dan 'table-name' ke 'tb_.+', konektor menggunakan ekspresi reguler db_.*\\.tb_.+ (atau db_.*.tb_.+ untuk versi sebelum 8.0.1) untuk mencocokkan nama tabel lengkap dan menentukan tabel mana yang akan dibaca.

    port

    Nomor port layanan database OceanBase.

    Tidak

    INTEGER

    3306

    Tidak ada.

    server-id

    ID numerik untuk client database.

    Tidak

    STRING

    Nilai acak antara 5400 dan 6400 dihasilkan.

    ID ini harus unik secara global. Kami merekomendasikan agar Anda menetapkan ID berbeda untuk setiap pekerjaan yang terhubung ke database yang sama.

    Parameter ini juga mendukung rentang ID, seperti 5400-5408. Saat pembacaan inkremental diaktifkan, kami merekomendasikan agar Anda menentukan rentang ID untuk memungkinkan setiap pembaca konkuren menggunakan ID berbeda. Untuk informasi lebih lanjut, lihat Penggunaan Server ID.

    scan.incremental.snapshot.chunk.size

    Ukuran setiap chunk dalam jumlah baris.

    Tidak

    INTEGER

    8096

    Saat pembacaan snapshot inkremental diaktifkan, tabel dibagi menjadi beberapa chunk untuk dibaca. Sebelum data dalam sebuah chunk sepenuhnya dibaca, data tersebut disimpan sementara di memori.

    Jumlah baris per chunk yang lebih kecil menghasilkan jumlah total chunk yang lebih besar untuk tabel tersebut. Hal ini memberikan pemulihan kesalahan yang lebih detail tetapi dapat menyebabkan error kehabisan memori (OOM) dan throughput keseluruhan yang lebih rendah. Oleh karena itu, Anda harus menyeimbangkan dan menetapkan ukuran chunk yang wajar.

    scan.snapshot.fetch.size

    Jumlah maksimum catatan yang ditarik sekaligus saat membaca data lengkap suatu tabel.

    Tidak

    INTEGER

    1024

    Tidak ada.

    scan.startup.mode

    Mode startup untuk konsumsi data.

    Tidak

    STRING

    initial

    Nilai yang valid:

    • initial (default): Memindai semua data historis lalu membaca data Binlog terbaru saat startup pertama kali.

    • latest-offset: Tidak memindai data historis saat startup pertama kali. Mulai membaca dari akhir Binlog, artinya hanya membaca perubahan terbaru setelah konektor dimulai.

    • earliest-offset: Tidak memindai data historis. Mulai membaca dari Binlog paling awal yang tersedia.

    • specific-offset: Tidak memindai data historis. Mulai dari offset Binlog tertentu yang Anda tentukan. Anda dapat menentukan offset dengan mengatur kedua parameter scan.startup.specific-offset.file dan scan.startup.specific-offset.pos, atau hanya mengatur parameter scan.startup.specific-offset.gtid-set untuk memulai dari set GTID tertentu.

    • timestamp: Tidak memindai data historis. Mulai membaca Binlog dari timestamp tertentu. Timestamp ditentukan oleh parameter scan.startup.timestamp-millis dalam milidetik.

    Penting

    Jika Anda menggunakan mode startup earliest-offset, specific-offset, atau timestamp, pastikan skema tabel yang sesuai tidak berubah antara posisi konsumsi Binlog yang ditentukan dan waktu startup pekerjaan. Hal ini mencegah error akibat perbedaan skema.

    scan.startup.specific-offset.file

    Nama file Binlog untuk offset awal saat menggunakan mode startup offset tertentu.

    Tidak

    STRING

    Tidak ada

    Untuk menggunakan parameter ini, Anda harus mengatur scan.startup.mode ke specific-offset. Contoh format nama file: mysql-bin.000003.

    scan.startup.specific-offset.pos

    Offset dalam file Binlog yang ditentukan untuk offset awal saat menggunakan mode startup offset tertentu.

    Tidak

    INTEGER

    Tidak ada

    Untuk menggunakan parameter ini, Anda harus mengatur scan.startup.mode ke specific-offset.

    scan.startup.specific-offset.gtid-set

    Set GTID untuk offset awal saat menggunakan mode startup offset tertentu.

    Tidak

    STRING

    Tidak ada

    Untuk menggunakan parameter ini, Anda harus mengatur scan.startup.mode ke specific-offset. Contoh format set GTID: 24DA167-0C0C-11E8-8442-00059A3C7B00:1-19.

    scan.startup.timestamp-millis

    Timestamp dalam milidetik untuk offset awal saat menggunakan mode startup timestamp.

    Tidak

    LONG

    Tidak ada

    Untuk menggunakan parameter ini, Anda harus mengatur scan.startup.mode ke timestamp. Satuannya adalah milidetik.

    Penting

    Saat Anda menentukan waktu, CDC OceanBase mencoba membaca event awal setiap file Binlog untuk menentukan timestamp-nya dan menemukan file Binlog yang sesuai dengan waktu yang ditentukan. Pastikan file Binlog untuk timestamp yang ditentukan belum dihapus dari database dan dapat dibaca.

    server-time-zone

    Zona waktu sesi yang digunakan oleh database.

    Tidak

    STRING

    Jika Anda tidak menentukan parameter ini, sistem menggunakan zona waktu lingkungan runtime pekerjaan Flink sebagai zona waktu server database. Ini adalah zona waktu dari zona yang Anda pilih.

    Contoh: Asia/Shanghai. Parameter ini mengontrol bagaimana tipe TIMESTAMP dikonversi ke tipe STRING. Untuk informasi lebih lanjut, lihat Tipe temporal Debezium.

    debezium.min.row.count.to.stream.results

    Saat jumlah baris dalam tabel melebihi nilai ini, mode pembacaan batch digunakan.

    Tidak

    INTEGER

    1000

    Flink membaca data dari tabel sumber OceanBase dengan salah satu cara berikut:

    • Pembacaan penuh: Membaca seluruh data tabel langsung ke memori. Metode ini cepat tetapi mengonsumsi memori dalam jumlah yang sesuai. Jika tabel sumber sangat besar, hal ini dapat menyebabkan error OOM.

    • Pembacaan batch: Membaca data dalam beberapa batch, dengan jumlah baris tertentu per batch, hingga semua data terbaca. Metode ini menghindari error OOM saat membaca tabel besar tetapi relatif lebih lambat.

    connect.timeout

    Waktu maksimum menunggu sebelum mencoba kembali koneksi saat koneksi ke server database OceanBase mengalami timeout.

    Tidak

    DURATION

    30s

    Tidak ada.

    connect.max-retries

    Jumlah maksimum percobaan ulang setelah koneksi ke layanan database OceanBase gagal.

    Tidak

    INTEGER

    3

    Tidak ada.

    connection.pool.size

    Ukuran kolam koneksi database.

    Tidak

    INTEGER

    20

    Kolam koneksi database digunakan untuk menggunakan kembali koneksi, yang dapat mengurangi jumlah koneksi database.

    jdbc.properties.*

    Parameter koneksi kustom dalam URL JDBC.

    Tidak

    STRING

    Tidak ada

    Anda dapat meneruskan parameter koneksi kustom. Misalnya, untuk tidak menggunakan protokol SSL, Anda dapat mengonfigurasi 'jdbc.properties.useSSL' = 'false'.

    Untuk informasi selengkapnya tentang parameter koneksi yang didukung, lihat MySQL Configuration Properties.

    debezium.*

    Parameter kustom untuk Debezium guna membaca data Binlog.

    Tidak

    STRING

    Tidak ada

    Anda dapat meneruskan parameter Debezium kustom. Misalnya, gunakan 'debezium.event.deserialization.failure.handling.mode'='ignore' untuk menentukan logika penanganan error parsing.

    heartbeat.interval

    Interval waktu di mana sumber menggunakan event heartbeat untuk memajukan offset Binlog.

    Tidak

    DURATION

    30s

    Event heartbeat digunakan untuk memajukan offset Binlog di sumber, yang berguna untuk tabel di OceanBase yang jarang diperbarui. Untuk tabel tersebut, offset Binlog tidak maju secara otomatis. Event heartbeat dapat mendorong offset Binlog maju, mencegahnya kedaluwarsa. Offset Binlog yang kedaluwarsa menyebabkan pekerjaan gagal dan tidak dapat dipulihkan, sehingga memerlukan restart tanpa status.

    scan.incremental.snapshot.chunk.key-column

    Menentukan kolom yang akan digunakan sebagai kunci chunk untuk membagi chunk selama fase snapshot.

    Lihat kolom Keterangan.

    STRING

    Tidak ada

    • Parameter ini wajib untuk tabel tanpa kunci primer. Kolom yang dipilih harus bertipe non-null (NOT NULL).

    • Parameter ini opsional untuk tabel dengan kunci primer. Anda hanya dapat memilih satu kolom dari kunci primer.

    scan.incremental.close-idle-reader.enabled

    Menentukan apakah pembaca idle ditutup setelah snapshot selesai.

    Tidak

    BOOLEAN

    false

    • Hanya didukung di Realtime Compute for Apache Flink VVR 8.0.1 dan versi setelahnya.

    • Agar konfigurasi ini berlaku, atur execution.checkpointing.checkpoints-after-tasks-finish.enabled ke true.

    scan.read-changelog-as-append-only.enabled

    Menentukan apakah aliran changelog diubah menjadi aliran append-only.

    Tidak

    BOOLEAN

    false

    Nilai yang valid:

    • true: Semua jenis pesan, termasuk INSERT, DELETE, UPDATE_BEFORE, dan UPDATE_AFTER, diubah menjadi pesan INSERT. Aktifkan ini hanya dalam skenario khusus, seperti saat Anda perlu menyimpan pesan hapus dari tabel leluhur.

    • false (default): Semua jenis pesan dikirim ke downstream apa adanya.

    Catatan

    Hanya didukung di Realtime Compute for Apache Flink VVR 8.0.8 dan versi setelahnya.

    scan.only.deserialize.captured.tables.changelog.enabled

    Selama fase inkremental, menentukan apakah hanya mendeserialisasi event perubahan untuk tabel yang ditentukan.

    Tidak

    BOOLEAN

    • Nilai default adalah false di versi VVR 8.x.

    • Nilai default adalah true di VVR 11.1 dan versi yang lebih baru.

    Nilai yang valid:

    • true: Mendeserialisasi data perubahan hanya untuk tabel target, yang mempercepat pembacaan Binlog.

    • false (default): Mendeserialisasi data perubahan untuk semua tabel.

    Catatan
    • Hanya didukung di Realtime Compute for Apache Flink VVR 8.0.7 dan versi setelahnya.

    • Saat digunakan di Realtime Compute for Apache Flink VVR 8.0.8 dan versi sebelumnya, nama parameter harus diubah menjadi debezium.scan.only.deserialize.captured.tables.changelog.enable.

    scan.parse.online.schema.changes.enabled

    Selama fase inkremental, menentukan apakah mencoba mengurai event DDL untuk perubahan tanpa lock ApsaraDB RDS.

    Tidak

    BOOLEAN

    false

    Nilai yang valid:

    • true: Mengurai event DDL untuk perubahan tanpa lock ApsaraDB RDS.

    • false (default): Tidak mengurai event DDL untuk perubahan tanpa lock ApsaraDB RDS.

    Ini adalah fitur eksperimen. Kami merekomendasikan agar Anda mengambil snapshot pekerjaan Flink untuk pemulihan sebelum melakukan perubahan tanpa lock online.

    Catatan

    Hanya didukung di Realtime Compute for Apache Flink VVR 11.1 dan versi setelahnya.

    scan.incremental.snapshot.backfill.skip

    Menentukan apakah melewati backfill selama fase pembacaan snapshot.

    Tidak

    BOOLEAN

    false

    Nilai yang valid:

    • true: Melewati backfill selama fase pembacaan snapshot.

    • false (default): Tidak melewati backfill selama fase pembacaan snapshot.

    Jika Anda melewati backfill, perubahan pada tabel selama fase snapshot dibaca dalam fase inkremental berikutnya, bukan digabungkan ke dalam snapshot.

    Penting

    Melewati backfill dapat menyebabkan ketidakkonsistenan data karena perubahan yang terjadi selama fase snapshot mungkin diputar ulang. Hanya semantik at-least-once yang dijamin.

    Catatan

    Hanya didukung di Realtime Compute for Apache Flink VVR 11.1 dan versi setelahnya.

    scan.incremental.snapshot.unbounded-chunk-first.enabled

    Selama fase pembacaan snapshot, menentukan apakah mendistribusikan chunk tak terbatas terlebih dahulu.

    Tidak

    BOOLEAN

    false

    Nilai yang valid:

    • true: Mendistribusikan chunk tak terbatas terlebih dahulu selama fase pembacaan snapshot.

    • false (default): Tidak mendistribusikan chunk tak terbatas terlebih dahulu selama fase pembacaan snapshot.

    Ini adalah fitur eksperimen. Mengaktifkannya dapat mengurangi risiko error OOM saat Pengelola Tugas menyinkronkan chunk terakhir selama fase snapshot. Kami merekomendasikan agar Anda menambahkan parameter ini sebelum pekerjaan dimulai untuk pertama kalinya.

    Catatan

    Hanya didukung di Realtime Compute for Apache Flink VVR 11.1 dan versi setelahnya.

  • Hanya untuk tabel dimensi

    Parameter

    Deskripsi

    Wajib

    Tipe data

    Nilai default

    Catatan

    url

    URL JDBC.

    Ya

    STRING

    Tidak ada

    • URL harus berisi nama database MySQL atau nama layanan Oracle.

    userName

    Username.

    Ya

    STRING

    Tidak ada

    Tidak ada.

    cache

    Kebijakan cache.

    Tidak

    STRING

    ALL

    Tiga kebijakan cache berikut didukung:

    • ALL: Menyimpan cache semua data dari tabel dimensi. Sebelum pekerjaan dijalankan, sistem memuat semua data dari tabel dimensi ke dalam cache. Semua pencarian data tabel dimensi selanjutnya dilakukan melalui cache. Jika data tidak ditemukan di cache, kunci tersebut tidak ada. Cache penuh dimuat ulang setelah kedaluwarsa.

      Kebijakan ini cocok untuk skenario di mana tabel remote kecil dan terdapat banyak kunci yang tidak ditemukan (kondisi ON tidak dapat diasosiasikan saat menggabungkan tabel sumber dan tabel dimensi).

    • LRU: Menyimpan cache sebagian data dari tabel dimensi. Untuk setiap catatan dari tabel sumber, sistem terlebih dahulu mencari data di cache. Jika tidak ditemukan, sistem mencari di tabel dimensi fisik. Jika Anda menggunakan kebijakan cache ini, Anda harus mengonfigurasi parameter cacheSize.

    • None: Tidak ada cache.

    Penting
    • Jika Anda menggunakan kebijakan cache ALL, perhatikan ukuran memori node untuk mencegah error OOM.

    • Karena sistem memuat data tabel dimensi secara asinkron, jika Anda menggunakan kebijakan cache ALL, Anda harus menambah memori node penggabungan tabel dimensi. Ukuran penambahan memori harus dua kali ukuran data tabel remote.

    cacheSize

    Jumlah maksimum entri yang dicache.

    Tidak

    INTEGER

    100000

    • Jika Anda memilih kebijakan cache LRU, Anda harus mengatur ukuran cache.

    • Jika Anda memilih kebijakan cache ALL, Anda tidak perlu mengatur ukuran cache.

    cacheTTLMs

    Periode timeout cache.

    Tidak

    LONG

    Long.MAX_VALUE

    Konfigurasi cacheTTLMs bergantung pada parameter cache:

    • Jika cache diatur ke None, Anda tidak perlu mengonfigurasi cacheTTLMs. Artinya cache tidak kedaluwarsa.

    • Jika cache diatur ke LRU, cacheTTLMs adalah periode timeout cache. Secara default, cache tidak kedaluwarsa.

    • Jika cache diatur ke ALL, cacheTTLMs adalah waktu reload cache. Secara default, cache tidak dimuat ulang.

    maxRetryTimeout

    Waktu retry maksimum.

    Tidak

    DURATION

    60s

    Tidak ada.

  • Tabel sink: hanya JDBC

    Parameter

    Deskripsi

    Wajib

    Tipe data

    Nilai default

    Catatan

    userName

    Username.

    Ya

    STRING

    Tidak ada

    Tidak ada.

    compatibleMode

    Mode kompatibilitas OceanBase.

    Tidak

    STRING

    mysql

    Nilai valid:

    • mysql

    • oracle

    Catatan

    Ini adalah parameter khusus OceanBase.

    url

    URL JDBC.

    Ya

    STRING

    Tidak ada

    • URL harus berisi nama database MySQL atau nama layanan Oracle.

    tableName

    Nama tabel.

    Ya

    STRING

    Tidak ada

    Tidak ada.

    sink.mode

    Mode penulisan untuk tabel sink OceanBase.

    Ya

    STRING

    jdbc

    Mendukung jdbc dan direct-load.

    maxRetryTimes

    Jumlah maksimum percobaan ulang.

    Tidak

    INTEGER

    3

    Tidak ada.

    poolInitialSize

    Ukuran awal kumpulan koneksi basis data.

    Tidak

    INTEGER

    1

    Tidak ada.

    poolMaxActive

    Jumlah maksimum koneksi dalam kumpulan koneksi basis data.

    Tidak

    INTEGER

    8

    Tidak ada.

    poolMaxWait

    Waktu maksimum menunggu koneksi dari kolam koneksi database.

    Tidak

    INTEGER

    2000

    Satuannya adalah milidetik.

    poolMinIdle

    Jumlah minimum koneksi idle dalam kumpulan koneksi basis data.

    Tidak

    INTEGER

    1

    Tidak ada.

    connectionProperties

    Properti koneksi untuk JDBC.

    Tidak

    STRING

    Tidak ada

    Formatnya adalah "k1=v1;k2=v2;k3=v3".

    ignoreDelete

    Menentukan apakah mengabaikan operasi penghapusan data.

    Tidak

    Boolean

    false

    Tidak ada.

    excludeUpdateColumns

    Menentukan nama kolom yang akan dikecualikan. Kolom-kolom ini tidak diperbarui selama operasi update.

    Tidak

    STRING

    Tidak ada

    Jika Anda menentukan beberapa kolom, pisahkan dengan koma (,), misalnya, excludeUpdateColumns=column1,column2.

    Catatan

    Nilai ini selalu mencakup kolom kunci primer. Kolom yang berlaku adalah kolom yang Anda tentukan ditambah kolom kunci primer.

    partitionKey

    Kunci partisi.

    Tidak

    STRING

    Tidak ada

    Saat kunci partisi diatur, konektor terlebih dahulu mengelompokkan data berdasarkan kunci partisi, dan setiap kelompok ditulis ke database secara terpisah. Pengelompokan ini terjadi sebelum pemrosesan modRule.

    modRule

    Aturan pengelompokan.

    Tidak

    STRING

    Tidak ada

    Format aturan pengelompokan harus berupa "nama_kolom mod angka", seperti user_id mod 8. Tipe kolom harus numerik.

    Saat aturan pengelompokan diatur, data terlebih dahulu dipartisi berdasarkan partitionKey. Di dalam setiap partisi, data kemudian dikelompokkan berdasarkan hasil perhitungan modRule.

    bufferSize

    Ukuran buffer data.

    Tidak

    INTEGER

    1000

    Tidak ada.

    flushIntervalMs

    Interval waktu untuk membersihkan cache. Jika data dalam cache tidak memenuhi kondisi output setelah menunggu waktu yang ditentukan, sistem secara otomatis mengeluarkan semua data dalam cache.

    Tidak

    LONG

    1000

    Tidak ada.

    retryIntervalMs

    Waktu retry maksimum.

    Tidak

    INTEGER

    5000

    Satuannya adalah milidetik.

  • Hanya untuk impor bypass ke tabel sink

Penting
  • Impor bypass untuk tabel sink tersedia di Ververica Runtime (VVR) 11.5 dan versi setelahnya. Untuk informasi lebih lanjut tentang impor bypass, lihat dokumen.

  • Hanya mendukung aliran data terbatas: Sumber data harus berupa aliran data terbatas. Aliran data tak terbatas tidak didukung. Anda dapat menggunakan mode Batch Flink untuk kinerja yang lebih baik.

  • Penulisan throughput tinggi: Metode ini cocok untuk skenario impor data batch besar.

  • Penguncian tabel selama impor: Impor bypass mengunci tabel target. Selama tabel dikunci, penulisan data perubahan dan perubahan DDL diblokir. Kueri data tidak terpengaruh.

  • Tidak untuk penulisan real-time: Untuk skenario penulisan real-time atau streaming, gunakan tabel sink Java Database Connectivity (JDBC).

Parameter

Deskripsi

Wajib

Tipe data

Nilai default

Keterangan

sink.mode

Metode untuk menulis data ke tabel sink OceanBase.

Tidak

STRING

jdbc

Mendukung mode `jdbc` dan `direct-load`. Untuk menulis data ke tabel sink OceanBase menggunakan impor bypass, atur parameter ini ke bidang statis `direct-load`.

host

Alamat IP atau hostname database OceanBase.

Ya

STRING

Tidak ada

Tidak ada.

port

Port RPC database OceanBase.

Tidak

INTEGER

2882

Tidak ada.

username

Username.

Ya

STRING

Tidak ada

Tidak ada.

tenant-name

Nama penyewa database OceanBase.

Ya

STRING

Tidak ada

schema-name

  • Untuk penyewa MySQL, masukkan nama database.

  • Untuk penyewa Oracle, masukkan nama pemilik.

Ya

STRING

Tidak ada

Tidak ada.

table-name

Nama tabel OceanBase.

Ya

STRING

Tidak ada

Tidak ada.

parallel

Konkurensi sisi server untuk tugas impor bypass.

Tidak

INTEGER

8

  • Parameter ini menentukan sumber daya CPU sisi server untuk tugas impor dan tidak bergantung pada konkurensi klien. Server membatasi tingkat paralelisme maksimum berdasarkan spesifikasi CPU penyewa tanpa mengembalikan error. Tingkat paralelisme aktual ditentukan oleh spesifikasi CPU penyewa dan distribusi partisi tabel.

  • Sebagai contoh, jika penyewa memiliki 2 core CPU dan tingkat paralelisme diatur ke 10, tingkat paralelisme aktual adalah 4, dihitung sebagai MIN(2 core * 2, 10).

  • Jika partisi tabel didistribusikan di 2 node, total tingkat paralelisme aktual adalah MIN(2 core * 2, 10) * 2 = 8.

buffer-size

Ukuran buffer untuk menulis ke OceanBase dalam tugas impor bypass.

Tidak

INTEGER

1024

Flink menyimpan cache jumlah catatan data yang ditentukan oleh buffer-size lalu menuliskannya ke OceanBase dalam satu operasi.

dup-action

Kebijakan untuk menangani kunci primer duplikat selama tugas impor bypass. Nilai yang valid adalah STOP_ON_DUP (impor gagal), REPLACE (baris yang ada diganti), atau IGNORE (baris baru diabaikan).

Tidak

STRING

REPLACE

  • STOP_ON_DUP: Impor gagal.

  • REPLACE: Baris yang diimpor menggantikan baris yang ada.

  • IGNORE: Baris yang diimpor dibuang, dan baris yang ada dipertahankan.

load-method

Mode impor bypass.

full

  • full: Impor bypass penuh. Ini adalah nilai default.

  • inc: Impor bypass inkremental. Mode ini memeriksa konflik kunci primer. Didukung di observer 4.3.2 dan versi setelahnya. Mengatur `direct-load.dup-action` ke `REPLACE` tidak didukung.

  • inc_replace: Impor bypass inkremental dalam mode replace. Mode ini tidak memeriksa konflik kunci primer dan langsung menimpa data lama, yang memiliki efek yang sama dengan `REPLACE`. Parameter `direct-load.dup-action` diabaikan. Didukung di observer 4.3.2 dan versi setelahnya.

max-error-rows

Jumlah maksimum baris error yang dapat ditoleransi oleh tugas impor bypass.

Tidak

LONG

0

Baris dianggap sebagai baris error dalam kasus berikut:

  • Baris dengan kunci primer duplikat saat `dupAction` diatur ke `STOP_ON_DUP`.

  • Baris dengan jumlah kolom tidak sesuai (terlalu banyak atau terlalu sedikit).

  • Baris yang gagal dalam konversi tipe data.

timeout

Durasi timeout keseluruhan untuk tugas impor bypass.

Tidak

DURATION

7d

heartbeat-timeout

Timeout heartbeat sisi klien untuk tugas impor bypass.

Tidak

DURATION

60s

heartbeat-interval

Interval heartbeat sisi klien untuk tugas impor bypass.

Tidak

DURATION

10s

Pemetaan tipe

  • Mode kompatibel MySQL

    Tipe bidang OceanBase

    Tipe bidang Flink

    TINYINT

    TINYINT

    SMALLINT

    SMALLINT

    TINYINT UNSIGNED

    INT

    INT

    MEDIUMINT

    SMALLINT UNSIGNED

    BIGINT

    BIGINT

    INT UNSIGNED

    BIGINT UNSIGNED

    DECIMAL(20, 0)

    REAL

    FLOAT

    FLOAT

    DOUBLE

    DOUBLE

    NUMERIC(p, s)

    DECIMAL(p, s)

    Catatan

    dengan p <= 38.

    DECIMAL(p, s)

    BOOLEAN

    BOOLEAN

    TINYINT(1)

    DATE

    DATE

    TIME [(p)]

    TIME [(p)] [WITHOUT TIME ZONE]

    DATETIME [(p)]

    TIMESTAMP [(p)] [WITHOUT TIME ZONE]

    TIMESTAMP [(p)]

    CHAR(n)

    CHAR(n)

    VARCHAR(n)

    VARCHAR(n)

    BIT(n)

    BINARY(⌈n/8⌉)

    BINARY(n)

    BINARY(n)

    VARBINARY(N)

    VARBINARY(N)

    TINYTEXT

    STRING

    TEXT

    MEDIUMTEXT

    LONGTEXT

    TINYBLOB

    BYTES

    Penting

    Flink mendukung catatan BLOB dengan ukuran 2.147.483.647 (2^31 - 1) byte atau kurang.

    BLOB

    MEDIUMBLOB

    LONGBLOB

  • Mode kompatibel Oracle

    Tipe bidang OceanBase

    Tipe bidang Flink

    NUMBER(p, s <= 0), p - s < 3

    TINYINT

    NUMBER(p, s <= 0), p - s < 5

    SMALLINT

    NUMBER(p, s <= 0), p - s < 10

    INT

    NUMBER(p, s <= 0), p - s < 19

    BIGINT

    NUMBER(p, s <= 0), 19 <= p - s <= 38

    DECIMAL(p - s, 0)

    NUMBER(p, s > 0)

    DECIMAL(p, s)

    NUMBER(p, s <= 0), p - s > 38

    STRING

    FLOAT

    FLOAT

    BINARY_FLOAT

    BINARY_DOUBLE

    DOUBLE

    NUMBER(1)

    BOOLEAN

    DATE

    TIMESTAMP [(p)] [WITHOUT TIME ZONE]

    TIMESTAMP [(p)]

    CHAR(n)

    STRING

    NCHAR(n)

    NVARCHAR2(n)

    VARCHAR(n)

    VARCHAR2(n)

    CLOB

    BLOB

    BYTES

    ROWID

Contoh penggunaan

  • Tabel sumber dan tabel sink

    -- Tabel sumber CDC OceanBase
    CREATE TEMPORARY TABLE oceanbase_source (
      a INT,
      b VARCHAR,
      c VARCHAR
    ) WITH (
      'connector' = 'oceanbase',
      'hostname' = '<yourHostname>',
      'port' = '3306',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>',
      'database-name' = '<yourDatabaseName>',
      'table-name' = '<yourTableName>'
    );
    
    -- Tabel sink JDBC OceanBase
    CREATE TEMPORARY TABLE oceanbase_sink (
      a INT,
      b VARCHAR,
      c VARCHAR
    ) WITH (
      'connector' = 'oceanbase',
      'url' = '<yourJdbcUrl>',
      'userName' = '<yourUserName>',
      'password' = '<yourPassword>',
      'tableName' = '<yourTableName>'
    );
    
    -- Tabel sink direct load OceanBase
    CREATE TEMPORARY TABLE oceanbase_directload_sink (
      a INT,
      b VARCHAR,
      c VARCHAR
    ) WITH (
      'connector' = 'oceanbase',
      'sink.mode' = 'direct-load',
      'host' = '<yourHost>',
      'port' = 'yourPort',
      'tenant-name' = '<yourTenantName>',
      'schema-name' = '<yourSchemaName>',
      'table-name' = '<yourTableName>',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>'
    );
    
    
    BEGIN STATEMENT SET;  
    INSERT INTO oceanbase_sink
    SELECT * FROM oceanbase_source;
    END; 

  • Tabel dimensi

    CREATE TEMPORARY TABLE datagen_source(
      a INT,
      b BIGINT,
      c STRING,
      `proctime` AS PROCTIME()
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE oceanbase_dim (
      a INT,
      b VARCHAR,
      c VARCHAR
    ) WITH (
      'connector' = 'oceanbase',
      'url' = '<yourJdbcUrl>',
      'userName' = '<yourUserName>',
      'password' = '${secret_values.password}',
      'tableName' = '<yourTableName>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
      a INT,
      b STRING
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink
    SELECT T.a, H.b
    FROM datagen_source AS T 
    JOIN oceanbase_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H 
    ON T.a = H.a;

Referensi

Untuk daftar konektor yang didukung oleh Flink, lihat Konektor yang didukung.