全部产品
Search
文档中心

Realtime Compute for Apache Flink:Konektor JDBC

更新时间:Nov 10, 2025

Topik ini menjelaskan cara menggunakan konektor Java Database Connectivity (JDBC).

Ikhtisar

Konektor JDBC disediakan oleh Apache Flink dan dapat digunakan untuk membaca serta menulis data ke database umum seperti MySQL, PostgreSQL, dan Oracle. Tabel berikut menjelaskan kemampuan yang didukung oleh konektor JDBC.

Item

Deskripsi

Tipe tabel

Tabel sumber, tabel dimensi, dan Tabel sink

Mode operasi

Mode aliran dan mode batch

Format data

Tidak tersedia

Metrik

Tidak tersedia

Tipe API

SQL API

Pembaruan atau penghapusan data dalam tabel sink

Didukung

Prasyarat

Database dan tabel yang akan dihubungkan telah dibuat.

Batasan

  • Tabel sumber JDBC merupakan sumber terbatas (bounded source). Setelah konektor sumber JDBC membaca seluruh data dari sebuah tabel di database hulu, tugas tersebut selesai. Untuk menangkap data perubahan secara real-time, gunakan konektor Change Data Capture (CDC). Untuk informasi lebih lanjut, lihat Membuat tabel sumber MySQL CDC dan Membuat tabel sumber PostgreSQL CDC (pratinjau publik).

  • Untuk menulis data ke PostgreSQL, pastikan versi database adalah PostgreSQL 9.5 atau lebih tinggi. Versi tersebut mendukung klausa ON CONFLICT, yang diperlukan agar penyisipan berhasil.

  • Saat menggunakan konektor JDBC, unggah secara manual paket JAR driver database tujuan sebagai file dependensi. Driver JDBC yang tersedia:

    Driver

    ID Grup

    ID Artefak

    MySQL

    mysql

    mysql-connector-java

    Oracle

    com.oracle.database.jdbc

    ojdbc8

    PostgreSQL

    org.postgresql

    postgresql

    Jika Anda menggunakan driver JDBC yang tidak tercantum dalam tabel, uji validitas dan ketersediaannya sebelum digunakan.
    • Saat konektor JDBC menulis data ke tabel sink MySQL, konektor JDBC menggabungkan setiap catatan data yang diterima menjadi sebuah Pernyataan SQL dan mengeksekusi Pernyataan SQL tersebut. Pada tabel sink MySQL dengan kunci primer, digunakan sintaks berikut: INSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...) ON DUPLICATE KEY UPDATE column1 = VALUES(column1), column2 = VALUES(column2), ...;

      Peringatan

      Menyisipkan catatan yang memiliki nilai indeks unik duplikat (meskipun dengan kunci primer berbeda) ke dalam tabel fisik yang memiliki batasan indeks unik menyebabkan penimpaan data hilir dan kehilangan data.

Sintaks

CREATE TABLE jdbc_table (
  `id` BIGINT,
  `name` VARCHAR,
   PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:xxx',
  'table-name' = '<yourTable>',
  'username' = '<yourUsername>',
  'password' = '<yourPassword>'
);

Opsi konektor

  • Umum

    Opsi

    Deskripsi

    Tipe data

    Diperlukan

    Nilai default

    Catatan

    connector

    Tipe tabel.

    STRING

    Ya

    Tidak ada nilai default

    Atur nilainya menjadi jdbc.

    url

    URL database.

    STRING

    Ya

    Tidak ada nilai default

    Tidak tersedia.

    table-name

    Nama tabel JDBC.

    STRING

    Ya

    Tidak ada nilai default

    Tidak tersedia.

    username

    Nama pengguna JDBC.

    STRING

    Tidak

    Tidak ada nilai default

    username dan password harus diatur secara bersamaan.

    password

    Kata sandi pengguna JDBC.

    STRING

    Tidak

    Tidak ada nilai default

  • Khusus sumber

    Opsi

    Deskripsi

    Tipe data

    Diperlukan

    Nilai default

    Catatan

    scan.partition.column

    Nama kolom yang digunakan untuk mempartisi data masukan.

    STRING

    Tidak

    Tidak ada nilai default

    Nilai dalam kolom harus bertipe NUMERIC atau TIMESTAMP dan mendukung perbandingan dengan nilai NUMERIC di database. Untuk informasi lebih lanjut tentang pemindaian terpartisi, lihat Pemindaian Terpartisi.

    scan.partition.num

    Jumlah partisi.

    INTEGER

    Tidak

    Tidak ada nilai default

    Tidak tersedia.

    scan.partition.lower-bound

    Nilai terkecil dari partisi pertama.

    LONG

    Tidak

    Tidak ada nilai default

    Tidak tersedia.

    scan.partition.upper-bound

    Nilai terbesar dari partisi terakhir.

    LONG

    Tidak

    Tidak ada nilai default

    Tidak tersedia.

    scan.fetch-size

    Jumlah baris data yang diperoleh dari database setiap kali data dibaca dari tabel sumber.

    INTEGER

    Tidak

    0

    Jika Anda mengatur opsi ini ke 0, opsi ini diabaikan.

    scan.auto-commit

    Menentukan apakah akan mengaktifkan auto-commit.

    BOOLEAN

    Tidak

    true

    Tidak tersedia.

  • Khusus sink

    Opsi

    Deskripsi

    Tipe data

    Diperlukan

    Nilai default

    Catatan

    sink.buffer-flush.max-rows

    Jumlah maksimum rekaman data yang dapat di-cache sebelum operasi flush dilakukan.

    INTEGER

    Tidak

    100

    Jika Anda mengatur opsi ini ke 0, catatan data tidak di-cache sebelum operasi flush dilakukan.

    sink.buffer-flush.interval

    Interval flushing data, dalam milidetik. Jika catatan data di-cache melebihi durasi yang ditentukan oleh opsi ini, operasi flush dilakukan dalam thread asinkron.

    DURASI

    Tidak

    1000

    Jika Anda mengatur opsi ini ke 0, catatan data tidak di-cache sebelum operasi flush dilakukan.

    Catatan

    Jika Anda ingin memproses event flush yang di-cache dalam mode asinkron, Anda dapat mengatur opsi sink.buffer-flush.max-rows ke 0 dan mengonfigurasi opsi sink.buffer-flush.interval sesuai kebutuhan bisnis Anda.

    sink.max-retries

    Jumlah maksimum percobaan ulang yang diizinkan saat data gagal ditulis ke database.

    INTEGER

    Tidak

    3

    Tidak tersedia.

  • Khusus tabel dimensi

    Opsi

    Deskripsi

    Tipe data

    Diperlukan

    Nilai default

    Catatan

    lookup.cache.max-rows

    Jumlah maksimum baris data yang dapat di-cache. Jika jumlah baris data dalam cache melebihi nilai opsi ini, baris data paling awal kedaluwarsa dan digantikan oleh baris data baru.

    INTEGER

    Tidak

    Tidak ada nilai default

    Secara default, caching untuk tabel dimensi dinonaktifkan. Anda dapat mengonfigurasi opsi lookup.cache.max-rows dan lookup.cache.ttl untuk mengaktifkan caching untuk tabel dimensi. Jika caching untuk tabel dimensi diaktifkan, kebijakan cache LRU digunakan.

    lookup.cache.ttl

    Masa hidup data (TTL) maksimum setiap baris data dalam cache. Jika periode waktu suatu baris data di-cache melebihi nilai opsi ini, baris data tersebut kedaluwarsa.

    DURASI

    Tidak

    Tidak ada nilai default

    lookup.cache.caching-missing-key

    Menentukan apakah akan menyimpan hasil kueri kosong.

    BOOLEAN

    Tidak

    true

    Nilai yang valid:

    • true: Hasil kueri kosong disimpan. Ini adalah nilai default.

    • false: Hasil kueri kosong tidak disimpan.

    lookup.max-retries

    Jumlah maksimum percobaan ulang saat query basis data gagal.

    INTEGER

    Tidak

    3

    Tidak tersedia.

  • Khusus PostgreSQL

    Opsi

    Deskripsi

    Tipe data

    Diperlukan

    Nilai default

    Catatan

    source.extend-type.enabled

    Menentukan apakah data tipe ekstensi JSONB dan UUID dapat dibaca dan dipetakan ke tipe data yang didukung oleh Flink saat tabel PostgreSQL digunakan sebagai tabel sumber atau tabel dimensi.

    BOOLEAN

    Tidak

    false

    Nilai yang valid:

    • true: Data tipe ekstensi JSONB dan UUID dapat dibaca dan dipetakan ke tipe data yang didukung oleh Flink.

    • false: Data tipe ekstensi JSONB dan UUID tidak dapat dibaca atau dipetakan ke tipe data yang didukung oleh Flink. Ini adalah nilai default.

Pemetaan tipe data

Tipe data MySQL

Tipe data Oracle

Tipe data PostgreSQL

Tipe data SQL Flink

TINYINT

Tidak tersedia

Tidak tersedia

TINYINT

  • SMALLINT

  • TINYINT UNSIGNED

Tidak ada nilai default

  • SMALLINT

  • INT2

  • SMALLSERIAL

  • SERIAL2

SMALLINT

  • INT

  • MEDIUMINT

  • SMALLINT UNSIGNED

Tidak ada nilai default

  • INTEGER

  • SERIAL

INT

  • BIGINT

  • INT UNSIGNED

Tidak ada nilai default

  • BIGINT

  • BIGSERIAL

BIGINT

BIGINT UNSIGNED

Tidak tersedia

Tidak tersedia

DECIMAL(20, 0)

BIGINT

Tidak ada nilai default

BIGINT

BIGINT

FLOAT

BINARY_FLOAT

  • REAL

  • FLOAT4

FLOAT

  • DOUBLE

  • DOUBLE PRECISION

BINARY_DOUBLE

  • FLOAT8

  • DOUBLE PRECISION

DOUBLE

  • NUMERIC(p, s)

  • DECIMAL(p, s)

  • SMALLINT

  • FLOAT(s)

  • DOUBLE PRECISION

  • REAL

  • NUMBER(p, s)

  • NUMERIC(p, s)

  • DECIMAL(p, s)

DECIMAL(p, s)

  • BOOLEAN

  • TINYINT(1)

Tidak ada nilai default

BOOLEANcan

BOOLEAN

DATE

DATE

DATE

DATE

TIME [(p)]

DATE

TIME [(p)] [WITHOUT TIMEZONE]

TIME [(p)] [WITHOUT TIMEZONE]

DATETIME [(p)]

TIMESTAMP [(p)] [WITHOUT TIMEZONE]

TIMESTAMP [(p)] [WITHOUT TIMEZONE]

TIMESTAMP [(p)] [WITHOUT TIMEZONE]

  • CHAR(n)

  • VARCHAR(n)

  • TEXT

  • CHAR(n)

  • VARCHAR(n)

  • CLOB

  • CHAR(n)

  • CHARACTER(n)

  • VARCHAR(n)

  • CHARACTER VARYING(n)

  • TEXT

  • JSONB

  • UUID

STRING

  • BINARY

  • VARBINARY

  • BLOB

  • RAW(s)

  • BLOB

BYTEA

BYTES

Tidak tersedia

Tidak tersedia

ARRAY

ARRAY

Kode contoh

  • Kode Contoh untuk Tabel Sumber

    CREATE TEMPORARY TABLE jdbc_source (
      `id` INT,
      `name` VARCHAR
    ) WITH (
      'connector' = 'jdbc',
      'url' = 'jdbc:xxx',
      'table-name' = '<yourTable>',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
      `id` INT,
      `name` VARCHAR
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink SELECT * FROM jdbc_source ;
  • Kode contoh untuk tabel sink

    CREATE TEMPORARY TABLE datagen_source (
      `name` VARCHAR,
      `age` INT
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE jdbc_sink (
      `name` VARCHAR,
      `age` INT
    ) WITH (
      'connector' = 'jdbc',
      'url' = 'jdbc:xxxx',
      'table-name' = '<yourTable>',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>'
    );
    
    INSERT INTO jdbc_sink
    SELECT * FROM datagen_source;
  • Kode Contoh untuk Tabel Dimensi

    CREATE TEMPORARY TABLE datagen_source(
     `id` INT,
     `data` BIGINT,
     `proctime` AS PROCTIME()
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE jdbc_dim (
      `id` INT,
      `name` VARCHAR
    ) WITH (
      'connector' = 'jdbc',
      'url' = 'jdbc:xxx',
      'table-name' = '<yourTable>',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
      `id` INT,
      `data` BIGINT,
      `name` VARCHAR
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink
    SELECT T.`id`,T.`data`, H.`name`
    FROM datagen_source AS T
    JOIN jdbc_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.id = H.id;