Topik ini menjelaskan cara menggunakan Konektor OceanBase.
Informasi latar belakang
OceanBase adalah sistem manajemen database pemrosesan transaksional dan analitik hibrida (HTAP) terdistribusi native. Untuk informasi selengkapnya, lihat situs web resmi OceanBase. OceanBase mendukung mode kompatibilitas Oracle dan MySQL, sehingga mengurangi biaya refaktor sistem bisnis saat Anda melakukan migrasi dari database MySQL atau Oracle. Tipe data, fitur SQL, dan tampilan internal dalam masing-masing mode tersebut konsisten dengan MySQL atau Oracle. Konektor yang direkomendasikan untuk setiap mode adalah sebagai berikut:
Mode Oracle: Anda hanya dapat menggunakan konektor OceanBase.
Mode MySQL: Mode ini sangat kompatibel dengan sintaksis MySQL native. Anda dapat menggunakan konektor OceanBase dan MySQL untuk membaca dan menulis ke OceanBase.
PentingKonektor OceanBase sedang dalam pratinjau publik. Untuk OceanBase 3.2.4.4 dan versi setelahnya, Anda dapat menggunakan konektor MySQL untuk membaca dan menulis ke OceanBase. Fitur ini juga dalam pratinjau publik. Evaluasi fitur ini secara hati-hati sebelum menggunakannya.
Saat Anda menggunakan konektor MySQL untuk membaca data inkremental dari OceanBase, pastikan binary logging (Binlog) OceanBase diaktifkan dan dikonfigurasi dengan benar. Untuk informasi selengkapnya tentang Binlog OceanBase, lihat Ikhtisar atau Operasi terkait binary logging.
Tabel berikut menjelaskan informasi yang didukung oleh konektor OceanBase.
Kategori | Rincian |
Tipe yang didukung | Tabel sumber, tabel dimensi, dan tabel sink |
Mode operasi | Streaming dan batch |
Format data | Tidak berlaku |
Metrik pemantauan spesifik | Tidak ada |
Tipe API | SQL |
Mendukung pembaruan atau penghapusan data di tabel sink | Ya |
Prasyarat
Database dan tabel yang ingin Anda hubungkan telah dibuat.
Daftar putih alamat IP telah dikonfigurasi. Untuk informasi selengkapnya, lihat Konfigurasi grup daftar putih.
Untuk mengumpulkan data inkremental menggunakan change data capture (CDC) dari OceanBase, Anda juga harus mengaktifkan layanan Binlog OceanBase. Untuk informasi selengkapnya, lihat Operasi terkait binary logging.
Batasan
Konektor OceanBase didukung di Realtime Compute for Apache Flink dengan Ververica Runtime (VVR) 8.0.1 atau versi yang lebih baru.
Semantik at-least-once dijamin. Jika tabel sink memiliki kunci primer, idempotensi memastikan kebenaran data.
Sintaksis
CREATE TABLE oceanabse_source (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
'connector' = 'oceanbase',
'url' = '<yourJdbcUrl>',
'tableName' = '<yourTableName>',
'userName' = '<yourUserName>',
'password' = '<yourPassword>'
);Konektor menulis ke tabel sink dengan membuat dan mengeksekusi pernyataan SQL untuk setiap catatan data yang diterima.
Jika tabel sink tidak memiliki kunci primer, pernyataan INSERT INTO akan dihasilkan.
Jika tabel sink memiliki kunci primer, pernyataan UPSERT akan dihasilkan berdasarkan mode kompatibilitas database.
Parameter WITH
Umum
Parameter
Deskripsi
Wajib
Tipe data
Nilai default
Catatan
connector
Tipe tabel.
Ya
STRING
Tidak ada
Nilainya harus
oceanbase.password
Kata sandi.
Ya
STRING
Tidak ada
Tidak ada.
Spesifik Sumber
PentingCatatan: Mulai dari Realtime Compute for Apache Flink dengan VVR 11.4.0, konektor CDC OceanBase telah mengalami peningkatan arsitektur besar-besaran dan penyesuaian fitur. Perubahan inti dijelaskan sebagai berikut untuk membantu Anda memahami pembaruan dan melakukan migrasi versi dengan lancar:
Konektor CDC berbasis layanan OceanBase LogProxy yang asli telah secara resmi ditinggalkan dan dihapus dari distribusi. Mulai dari VVR 11.4.0, konektor CDC OceanBase hanya mendukung pengambilan log inkremental dan sinkronisasi data melalui layanan Binlog OceanBase.
Konektor CDC OceanBase menyediakan kompatibilitas protokol yang ditingkatkan dan stabilitas koneksi dengan layanan Binlog OceanBase. Oleh karena itu, kami merekomendasikan agar Anda menggunakan konektor CDC OceanBase.
Layanan Binlog OceanBase sepenuhnya kompatibel dengan protokol replikasi MySQL. Anda juga dapat menggunakan konektor standar MySQL CDC untuk terhubung ke layanan Binlog OceanBase untuk pelacakan perubahan, tetapi hal ini tidak disarankan.
Mulai dari Realtime Compute for Apache Flink dengan VVR 11.4.0, konektor CDC OceanBase tidak lagi mendukung langganan data inkremental dalam mode kompatibilitas Oracle. Untuk langganan data inkremental dalam mode kompatibilitas Oracle, hubungi dukungan teknis enterprise OceanBase.
Parameter
Deskripsi
Wajib
Tipe data
Nilai default
Keterangan
hostname
Alamat IP atau hostname database OceanBase.
Ya
STRING
Tidak
Kami menyarankan agar Anda menentukan alamat virtual private cloud (VPC).
CatatanJika OceanBase dan Realtime Compute for Apache Flink tidak berada dalam VPC yang sama, Anda harus terlebih dahulu membuat koneksi jaringan cross-VPC atau menggunakan Internet untuk akses. Untuk informasi selengkapnya, lihat Manajemen dan operasi penyimpanan dan Bagaimana kluster Flink yang sepenuhnya dikelola mengakses Internet?.
username
Username untuk layanan database OceanBase.
Ya
STRING
Tidak
Tidak ada.
database-name
Nama database OceanBase.
Ya
STRING
Tidak ada
Sebagai tabel sumber, nama database mendukung ekspresi reguler untuk membaca data dari beberapa database.
Saat menggunakan ekspresi reguler, hindari penggunaan simbol ^ dan $ untuk mencocokkan awal dan akhir. Untuk alasannya, lihat keterangan parameter table-name.
table-name
Ditunjukkan oleh OceanBase.
Ya
STRING
Tidak ada
Sebagai tabel sumber, nama tabel mendukung ekspresi reguler untuk membaca data dari beberapa tabel.
Saat menggunakan ekspresi reguler, hindari penggunaan simbol ^ dan $ untuk mencocokkan awal dan akhir. Untuk alasannya, lihat catatan berikut.
CatatanSaat tabel sumber OceanBase mencocokkan nama tabel dengan ekspresi reguler, konektor menggabungkan database-name dan table-name yang Anda berikan menjadi ekspresi reguler path lengkap menggunakan string \\. (karakter . digunakan sebelum VVR 8.0.1). Ekspresi reguler gabungan ini kemudian digunakan untuk mencocokkan nama tabel yang memenuhi syarat penuh di database OceanBase.
Contohnya, jika Anda mengatur 'database-name'='db_.*' dan 'table-name'='tb_.+', konektor menggunakan ekspresi reguler db_.*\\.tb_.+ (atau db_.*.tb_.+ sebelum versi 8.0.1) untuk mencocokkan nama tabel yang memenuhi syarat penuh guna menentukan tabel mana yang akan dibaca.
port
Nomor port layanan database OceanBase.
Tidak
INTEGER
3306
Tidak ada.
server-id
ID numerik untuk client database.
Tidak
STRING
Nilai acak antara 5400 dan 6400 dihasilkan secara default.
ID ini harus unik secara global. Kami menyarankan agar Anda menetapkan ID yang berbeda untuk setiap pekerjaan yang terhubung ke database yang sama.
Parameter ini juga mendukung format rentang ID, seperti 5400-5408. Saat pembacaan inkremental diaktifkan, pembacaan konkuren ganda didukung. Dalam kasus ini, kami menyarankan agar Anda menetapkan rentang ID sehingga setiap tugas konkuren menggunakan ID yang berbeda. Untuk informasi selengkapnya, lihat Penggunaan Server ID.
scan.incremental.snapshot.chunk.size
Ukuran setiap chunk dalam jumlah baris.
Tidak
INTEGER
8096
Saat pembacaan snapshot inkremental diaktifkan, tabel dibagi menjadi beberapa chunk untuk dibaca. Data chunk dibuffer di memori sebelum sepenuhnya dibaca.
Ukuran chunk yang lebih kecil menghasilkan jumlah total chunk yang lebih besar untuk tabel tersebut. Hal ini memberikan pemulihan kesalahan yang lebih granular tetapi dapat menyebabkan error kehabisan memori (OOM) dan throughput keseluruhan yang lebih rendah. Oleh karena itu, Anda perlu menyeimbangkan dan menetapkan ukuran chunk yang wajar.
scan.snapshot.fetch.size
Jumlah maksimum catatan yang ditarik dalam setiap batch saat Anda membaca data lengkap tabel.
Tidak
INTEGER
1024
Tidak ada.
scan.startup.mode
Mode startup untuk konsumsi data.
Tidak
STRING
initial
Nilai yang valid:
initial (default): Memindai data historis lengkap saat startup pertama kali, lalu membaca data Binlog terbaru.
latest-offset: Tidak memindai data historis lengkap saat startup pertama kali. Mulai membaca dari akhir Binlog (posisi Binlog terbaru), artinya hanya membaca perubahan terbaru setelah konektor dimulai.
earliest-offset: Tidak memindai data historis lengkap. Mulai membaca dari posisi Binlog paling awal yang tersedia.
specific-offset: Tidak memindai data historis lengkap. Mulai dari offset Binlog tertentu yang Anda tentukan. Anda dapat menentukan offset dengan mengatur parameter scan.startup.specific-offset.file dan scan.startup.specific-offset.pos, atau hanya mengatur parameter scan.startup.specific-offset.gtid-set untuk memulai dari set GTID tertentu.
timestamp: Tidak memindai data historis lengkap. Mulai membaca Binlog dari timestamp tertentu. Timestamp ditentukan oleh scan.startup.timestamp-millis dalam milidetik.
PentingSaat menggunakan mode startup earliest-offset, specific-offset, atau timestamp, pastikan skema tabel yang sesuai tidak berubah antara posisi konsumsi Binlog yang ditentukan dan waktu startup pekerjaan. Hal ini menghindari error yang disebabkan oleh perbedaan skema.
scan.startup.specific-offset.file
Nama file Binlog untuk offset awal saat menggunakan mode offset tertentu.
Tidak
STRING
Tidak ada
Saat menggunakan parameter ini, scan.startup.mode harus diatur ke specific-offset. Contoh format nama file adalah
mysql-bin.000003.scan.startup.specific-offset.pos
Offset dalam file Binlog yang ditentukan untuk offset awal saat menggunakan mode offset tertentu.
Tidak
INTEGER
Tidak ada
Saat menggunakan parameter ini, scan.startup.mode harus diatur ke specific-offset.
scan.startup.specific-offset.gtid-set
Set GTID untuk offset awal saat menggunakan mode offset tertentu.
Tidak
STRING
Tidak ada
Saat menggunakan parameter ini, scan.startup.mode harus diatur ke specific-offset. Contoh format set GTID adalah
24DA167-0C0C-11E8-8442-00059A3C7B00:1-19.scan.startup.timestamp-millis
Timestamp dalam milidetik untuk offset awal saat menggunakan mode waktu tertentu.
Tidak
LONG
Tidak ada
Saat menggunakan parameter ini, scan.startup.mode harus diatur ke timestamp. Satuannya adalah milidetik.
PentingSaat menggunakan waktu tertentu, OceanBase CDC mencoba membaca event awal setiap file Binlog untuk menentukan timestamp-nya, hingga akhirnya menemukan file Binlog yang sesuai dengan waktu yang ditentukan. Pastikan file Binlog untuk timestamp yang ditentukan belum dihapus dari database dan masih dapat dibaca.
server-time-zone
Zona waktu sesi yang digunakan oleh database.
Tidak
STRING
Jika Anda tidak menentukan parameter ini, sistem menggunakan zona waktu lingkungan runtime pekerjaan Flink sebagai zona waktu server database. Ini adalah zona waktu dari zona yang Anda pilih.
Contoh: Asia/Shanghai. Parameter ini mengontrol bagaimana tipe TIMESTAMP dikonversi ke tipe STRING. Untuk informasi selengkapnya, lihat Debezium temporal values.
debezium.min.row.count.to.stream.results
Jika jumlah baris dalam tabel lebih besar dari nilai ini, mode baca batch digunakan.
Tidak
INTEGER
1000
Flink membaca data dari tabel sumber OceanBase dengan salah satu cara berikut:
Baca penuh: Membaca seluruh data tabel langsung ke memori. Cara ini cepat tetapi mengonsumsi memori dalam jumlah yang sesuai. Jika tabel sumber sangat besar, ada risiko error OOM.
Baca batch: Membaca data dalam beberapa batch, dengan jumlah baris tertentu per batch, hingga semua data terbaca. Cara ini menghindari risiko OOM untuk tabel besar tetapi relatif lebih lambat.
connect.timeout
Waktu maksimum menunggu sebelum mencoba koneksi ulang setelah koneksi ke server database OceanBase timeout.
Tidak
DURATION
30s
Tidak ada.
connect.max-retries
Jumlah maksimum percobaan ulang setelah koneksi ke layanan database OceanBase gagal.
Tidak
INTEGER
3
Tidak ada.
connection.pool.size
Ukuran kolam koneksi database.
Tidak
INTEGER
20
Kolam koneksi database digunakan untuk menggunakan kembali koneksi, yang dapat mengurangi jumlah koneksi database.
jdbc.properties.*
Parameter koneksi kustom dalam URL JDBC.
Tidak
STRING
Tidak ada
Anda dapat meneruskan parameter koneksi kustom. Misalnya, untuk tidak menggunakan protokol SSL, atur 'jdbc.properties.useSSL' = 'false'.
Untuk informasi selengkapnya tentang parameter koneksi yang didukung, lihat MySQL Configuration Properties.
debezium.*
Parameter kustom untuk Debezium guna membaca log biner.
Tidak
STRING
Tidak ada
Anda dapat meneruskan parameter Debezium kustom. Misalnya, gunakan 'debezium.event.deserialization.failure.handling.mode'='ignore' untuk menentukan logika penanganan error parsing.
heartbeat.interval
Interval di mana sumber menggunakan event heartbeat untuk memajukan offset Binlog.
Tidak
DURATION
30s
Event heartbeat digunakan untuk memajukan offset Binlog di sumber, yang sangat berguna untuk tabel yang jarang diperbarui di OceanBase. Untuk tabel semacam itu, offset Binlog tidak maju secara otomatis. Event heartbeat dapat mendorong offset Binlog maju, mencegahnya kedaluwarsa. Offset Binlog yang kedaluwarsa dapat menyebabkan pekerjaan gagal dan tidak dapat dipulihkan, sehingga memerlukan restart tanpa status.
scan.incremental.snapshot.chunk.key-column
Kolom yang digunakan untuk membagi chunk selama fase snapshot.
Lihat Keterangan.
STRING
Tidak ada
Wajib untuk tabel tanpa kunci primer. Kolom yang dipilih harus bertipe non-null (NOT NULL).
Opsional untuk tabel dengan kunci primer. Hanya satu kolom dari kunci primer yang dapat dipilih.
scan.incremental.close-idle-reader.enabled
Menentukan apakah akan menutup reader yang idle setelah fase snapshot berakhir.
Tidak
BOOLEAN
false
Hanya didukung di Realtime Compute for Apache Flink yang menggunakan VVR 8.0.1 atau versi yang lebih baru.
Agar parameter ini berlaku, atur execution.checkpointing.checkpoints-after-tasks-finish.enabled ke true.
scan.read-changelog-as-append-only.enabled
Menentukan apakah akan mengonversi aliran data changelog menjadi aliran data append-only.
Tidak
BOOLEAN
false
Nilai yang valid:
true: Semua jenis pesan (termasuk INSERT, DELETE, UPDATE_BEFORE, dan UPDATE_AFTER) dikonversi menjadi pesan INSERT. Aktifkan ini hanya dalam skenario khusus, seperti saat Anda perlu menyimpan pesan hapus dari tabel leluhur.
false (default): Semua jenis pesan dikirim ke downstream apa adanya.
CatatanHanya didukung di Realtime Compute for Apache Flink yang menggunakan VVR 8.0.8 atau versi yang lebih baru.
scan.only.deserialize.captured.tables.changelog.enabled
Menentukan apakah akan mendeserialisasi event perubahan hanya untuk tabel yang ditentukan selama fase inkremental.
Tidak
BOOLEAN
Nilai default adalah false di versi VVR 8.x.
Nilai default adalah true di VVR 11.1 dan versi yang lebih baru.
Nilai yang valid:
true: Mendeserialisasi data perubahan hanya untuk tabel target, yang mempercepat pembacaan Binlog.
false (default): Mendeserialisasi data perubahan untuk semua tabel.
CatatanHanya didukung di Realtime Compute for Apache Flink yang menggunakan VVR 8.0.7 atau versi yang lebih baru.
Saat menggunakan parameter ini di Realtime Compute for Apache Flink yang menggunakan VVR 8.0.8 atau versi sebelumnya, ubah nama parameternya menjadi debezium.scan.only.deserialize.captured.tables.changelog.enable.
scan.parse.online.schema.changes.enabled
Menentukan apakah akan mengurai event DDL untuk perubahan tanpa lock di RDS selama fase inkremental.
Tidak
BOOLEAN
false
Nilai yang valid:
true: Mengurai event DDL untuk perubahan tanpa lock di RDS.
false (default): Tidak mengurai event DDL untuk perubahan tanpa lock di RDS.
Ini adalah fitur eksperimen. Sebelum melakukan perubahan online tanpa lock, kami menyarankan agar Anda mengambil snapshot pekerjaan Flink untuk pemulihan.
CatatanHanya didukung di Realtime Compute for Apache Flink yang menggunakan VVR 11.1 atau versi yang lebih baru.
scan.incremental.snapshot.backfill.skip
Menentukan apakah akan melewati backfill selama fase pembacaan snapshot.
Tidak
BOOLEAN
false
Nilai yang valid:
true: Melewati backfill selama fase pembacaan snapshot.
false (default): Tidak melewati backfill selama fase pembacaan snapshot.
Jika Anda melewati backfill, perubahan pada tabel selama fase snapshot dibaca dalam fase inkremental berikutnya, bukan digabungkan ke dalam snapshot.
PentingMelewati backfill dapat menyebabkan ketidakkonsistenan data karena perubahan yang terjadi selama fase snapshot mungkin diputar ulang. Hanya semantik at-least-once yang dijamin.
CatatanHanya didukung di Realtime Compute for Apache Flink yang menggunakan VVR 11.1 atau versi yang lebih baru.
scan.incremental.snapshot.unbounded-chunk-first.enabled
Menentukan apakah akan mendistribusikan chunk tak terbatas terlebih dahulu selama fase pembacaan snapshot.
Tidak
BOOLEAN
false
Nilai yang valid:
true: Mendistribusikan chunk tak terbatas terlebih dahulu selama fase pembacaan snapshot.
false (default): Tidak mendistribusikan chunk tak terbatas terlebih dahulu selama fase pembacaan snapshot.
Ini adalah fitur eksperimen. Mengaktifkannya dapat mengurangi risiko error OOM saat Pengelola Tugas menyinkronkan chunk terakhir selama fase snapshot. Kami menyarankan agar Anda menambahkan parameter ini sebelum startup pertama pekerjaan.
CatatanHanya didukung di Realtime Compute for Apache Flink yang menggunakan VVR 11.1 atau versi yang lebih baru.
Spesifik Tabel Dimensi
Parameter
Deskripsi
Wajib
Tipe data
Nilai default
Catatan
url
URL JDBC.
Ya
STRING
Tidak ada
URL harus berisi nama database MySQL atau nama layanan Oracle.
userName
Username.
Ya
STRING
Tidak ada
Tidak ada.
cache
Kebijakan cache.
Tidak
STRING
ALL
Tiga kebijakan cache berikut didukung:
ALL: Menyimpan cache semua data di tabel dimensi. Sebelum pekerjaan dijalankan, sistem memuat semua data dari tabel dimensi ke cache. Semua pencarian data tabel dimensi selanjutnya dilakukan melalui cache. Jika data tidak ditemukan di cache, kunci tersebut tidak ada. Cache penuh dimuat ulang setelah kedaluwarsa.
Kebijakan ini cocok untuk skenario di mana volume data tabel remote kecil dan banyak kunci yang hilang (tabel sumber dan tabel dimensi tidak dapat di-join berdasarkan kondisi ON).
LRU: Menyimpan cache sebagian data di tabel dimensi. Untuk setiap catatan dari tabel sumber, sistem terlebih dahulu mencari data di cache. Jika tidak ditemukan, sistem melakukan query ke tabel dimensi fisik. Saat menggunakan kebijakan cache ini, Anda harus mengonfigurasi parameter cacheSize.
None: Tidak ada cache.
PentingSaat menggunakan kebijakan cache ALL, pantau ukuran memori node untuk mencegah error OOM.
Karena sistem memuat data tabel dimensi secara asinkron, saat menggunakan kebijakan cache ALL, Anda perlu menambah memori node join tabel dimensi. Ukuran penambahan memori harus dua kali volume data tabel remote.
cacheSize
Jumlah maksimum entri yang dicache.
Tidak
INTEGER
100000
Jika Anda memilih kebijakan cache LRU, Anda harus mengatur ukuran cache.
Jika Anda memilih kebijakan cache ALL, Anda tidak perlu mengatur ukuran cache.
cacheTTLMs
Waktu hidup (TTL) cache.
Tidak
LONG
Long.MAX_VALUE
Konfigurasi cacheTTLMs bergantung pada parameter cache:
Jika cache diatur ke None, Anda tidak perlu mengonfigurasi cacheTTLMs. Artinya cache tidak kedaluwarsa.
Jika cache diatur ke LRU, cacheTTLMs adalah TTL cache. Secara default, cache tidak kedaluwarsa.
Jika cache diatur ke ALL, cacheTTLMs adalah waktu reload cache. Secara default, cache tidak dimuat ulang.
maxRetryTimeout
Waktu retry maksimum.
Tidak
DURATION
60s
Tidak ada.
Spesifik Sink
Parameter
Deskripsi
Wajib
Tipe data
Nilai default
Catatan
userName
Username.
Ya
STRING
Tidak ada
Tidak ada.
compatibleMode
Mode kompatibilitas OceanBase.
Tidak
STRING
mysql
Nilai valid:
mysql
oracle
CatatanIni adalah parameter khusus OceanBase.
url
URL JDBC.
Ya
STRING
Tidak ada
URL harus berisi nama database MySQL atau nama layanan Oracle.
tableName
Nama tabel.
Ya
STRING
Tidak ada
Tidak ada.
maxRetryTimes
Jumlah maksimum percobaan ulang.
Tidak
INTEGER
3
Tidak ada.
poolInitialSize
Ukuran awal kumpulan koneksi basis data.
Tidak
INTEGER
1
Tidak ada.
poolMaxActive
Jumlah maksimum koneksi dalam kumpulan koneksi basis data.
Tidak
INTEGER
8
Tidak ada.
poolMaxWait
Waktu maksimum menunggu koneksi dari kolam koneksi database.
Tidak
INTEGER
2000
Satuan: milidetik.
poolMinIdle
Jumlah minimum koneksi idle dalam kumpulan koneksi basis data.
Tidak
INTEGER
1
Tidak ada.
connectionProperties
Properti koneksi JDBC.
Tidak
STRING
Tidak ada
Formatnya adalah "k1=v1;k2=v2;k3=v3".
ignoreDelete
Menentukan apakah akan mengabaikan operasi DELETE.
Tidak
Boolean
false
Tidak ada.
excludeUpdateColumns
Menentukan nama kolom yang akan dikecualikan. Kolom-kolom ini tidak diperbarui selama operasi update.
Tidak
STRING
Tidak ada
Jika Anda menentukan beberapa kolom untuk diabaikan, pisahkan dengan koma (,). Contoh:
excludeUpdateColumns=column1,column2.CatatanNilai ini selalu mencakup kolom kunci primer. Kolom yang benar-benar berlaku adalah kolom yang Anda tentukan ditambah kolom kunci primer.
partitionKey
Kunci partisi.
Tidak
STRING
Tidak ada
Saat kunci partisi ditetapkan, konektor terlebih dahulu mengelompokkan data berdasarkan kunci partisi. Setiap kelompok kemudian ditulis ke database secara terpisah. Pengelompokan ini diproses sebelum modRule.
modRule
Aturan pengelompokan.
Tidak
STRING
Tidak ada
Aturan pengelompokan harus dalam format "column_name mod number", seperti
user_id mod 8. Kolom harus bertipe numerik.Saat aturan pengelompokan ditetapkan, data terlebih dahulu dipartisi berdasarkan partitionKey. Di dalam setiap partisi, data kemudian dikelompokkan berdasarkan hasil perhitungan modRule.
bufferSize
Ukuran buffer data.
Tidak
INTEGER
1000
Tidak ada.
flushIntervalMs
Interval untuk flushing cache. Jika data dalam cache tidak memenuhi kondisi output setelah waktu tunggu yang ditentukan, sistem secara otomatis mengeluarkan semua data dalam cache.
Tidak
LONG
1000
Tidak ada.
retryIntervalMs
Waktu retry maksimum.
Tidak
INTEGER
5000
Satuan: milidetik.
Pemetaan tipe
Mode kompatibel MySQL
Tipe bidang OceanBase
Tipe bidang Flink
TINYINT
TINYINT
SMALLINT
SMALLINT
TINYINT UNSIGNED
INT
INT
MEDIUMINT
SMALLINT UNSIGNED
BIGINT
BIGINT
INT UNSIGNED
BIGINT UNSIGNED
DECIMAL(20, 0)
REAL
FLOAT
FLOAT
DOUBLE
DOUBLE
NUMERIC(p, s)
DECIMAL(p, s)
Catatanp harus kurang dari atau sama dengan 38.
DECIMAL(p, s)
BOOLEAN
BOOLEAN
TINYINT(1)
DATE
DATE
TIME [(p)]
TIME [(p)] [WITHOUT TIME ZONE]
DATETIME [(p)]
TIMESTAMP [(p)] [WITHOUT TIME ZONE]
TIMESTAMP [(p)]
CHAR(n)
CHAR(n)
VARCHAR(n)
VARCHAR(n)
BIT(n)
BINARY(⌈n/8⌉)
BINARY(n)
BINARY(n)
VARBINARY(N)
VARBINARY(N)
TINYTEXT
STRING
TEXT
MEDIUMTEXT
LONGTEXT
TINYBLOB
BYTES
PentingFlink hanya mendukung catatan tipe BLOB yang kurang dari atau sama dengan 2.147.483.647 (2^31 - 1) byte.
BLOB
MEDIUMBLOB
LONGBLOB
Mode kompatibel Oracle
Tipe bidang OceanBase
Tipe bidang Flink
NUMBER(p, s <= 0), p - s < 3
TINYINT
NUMBER(p, s <= 0), p - s < 5
SMALLINT
NUMBER(p, s <= 0), p - s < 10
INT
NUMBER(p, s <= 0), p - s < 19
BIGINT
NUMBER(p, s <= 0), 19 <= p - s <= 38
DECIMAL(p - s, 0)
NUMBER(p, s > 0)
DECIMAL(p, s)
NUMBER(p, s <= 0), p - s > 38
STRING
FLOAT
FLOAT
BINARY_FLOAT
BINARY_DOUBLE
DOUBLE
NUMBER(1)
BOOLEAN
DATE
TIMESTAMP [(p)] [WITHOUT TIMEZONE]
TIMESTAMP [(p)]
CHAR(n)
STRING
NCHAR(n)
NVARCHAR2(n)
VARCHAR(n)
VARCHAR2(n)
CLOB
BLOB
BYTES
ROWID
Contoh penggunaan
Tabel Sumber dan Tabel Sink
-- Tabel sumber OceanBase CDC CREATE TEMPORARY TABLE oceanbase_source ( a INT, b VARCHAR, c VARCHAR ) WITH ( 'connector' = 'oceanbase', 'hostname' = '<yourHostname>', 'port' = '3306', 'username' = '<yourUsername>', 'password' = '<yourPassword>', 'database-name' = '<yourDatabaseName>', 'table-name' = '<yourTableName>' ); -- Tabel sink OceanBase CREATE TEMPORARY TABLE oceanbase_sink ( a INT, b VARCHAR, c VARCHAR ) WITH ( 'connector' = 'oceanbase', 'url' = '<yourJdbcUrl>', 'userName' = '<yourUserName>', 'password' = '<yourPassword>', 'tableName' = '<yourTableName>' ); BEGIN STATEMENT SET; INSERT INTO oceanbase_sink SELECT * FROM oceanbase_source; END;Tabel dimensi
CREATE TEMPORARY TABLE datagen_source( a INT, b BIGINT, c STRING, `proctime` AS PROCTIME() ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE oceanbase_dim ( a INT, b VARCHAR, c VARCHAR ) WITH ( 'connector' = 'oceanbase', 'url' = '<yourJdbcUrl>', 'userName' = '<yourUserName>', 'password' = '${secret_values.password}', 'tableName' = '<yourTableName>' ); CREATE TEMPORARY TABLE blackhole_sink( a INT, b STRING ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT T.a, H.b FROM datagen_source AS T JOIN oceanbase_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.a = H.a;
Referensi
Untuk informasi selengkapnya tentang konektor yang didukung Flink, lihat Konektor yang didukung.