Topik ini menjelaskan cara menggunakan Konektor ApsaraDB RDS untuk MySQL.
ApsaraDB RDS untuk MySQL dikembangkan berdasarkan cabang MySQL dan menawarkan performa yang sangat baik. Solusi ini telah teruji mampu menangani lalu lintas bersamaan dalam volume tinggi, seperti selama acara Double 11. ApsaraDB RDS untuk MySQL menyediakan fitur dasar seperti konfigurasi Daftar putih alamat IP, pencadangan dan pemulihan, Enkripsi Data Transparan (TDE), migrasi data, serta manajemen instance, akun, dan database. Untuk informasi lebih lanjut tentang ApsaraDB RDS untuk MySQL, lihat Basis data ApsaraDB RDS untuk MySQL.
Konektor ApsaraDB RDS untuk MySQL tidak akan didukung di masa mendatang. Kami merekomendasikan Anda menggunakan Konektor MySQL sebagai pengganti Konektor ApsaraDB RDS untuk MySQL. Untuk informasi lebih lanjut tentang cara menggunakan Konektor MySQL, lihat Konektor MySQL.
Tabel berikut menguraikan kemampuan yang didukung oleh Konektor ApsaraDB RDS untuk MySQL.
Item | Deskripsi |
Jenis tabel | Tabel sink dan tabel dimensi |
Mode operasi | Mode batch dan mode streaming |
Format data | Tidak tersedia |
Metrik |
Catatan Untuk informasi lebih lanjut tentang metrik, lihat Metrics. |
Jenis API | SQL |
Pembaruan atau penghapusan data dalam tabel sink | Didukung |
Prasyarat
Basis data ApsaraDB RDS untuk MySQL dan tabel ApsaraDB RDS untuk MySQL telah dibuat. Untuk informasi lebih lanjut, lihat Membuat basis data dan akun untuk ApsaraDB RDS untuk MySQL.
Daftar putih alamat IP telah dikonfigurasi untuk basis data ApsaraDB RDS untuk MySQL. Untuk informasi lebih lanjut, lihat Gunakan klien basis data atau CLI untuk terhubung ke instance ApsaraDB RDS untuk MySQL.
Batasan
Hanya Realtime Compute for Apache Flink yang menggunakan Ververica Runtime (VVR) 2.0.0 atau versi lebih baru yang mendukung Konektor ApsaraDB RDS untuk MySQL.
Konektor ApsaraDB RDS untuk MySQL hanya mendukung basis data ApsaraDB RDS untuk MySQL.
Semantik setidaknya sekali dapat digunakan. Jika tabel sink ApsaraDB RDS untuk MySQL berisi primary key, idempotensi dapat digunakan untuk memastikan keakuratan data.
Kami merekomendasikan Anda menggunakan versi terbaru dari Realtime Compute for Apache Flink untuk memastikan performa dan stabilitas tinggi. Sebagai contoh, Anda dapat menggunakan Realtime Compute for Apache Flink yang menggunakan VVR 6.X atau versi lebih baru.
Peringatan
Konektor ApsaraDB RDS untuk MySQL akan dihapus secara bertahap di masa depan. Kami merekomendasikan Anda menggunakan Konektor MySQL jika Konektor MySQL dapat memenuhi kebutuhan bisnis Anda. Untuk informasi lebih lanjut, lihat Konektor MySQL.
Sintaksis
Pernyataan untuk membuat tabel sink ApsaraDB RDS untuk MySQL
CREATE TABLE rds_sink( id INT, num BIGINT, PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector'='rds', 'tableName'='your-table-name', 'userName'='your-user-name', 'password'='your-password', 'url'='your-url' );CatatanKonektor ApsaraDB RDS untuk MySQL mengonversi setiap baris data keluaran menjadi pernyataan SQL dan kemudian mengeksekusinya untuk menulis data ke dalam tabel sink. Jika tabel sink tidak berisi primary key, Konektor ApsaraDB RDS untuk MySQL mengeksekusi pernyataan
INSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...);. Jika tabel sink berisi primary key, Konektor ApsaraDB RDS untuk MySQL mengeksekusi pernyataanINSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...) ON DUPLICATE KEY UPDATE column1 = VALUES(column1), column2 = VALUES(column2), ...;. Jika tabel fisik memiliki batasan indeks unik selain batasan primary key dan dua rekaman dengan primary key berbeda tetapi indeks unik yang sama dimasukkan ke dalam tabel fisik, data hilir akan ditimpa karena konflik antara indeks unik. Ini menyebabkan kehilangan data.Jika primary key auto-increment ditentukan dalam basis data ApsaraDB RDS untuk MySQL, Anda tidak dapat mendeklarasikan bidang auto-increment dalam pernyataan DDL Flink. Selama penulisan data, basis data secara otomatis mengonfigurasi bidang auto-increment. Konektor ApsaraDB RDS untuk MySQL hanya dapat digunakan untuk menulis atau menghapus data yang berisi bidang auto-increment tetapi tidak dapat digunakan untuk memperbarui data.
Pernyataan untuk membuat tabel dimensi ApsaraDB RDS untuk MySQL
CREATE TABLE rds_dim( id1 INT, id2 VARCHAR ) WITH ( 'connector'='rds', 'tableName'='your-table-name', 'userName'='your-user-name', 'password'='your-password', 'url'='your-url' 'cache'='NONE' );
Parameter dalam klausa WITH
Parameter umum
Parameter
Deskripsi
Tipe data
Diperlukan
Nilai default
Catatan
connector
Jenis tabel.
STRING
Ya
Tidak ada nilai default
Atur nilainya menjadi rds.
tableName
Nama metatable.
STRING
Ya
Tidak ada nilai default
Tidak tersedia.
userName
Nama pengguna yang digunakan untuk mengakses basis data.
STRING
Ya
Tidak ada nilai default
Tidak tersedia.
password
Kata sandi yang digunakan untuk mengakses basis data.
STRING
Ya
Tidak ada nilai default
Tidak tersedia.
url
URL yang digunakan untuk mengakses tabel.
STRING
Ya
Tidak ada nilai default
Titik akhir virtual private cloud (VPC) dari basis data ApsaraDB RDS untuk MySQL. Nilainya adalah titik akhir internal. Untuk informasi lebih lanjut, lihat Lihat dan ubah titik akhir internal dan publik serta nomor port dari instance ApsaraDB RDS untuk MySQL.
URL berada dalam format
jdbc:mysql://<Titik akhir internal>:<Nomor port>/<Nama basis data>.CatatanJika Anda membuat tabel sink, Anda harus menambahkan ?rewriteBatchedStatements=true di akhir URL untuk meningkatkan performa sistem.
maxRetryTimes
Jumlah maksimum percobaan ulang yang dapat dilakukan ketika gagal menanyakan data dalam tabel dimensi atau menulis data ke tabel sink.
INTEGER
Tidak
Untuk Realtime Compute for Apache Flink yang menggunakan VVR 4.0.7 atau lebih baru, nilai default parameter ini adalah 10.
Untuk Realtime Compute for Apache Flink yang menggunakan VVR 4.0.6 atau lebih lama, nilai default parameter ini adalah 3.
Tidak tersedia.
Parameter hanya untuk tabel sink
Parameter
Deskripsi
Tipe data
Diperlukan
Nilai default
Catatan
batchSize
Jumlah rekaman data yang dapat ditulis sekaligus.
INTEGER
Tidak
Untuk Realtime Compute for Apache Flink yang menggunakan VVR 4.0.7 atau lebih baru, nilai default parameter ini adalah 4096.
Untuk Realtime Compute for Apache Flink yang menggunakan VVR versi 4.0.0 hingga 4.0.6, nilai default parameter ini adalah 5000.
Untuk Realtime Compute for Apache Flink yang menggunakan VVR 3.X atau lebih lama, nilai default parameter ini adalah 100.
Tidak tersedia.
bufferSize
Jumlah maksimum rekaman data yang dapat disimpan dalam memori. Operasi penulisan dipicu jika ambang batas yang ditentukan oleh parameter batchSize atau bufferSize tercapai.
INTEGER
Tidak
10000
Hanya Realtime Compute for Apache Flink yang menggunakan VVR 4.0.7 atau lebih baru yang mendukung parameter ini.
Parameter ini hanya berlaku setelah Anda menentukan primary key.
flushIntervalMs
Interval saat Anda ingin membersihkan buffer memori. Jika jumlah rekaman data yang disimpan tidak mencapai batas atas yang ditentukan oleh parameter batchSize atau bufferSize dalam periode waktu tertentu, sistem secara otomatis menulis semua data yang disimpan ke tabel sink.
INTEGER
Tidak
Untuk Realtime Compute for Apache Flink yang menggunakan VVR 4.0.7 atau lebih baru, nilai defaultnya adalah 2000.
Untuk Realtime Compute for Apache Flink yang menggunakan VVR versi 4.0.0 hingga 4.0.6, nilai defaultnya adalah 0.
Untuk Realtime Compute for Apache Flink yang menggunakan VVR 3.X atau lebih lama, nilai defaultnya adalah 1000.
Jika Anda tidak mengonfigurasi parameter ini dalam versi di mana nilai default parameter ini adalah 0, sejumlah kecil data mungkin tidak pernah ditulis ke tabel sink. Untuk menyelesaikan masalah ini, kami merekomendasikan Anda menggunakan versi terbaru dari Realtime Compute for Apache Flink.
ignoreDelete
Menentukan apakah akan mengabaikan operasi penghapusan.
BOOLEAN
Tidak
false
Operasi penghapusan mungkin terjadi ketika Anda menggunakan Flink SQL. Jika beberapa operator keluaran memperbarui bidang berbeda dalam tabel sink yang sama berdasarkan primary key, hasil data mungkin salah.
Sebagai contoh, rekaman data dihapus dalam satu tugas dan kemudian hanya beberapa bidang dari rekaman data diperbarui dalam tugas lain. Dalam kasus ini, nilai bidang yang tidak diperbarui menjadi null atau nilai default karena bidang tersebut dihapus. Untuk menghindari operasi penghapusan, Anda dapat mengatur parameter ignoreDelete menjadi true.
connectionMaxActive
Ukuran kolam koneksi basis data.
INTEGER
Tidak
40
Hanya Realtime Compute for Apache Flink yang menggunakan VVR 4.0.7 atau lebih baru yang mendukung parameter ini.
Jika akses ke kolam koneksi basis data habis waktu, jumlah koneksi basis data dalam kolam mungkin tidak cukup. Anda dapat meningkatkan ukuran kolam koneksi basis data.
Jika jumlah maksimum koneksi paralel yang didukung oleh basis data kecil, Anda dapat mengurangi ukuran kolam koneksi atau mengurangi paralelisme operator.
Parameter hanya untuk tabel dimensi
Parameter
Deskripsi
Tipe data
Diperlukan
Nilai default
Catatan
cache
Kebijakan cache untuk tabel dimensi.
STRING
Tidak
Untuk Realtime Compute for Apache Flink yang menggunakan VVR versi lebih lama dari 4.0.6, nilai default parameter ini adalah NONE.
Untuk Realtime Compute for Apache Flink yang menggunakan VVR 4.0.6 atau lebih baru, nilai default parameter ini adalah ALL.
Konektor ApsaraDB RDS untuk MySQL mendukung kebijakan cache berikut untuk tabel dimensi: None, LRU, dan ALL. Untuk informasi lebih lanjut tentang kebijakan cache, lihat Informasi latar belakang.
cacheSize
Jumlah maksimum baris rekaman data yang dapat disimpan dalam cache.
INTEGER
Tidak
100000
Jika Anda mengatur parameter cache ke LRU, Anda harus mengonfigurasi parameter cacheSize.
Jika Anda mengatur parameter cache ke NONE atau ALL, Anda tidak perlu mengonfigurasi parameter cacheSize.
cacheTTLMs
Periode timeout cache.
LONG
Tidak
Jika Anda mengatur parameter cache ke NONE, Anda tidak perlu mengonfigurasi parameter cacheTTLMs. Ini menunjukkan bahwa entri cache tidak kedaluwarsa.
Jika Anda mengatur parameter cache ke LRU, parameter cacheTTLMs menentukan periode timeout cache. Secara default, entri cache tidak kedaluwarsa.
Jika Anda mengatur parameter cache ke ALL, parameter cacheTTLMs menentukan interval di mana sistem memuat ulang cache. Secara default, cache tidak dimuat ulang.
Satuan: milidetik.
maxJoinRows
Jumlah maksimum hasil yang dikembalikan setelah setiap rekaman data dalam tabel utama dipetakan ke data dalam tabel dimensi.
INTEGER
Tidak
1024
Ketika Anda menggabungkan tabel utama dan tabel dimensi, jumlah hasil yang dikembalikan setelah rekaman data masukan dalam tabel utama dipetakan ke rekaman data dalam tabel dimensi dibatasi oleh parameter ini.
Jika Anda dapat memperkirakan bahwa rekaman data dalam tabel utama sesuai dengan maksimum n rekaman data dalam tabel dimensi, Anda dapat mengatur parameter
maxJoinRowsmenjadi n untuk memastikan pencocokan yang efisien dari Realtime Compute for Apache Flink.
Pemetaan tipe data
Tipe data Flink | Tipe data ApsaraDB RDS untuk MySQL |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
TINYINT(1) Catatan Hanya tabel dimensi yang mendukung pemetaan ini. | BOOLEAN |
SMALLINT | SMALLINT |
SMALLINT | TINYINT UNSIGNED |
INT | INT |
INT | SMALLINT UNSIGNED |
BIGINT | BIGINT |
BIGINT | INT UNSIGNED |
DECIMAL(20,0) | BIGINT UNSIGNED |
FLOAT | FLOAT |
DECIMAL | DECIMAL |
DOUBLE | DOUBLE |
DATE | DATE |
TIME | TIME |
TIMESTAMP | TIMESTAMP |
VARCHAR | VARCHAR |
VARBINARY | VARBINARY |
Contoh kode
Contoh kode untuk tabel sink
CREATE TEMPORARY TABLE datagen_source( `name` VARCHAR, `age` INT ) WITH ( 'connector'='datagen' ); CREATE TEMPORARY TABLE rds_sink( `name` VARCHAR, `age` INT ) WITH ( 'connector'='rds', 'password'='your-password', 'tableName'='your-tablename', 'url'='your-url', 'userName'='your-username' ); INSERT INTO rds_sink SELECT * FROM datagen_source;Contoh kode untuk tabel dimensi
CREATE TEMPORARY TABLE datagen_source( a INT, b BIGINT, c STRING, `proctime` AS PROCTIME() ) WITH ( 'connector'='datagen' ); CREATE TEMPORARY TABLE rds_dim( a INT, b VARCHAR, c VARCHAR ) WITH ( 'connector'='rds', 'password'='<yourPassword>', 'tableName'='<yourTablename>', 'url'='jdbc:mysql://xxx', 'userName'='<yourUsername>' ); CREATE TEMPORARY TABLE blackhole_sink( a INT, b STRING ) WITH ( 'connector'='blackhole' ); INSERT INTO blackhole_sink SELECT T.a, H.b FROM datagen_source AS T JOIN rds_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.a=H.a;