Topik ini menjelaskan cara menggunakan konektor AnalyticDB for MySQL V3.0.
Informasi latar belakang
AnalyticDB for MySQL V3.0 adalah layanan gudang data enterprise berbasis cloud yang mengintegrasikan teknologi database dan data besar. Layanan ini mendukung operasi penambahan, penghapusan, dan modifikasi data real-time dengan throughput tinggi, analisis data real-time berlatensi rendah, serta proses ekstrak, transformasi, dan muat (ETL) yang kompleks. AnalyticDB for MySQL kompatibel dengan alat ekosistem hulu dan hilir, sehingga dapat digunakan untuk membangun sistem pelaporan enterprise, gudang data, dan mesin layanan data.
Tabel berikut menjelaskan kemampuan yang didukung oleh konektor AnalyticDB for MySQL V3.0.
Item | Deskripsi |
Jenis tabel | Tabel sumber, tabel dimensi, dan tabel sink Catatan Hanya Ververica Runtime (VVR) 8.0.4 atau yang lebih baru yang mendukung tabel sumber. Untuk informasi selengkapnya mengenai parameter dan konfigurasi tabel sumber, lihat Gunakan Flink untuk berlangganan log biner. Untuk informasi selengkapnya mengenai parameter tabel dimensi dan tabel sink, lihat Parameter dalam klausa WITH. |
Mode jalankan | Mode streaming dan mode batch |
Format data | Tidak tersedia |
Metriks | Tidak tersedia |
Jenis API | API SQL |
Pembaruan atau penghapusan data dalam tabel sink | Didukung |
Prasyarat
Kluster AnalyticDB for MySQL dan tabel AnalyticDB for MySQL telah dibuat. Untuk informasi selengkapnya, lihat Buat kluster dan CREATE TABLE.
Daftar putih telah dikonfigurasi untuk kluster AnalyticDB for MySQL. Untuk informasi selengkapnya, lihat Konfigurasikan daftar putih alamat IP.
Sintaksis
CREATE TEMPORARY TABLE adb_table (
`id` INT,
`num` BIGINT,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'adb3.0',
'url' = '<yourUrl>',
'userName' = '<yourUsername>',
'password' = '<yourPassword>',
'tableName' = '<yourTablename>'
);Kunci utama yang ditentukan dalam pernyataan DDL Flink harus konsisten dengan kunci utama tabel fisik di database AnalyticDB for MySQL. Kunci utama tersebut harus didefinisikan dalam pernyataan DDL Flink dan juga harus ada di tabel fisik di database AnalyticDB for MySQL secara bersamaan. Nama kunci utama dalam pernyataan DDL Flink harus identik dengan nama kunci utama pada tabel fisik. Jika tidak sesuai, data mungkin menjadi tidak akurat.
Parameter dalam klausa WITH
Parameter umum
Parameter
Deskripsi
Tipe data
Wajib
Nilai default
Keterangan
connector
Jenis tabel sink.
String
Ya
Tidak ada nilai default
Atur nilainya ke adb3.0.
url
URL Java Database Connectivity (JDBC) dari database.
String
Ya
Tidak ada nilai default
URL JDBC dari database AnalyticDB for MySQL. URL tersebut dalam format jdbc:mysql://<endpoint>:<port>/<databaseName>.
endpoint dan port: Anda dapat masuk ke Konsol AnalyticDB for MySQL. Di panel navigasi sebelah kiri, klik nama kluster yang diinginkan di kolom Cluster ID/Cluster Description. Pada halaman yang muncul, peroleh informasi di bagian Network Information.
databaseName: nama database AnalyticDB for MySQL.
userName
Nama pengguna yang digunakan untuk mengakses database.
String
Ya
Tidak ada nilai default
Tidak tersedia.
password
Kata sandi yang digunakan untuk mengakses database.
String
Ya
Tidak ada nilai default
Tidak tersedia.
tableName
Nama tabel di database.
String
Ya
Tidak ada nilai default
Tidak tersedia.
maxRetryTimes
Jumlah maksimum percobaan ulang yang diizinkan jika upaya penulisan atau pembacaan data gagal.
Integer
Tidak
10
Tidak tersedia.
Parameter hanya untuk tabel sink
Parameter
Deskripsi
Tipe data
Wajib
Nilai default
Keterangan
batchSize
Jumlah catatan data yang dapat ditulis sekaligus.
Integer
Tidak
1000
Parameter ini hanya berlaku setelah Anda menentukan kunci utama.
bufferSize
Jumlah maksimum catatan data yang dapat di-cache di memori. Operasi tulis dipicu jika nilai parameter batchSize atau bufferSize mencapai ambang batas yang ditentukan.
Integer
Tidak
1000
Parameter ini hanya berlaku setelah Anda menentukan kunci utama.
flushIntervalMs
Interval pengosongan cache. Nilai ini menunjukkan bahwa jika jumlah catatan data yang di-cache tidak mencapai batas atas dalam periode waktu tertentu, semua data yang di-cache akan ditulis ke tabel sink.
Integer
Tidak
3000
Unit: milidetik.
ignoreDelete
Menentukan apakah operasi penghapusan diabaikan.
Boolean
Tidak
false
Nilai yang valid:
true: Operasi penghapusan diabaikan.
false: Operasi penghapusan tidak diabaikan.
replaceMode
Menentukan apakah menggunakan pernyataan REPLACE INTO untuk memasukkan data ke dalam tabel jika kunci utama ditentukan dalam pernyataan DDL.
Boolean
Tidak
true
Nilai yang valid (VVR 11.2+):
replace: Menggunakan sintaksREPLACE INTO. Jika kunci utama duplikat, baris baru akan menimpa baris yang sudah ada.upsert: Menggunakan sintaksINSERT INTO ... ON DUPLICATE KEY UPDATE. Memasukkan baris baru jika kunci utama tidak ada; memperbarui baris yang sudah ada jika kunci utama ada. Contoh: Untuk tabel dengan bidanga(kunci utama),b,c,d, jika data hanya disediakan untukadanb, hanya bidangbyang diperbarui saat terdeteksi kunci utama duplikat. Bidangcdandtetap tidak berubah.insert: Menggunakan sintaksINSERT IGNORE INTO. Jika kunci utama duplikat, entri data pertama dipertahankan, dan entri berikutnya diabaikan.
Nilai yang valid (versi sebelum VVR 11.2):
true: Perilaku yang sama sepertireplace.false: Perilaku yang sama sepertiupsert.
Catatan: VVR 11.2 dan versi yang lebih baru kompatibel dengan nilai
truedanfalsedari versi sebelumnya.CatatanHanya AnalyticDB for MySQL V3.1.3.5 atau yang lebih baru yang mendukung parameter ini.
Opsi ini hanya berlaku ketika kunci utama didefinisikan dalam DDL tabel sink. Jika tidak ada kunci utama yang didefinisikan dalam DDL tabel sink, sintaks insert ignore into selalu digunakan untuk memasukkan data.
excludeUpdateColumns
Bidang-bidang yang tidak diperbarui ketika data dengan kunci utama yang sama diperbarui.
String
Tidak
String kosong
Pisahkan beberapa bidang dengan koma (,). Contoh:
excludeUpdateColumns='column1,column2'.Pertimbangkan tabel sink dengan bidang
a(kunci utama),b,c, dand, danexcludeUpdateColumns='c,d'ditetapkan. Ketika data yang dimasukkan memiliki nilai kunci utama unik, semua bidang dimasukkan. Ketika data yang dimasukkan memiliki nilai kunci utama duplikat, hanya bidangbyang diperbarui, dan bidangcdandmempertahankan nilai aslinya.CatatanOpsi ini hanya berlaku ketika
replaceModediatur keupsertataufalse.Pastikan kolom yang akan diabaikan ditulis dalam satu baris dan tidak dilipat.
connectionMaxActive
Ukuran maksimum kolam thread.
Integer
Tidak
40
Tidak tersedia.
Parameter hanya untuk tabel dimensi
Parameter
Deskripsi
Tipe data
Wajib
Nilai default
Keterangan
cache
Kebijakan cache.
String
Tidak
ALL
Nilai yang valid:
None: Tidak ada data yang di-cache.
LRU: Hanya data tertentu di tabel dimensi yang di-cache. Setiap kali sistem menerima catatan data, sistem mencari cache. Jika sistem tidak menemukan catatan tersebut di cache, sistem mencari catatan data di tabel dimensi fisik.
ALL: Semua data di tabel dimensi di-cache. Ini adalah nilai default. Sebelum penerapan dijalankan, sistem memuat semua data di tabel dimensi ke cache. Dengan demikian, cache dicari untuk semua kueri selanjutnya di tabel dimensi. Jika sistem tidak menemukan catatan data di cache, kunci gabungan tidak ada. Sistem memuat ulang semua data di cache setelah entri cache kedaluwarsa.
Jika jumlah data di tabel jarak jauh kecil dan terdapat banyak kunci yang hilang, kami menyarankan Anda mengatur parameter ini ke ALL. Tabel sumber dan tabel dimensi tidak dapat diasosiasikan berdasarkan klausa ON.
CatatanJika Anda mengatur parameter cache ke ALL, Anda harus memantau penggunaan memori node untuk mencegah kesalahan kehabisan memori (OOM).
Jika Anda mengatur parameter cache ke ALL, Anda harus menambah memori node untuk penggabungan tabel karena sistem memuat data dari tabel dimensi secara asinkron. Ukuran memori yang ditambahkan adalah dua kali lipat ukuran tabel jarak jauh.
cacheSize
Jumlah maksimum catatan data yang dapat di-cache.
Integer
Tidak
100000
Anda harus mengonfigurasi parameter cacheSize ketika parameter cache diatur ke LRU.
cacheTTLMs
Periode waktu kedaluwarsa cache. Satuan: milidetik.
Integer
Tidak
Long.MAX_VALUE
Anda harus mengonfigurasi parameter cacheTTLMs ketika parameter cache diatur ke LRU atau ALL.
Jika parameter cache diatur ke LRU, parameter cacheTTLMs menentukan periode waktu kedaluwarsa cache. Nilai default:
Long.MAX_VALUE. Nilai default menunjukkan bahwa entri cache tidak kedaluwarsa.Jika parameter cache diatur ke ALL, parameter cacheTTLMs menentukan interval pemuatan ulang data di tabel fisik. Nilai default:
Long.MAX_VALUE. Nilai default menunjukkan bahwa data di tabel fisik tidak dimuat ulang.
CatatanJika parameter cache diatur ke None, Anda tidak perlu mengonfigurasi parameter cacheTTLMs. Jika parameter cache diatur ke None, data tidak di-cache. Oleh karena itu, Anda tidak perlu mengonfigurasi parameter cacheTTLMs.
maxJoinRows
Jumlah maksimum hasil yang dikembalikan setelah setiap catatan data di tabel utama dipetakan ke data di tabel dimensi.
Integer
Tidak
1024
Jika Anda dapat memperkirakan bahwa setiap catatan data di tabel utama dipetakan ke maksimal n catatan data di tabel dimensi, Anda dapat mengonfigurasi maxJoinRows='n' untuk memastikan pencocokan yang efisien di Realtime Compute for Apache Flink.
CatatanSaat Anda menggabungkan tabel utama dengan tabel dimensi, jumlah hasil yang dikembalikan setelah catatan data masukan di tabel utama dipetakan ke catatan data di tabel dimensi dibatasi oleh parameter ini.
Pemetaan tipe data
Tipe data AnalyticDB for MySQL V3.0 | Tipe data Realtime Compute for Apache Flink |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL(p, s) atau NUMERIC(p, s) | DECIMAL(p, s) |
VARCHAR | STRING |
BINARY | BYTES |
DATE | DATE |
TIME | TIME |
DATETIME | TIMESTAMP |
TIMESTAMP | TIMESTAMP |
POINT | STRING |
Kode contoh
Kode contoh untuk tabel sink
CREATE TEMPORARY TABLE datagen_source ( `name` VARCHAR, `age` INT ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE adb_sink ( `name` VARCHAR, `age` INT ) WITH ( 'connector' = 'adb3.0', 'url' = '<yourUrl>', 'userName' = '<yourUsername>', 'password' = '<yourPassword>', 'tableName' = '<yourTablename>' ); INSERT INTO adb_sink SELECT * FROM datagen_source;Kode contoh untuk tabel dimensi
CREATE TEMPORARY TABLE datagen_source( `a` INT, `b` VARCHAR, `c` STRING, `proctime` AS PROCTIME() ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE adb_dim ( `a` INT, `b` VARCHAR, `c` VARCHAR ) WITH ( 'connector' = 'adb3.0', 'url' = '<yourUrl>', 'userName' = '<yourUsername>', 'password' = '<yourPassword>', 'tableName' = '<yourTablename>' ); CREATE TEMPORARY TABLE blackhole_sink( `a` INT, `b` VARCHAR ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT T.a,H.b FROM datagen_source AS T JOIN adb_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.a = H.a;