All Products
Search
Document Center

Realtime Compute for Apache Flink:ApsaraDB RDS for MySQL

Last Updated:Mar 27, 2026
Penting

Konektor ApsaraDB RDS for MySQL tidak akan didukung di masa mendatang. Gunakan konektor MySQL sebagai gantinya.

Konektor ApsaraDB RDS for MySQL memungkinkan Anda menulis output Flink SQL ke tabel sink ApsaraDB RDS for MySQL atau melakukan join aliran terhadap tabel dimensi ApsaraDB RDS for MySQL.

Jenis tabel yang didukung: Tabel sink · Tabel dimensi

Supported running modes: batch mode · streaming mode

Jenis API: SQL

Pembaruan dan penghapusan data pada tabel sink: Didukung

Prasyarat

Sebelum memulai, pastikan Anda telah memiliki:

Batasan

  • Memerlukan Realtime Compute for Apache Flink menggunakan Ververica Runtime (VVR) 2.0.0 atau versi lebih baru. Untuk performa dan stabilitas terbaik, gunakan VVR 6.X atau versi lebih baru.

  • Hanya database ApsaraDB RDS for MySQL yang didukung.

  • Konektor menggunakan semantik at-least-once. Jika tabel sink memiliki primary key, idempotensi memastikan keakuratan data.

Cara kerja

Perilaku penulisan sink

Setiap baris output dikonversi menjadi pernyataan SQL sebelum ditulis ke tabel sink:

  • Tanpa primary key — menjalankan INSERT INTO table_name (col1, col2, ...) VALUES (val1, val2, ...);

  • Dengan primary key — menjalankan INSERT INTO table_name (col1, col2, ...) VALUES (val1, val2, ...) ON DUPLICATE KEY UPDATE col1 = VALUES(col1), col2 = VALUES(col2), ...;

Konflik indeks unik: Jika tabel fisik memiliki batasan indeks unik selain primary key, penyisipan dua baris dengan primary key berbeda tetapi nilai indeks unik yang sama menyebabkan baris sebelumnya ditimpa, sehingga mengakibatkan kehilangan data.

Primary key auto-increment: Jangan deklarasikan kolom auto-increment dalam DDL Flink. Database secara otomatis menetapkan nilai-nilai tersebut. Konektor dapat menulis dan menghapus baris yang memiliki kolom auto-increment, tetapi tidak dapat memperbaruinya.

Kebijakan cache tabel dimensi

Konektor mendukung tiga kebijakan cache untuk pencarian tabel dimensi:

Kebijakan Perilaku Kapan digunakan
NONE Tanpa caching — setiap pencarian langsung mengakses database. Persyaratan latensi rendah, set data kecil.
LRU Menyimpan jumlah tetap baris yang baru saja digunakan per task manager. Subset tabel besar yang sering diakses.
ALL Memuat seluruh tabel ke memori dan memuat ulang secara berkala. Tabel referensi statis berukuran kecil.

Sintaksis

Tabel sink

CREATE TABLE rds_sink (
  id  INT,
  num BIGINT,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector'  = 'rds',
  'tableName'  = '<your-table-name>',
  'userName'   = '<your-user-name>',
  'password'   = '<your-password>',
  'url'        = 'jdbc:mysql://<internal-endpoint>:<port>/<database-name>?rewriteBatchedStatements=true'
);
Catatan

Tambahkan ?rewriteBatchedStatements=true ke nilai url untuk tabel sink guna meningkatkan throughput penulisan.

Tabel dimensi

CREATE TABLE rds_dim (
  id1 INT,
  id2 VARCHAR
) WITH (
  'connector' = 'rds',
  'tableName' = '<your-table-name>',
  'userName'  = '<your-user-name>',
  'password'  = '<your-password>',
  'url'       = 'jdbc:mysql://<internal-endpoint>:<port>/<database-name>',
  'cache'     = 'NONE'
);

Parameter dalam klausa WITH

Parameter umum

Parameter Tipe Wajib Bawaan Deskripsi
connector STRING Ya Atur ke rds
tableName STRING Ya Nama tabel fisik di ApsaraDB RDS for MySQL
userName STRING Ya Username database
password STRING Ya Password database
url STRING Ya Titik akhir virtual private cloud (VPC) database, dalam format jdbc:mysql://<internal-endpoint>:<port>/<database-name>. Untuk tabel sink, tambahkan ?rewriteBatchedStatements=true. Untuk detail titik akhir, lihat Melihat dan mengubah titik akhir internal dan publik serta nomor port instans ApsaraDB RDS for MySQL
maxRetryTimes INTEGER Tidak 10 (VVR 4.0.7+), 3 (VVR 4.0.6 dan sebelumnya) Jumlah maksimum percobaan ulang untuk pencarian tabel dimensi atau penulisan sink yang gagal

Parameter tabel sink

Parameter Tipe Wajib Bawaan Deskripsi
batchSize INTEGER Tidak 4096 (VVR 4.0.7+), 5000 (VVR 4.0.0–4.0.6), 100 (VVR 3.x dan sebelumnya) Jumlah baris yang ditulis per batch
bufferSize INTEGER Tidak 10000 Jumlah maksimum baris yang disimpan dalam memori sebelum pemicu penulisan. Didukung di VVR 4.0.7 dan versi lebih baru. Hanya berlaku jika primary key didefinisikan
flushIntervalMs INTEGER Tidak 2000 (VVR 4.0.7+), 0 (VVR 4.0.0–4.0.6), 1000 (VVR 3.x dan sebelumnya) Interval dalam milidetik saat buffer dikosongkan ke tabel sink, terlepas dari apakah ambang batas batchSize atau bufferSize tercapai. Jika diatur ke 0 (nilai bawaan untuk VVR 4.0.0–4.0.6), data dalam buffer berjumlah kecil mungkin tidak pernah ditulis — lakukan upgrade ke versi VVR yang lebih baru untuk menghindari hal ini
ignoreDelete BOOLEAN Tidak false Atur ke true untuk melewati operasi penghapusan. Berguna ketika beberapa operator memperbarui bidang berbeda dari baris yang sama — tanpa pengaturan ini, penghapusan oleh satu operator diikuti pembaruan parsial oleh operator lain menyebabkan bidang yang tidak diperbarui bernilai null atau nilai bawaannya
connectionMaxActive INTEGER Tidak 40 Ukuran kolam koneksi. Didukung di VVR 4.0.7 dan versi lebih baru. Tingkatkan nilai ini jika terjadi timeout kolam koneksi; kurangi jika database membatasi jumlah koneksi bersamaan

Parameter tabel dimensi

Parameter Tipe Wajib Bawaan Deskripsi
cache STRING Tidak NONE (VVR sebelum 4.0.6), ALL (VVR 4.0.6+) Kebijakan cache. Nilai yang valid: NONE, LRU, ALL. Lihat Kebijakan cache
cacheSize INTEGER Tidak 100000 Jumlah maksimum baris yang disimpan dalam cache. Wajib jika cache diatur ke LRU; diabaikan untuk NONE dan ALL
cacheTTLMs LONG Tidak Tidak kedaluwarsa untuk NONE dan LRU; tidak dimuat ulang untuk ALL Waktu kedaluwarsa cache dalam milidetik. Untuk LRU, baris kedaluwarsa setelah periode ini. Untuk ALL, seluruh cache dimuat ulang pada interval ini
maxJoinRows INTEGER Tidak 1024 Jumlah maksimum baris tabel dimensi yang cocok per baris input. Atur nilai ini ke jumlah maksimum baris dimensi yang diharapkan per baris tabel utama untuk menghindari pemindaian yang tidak perlu

Metrik

Tabel sink menampilkan metrik berikut. Tabel dimensi tidak memiliki metrik.

Metrik Deskripsi
numRecordsOut Total baris yang ditulis
numRecordsOutPerSecond Baris yang ditulis per detik
numBytesOut Total byte yang ditulis
numBytesOutPerSecond Byte yang ditulis per detik
currentSendTime Latensi penulisan saat ini
numRecordsOutErrors Total error penulisan

Untuk definisi metrik, lihat Metrik.

Pemetaan tipe data

Tipe Flink Tipe ApsaraDB RDS for MySQL
BOOLEAN BOOLEAN
TINYINT TINYINT
TINYINT(1) (hanya untuk tabel dimensi) BOOLEAN
SMALLINT SMALLINT
SMALLINT TINYINT UNSIGNED
INT INT
INT SMALLINT UNSIGNED
BIGINT BIGINT
BIGINT INT UNSIGNED
DECIMAL(20, 0) BIGINT UNSIGNED
FLOAT FLOAT
DECIMAL DECIMAL
DOUBLE DOUBLE
DATE DATE
TIME TIME
TIMESTAMP TIMESTAMP
VARCHAR VARCHAR
VARBINARY VARBINARY

Contoh

Contoh tabel sink

Contoh berikut membaca dari sumber DataGen dan menulis ke tabel sink ApsaraDB RDS for MySQL.

CREATE TEMPORARY TABLE datagen_source (
  `name` VARCHAR,
  `age`  INT
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE rds_sink (
  `name` VARCHAR,
  `age`  INT
) WITH (
  'connector' = 'rds',
  'tableName' = '<your-table-name>',
  'userName'  = '<your-user-name>',
  'password'  = '<your-password>',
  'url'       = 'jdbc:mysql://<internal-endpoint>:<port>/<database-name>?rewriteBatchedStatements=true'
);

INSERT INTO rds_sink
SELECT * FROM datagen_source;

Contoh tabel dimensi

Contoh berikut melakukan join aliran terhadap tabel dimensi ApsaraDB RDS for MySQL menggunakan temporal join.

CREATE TEMPORARY TABLE datagen_source (
  a          INT,
  b          BIGINT,
  c          STRING,
  `proctime` AS PROCTIME()
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE rds_dim (
  a INT,
  b VARCHAR,
  c VARCHAR
) WITH (
  'connector' = 'rds',
  'tableName' = '<your-table-name>',
  'userName'  = '<your-user-name>',
  'password'  = '<your-password>',
  'url'       = 'jdbc:mysql://<internal-endpoint>:<port>/<database-name>'
);

CREATE TEMPORARY TABLE blackhole_sink (
  a INT,
  b STRING
) WITH (
  'connector' = 'blackhole'
);

INSERT INTO blackhole_sink
SELECT T.a, H.b
FROM datagen_source AS T
JOIN rds_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H
ON T.a = H.a;

FAQ

Lanjutan