Konektor ApsaraDB RDS for MySQL tidak akan didukung di masa mendatang. Gunakan konektor MySQL sebagai gantinya.
Konektor ApsaraDB RDS for MySQL memungkinkan Anda menulis output Flink SQL ke tabel sink ApsaraDB RDS for MySQL atau melakukan join aliran terhadap tabel dimensi ApsaraDB RDS for MySQL.
Jenis tabel yang didukung: Tabel sink · Tabel dimensi
Supported running modes: batch mode · streaming mode
Jenis API: SQL
Pembaruan dan penghapusan data pada tabel sink: Didukung
Prasyarat
Sebelum memulai, pastikan Anda telah memiliki:
-
Database dan tabel ApsaraDB RDS for MySQL. Lihat Membuat database dan akun untuk instans ApsaraDB RDS for MySQL.
-
Daftar putih alamat IP yang dikonfigurasi untuk database. Lihat Menghubungkan ke instans ApsaraDB RDS for MySQL menggunakan klien database atau CLI.
Batasan
-
Memerlukan Realtime Compute for Apache Flink menggunakan Ververica Runtime (VVR) 2.0.0 atau versi lebih baru. Untuk performa dan stabilitas terbaik, gunakan VVR 6.X atau versi lebih baru.
-
Hanya database ApsaraDB RDS for MySQL yang didukung.
-
Konektor menggunakan semantik at-least-once. Jika tabel sink memiliki primary key, idempotensi memastikan keakuratan data.
Cara kerja
Perilaku penulisan sink
Setiap baris output dikonversi menjadi pernyataan SQL sebelum ditulis ke tabel sink:
-
Tanpa primary key — menjalankan
INSERT INTO table_name (col1, col2, ...) VALUES (val1, val2, ...); -
Dengan primary key — menjalankan
INSERT INTO table_name (col1, col2, ...) VALUES (val1, val2, ...) ON DUPLICATE KEY UPDATE col1 = VALUES(col1), col2 = VALUES(col2), ...;
Konflik indeks unik: Jika tabel fisik memiliki batasan indeks unik selain primary key, penyisipan dua baris dengan primary key berbeda tetapi nilai indeks unik yang sama menyebabkan baris sebelumnya ditimpa, sehingga mengakibatkan kehilangan data.
Primary key auto-increment: Jangan deklarasikan kolom auto-increment dalam DDL Flink. Database secara otomatis menetapkan nilai-nilai tersebut. Konektor dapat menulis dan menghapus baris yang memiliki kolom auto-increment, tetapi tidak dapat memperbaruinya.
Kebijakan cache tabel dimensi
Konektor mendukung tiga kebijakan cache untuk pencarian tabel dimensi:
| Kebijakan | Perilaku | Kapan digunakan |
|---|---|---|
NONE |
Tanpa caching — setiap pencarian langsung mengakses database. | Persyaratan latensi rendah, set data kecil. |
LRU |
Menyimpan jumlah tetap baris yang baru saja digunakan per task manager. | Subset tabel besar yang sering diakses. |
ALL |
Memuat seluruh tabel ke memori dan memuat ulang secara berkala. | Tabel referensi statis berukuran kecil. |
Sintaksis
Tabel sink
CREATE TABLE rds_sink (
id INT,
num BIGINT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'rds',
'tableName' = '<your-table-name>',
'userName' = '<your-user-name>',
'password' = '<your-password>',
'url' = 'jdbc:mysql://<internal-endpoint>:<port>/<database-name>?rewriteBatchedStatements=true'
);
Tambahkan ?rewriteBatchedStatements=true ke nilai url untuk tabel sink guna meningkatkan throughput penulisan.
Tabel dimensi
CREATE TABLE rds_dim (
id1 INT,
id2 VARCHAR
) WITH (
'connector' = 'rds',
'tableName' = '<your-table-name>',
'userName' = '<your-user-name>',
'password' = '<your-password>',
'url' = 'jdbc:mysql://<internal-endpoint>:<port>/<database-name>',
'cache' = 'NONE'
);
Parameter dalam klausa WITH
Parameter umum
| Parameter | Tipe | Wajib | Bawaan | Deskripsi |
|---|---|---|---|---|
connector |
STRING | Ya | — | Atur ke rds |
tableName |
STRING | Ya | — | Nama tabel fisik di ApsaraDB RDS for MySQL |
userName |
STRING | Ya | — | Username database |
password |
STRING | Ya | — | Password database |
url |
STRING | Ya | — | Titik akhir virtual private cloud (VPC) database, dalam format jdbc:mysql://<internal-endpoint>:<port>/<database-name>. Untuk tabel sink, tambahkan ?rewriteBatchedStatements=true. Untuk detail titik akhir, lihat Melihat dan mengubah titik akhir internal dan publik serta nomor port instans ApsaraDB RDS for MySQL |
maxRetryTimes |
INTEGER | Tidak | 10 (VVR 4.0.7+), 3 (VVR 4.0.6 dan sebelumnya) | Jumlah maksimum percobaan ulang untuk pencarian tabel dimensi atau penulisan sink yang gagal |
Parameter tabel sink
| Parameter | Tipe | Wajib | Bawaan | Deskripsi |
|---|---|---|---|---|
batchSize |
INTEGER | Tidak | 4096 (VVR 4.0.7+), 5000 (VVR 4.0.0–4.0.6), 100 (VVR 3.x dan sebelumnya) | Jumlah baris yang ditulis per batch |
bufferSize |
INTEGER | Tidak | 10000 | Jumlah maksimum baris yang disimpan dalam memori sebelum pemicu penulisan. Didukung di VVR 4.0.7 dan versi lebih baru. Hanya berlaku jika primary key didefinisikan |
flushIntervalMs |
INTEGER | Tidak | 2000 (VVR 4.0.7+), 0 (VVR 4.0.0–4.0.6), 1000 (VVR 3.x dan sebelumnya) | Interval dalam milidetik saat buffer dikosongkan ke tabel sink, terlepas dari apakah ambang batas batchSize atau bufferSize tercapai. Jika diatur ke 0 (nilai bawaan untuk VVR 4.0.0–4.0.6), data dalam buffer berjumlah kecil mungkin tidak pernah ditulis — lakukan upgrade ke versi VVR yang lebih baru untuk menghindari hal ini |
ignoreDelete |
BOOLEAN | Tidak | false | Atur ke true untuk melewati operasi penghapusan. Berguna ketika beberapa operator memperbarui bidang berbeda dari baris yang sama — tanpa pengaturan ini, penghapusan oleh satu operator diikuti pembaruan parsial oleh operator lain menyebabkan bidang yang tidak diperbarui bernilai null atau nilai bawaannya |
connectionMaxActive |
INTEGER | Tidak | 40 | Ukuran kolam koneksi. Didukung di VVR 4.0.7 dan versi lebih baru. Tingkatkan nilai ini jika terjadi timeout kolam koneksi; kurangi jika database membatasi jumlah koneksi bersamaan |
Parameter tabel dimensi
| Parameter | Tipe | Wajib | Bawaan | Deskripsi |
|---|---|---|---|---|
cache |
STRING | Tidak | NONE (VVR sebelum 4.0.6), ALL (VVR 4.0.6+) | Kebijakan cache. Nilai yang valid: NONE, LRU, ALL. Lihat Kebijakan cache |
cacheSize |
INTEGER | Tidak | 100000 | Jumlah maksimum baris yang disimpan dalam cache. Wajib jika cache diatur ke LRU; diabaikan untuk NONE dan ALL |
cacheTTLMs |
LONG | Tidak | Tidak kedaluwarsa untuk NONE dan LRU; tidak dimuat ulang untuk ALL |
Waktu kedaluwarsa cache dalam milidetik. Untuk LRU, baris kedaluwarsa setelah periode ini. Untuk ALL, seluruh cache dimuat ulang pada interval ini |
maxJoinRows |
INTEGER | Tidak | 1024 | Jumlah maksimum baris tabel dimensi yang cocok per baris input. Atur nilai ini ke jumlah maksimum baris dimensi yang diharapkan per baris tabel utama untuk menghindari pemindaian yang tidak perlu |
Metrik
Tabel sink menampilkan metrik berikut. Tabel dimensi tidak memiliki metrik.
| Metrik | Deskripsi |
|---|---|
numRecordsOut |
Total baris yang ditulis |
numRecordsOutPerSecond |
Baris yang ditulis per detik |
numBytesOut |
Total byte yang ditulis |
numBytesOutPerSecond |
Byte yang ditulis per detik |
currentSendTime |
Latensi penulisan saat ini |
numRecordsOutErrors |
Total error penulisan |
Untuk definisi metrik, lihat Metrik.
Pemetaan tipe data
| Tipe Flink | Tipe ApsaraDB RDS for MySQL |
|---|---|
| BOOLEAN | BOOLEAN |
| TINYINT | TINYINT |
| TINYINT(1) (hanya untuk tabel dimensi) | BOOLEAN |
| SMALLINT | SMALLINT |
| SMALLINT | TINYINT UNSIGNED |
| INT | INT |
| INT | SMALLINT UNSIGNED |
| BIGINT | BIGINT |
| BIGINT | INT UNSIGNED |
| DECIMAL(20, 0) | BIGINT UNSIGNED |
| FLOAT | FLOAT |
| DECIMAL | DECIMAL |
| DOUBLE | DOUBLE |
| DATE | DATE |
| TIME | TIME |
| TIMESTAMP | TIMESTAMP |
| VARCHAR | VARCHAR |
| VARBINARY | VARBINARY |
Contoh
Contoh tabel sink
Contoh berikut membaca dari sumber DataGen dan menulis ke tabel sink ApsaraDB RDS for MySQL.
CREATE TEMPORARY TABLE datagen_source (
`name` VARCHAR,
`age` INT
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE rds_sink (
`name` VARCHAR,
`age` INT
) WITH (
'connector' = 'rds',
'tableName' = '<your-table-name>',
'userName' = '<your-user-name>',
'password' = '<your-password>',
'url' = 'jdbc:mysql://<internal-endpoint>:<port>/<database-name>?rewriteBatchedStatements=true'
);
INSERT INTO rds_sink
SELECT * FROM datagen_source;
Contoh tabel dimensi
Contoh berikut melakukan join aliran terhadap tabel dimensi ApsaraDB RDS for MySQL menggunakan temporal join.
CREATE TEMPORARY TABLE datagen_source (
a INT,
b BIGINT,
c STRING,
`proctime` AS PROCTIME()
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE rds_dim (
a INT,
b VARCHAR,
c VARCHAR
) WITH (
'connector' = 'rds',
'tableName' = '<your-table-name>',
'userName' = '<your-user-name>',
'password' = '<your-password>',
'url' = 'jdbc:mysql://<internal-endpoint>:<port>/<database-name>'
);
CREATE TEMPORARY TABLE blackhole_sink (
a INT,
b STRING
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink
SELECT T.a, H.b
FROM datagen_source AS T
JOIN rds_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H
ON T.a = H.a;
FAQ
Lanjutan
-
Konektor MySQL — pengganti yang direkomendasikan untuk konektor ini.
-
ApsaraDB RDS for MySQL — ikhtisar produk dan dokumentasi fitur.
-
Metrik — definisi untuk semua metrik konektor.