全部产品
Search
文档中心

Realtime Compute for Apache Flink:AnalyticDB for MySQL V3.0

更新时间:Nov 10, 2025

Topik ini menjelaskan cara menggunakan konektor AnalyticDB for MySQL V3.0.

Informasi latar belakang

AnalyticDB for MySQL V3.0 adalah layanan gudang data enterprise berbasis cloud yang mengintegrasikan teknologi database dan data besar. Layanan ini mendukung operasi penambahan, penghapusan, dan modifikasi data real-time dengan throughput tinggi, analisis data real-time berlatensi rendah, serta proses ekstrak, transformasi, dan muat (ETL) yang kompleks. AnalyticDB for MySQL kompatibel dengan alat ekosistem hulu dan hilir, sehingga dapat digunakan untuk membangun sistem pelaporan enterprise, gudang data, dan mesin layanan data.

Tabel berikut menjelaskan kemampuan yang didukung oleh konektor AnalyticDB for MySQL V3.0.

Item

Deskripsi

Jenis tabel

Tabel sumber, tabel dimensi, dan tabel sink

Catatan

Hanya Ververica Runtime (VVR) 8.0.4 atau yang lebih baru yang mendukung tabel sumber. Untuk informasi selengkapnya mengenai parameter dan konfigurasi tabel sumber, lihat Gunakan Flink untuk berlangganan log biner. Untuk informasi selengkapnya mengenai parameter tabel dimensi dan tabel sink, lihat Parameter dalam klausa WITH.

Mode jalankan

Mode streaming dan mode batch

Format data

Tidak tersedia

Metriks

Tidak tersedia

Jenis API

API SQL

Pembaruan atau penghapusan data dalam tabel sink

Didukung

Prasyarat

Sintaksis

CREATE TEMPORARY TABLE adb_table (
  `id` INT,
  `num` BIGINT,
  PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
  'connector' = 'adb3.0',
  'url' = '<yourUrl>',
  'userName' = '<yourUsername>',
  'password' = '<yourPassword>',
  'tableName' = '<yourTablename>'
);
Penting

Kunci utama yang ditentukan dalam pernyataan DDL Flink harus konsisten dengan kunci utama tabel fisik di database AnalyticDB for MySQL. Kunci utama tersebut harus didefinisikan dalam pernyataan DDL Flink dan juga harus ada di tabel fisik di database AnalyticDB for MySQL secara bersamaan. Nama kunci utama dalam pernyataan DDL Flink harus identik dengan nama kunci utama pada tabel fisik. Jika tidak sesuai, data mungkin menjadi tidak akurat.

Parameter dalam klausa WITH

  • Parameter umum

    Parameter

    Deskripsi

    Tipe data

    Wajib

    Nilai default

    Keterangan

    connector

    Jenis tabel sink.

    String

    Ya

    Tidak ada nilai default

    Atur nilainya ke adb3.0.

    url

    URL Java Database Connectivity (JDBC) dari database.

    String

    Ya

    Tidak ada nilai default

    URL JDBC dari database AnalyticDB for MySQL. URL tersebut dalam format jdbc:mysql://<endpoint>:<port>/<databaseName>.

    • endpoint dan port: Anda dapat masuk ke Konsol AnalyticDB for MySQL. Di panel navigasi sebelah kiri, klik nama kluster yang diinginkan di kolom Cluster ID/Cluster Description. Pada halaman yang muncul, peroleh informasi di bagian Network Information.

    • databaseName: nama database AnalyticDB for MySQL.

    userName

    Nama pengguna yang digunakan untuk mengakses database.

    String

    Ya

    Tidak ada nilai default

    Tidak tersedia.

    password

    Kata sandi yang digunakan untuk mengakses database.

    String

    Ya

    Tidak ada nilai default

    Tidak tersedia.

    tableName

    Nama tabel di database.

    String

    Ya

    Tidak ada nilai default

    Tidak tersedia.

    maxRetryTimes

    Jumlah maksimum percobaan ulang yang diizinkan jika upaya penulisan atau pembacaan data gagal.

    Integer

    Tidak

    10

    Tidak tersedia.

  • Parameter hanya untuk tabel sink

    Parameter

    Deskripsi

    Tipe data

    Wajib

    Nilai default

    Keterangan

    batchSize

    Jumlah catatan data yang dapat ditulis sekaligus.

    Integer

    Tidak

    1000

    Parameter ini hanya berlaku setelah Anda menentukan kunci utama.

    bufferSize

    Jumlah maksimum catatan data yang dapat di-cache di memori. Operasi tulis dipicu jika nilai parameter batchSize atau bufferSize mencapai ambang batas yang ditentukan.

    Integer

    Tidak

    1000

    Parameter ini hanya berlaku setelah Anda menentukan kunci utama.

    flushIntervalMs

    Interval pengosongan cache. Nilai ini menunjukkan bahwa jika jumlah catatan data yang di-cache tidak mencapai batas atas dalam periode waktu tertentu, semua data yang di-cache akan ditulis ke tabel sink.

    Integer

    Tidak

    3000

    Unit: milidetik.

    ignoreDelete

    Menentukan apakah operasi penghapusan diabaikan.

    Boolean

    Tidak

    false

    Nilai yang valid:

    • true: Operasi penghapusan diabaikan.

    • false: Operasi penghapusan tidak diabaikan.

    replaceMode

    Menentukan apakah menggunakan pernyataan REPLACE INTO untuk memasukkan data ke dalam tabel jika kunci utama ditentukan dalam pernyataan DDL.

    Boolean

    Tidak

    true

    Nilai yang valid (VVR 11.2+):

    • replace: Menggunakan sintaks REPLACE INTO. Jika kunci utama duplikat, baris baru akan menimpa baris yang sudah ada.

    • upsert: Menggunakan sintaks INSERT INTO ... ON DUPLICATE KEY UPDATE. Memasukkan baris baru jika kunci utama tidak ada; memperbarui baris yang sudah ada jika kunci utama ada. Contoh: Untuk tabel dengan bidang a (kunci utama), b, c, d, jika data hanya disediakan untuk a dan b, hanya bidang b yang diperbarui saat terdeteksi kunci utama duplikat. Bidang c dan d tetap tidak berubah.

    • insert: Menggunakan sintaks INSERT IGNORE INTO. Jika kunci utama duplikat, entri data pertama dipertahankan, dan entri berikutnya diabaikan.

    Nilai yang valid (versi sebelum VVR 11.2):

    • true: Perilaku yang sama seperti replace.

    • false: Perilaku yang sama seperti upsert.

    Catatan: VVR 11.2 dan versi yang lebih baru kompatibel dengan nilai true dan false dari versi sebelumnya.

    Catatan
    • Hanya AnalyticDB for MySQL V3.1.3.5 atau yang lebih baru yang mendukung parameter ini.

    • Opsi ini hanya berlaku ketika kunci utama didefinisikan dalam DDL tabel sink. Jika tidak ada kunci utama yang didefinisikan dalam DDL tabel sink, sintaks insert ignore into selalu digunakan untuk memasukkan data.

    excludeUpdateColumns

    Bidang-bidang yang tidak diperbarui ketika data dengan kunci utama yang sama diperbarui.

    String

    Tidak

    String kosong

    Pisahkan beberapa bidang dengan koma (,). Contoh: excludeUpdateColumns='column1,column2'.

    Pertimbangkan tabel sink dengan bidang a (kunci utama), b, c, dan d, dan excludeUpdateColumns='c,d' ditetapkan. Ketika data yang dimasukkan memiliki nilai kunci utama unik, semua bidang dimasukkan. Ketika data yang dimasukkan memiliki nilai kunci utama duplikat, hanya bidang b yang diperbarui, dan bidang c dan d mempertahankan nilai aslinya.

    Catatan
    • Opsi ini hanya berlaku ketika replaceMode diatur ke upsert atau false.

    • Pastikan kolom yang akan diabaikan ditulis dalam satu baris dan tidak dilipat.

    connectionMaxActive

    Ukuran maksimum kolam thread.

    Integer

    Tidak

    40

    Tidak tersedia.

  • Parameter hanya untuk tabel dimensi

    Parameter

    Deskripsi

    Tipe data

    Wajib

    Nilai default

    Keterangan

    cache

    Kebijakan cache.

    String

    Tidak

    ALL

    Nilai yang valid:

    • None: Tidak ada data yang di-cache.

    • LRU: Hanya data tertentu di tabel dimensi yang di-cache. Setiap kali sistem menerima catatan data, sistem mencari cache. Jika sistem tidak menemukan catatan tersebut di cache, sistem mencari catatan data di tabel dimensi fisik.

    • ALL: Semua data di tabel dimensi di-cache. Ini adalah nilai default. Sebelum penerapan dijalankan, sistem memuat semua data di tabel dimensi ke cache. Dengan demikian, cache dicari untuk semua kueri selanjutnya di tabel dimensi. Jika sistem tidak menemukan catatan data di cache, kunci gabungan tidak ada. Sistem memuat ulang semua data di cache setelah entri cache kedaluwarsa.

    Jika jumlah data di tabel jarak jauh kecil dan terdapat banyak kunci yang hilang, kami menyarankan Anda mengatur parameter ini ke ALL. Tabel sumber dan tabel dimensi tidak dapat diasosiasikan berdasarkan klausa ON.

    Catatan
    • Jika Anda mengatur parameter cache ke ALL, Anda harus memantau penggunaan memori node untuk mencegah kesalahan kehabisan memori (OOM).

    • Jika Anda mengatur parameter cache ke ALL, Anda harus menambah memori node untuk penggabungan tabel karena sistem memuat data dari tabel dimensi secara asinkron. Ukuran memori yang ditambahkan adalah dua kali lipat ukuran tabel jarak jauh.

    cacheSize

    Jumlah maksimum catatan data yang dapat di-cache.

    Integer

    Tidak

    100000

    Anda harus mengonfigurasi parameter cacheSize ketika parameter cache diatur ke LRU.

    cacheTTLMs

    Periode waktu kedaluwarsa cache. Satuan: milidetik.

    Integer

    Tidak

    Long.MAX_VALUE

    Anda harus mengonfigurasi parameter cacheTTLMs ketika parameter cache diatur ke LRU atau ALL.

    • Jika parameter cache diatur ke LRU, parameter cacheTTLMs menentukan periode waktu kedaluwarsa cache. Nilai default: Long.MAX_VALUE. Nilai default menunjukkan bahwa entri cache tidak kedaluwarsa.

    • Jika parameter cache diatur ke ALL, parameter cacheTTLMs menentukan interval pemuatan ulang data di tabel fisik. Nilai default: Long.MAX_VALUE. Nilai default menunjukkan bahwa data di tabel fisik tidak dimuat ulang.

    Catatan

    Jika parameter cache diatur ke None, Anda tidak perlu mengonfigurasi parameter cacheTTLMs. Jika parameter cache diatur ke None, data tidak di-cache. Oleh karena itu, Anda tidak perlu mengonfigurasi parameter cacheTTLMs.

    maxJoinRows

    Jumlah maksimum hasil yang dikembalikan setelah setiap catatan data di tabel utama dipetakan ke data di tabel dimensi.

    Integer

    Tidak

    1024

    Jika Anda dapat memperkirakan bahwa setiap catatan data di tabel utama dipetakan ke maksimal n catatan data di tabel dimensi, Anda dapat mengonfigurasi maxJoinRows='n' untuk memastikan pencocokan yang efisien di Realtime Compute for Apache Flink.

    Catatan

    Saat Anda menggabungkan tabel utama dengan tabel dimensi, jumlah hasil yang dikembalikan setelah catatan data masukan di tabel utama dipetakan ke catatan data di tabel dimensi dibatasi oleh parameter ini.

Pemetaan tipe data

Tipe data AnalyticDB for MySQL V3.0

Tipe data Realtime Compute for Apache Flink

BOOLEAN

BOOLEAN

TINYINT

TINYINT

SMALLINT

SMALLINT

INT

INT

BIGINT

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

DECIMAL(p, s) atau NUMERIC(p, s)

DECIMAL(p, s)

VARCHAR

STRING

BINARY

BYTES

DATE

DATE

TIME

TIME

DATETIME

TIMESTAMP

TIMESTAMP

TIMESTAMP

POINT

STRING

Kode contoh

  • Kode contoh untuk tabel sink

    CREATE TEMPORARY TABLE datagen_source (
      `name` VARCHAR,
      `age` INT
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE adb_sink (
      `name` VARCHAR,
      `age` INT
    ) WITH (
      'connector' = 'adb3.0',
      'url' = '<yourUrl>',
      'userName' = '<yourUsername>',
      'password' = '<yourPassword>',
      'tableName' = '<yourTablename>'
    );
    
    INSERT INTO adb_sink
    SELECT * FROM datagen_source;
  • Kode contoh untuk tabel dimensi

    CREATE TEMPORARY TABLE datagen_source(
      `a` INT,
      `b` VARCHAR,
      `c` STRING,
      `proctime` AS PROCTIME()
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE adb_dim (
      `a` INT,
      `b` VARCHAR,
      `c` VARCHAR
    ) WITH (
      'connector' = 'adb3.0',
      'url' = '<yourUrl>',
      'userName' = '<yourUsername>',
      'password' = '<yourPassword>',
      'tableName' = '<yourTablename>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
      `a` INT,
      `b` VARCHAR
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink SELECT T.a,H.b
    FROM datagen_source AS T JOIN adb_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.a = H.a;

Referensi

Error: multi-statement be found