全部产品
Search
文档中心

Realtime Compute for Apache Flink:OceanBase (pratinjau publik)

更新时间:Dec 07, 2025

Topik ini menjelaskan cara menggunakan Konektor OceanBase.

Informasi latar belakang

OceanBase adalah sistem manajemen database pemrosesan transaksional dan analitik hibrida (HTAP) terdistribusi native. Untuk informasi selengkapnya, lihat situs web resmi OceanBase. OceanBase mendukung mode kompatibilitas Oracle dan MySQL, sehingga mengurangi biaya refaktor sistem bisnis saat Anda melakukan migrasi dari database MySQL atau Oracle. Tipe data, fitur SQL, dan tampilan internal dalam masing-masing mode tersebut konsisten dengan MySQL atau Oracle. Konektor yang direkomendasikan untuk setiap mode adalah sebagai berikut:

  • Mode Oracle: Anda hanya dapat menggunakan konektor OceanBase.

  • Mode MySQL: Mode ini sangat kompatibel dengan sintaksis MySQL native. Anda dapat menggunakan konektor OceanBase dan MySQL untuk membaca dan menulis ke OceanBase.

    Penting
    • Konektor OceanBase sedang dalam pratinjau publik. Untuk OceanBase 3.2.4.4 dan versi setelahnya, Anda dapat menggunakan konektor MySQL untuk membaca dan menulis ke OceanBase. Fitur ini juga dalam pratinjau publik. Evaluasi fitur ini secara hati-hati sebelum menggunakannya.

    • Saat Anda menggunakan konektor MySQL untuk membaca data inkremental dari OceanBase, pastikan binary logging (Binlog) OceanBase diaktifkan dan dikonfigurasi dengan benar. Untuk informasi selengkapnya tentang Binlog OceanBase, lihat Ikhtisar atau Operasi terkait binary logging.

Tabel berikut menjelaskan informasi yang didukung oleh konektor OceanBase.

Kategori

Rincian

Tipe yang didukung

Tabel sumber, tabel dimensi, dan tabel sink

Mode operasi

Streaming dan batch

Format data

Tidak berlaku

Metrik pemantauan spesifik

Tidak ada

Tipe API

SQL

Mendukung pembaruan atau penghapusan data di tabel sink

Ya

Prasyarat

Batasan

  • Konektor OceanBase didukung di Realtime Compute for Apache Flink dengan Ververica Runtime (VVR) 8.0.1 atau versi yang lebih baru.

  • Semantik at-least-once dijamin. Jika tabel sink memiliki kunci primer, idempotensi memastikan kebenaran data.

Sintaksis

CREATE TABLE oceanabse_source (
   order_id INT,
   order_date TIMESTAMP(0),
   customer_name STRING,
   price DECIMAL(10, 5),
   product_id INT,
   order_status BOOLEAN,
   PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
  'connector' = 'oceanbase',
  'url' = '<yourJdbcUrl>',
  'tableName' = '<yourTableName>',
  'userName' = '<yourUserName>',
  'password' = '<yourPassword>'
);
Catatan

Konektor menulis ke tabel sink dengan membuat dan mengeksekusi pernyataan SQL untuk setiap catatan data yang diterima.

  • Jika tabel sink tidak memiliki kunci primer, pernyataan INSERT INTO akan dihasilkan.

  • Jika tabel sink memiliki kunci primer, pernyataan UPSERT akan dihasilkan berdasarkan mode kompatibilitas database.

Parameter WITH

  • Umum

    Parameter

    Deskripsi

    Wajib

    Tipe data

    Nilai default

    Catatan

    connector

    Tipe tabel.

    Ya

    STRING

    Tidak ada

    Nilainya harus oceanbase.

    password

    Kata sandi.

    Ya

    STRING

    Tidak ada

    Tidak ada.

  • Spesifik Sumber

    Penting

    Catatan: Mulai dari Realtime Compute for Apache Flink dengan VVR 11.4.0, konektor CDC OceanBase telah mengalami peningkatan arsitektur besar-besaran dan penyesuaian fitur. Perubahan inti dijelaskan sebagai berikut untuk membantu Anda memahami pembaruan dan melakukan migrasi versi dengan lancar:

    • Konektor CDC berbasis layanan OceanBase LogProxy yang asli telah secara resmi ditinggalkan dan dihapus dari distribusi. Mulai dari VVR 11.4.0, konektor CDC OceanBase hanya mendukung pengambilan log inkremental dan sinkronisasi data melalui layanan Binlog OceanBase.

    • Konektor CDC OceanBase menyediakan kompatibilitas protokol yang ditingkatkan dan stabilitas koneksi dengan layanan Binlog OceanBase. Oleh karena itu, kami merekomendasikan agar Anda menggunakan konektor CDC OceanBase.

      Layanan Binlog OceanBase sepenuhnya kompatibel dengan protokol replikasi MySQL. Anda juga dapat menggunakan konektor standar MySQL CDC untuk terhubung ke layanan Binlog OceanBase untuk pelacakan perubahan, tetapi hal ini tidak disarankan.

    • Mulai dari Realtime Compute for Apache Flink dengan VVR 11.4.0, konektor CDC OceanBase tidak lagi mendukung langganan data inkremental dalam mode kompatibilitas Oracle. Untuk langganan data inkremental dalam mode kompatibilitas Oracle, hubungi dukungan teknis enterprise OceanBase.

    Parameter

    Deskripsi

    Wajib

    Tipe data

    Nilai default

    Keterangan

    hostname

    Alamat IP atau hostname database OceanBase.

    Ya

    STRING

    Tidak

    Kami menyarankan agar Anda menentukan alamat virtual private cloud (VPC).

    Catatan

    Jika OceanBase dan Realtime Compute for Apache Flink tidak berada dalam VPC yang sama, Anda harus terlebih dahulu membuat koneksi jaringan cross-VPC atau menggunakan Internet untuk akses. Untuk informasi selengkapnya, lihat Manajemen dan operasi penyimpanan dan Bagaimana kluster Flink yang sepenuhnya dikelola mengakses Internet?.

    username

    Username untuk layanan database OceanBase.

    Ya

    STRING

    Tidak

    Tidak ada.

    database-name

    Nama database OceanBase.

    Ya

    STRING

    Tidak ada

    • Sebagai tabel sumber, nama database mendukung ekspresi reguler untuk membaca data dari beberapa database.

    • Saat menggunakan ekspresi reguler, hindari penggunaan simbol ^ dan $ untuk mencocokkan awal dan akhir. Untuk alasannya, lihat keterangan parameter table-name.

    table-name

    Ditunjukkan oleh OceanBase.

    Ya

    STRING

    Tidak ada

    • Sebagai tabel sumber, nama tabel mendukung ekspresi reguler untuk membaca data dari beberapa tabel.

    • Saat menggunakan ekspresi reguler, hindari penggunaan simbol ^ dan $ untuk mencocokkan awal dan akhir. Untuk alasannya, lihat catatan berikut.

    Catatan

    Saat tabel sumber OceanBase mencocokkan nama tabel dengan ekspresi reguler, konektor menggabungkan database-name dan table-name yang Anda berikan menjadi ekspresi reguler path lengkap menggunakan string \\. (karakter . digunakan sebelum VVR 8.0.1). Ekspresi reguler gabungan ini kemudian digunakan untuk mencocokkan nama tabel yang memenuhi syarat penuh di database OceanBase.

    Contohnya, jika Anda mengatur 'database-name'='db_.*' dan 'table-name'='tb_.+', konektor menggunakan ekspresi reguler db_.*\\.tb_.+ (atau db_.*.tb_.+ sebelum versi 8.0.1) untuk mencocokkan nama tabel yang memenuhi syarat penuh guna menentukan tabel mana yang akan dibaca.

    port

    Nomor port layanan database OceanBase.

    Tidak

    INTEGER

    3306

    Tidak ada.

    server-id

    ID numerik untuk client database.

    Tidak

    STRING

    Nilai acak antara 5400 dan 6400 dihasilkan secara default.

    ID ini harus unik secara global. Kami menyarankan agar Anda menetapkan ID yang berbeda untuk setiap pekerjaan yang terhubung ke database yang sama.

    Parameter ini juga mendukung format rentang ID, seperti 5400-5408. Saat pembacaan inkremental diaktifkan, pembacaan konkuren ganda didukung. Dalam kasus ini, kami menyarankan agar Anda menetapkan rentang ID sehingga setiap tugas konkuren menggunakan ID yang berbeda. Untuk informasi selengkapnya, lihat Penggunaan Server ID.

    scan.incremental.snapshot.chunk.size

    Ukuran setiap chunk dalam jumlah baris.

    Tidak

    INTEGER

    8096

    Saat pembacaan snapshot inkremental diaktifkan, tabel dibagi menjadi beberapa chunk untuk dibaca. Data chunk dibuffer di memori sebelum sepenuhnya dibaca.

    Ukuran chunk yang lebih kecil menghasilkan jumlah total chunk yang lebih besar untuk tabel tersebut. Hal ini memberikan pemulihan kesalahan yang lebih granular tetapi dapat menyebabkan error kehabisan memori (OOM) dan throughput keseluruhan yang lebih rendah. Oleh karena itu, Anda perlu menyeimbangkan dan menetapkan ukuran chunk yang wajar.

    scan.snapshot.fetch.size

    Jumlah maksimum catatan yang ditarik dalam setiap batch saat Anda membaca data lengkap tabel.

    Tidak

    INTEGER

    1024

    Tidak ada.

    scan.startup.mode

    Mode startup untuk konsumsi data.

    Tidak

    STRING

    initial

    Nilai yang valid:

    • initial (default): Memindai data historis lengkap saat startup pertama kali, lalu membaca data Binlog terbaru.

    • latest-offset: Tidak memindai data historis lengkap saat startup pertama kali. Mulai membaca dari akhir Binlog (posisi Binlog terbaru), artinya hanya membaca perubahan terbaru setelah konektor dimulai.

    • earliest-offset: Tidak memindai data historis lengkap. Mulai membaca dari posisi Binlog paling awal yang tersedia.

    • specific-offset: Tidak memindai data historis lengkap. Mulai dari offset Binlog tertentu yang Anda tentukan. Anda dapat menentukan offset dengan mengatur parameter scan.startup.specific-offset.file dan scan.startup.specific-offset.pos, atau hanya mengatur parameter scan.startup.specific-offset.gtid-set untuk memulai dari set GTID tertentu.

    • timestamp: Tidak memindai data historis lengkap. Mulai membaca Binlog dari timestamp tertentu. Timestamp ditentukan oleh scan.startup.timestamp-millis dalam milidetik.

    Penting

    Saat menggunakan mode startup earliest-offset, specific-offset, atau timestamp, pastikan skema tabel yang sesuai tidak berubah antara posisi konsumsi Binlog yang ditentukan dan waktu startup pekerjaan. Hal ini menghindari error yang disebabkan oleh perbedaan skema.

    scan.startup.specific-offset.file

    Nama file Binlog untuk offset awal saat menggunakan mode offset tertentu.

    Tidak

    STRING

    Tidak ada

    Saat menggunakan parameter ini, scan.startup.mode harus diatur ke specific-offset. Contoh format nama file adalah mysql-bin.000003.

    scan.startup.specific-offset.pos

    Offset dalam file Binlog yang ditentukan untuk offset awal saat menggunakan mode offset tertentu.

    Tidak

    INTEGER

    Tidak ada

    Saat menggunakan parameter ini, scan.startup.mode harus diatur ke specific-offset.

    scan.startup.specific-offset.gtid-set

    Set GTID untuk offset awal saat menggunakan mode offset tertentu.

    Tidak

    STRING

    Tidak ada

    Saat menggunakan parameter ini, scan.startup.mode harus diatur ke specific-offset. Contoh format set GTID adalah 24DA167-0C0C-11E8-8442-00059A3C7B00:1-19.

    scan.startup.timestamp-millis

    Timestamp dalam milidetik untuk offset awal saat menggunakan mode waktu tertentu.

    Tidak

    LONG

    Tidak ada

    Saat menggunakan parameter ini, scan.startup.mode harus diatur ke timestamp. Satuannya adalah milidetik.

    Penting

    Saat menggunakan waktu tertentu, OceanBase CDC mencoba membaca event awal setiap file Binlog untuk menentukan timestamp-nya, hingga akhirnya menemukan file Binlog yang sesuai dengan waktu yang ditentukan. Pastikan file Binlog untuk timestamp yang ditentukan belum dihapus dari database dan masih dapat dibaca.

    server-time-zone

    Zona waktu sesi yang digunakan oleh database.

    Tidak

    STRING

    Jika Anda tidak menentukan parameter ini, sistem menggunakan zona waktu lingkungan runtime pekerjaan Flink sebagai zona waktu server database. Ini adalah zona waktu dari zona yang Anda pilih.

    Contoh: Asia/Shanghai. Parameter ini mengontrol bagaimana tipe TIMESTAMP dikonversi ke tipe STRING. Untuk informasi selengkapnya, lihat Debezium temporal values.

    debezium.min.row.count.to.stream.results

    Jika jumlah baris dalam tabel lebih besar dari nilai ini, mode baca batch digunakan.

    Tidak

    INTEGER

    1000

    Flink membaca data dari tabel sumber OceanBase dengan salah satu cara berikut:

    • Baca penuh: Membaca seluruh data tabel langsung ke memori. Cara ini cepat tetapi mengonsumsi memori dalam jumlah yang sesuai. Jika tabel sumber sangat besar, ada risiko error OOM.

    • Baca batch: Membaca data dalam beberapa batch, dengan jumlah baris tertentu per batch, hingga semua data terbaca. Cara ini menghindari risiko OOM untuk tabel besar tetapi relatif lebih lambat.

    connect.timeout

    Waktu maksimum menunggu sebelum mencoba koneksi ulang setelah koneksi ke server database OceanBase timeout.

    Tidak

    DURATION

    30s

    Tidak ada.

    connect.max-retries

    Jumlah maksimum percobaan ulang setelah koneksi ke layanan database OceanBase gagal.

    Tidak

    INTEGER

    3

    Tidak ada.

    connection.pool.size

    Ukuran kolam koneksi database.

    Tidak

    INTEGER

    20

    Kolam koneksi database digunakan untuk menggunakan kembali koneksi, yang dapat mengurangi jumlah koneksi database.

    jdbc.properties.*

    Parameter koneksi kustom dalam URL JDBC.

    Tidak

    STRING

    Tidak ada

    Anda dapat meneruskan parameter koneksi kustom. Misalnya, untuk tidak menggunakan protokol SSL, atur 'jdbc.properties.useSSL' = 'false'.

    Untuk informasi selengkapnya tentang parameter koneksi yang didukung, lihat MySQL Configuration Properties.

    debezium.*

    Parameter kustom untuk Debezium guna membaca log biner.

    Tidak

    STRING

    Tidak ada

    Anda dapat meneruskan parameter Debezium kustom. Misalnya, gunakan 'debezium.event.deserialization.failure.handling.mode'='ignore' untuk menentukan logika penanganan error parsing.

    heartbeat.interval

    Interval di mana sumber menggunakan event heartbeat untuk memajukan offset Binlog.

    Tidak

    DURATION

    30s

    Event heartbeat digunakan untuk memajukan offset Binlog di sumber, yang sangat berguna untuk tabel yang jarang diperbarui di OceanBase. Untuk tabel semacam itu, offset Binlog tidak maju secara otomatis. Event heartbeat dapat mendorong offset Binlog maju, mencegahnya kedaluwarsa. Offset Binlog yang kedaluwarsa dapat menyebabkan pekerjaan gagal dan tidak dapat dipulihkan, sehingga memerlukan restart tanpa status.

    scan.incremental.snapshot.chunk.key-column

    Kolom yang digunakan untuk membagi chunk selama fase snapshot.

    Lihat Keterangan.

    STRING

    Tidak ada

    • Wajib untuk tabel tanpa kunci primer. Kolom yang dipilih harus bertipe non-null (NOT NULL).

    • Opsional untuk tabel dengan kunci primer. Hanya satu kolom dari kunci primer yang dapat dipilih.

    scan.incremental.close-idle-reader.enabled

    Menentukan apakah akan menutup reader yang idle setelah fase snapshot berakhir.

    Tidak

    BOOLEAN

    false

    • Hanya didukung di Realtime Compute for Apache Flink yang menggunakan VVR 8.0.1 atau versi yang lebih baru.

    • Agar parameter ini berlaku, atur execution.checkpointing.checkpoints-after-tasks-finish.enabled ke true.

    scan.read-changelog-as-append-only.enabled

    Menentukan apakah akan mengonversi aliran data changelog menjadi aliran data append-only.

    Tidak

    BOOLEAN

    false

    Nilai yang valid:

    • true: Semua jenis pesan (termasuk INSERT, DELETE, UPDATE_BEFORE, dan UPDATE_AFTER) dikonversi menjadi pesan INSERT. Aktifkan ini hanya dalam skenario khusus, seperti saat Anda perlu menyimpan pesan hapus dari tabel leluhur.

    • false (default): Semua jenis pesan dikirim ke downstream apa adanya.

    Catatan

    Hanya didukung di Realtime Compute for Apache Flink yang menggunakan VVR 8.0.8 atau versi yang lebih baru.

    scan.only.deserialize.captured.tables.changelog.enabled

    Menentukan apakah akan mendeserialisasi event perubahan hanya untuk tabel yang ditentukan selama fase inkremental.

    Tidak

    BOOLEAN

    • Nilai default adalah false di versi VVR 8.x.

    • Nilai default adalah true di VVR 11.1 dan versi yang lebih baru.

    Nilai yang valid:

    • true: Mendeserialisasi data perubahan hanya untuk tabel target, yang mempercepat pembacaan Binlog.

    • false (default): Mendeserialisasi data perubahan untuk semua tabel.

    Catatan
    • Hanya didukung di Realtime Compute for Apache Flink yang menggunakan VVR 8.0.7 atau versi yang lebih baru.

    • Saat menggunakan parameter ini di Realtime Compute for Apache Flink yang menggunakan VVR 8.0.8 atau versi sebelumnya, ubah nama parameternya menjadi debezium.scan.only.deserialize.captured.tables.changelog.enable.

    scan.parse.online.schema.changes.enabled

    Menentukan apakah akan mengurai event DDL untuk perubahan tanpa lock di RDS selama fase inkremental.

    Tidak

    BOOLEAN

    false

    Nilai yang valid:

    • true: Mengurai event DDL untuk perubahan tanpa lock di RDS.

    • false (default): Tidak mengurai event DDL untuk perubahan tanpa lock di RDS.

    Ini adalah fitur eksperimen. Sebelum melakukan perubahan online tanpa lock, kami menyarankan agar Anda mengambil snapshot pekerjaan Flink untuk pemulihan.

    Catatan

    Hanya didukung di Realtime Compute for Apache Flink yang menggunakan VVR 11.1 atau versi yang lebih baru.

    scan.incremental.snapshot.backfill.skip

    Menentukan apakah akan melewati backfill selama fase pembacaan snapshot.

    Tidak

    BOOLEAN

    false

    Nilai yang valid:

    • true: Melewati backfill selama fase pembacaan snapshot.

    • false (default): Tidak melewati backfill selama fase pembacaan snapshot.

    Jika Anda melewati backfill, perubahan pada tabel selama fase snapshot dibaca dalam fase inkremental berikutnya, bukan digabungkan ke dalam snapshot.

    Penting

    Melewati backfill dapat menyebabkan ketidakkonsistenan data karena perubahan yang terjadi selama fase snapshot mungkin diputar ulang. Hanya semantik at-least-once yang dijamin.

    Catatan

    Hanya didukung di Realtime Compute for Apache Flink yang menggunakan VVR 11.1 atau versi yang lebih baru.

    scan.incremental.snapshot.unbounded-chunk-first.enabled

    Menentukan apakah akan mendistribusikan chunk tak terbatas terlebih dahulu selama fase pembacaan snapshot.

    Tidak

    BOOLEAN

    false

    Nilai yang valid:

    • true: Mendistribusikan chunk tak terbatas terlebih dahulu selama fase pembacaan snapshot.

    • false (default): Tidak mendistribusikan chunk tak terbatas terlebih dahulu selama fase pembacaan snapshot.

    Ini adalah fitur eksperimen. Mengaktifkannya dapat mengurangi risiko error OOM saat Pengelola Tugas menyinkronkan chunk terakhir selama fase snapshot. Kami menyarankan agar Anda menambahkan parameter ini sebelum startup pertama pekerjaan.

    Catatan

    Hanya didukung di Realtime Compute for Apache Flink yang menggunakan VVR 11.1 atau versi yang lebih baru.

  • Spesifik Tabel Dimensi

    Parameter

    Deskripsi

    Wajib

    Tipe data

    Nilai default

    Catatan

    url

    URL JDBC.

    Ya

    STRING

    Tidak ada

    • URL harus berisi nama database MySQL atau nama layanan Oracle.

    userName

    Username.

    Ya

    STRING

    Tidak ada

    Tidak ada.

    cache

    Kebijakan cache.

    Tidak

    STRING

    ALL

    Tiga kebijakan cache berikut didukung:

    • ALL: Menyimpan cache semua data di tabel dimensi. Sebelum pekerjaan dijalankan, sistem memuat semua data dari tabel dimensi ke cache. Semua pencarian data tabel dimensi selanjutnya dilakukan melalui cache. Jika data tidak ditemukan di cache, kunci tersebut tidak ada. Cache penuh dimuat ulang setelah kedaluwarsa.

      Kebijakan ini cocok untuk skenario di mana volume data tabel remote kecil dan banyak kunci yang hilang (tabel sumber dan tabel dimensi tidak dapat di-join berdasarkan kondisi ON).

    • LRU: Menyimpan cache sebagian data di tabel dimensi. Untuk setiap catatan dari tabel sumber, sistem terlebih dahulu mencari data di cache. Jika tidak ditemukan, sistem melakukan query ke tabel dimensi fisik. Saat menggunakan kebijakan cache ini, Anda harus mengonfigurasi parameter cacheSize.

    • None: Tidak ada cache.

    Penting
    • Saat menggunakan kebijakan cache ALL, pantau ukuran memori node untuk mencegah error OOM.

    • Karena sistem memuat data tabel dimensi secara asinkron, saat menggunakan kebijakan cache ALL, Anda perlu menambah memori node join tabel dimensi. Ukuran penambahan memori harus dua kali volume data tabel remote.

    cacheSize

    Jumlah maksimum entri yang dicache.

    Tidak

    INTEGER

    100000

    • Jika Anda memilih kebijakan cache LRU, Anda harus mengatur ukuran cache.

    • Jika Anda memilih kebijakan cache ALL, Anda tidak perlu mengatur ukuran cache.

    cacheTTLMs

    Waktu hidup (TTL) cache.

    Tidak

    LONG

    Long.MAX_VALUE

    Konfigurasi cacheTTLMs bergantung pada parameter cache:

    • Jika cache diatur ke None, Anda tidak perlu mengonfigurasi cacheTTLMs. Artinya cache tidak kedaluwarsa.

    • Jika cache diatur ke LRU, cacheTTLMs adalah TTL cache. Secara default, cache tidak kedaluwarsa.

    • Jika cache diatur ke ALL, cacheTTLMs adalah waktu reload cache. Secara default, cache tidak dimuat ulang.

    maxRetryTimeout

    Waktu retry maksimum.

    Tidak

    DURATION

    60s

    Tidak ada.

  • Spesifik Sink

    Parameter

    Deskripsi

    Wajib

    Tipe data

    Nilai default

    Catatan

    userName

    Username.

    Ya

    STRING

    Tidak ada

    Tidak ada.

    compatibleMode

    Mode kompatibilitas OceanBase.

    Tidak

    STRING

    mysql

    Nilai valid:

    • mysql

    • oracle

    Catatan

    Ini adalah parameter khusus OceanBase.

    url

    URL JDBC.

    Ya

    STRING

    Tidak ada

    • URL harus berisi nama database MySQL atau nama layanan Oracle.

    tableName

    Nama tabel.

    Ya

    STRING

    Tidak ada

    Tidak ada.

    maxRetryTimes

    Jumlah maksimum percobaan ulang.

    Tidak

    INTEGER

    3

    Tidak ada.

    poolInitialSize

    Ukuran awal kumpulan koneksi basis data.

    Tidak

    INTEGER

    1

    Tidak ada.

    poolMaxActive

    Jumlah maksimum koneksi dalam kumpulan koneksi basis data.

    Tidak

    INTEGER

    8

    Tidak ada.

    poolMaxWait

    Waktu maksimum menunggu koneksi dari kolam koneksi database.

    Tidak

    INTEGER

    2000

    Satuan: milidetik.

    poolMinIdle

    Jumlah minimum koneksi idle dalam kumpulan koneksi basis data.

    Tidak

    INTEGER

    1

    Tidak ada.

    connectionProperties

    Properti koneksi JDBC.

    Tidak

    STRING

    Tidak ada

    Formatnya adalah "k1=v1;k2=v2;k3=v3".

    ignoreDelete

    Menentukan apakah akan mengabaikan operasi DELETE.

    Tidak

    Boolean

    false

    Tidak ada.

    excludeUpdateColumns

    Menentukan nama kolom yang akan dikecualikan. Kolom-kolom ini tidak diperbarui selama operasi update.

    Tidak

    STRING

    Tidak ada

    Jika Anda menentukan beberapa kolom untuk diabaikan, pisahkan dengan koma (,). Contoh: excludeUpdateColumns=column1,column2.

    Catatan

    Nilai ini selalu mencakup kolom kunci primer. Kolom yang benar-benar berlaku adalah kolom yang Anda tentukan ditambah kolom kunci primer.

    partitionKey

    Kunci partisi.

    Tidak

    STRING

    Tidak ada

    Saat kunci partisi ditetapkan, konektor terlebih dahulu mengelompokkan data berdasarkan kunci partisi. Setiap kelompok kemudian ditulis ke database secara terpisah. Pengelompokan ini diproses sebelum modRule.

    modRule

    Aturan pengelompokan.

    Tidak

    STRING

    Tidak ada

    Aturan pengelompokan harus dalam format "column_name mod number", seperti user_id mod 8. Kolom harus bertipe numerik.

    Saat aturan pengelompokan ditetapkan, data terlebih dahulu dipartisi berdasarkan partitionKey. Di dalam setiap partisi, data kemudian dikelompokkan berdasarkan hasil perhitungan modRule.

    bufferSize

    Ukuran buffer data.

    Tidak

    INTEGER

    1000

    Tidak ada.

    flushIntervalMs

    Interval untuk flushing cache. Jika data dalam cache tidak memenuhi kondisi output setelah waktu tunggu yang ditentukan, sistem secara otomatis mengeluarkan semua data dalam cache.

    Tidak

    LONG

    1000

    Tidak ada.

    retryIntervalMs

    Waktu retry maksimum.

    Tidak

    INTEGER

    5000

    Satuan: milidetik.

Pemetaan tipe

  • Mode kompatibel MySQL

    Tipe bidang OceanBase

    Tipe bidang Flink

    TINYINT

    TINYINT

    SMALLINT

    SMALLINT

    TINYINT UNSIGNED

    INT

    INT

    MEDIUMINT

    SMALLINT UNSIGNED

    BIGINT

    BIGINT

    INT UNSIGNED

    BIGINT UNSIGNED

    DECIMAL(20, 0)

    REAL

    FLOAT

    FLOAT

    DOUBLE

    DOUBLE

    NUMERIC(p, s)

    DECIMAL(p, s)

    Catatan

    p harus kurang dari atau sama dengan 38.

    DECIMAL(p, s)

    BOOLEAN

    BOOLEAN

    TINYINT(1)

    DATE

    DATE

    TIME [(p)]

    TIME [(p)] [WITHOUT TIME ZONE]

    DATETIME [(p)]

    TIMESTAMP [(p)] [WITHOUT TIME ZONE]

    TIMESTAMP [(p)]

    CHAR(n)

    CHAR(n)

    VARCHAR(n)

    VARCHAR(n)

    BIT(n)

    BINARY(⌈n/8⌉)

    BINARY(n)

    BINARY(n)

    VARBINARY(N)

    VARBINARY(N)

    TINYTEXT

    STRING

    TEXT

    MEDIUMTEXT

    LONGTEXT

    TINYBLOB

    BYTES

    Penting

    Flink hanya mendukung catatan tipe BLOB yang kurang dari atau sama dengan 2.147.483.647 (2^31 - 1) byte.

    BLOB

    MEDIUMBLOB

    LONGBLOB

  • Mode kompatibel Oracle

    Tipe bidang OceanBase

    Tipe bidang Flink

    NUMBER(p, s <= 0), p - s < 3

    TINYINT

    NUMBER(p, s <= 0), p - s < 5

    SMALLINT

    NUMBER(p, s <= 0), p - s < 10

    INT

    NUMBER(p, s <= 0), p - s < 19

    BIGINT

    NUMBER(p, s <= 0), 19 <= p - s <= 38

    DECIMAL(p - s, 0)

    NUMBER(p, s > 0)

    DECIMAL(p, s)

    NUMBER(p, s <= 0), p - s > 38

    STRING

    FLOAT

    FLOAT

    BINARY_FLOAT

    BINARY_DOUBLE

    DOUBLE

    NUMBER(1)

    BOOLEAN

    DATE

    TIMESTAMP [(p)] [WITHOUT TIMEZONE]

    TIMESTAMP [(p)]

    CHAR(n)

    STRING

    NCHAR(n)

    NVARCHAR2(n)

    VARCHAR(n)

    VARCHAR2(n)

    CLOB

    BLOB

    BYTES

    ROWID

Contoh penggunaan

  • Tabel Sumber dan Tabel Sink

    -- Tabel sumber OceanBase CDC
    CREATE TEMPORARY TABLE oceanbase_source (
      a INT,
      b VARCHAR,
      c VARCHAR
    ) WITH (
      'connector' = 'oceanbase',
      'hostname' = '<yourHostname>',
      'port' = '3306',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>',
      'database-name' = '<yourDatabaseName>',
      'table-name' = '<yourTableName>'
    );
    
    -- Tabel sink OceanBase
    CREATE TEMPORARY TABLE oceanbase_sink (
      a INT,
      b VARCHAR,
      c VARCHAR
    ) WITH (
      'connector' = 'oceanbase',
      'url' = '<yourJdbcUrl>',
      'userName' = '<yourUserName>',
      'password' = '<yourPassword>',
      'tableName' = '<yourTableName>'
    );
    
    
    BEGIN STATEMENT SET;  
    INSERT INTO oceanbase_sink
    SELECT * FROM oceanbase_source;
    END; 

  • Tabel dimensi

    CREATE TEMPORARY TABLE datagen_source(
      a INT,
      b BIGINT,
      c STRING,
      `proctime` AS PROCTIME()
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE oceanbase_dim (
      a INT,
      b VARCHAR,
      c VARCHAR
    ) WITH (
      'connector' = 'oceanbase',
      'url' = '<yourJdbcUrl>',
      'userName' = '<yourUserName>',
      'password' = '${secret_values.password}',
      'tableName' = '<yourTableName>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
      a INT,
      b STRING
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink
    SELECT T.a, H.b
    FROM datagen_source AS T 
    JOIN oceanbase_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H 
    ON T.a = H.a;

Referensi

Untuk informasi selengkapnya tentang konektor yang didukung Flink, lihat Konektor yang didukung.