全部产品
Search
文档中心

Realtime Compute for Apache Flink:Konektor ApsaraDB RDS untuk MySQL

更新时间:Jul 06, 2025

Topik ini menjelaskan cara menggunakan Konektor ApsaraDB RDS untuk MySQL.

ApsaraDB RDS untuk MySQL dikembangkan berdasarkan cabang MySQL dan menawarkan performa yang sangat baik. Solusi ini telah teruji mampu menangani lalu lintas bersamaan dalam volume tinggi, seperti selama acara Double 11. ApsaraDB RDS untuk MySQL menyediakan fitur dasar seperti konfigurasi Daftar putih alamat IP, pencadangan dan pemulihan, Enkripsi Data Transparan (TDE), migrasi data, serta manajemen instance, akun, dan database. Untuk informasi lebih lanjut tentang ApsaraDB RDS untuk MySQL, lihat Basis data ApsaraDB RDS untuk MySQL.

Penting

Konektor ApsaraDB RDS untuk MySQL tidak akan didukung di masa mendatang. Kami merekomendasikan Anda menggunakan Konektor MySQL sebagai pengganti Konektor ApsaraDB RDS untuk MySQL. Untuk informasi lebih lanjut tentang cara menggunakan Konektor MySQL, lihat Konektor MySQL.

Tabel berikut menguraikan kemampuan yang didukung oleh Konektor ApsaraDB RDS untuk MySQL.

Item

Deskripsi

Jenis tabel

Tabel sink dan tabel dimensi

Mode operasi

Mode batch dan mode streaming

Format data

Tidak tersedia

Metrik

  • Data deret waktu untuk tabel dimensi: tidak ada

  • Data deret waktu untuk tabel sink:

    • numRecordsOut

    • numRecordsOutPerSecond

    • numBytesOut

    • numBytesOutPerSecond

    • currentSendTime

    • numRecordsOutErrors

Catatan

Untuk informasi lebih lanjut tentang metrik, lihat Metrics.

Jenis API

SQL

Pembaruan atau penghapusan data dalam tabel sink

Didukung

Prasyarat

Batasan

  • Hanya Realtime Compute for Apache Flink yang menggunakan Ververica Runtime (VVR) 2.0.0 atau versi lebih baru yang mendukung Konektor ApsaraDB RDS untuk MySQL.

  • Konektor ApsaraDB RDS untuk MySQL hanya mendukung basis data ApsaraDB RDS untuk MySQL.

  • Semantik setidaknya sekali dapat digunakan. Jika tabel sink ApsaraDB RDS untuk MySQL berisi primary key, idempotensi dapat digunakan untuk memastikan keakuratan data.

  • Kami merekomendasikan Anda menggunakan versi terbaru dari Realtime Compute for Apache Flink untuk memastikan performa dan stabilitas tinggi. Sebagai contoh, Anda dapat menggunakan Realtime Compute for Apache Flink yang menggunakan VVR 6.X atau versi lebih baru.

Peringatan

Konektor ApsaraDB RDS untuk MySQL akan dihapus secara bertahap di masa depan. Kami merekomendasikan Anda menggunakan Konektor MySQL jika Konektor MySQL dapat memenuhi kebutuhan bisnis Anda. Untuk informasi lebih lanjut, lihat Konektor MySQL.

Sintaksis

  • Pernyataan untuk membuat tabel sink ApsaraDB RDS untuk MySQL

    CREATE TABLE rds_sink(
      id INT,
      num BIGINT,
      PRIMARY KEY(id) NOT ENFORCED
    ) WITH (
      'connector'='rds',
      'tableName'='your-table-name',
      'userName'='your-user-name',
      'password'='your-password',
      'url'='your-url'
    );
    Catatan
    • Konektor ApsaraDB RDS untuk MySQL mengonversi setiap baris data keluaran menjadi pernyataan SQL dan kemudian mengeksekusinya untuk menulis data ke dalam tabel sink. Jika tabel sink tidak berisi primary key, Konektor ApsaraDB RDS untuk MySQL mengeksekusi pernyataan INSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...);. Jika tabel sink berisi primary key, Konektor ApsaraDB RDS untuk MySQL mengeksekusi pernyataan INSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...) ON DUPLICATE KEY UPDATE column1 = VALUES(column1), column2 = VALUES(column2), ...;. Jika tabel fisik memiliki batasan indeks unik selain batasan primary key dan dua rekaman dengan primary key berbeda tetapi indeks unik yang sama dimasukkan ke dalam tabel fisik, data hilir akan ditimpa karena konflik antara indeks unik. Ini menyebabkan kehilangan data.

    • Jika primary key auto-increment ditentukan dalam basis data ApsaraDB RDS untuk MySQL, Anda tidak dapat mendeklarasikan bidang auto-increment dalam pernyataan DDL Flink. Selama penulisan data, basis data secara otomatis mengonfigurasi bidang auto-increment. Konektor ApsaraDB RDS untuk MySQL hanya dapat digunakan untuk menulis atau menghapus data yang berisi bidang auto-increment tetapi tidak dapat digunakan untuk memperbarui data.

  • Pernyataan untuk membuat tabel dimensi ApsaraDB RDS untuk MySQL

    CREATE TABLE rds_dim(
     id1 INT,
     id2 VARCHAR
    ) WITH (
      'connector'='rds',
      'tableName'='your-table-name',
      'userName'='your-user-name',
      'password'='your-password',
      'url'='your-url'
      'cache'='NONE'
    );

Parameter dalam klausa WITH

  • Parameter umum

    Parameter

    Deskripsi

    Tipe data

    Diperlukan

    Nilai default

    Catatan

    connector

    Jenis tabel.

    STRING

    Ya

    Tidak ada nilai default

    Atur nilainya menjadi rds.

    tableName

    Nama metatable.

    STRING

    Ya

    Tidak ada nilai default

    Tidak tersedia.

    userName

    Nama pengguna yang digunakan untuk mengakses basis data.

    STRING

    Ya

    Tidak ada nilai default

    Tidak tersedia.

    password

    Kata sandi yang digunakan untuk mengakses basis data.

    STRING

    Ya

    Tidak ada nilai default

    Tidak tersedia.

    url

    URL yang digunakan untuk mengakses tabel.

    STRING

    Ya

    Tidak ada nilai default

    Titik akhir virtual private cloud (VPC) dari basis data ApsaraDB RDS untuk MySQL. Nilainya adalah titik akhir internal. Untuk informasi lebih lanjut, lihat Lihat dan ubah titik akhir internal dan publik serta nomor port dari instance ApsaraDB RDS untuk MySQL.

    URL berada dalam format jdbc:mysql://<Titik akhir internal>:<Nomor port>/<Nama basis data>.

    Catatan

    Jika Anda membuat tabel sink, Anda harus menambahkan ?rewriteBatchedStatements=true di akhir URL untuk meningkatkan performa sistem.

    maxRetryTimes

    Jumlah maksimum percobaan ulang yang dapat dilakukan ketika gagal menanyakan data dalam tabel dimensi atau menulis data ke tabel sink.

    INTEGER

    Tidak

    • Untuk Realtime Compute for Apache Flink yang menggunakan VVR 4.0.7 atau lebih baru, nilai default parameter ini adalah 10.

    • Untuk Realtime Compute for Apache Flink yang menggunakan VVR 4.0.6 atau lebih lama, nilai default parameter ini adalah 3.

    Tidak tersedia.

  • Parameter hanya untuk tabel sink

    Parameter

    Deskripsi

    Tipe data

    Diperlukan

    Nilai default

    Catatan

    batchSize

    Jumlah rekaman data yang dapat ditulis sekaligus.

    INTEGER

    Tidak

    • Untuk Realtime Compute for Apache Flink yang menggunakan VVR 4.0.7 atau lebih baru, nilai default parameter ini adalah 4096.

    • Untuk Realtime Compute for Apache Flink yang menggunakan VVR versi 4.0.0 hingga 4.0.6, nilai default parameter ini adalah 5000.

    • Untuk Realtime Compute for Apache Flink yang menggunakan VVR 3.X atau lebih lama, nilai default parameter ini adalah 100.

    Tidak tersedia.

    bufferSize

    Jumlah maksimum rekaman data yang dapat disimpan dalam memori. Operasi penulisan dipicu jika ambang batas yang ditentukan oleh parameter batchSize atau bufferSize tercapai.

    INTEGER

    Tidak

    10000

    • Hanya Realtime Compute for Apache Flink yang menggunakan VVR 4.0.7 atau lebih baru yang mendukung parameter ini.

    • Parameter ini hanya berlaku setelah Anda menentukan primary key.

    flushIntervalMs

    Interval saat Anda ingin membersihkan buffer memori. Jika jumlah rekaman data yang disimpan tidak mencapai batas atas yang ditentukan oleh parameter batchSize atau bufferSize dalam periode waktu tertentu, sistem secara otomatis menulis semua data yang disimpan ke tabel sink.

    INTEGER

    Tidak

    • Untuk Realtime Compute for Apache Flink yang menggunakan VVR 4.0.7 atau lebih baru, nilai defaultnya adalah 2000.

    • Untuk Realtime Compute for Apache Flink yang menggunakan VVR versi 4.0.0 hingga 4.0.6, nilai defaultnya adalah 0.

    • Untuk Realtime Compute for Apache Flink yang menggunakan VVR 3.X atau lebih lama, nilai defaultnya adalah 1000.

    Jika Anda tidak mengonfigurasi parameter ini dalam versi di mana nilai default parameter ini adalah 0, sejumlah kecil data mungkin tidak pernah ditulis ke tabel sink. Untuk menyelesaikan masalah ini, kami merekomendasikan Anda menggunakan versi terbaru dari Realtime Compute for Apache Flink.

    ignoreDelete

    Menentukan apakah akan mengabaikan operasi penghapusan.

    BOOLEAN

    Tidak

    false

    Operasi penghapusan mungkin terjadi ketika Anda menggunakan Flink SQL. Jika beberapa operator keluaran memperbarui bidang berbeda dalam tabel sink yang sama berdasarkan primary key, hasil data mungkin salah.

    Sebagai contoh, rekaman data dihapus dalam satu tugas dan kemudian hanya beberapa bidang dari rekaman data diperbarui dalam tugas lain. Dalam kasus ini, nilai bidang yang tidak diperbarui menjadi null atau nilai default karena bidang tersebut dihapus. Untuk menghindari operasi penghapusan, Anda dapat mengatur parameter ignoreDelete menjadi true.

    connectionMaxActive

    Ukuran kolam koneksi basis data.

    INTEGER

    Tidak

    40

    • Hanya Realtime Compute for Apache Flink yang menggunakan VVR 4.0.7 atau lebih baru yang mendukung parameter ini.

    • Jika akses ke kolam koneksi basis data habis waktu, jumlah koneksi basis data dalam kolam mungkin tidak cukup. Anda dapat meningkatkan ukuran kolam koneksi basis data.

    • Jika jumlah maksimum koneksi paralel yang didukung oleh basis data kecil, Anda dapat mengurangi ukuran kolam koneksi atau mengurangi paralelisme operator.

  • Parameter hanya untuk tabel dimensi

    Parameter

    Deskripsi

    Tipe data

    Diperlukan

    Nilai default

    Catatan

    cache

    Kebijakan cache untuk tabel dimensi.

    STRING

    Tidak

    • Untuk Realtime Compute for Apache Flink yang menggunakan VVR versi lebih lama dari 4.0.6, nilai default parameter ini adalah NONE.

    • Untuk Realtime Compute for Apache Flink yang menggunakan VVR 4.0.6 atau lebih baru, nilai default parameter ini adalah ALL.

    Konektor ApsaraDB RDS untuk MySQL mendukung kebijakan cache berikut untuk tabel dimensi: None, LRU, dan ALL. Untuk informasi lebih lanjut tentang kebijakan cache, lihat Informasi latar belakang.

    cacheSize

    Jumlah maksimum baris rekaman data yang dapat disimpan dalam cache.

    INTEGER

    Tidak

    100000

    • Jika Anda mengatur parameter cache ke LRU, Anda harus mengonfigurasi parameter cacheSize.

    • Jika Anda mengatur parameter cache ke NONE atau ALL, Anda tidak perlu mengonfigurasi parameter cacheSize.

    cacheTTLMs

    Periode timeout cache.

    LONG

    Tidak

    • Jika Anda mengatur parameter cache ke NONE, Anda tidak perlu mengonfigurasi parameter cacheTTLMs. Ini menunjukkan bahwa entri cache tidak kedaluwarsa.

    • Jika Anda mengatur parameter cache ke LRU, parameter cacheTTLMs menentukan periode timeout cache. Secara default, entri cache tidak kedaluwarsa.

    • Jika Anda mengatur parameter cache ke ALL, parameter cacheTTLMs menentukan interval di mana sistem memuat ulang cache. Secara default, cache tidak dimuat ulang.

    Satuan: milidetik.

    maxJoinRows

    Jumlah maksimum hasil yang dikembalikan setelah setiap rekaman data dalam tabel utama dipetakan ke data dalam tabel dimensi.

    INTEGER

    Tidak

    1024

    Ketika Anda menggabungkan tabel utama dan tabel dimensi, jumlah hasil yang dikembalikan setelah rekaman data masukan dalam tabel utama dipetakan ke rekaman data dalam tabel dimensi dibatasi oleh parameter ini.

    Jika Anda dapat memperkirakan bahwa rekaman data dalam tabel utama sesuai dengan maksimum n rekaman data dalam tabel dimensi, Anda dapat mengatur parameter maxJoinRows menjadi n untuk memastikan pencocokan yang efisien dari Realtime Compute for Apache Flink.

Pemetaan tipe data

Tipe data Flink

Tipe data ApsaraDB RDS untuk MySQL

BOOLEAN

BOOLEAN

TINYINT

TINYINT

TINYINT(1)

Catatan

Hanya tabel dimensi yang mendukung pemetaan ini.

BOOLEAN

SMALLINT

SMALLINT

SMALLINT

TINYINT UNSIGNED

INT

INT

INT

SMALLINT UNSIGNED

BIGINT

BIGINT

BIGINT

INT UNSIGNED

DECIMAL(20,0)

BIGINT UNSIGNED

FLOAT

FLOAT

DECIMAL

DECIMAL

DOUBLE

DOUBLE

DATE

DATE

TIME

TIME

TIMESTAMP

TIMESTAMP

VARCHAR

VARCHAR

VARBINARY

VARBINARY

Contoh kode

  • Contoh kode untuk tabel sink

    CREATE TEMPORARY TABLE datagen_source(
     `name` VARCHAR,
     `age` INT
    ) WITH (
      'connector'='datagen'
    );
    
    CREATE TEMPORARY TABLE rds_sink(
     `name` VARCHAR,
     `age` INT
    ) WITH (
      'connector'='rds',
      'password'='your-password',
      'tableName'='your-tablename',
      'url'='your-url',
      'userName'='your-username'
    );
    
    INSERT INTO rds_sink
    SELECT * FROM datagen_source;
  • Contoh kode untuk tabel dimensi

    CREATE TEMPORARY TABLE datagen_source(
     a INT,
     b BIGINT,
     c STRING,
     `proctime` AS PROCTIME()
    ) WITH (
      'connector'='datagen'
    );
    
    CREATE TEMPORARY TABLE rds_dim(
     a INT,
     b VARCHAR,
     c VARCHAR
    ) WITH (
     'connector'='rds',
     'password'='<yourPassword>',
     'tableName'='<yourTablename>',
     'url'='jdbc:mysql://xxx',
     'userName'='<yourUsername>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
     a INT,
     b STRING
    ) WITH (
     'connector'='blackhole'
    );
    
    INSERT INTO blackhole_sink
    SELECT T.a, H.b FROM datagen_source AS T JOIN rds_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.a=H.a;

FAQ