Topik ini menjelaskan cara menggunakan konektor Java Database Connectivity (JDBC).
Ikhtisar
Konektor JDBC disediakan oleh Apache Flink dan dapat digunakan untuk membaca serta menulis data ke database umum seperti MySQL, PostgreSQL, dan Oracle. Tabel berikut menjelaskan kemampuan yang didukung oleh konektor JDBC.
Item | Deskripsi |
Tipe tabel | Tabel sumber, tabel dimensi, dan Tabel sink |
Mode operasi | Mode aliran dan mode batch |
Format data | Tidak tersedia |
Metrik | Tidak tersedia |
Tipe API | SQL API |
Pembaruan atau penghapusan data dalam tabel sink | Didukung |
Prasyarat
Database dan tabel yang akan dihubungkan telah dibuat.
Batasan
Tabel sumber JDBC merupakan sumber terbatas (bounded source). Setelah konektor sumber JDBC membaca seluruh data dari sebuah tabel di database hulu, tugas tersebut selesai. Untuk menangkap data perubahan secara real-time, gunakan konektor Change Data Capture (CDC). Untuk informasi lebih lanjut, lihat Membuat tabel sumber MySQL CDC dan Membuat tabel sumber PostgreSQL CDC (pratinjau publik).
Untuk menulis data ke PostgreSQL, pastikan versi database adalah PostgreSQL 9.5 atau lebih tinggi. Versi tersebut mendukung klausa
ON CONFLICT, yang diperlukan agar penyisipan berhasil.Saat menggunakan konektor JDBC, unggah secara manual paket JAR driver database tujuan sebagai file dependensi. Driver JDBC yang tersedia:
Driver
ID Grup
ID Artefak
MySQL
mysql
Oracle
com.oracle.database.jdbc
PostgreSQL
org.postgresql
Jika Anda menggunakan driver JDBC yang tidak tercantum dalam tabel, uji validitas dan ketersediaannya sebelum digunakan.
Saat konektor JDBC menulis data ke tabel sink MySQL, konektor JDBC menggabungkan setiap catatan data yang diterima menjadi sebuah Pernyataan SQL dan mengeksekusi Pernyataan SQL tersebut. Pada tabel sink MySQL dengan kunci primer, digunakan sintaks berikut:
INSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...) ON DUPLICATE KEY UPDATE column1 = VALUES(column1), column2 = VALUES(column2), ...;PeringatanMenyisipkan catatan yang memiliki nilai indeks unik duplikat (meskipun dengan kunci primer berbeda) ke dalam tabel fisik yang memiliki batasan indeks unik menyebabkan penimpaan data hilir dan kehilangan data.
Sintaks
CREATE TABLE jdbc_table (
`id` BIGINT,
`name` VARCHAR,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:xxx',
'table-name' = '<yourTable>',
'username' = '<yourUsername>',
'password' = '<yourPassword>'
);Opsi konektor
Umum
Opsi
Deskripsi
Tipe data
Diperlukan
Nilai default
Catatan
connector
Tipe tabel.
STRING
Ya
Tidak ada nilai default
Atur nilainya menjadi jdbc.
url
URL database.
STRING
Ya
Tidak ada nilai default
Tidak tersedia.
table-name
Nama tabel JDBC.
STRING
Ya
Tidak ada nilai default
Tidak tersedia.
username
Nama pengguna JDBC.
STRING
Tidak
Tidak ada nilai default
usernamedanpasswordharus diatur secara bersamaan.password
Kata sandi pengguna JDBC.
STRING
Tidak
Tidak ada nilai default
Khusus sumber
Opsi
Deskripsi
Tipe data
Diperlukan
Nilai default
Catatan
scan.partition.column
Nama kolom yang digunakan untuk mempartisi data masukan.
STRING
Tidak
Tidak ada nilai default
Nilai dalam kolom harus bertipe NUMERIC atau TIMESTAMP dan mendukung perbandingan dengan nilai NUMERIC di database. Untuk informasi lebih lanjut tentang pemindaian terpartisi, lihat Pemindaian Terpartisi.
scan.partition.num
Jumlah partisi.
INTEGER
Tidak
Tidak ada nilai default
Tidak tersedia.
scan.partition.lower-bound
Nilai terkecil dari partisi pertama.
LONG
Tidak
Tidak ada nilai default
Tidak tersedia.
scan.partition.upper-bound
Nilai terbesar dari partisi terakhir.
LONG
Tidak
Tidak ada nilai default
Tidak tersedia.
scan.fetch-size
Jumlah baris data yang diperoleh dari database setiap kali data dibaca dari tabel sumber.
INTEGER
Tidak
0
Jika Anda mengatur opsi ini ke 0, opsi ini diabaikan.
scan.auto-commit
Menentukan apakah akan mengaktifkan auto-commit.
BOOLEAN
Tidak
true
Tidak tersedia.
Khusus sink
Opsi
Deskripsi
Tipe data
Diperlukan
Nilai default
Catatan
sink.buffer-flush.max-rows
Jumlah maksimum rekaman data yang dapat di-cache sebelum operasi flush dilakukan.
INTEGER
Tidak
100
Jika Anda mengatur opsi ini ke 0, catatan data tidak di-cache sebelum operasi flush dilakukan.
sink.buffer-flush.interval
Interval flushing data, dalam milidetik. Jika catatan data di-cache melebihi durasi yang ditentukan oleh opsi ini, operasi flush dilakukan dalam thread asinkron.
DURASI
Tidak
1000
Jika Anda mengatur opsi ini ke 0, catatan data tidak di-cache sebelum operasi flush dilakukan.
CatatanJika Anda ingin memproses event flush yang di-cache dalam mode asinkron, Anda dapat mengatur opsi sink.buffer-flush.max-rows ke 0 dan mengonfigurasi opsi sink.buffer-flush.interval sesuai kebutuhan bisnis Anda.
sink.max-retries
Jumlah maksimum percobaan ulang yang diizinkan saat data gagal ditulis ke database.
INTEGER
Tidak
3
Tidak tersedia.
Khusus tabel dimensi
Opsi
Deskripsi
Tipe data
Diperlukan
Nilai default
Catatan
lookup.cache.max-rows
Jumlah maksimum baris data yang dapat di-cache. Jika jumlah baris data dalam cache melebihi nilai opsi ini, baris data paling awal kedaluwarsa dan digantikan oleh baris data baru.
INTEGER
Tidak
Tidak ada nilai default
Secara default, caching untuk tabel dimensi dinonaktifkan. Anda dapat mengonfigurasi opsi lookup.cache.max-rows dan lookup.cache.ttl untuk mengaktifkan caching untuk tabel dimensi. Jika caching untuk tabel dimensi diaktifkan, kebijakan cache LRU digunakan.
lookup.cache.ttl
Masa hidup data (TTL) maksimum setiap baris data dalam cache. Jika periode waktu suatu baris data di-cache melebihi nilai opsi ini, baris data tersebut kedaluwarsa.
DURASI
Tidak
Tidak ada nilai default
lookup.cache.caching-missing-key
Menentukan apakah akan menyimpan hasil kueri kosong.
BOOLEAN
Tidak
true
Nilai yang valid:
true: Hasil kueri kosong disimpan. Ini adalah nilai default.
false: Hasil kueri kosong tidak disimpan.
lookup.max-retries
Jumlah maksimum percobaan ulang saat query basis data gagal.
INTEGER
Tidak
3
Tidak tersedia.
Khusus PostgreSQL
Opsi
Deskripsi
Tipe data
Diperlukan
Nilai default
Catatan
source.extend-type.enabled
Menentukan apakah data tipe ekstensi JSONB dan UUID dapat dibaca dan dipetakan ke tipe data yang didukung oleh Flink saat tabel PostgreSQL digunakan sebagai tabel sumber atau tabel dimensi.
BOOLEAN
Tidak
false
Nilai yang valid:
true: Data tipe ekstensi JSONB dan UUID dapat dibaca dan dipetakan ke tipe data yang didukung oleh Flink.
false: Data tipe ekstensi JSONB dan UUID tidak dapat dibaca atau dipetakan ke tipe data yang didukung oleh Flink. Ini adalah nilai default.
Pemetaan tipe data
Tipe data MySQL | Tipe data Oracle | Tipe data PostgreSQL | Tipe data SQL Flink |
TINYINT | Tidak tersedia | Tidak tersedia | TINYINT |
| Tidak ada nilai default |
| SMALLINT |
| Tidak ada nilai default |
| INT |
| Tidak ada nilai default |
| BIGINT |
BIGINT UNSIGNED | Tidak tersedia | Tidak tersedia | DECIMAL(20, 0) |
BIGINT | Tidak ada nilai default | BIGINT | BIGINT |
FLOAT | BINARY_FLOAT |
| FLOAT |
| BINARY_DOUBLE |
| DOUBLE |
|
|
| DECIMAL(p, s) |
| Tidak ada nilai default | BOOLEANcan | BOOLEAN |
DATE | DATE | DATE | DATE |
TIME [(p)] | DATE | TIME [(p)] [WITHOUT TIMEZONE] | TIME [(p)] [WITHOUT TIMEZONE] |
DATETIME [(p)] | TIMESTAMP [(p)] [WITHOUT TIMEZONE] | TIMESTAMP [(p)] [WITHOUT TIMEZONE] | TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
|
|
| STRING |
|
| BYTEA | BYTES |
Tidak tersedia | Tidak tersedia | ARRAY | ARRAY |
Kode contoh
Kode Contoh untuk Tabel Sumber
CREATE TEMPORARY TABLE jdbc_source ( `id` INT, `name` VARCHAR ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:xxx', 'table-name' = '<yourTable>', 'username' = '<yourUsername>', 'password' = '<yourPassword>' ); CREATE TEMPORARY TABLE blackhole_sink( `id` INT, `name` VARCHAR ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT * FROM jdbc_source ;Kode contoh untuk tabel sink
CREATE TEMPORARY TABLE datagen_source ( `name` VARCHAR, `age` INT ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE jdbc_sink ( `name` VARCHAR, `age` INT ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:xxxx', 'table-name' = '<yourTable>', 'username' = '<yourUsername>', 'password' = '<yourPassword>' ); INSERT INTO jdbc_sink SELECT * FROM datagen_source;Kode Contoh untuk Tabel Dimensi
CREATE TEMPORARY TABLE datagen_source( `id` INT, `data` BIGINT, `proctime` AS PROCTIME() ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE jdbc_dim ( `id` INT, `name` VARCHAR ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:xxx', 'table-name' = '<yourTable>', 'username' = '<yourUsername>', 'password' = '<yourPassword>' ); CREATE TEMPORARY TABLE blackhole_sink( `id` INT, `data` BIGINT, `name` VARCHAR ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT T.`id`,T.`data`, H.`name` FROM datagen_source AS T JOIN jdbc_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.id = H.id;