All Products
Search
Document Center

Realtime Compute for Apache Flink:JDBC connector

Last Updated:Mar 26, 2026

The JDBC connector reads from and writes to relational databases — MySQL, PostgreSQL, and Oracle — using standard SQL DDL in Flink SQL jobs.

Supported table types: source table · dimension table · sink table

Supported running modes: streaming mode · batch mode · SQL API

Prerequisites

Before you begin, make sure that:

  • The target database and table already exist

  • The JDBC driver JAR for your database is available to upload

Limitations

  • Bounded reads: A JDBC source table is a bounded source. The task completes after all rows are read. To capture real-time change data, use a Change Data Capture (CDC) connector instead — see Create a MySQL CDC source table and Create a PostgreSQL CDC source table (public preview).

  • PostgreSQL version: Writing to PostgreSQL requires version 9.5 or later, because the sink relies on the ON CONFLICT clause.

  • JDBC driver upload: Upload the JDBC driver JAR as a dependency file before running your job. Common drivers:

    Database Group ID Artifact ID
    MySQL mysql mysql-connector-java
    Oracle com.oracle.database.jdbc ojdbc8
    PostgreSQL org.postgresql postgresql

    For drivers not listed here, verify compatibility before use.

  • MySQL upsert behavior: When writing to a MySQL sink table with a primary key, the connector issues INSERT INTO ... ON DUPLICATE KEY UPDATE ... statements.

    Warning

    Inserting rows with duplicate unique index values — even when the primary keys differ — overwrites existing rows in any physical table with a unique index constraint, causing data loss.

Create a JDBC table

CREATE TABLE jdbc_table (
  `id`   BIGINT,
  `name` VARCHAR,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector'  = 'jdbc',
  'url'        = 'jdbc:<db-type>://<host>:<port>/<database>',
  'table-name' = '<yourTable>',
  'username'   = '<yourUsername>',
  'password'   = '<yourPassword>'
);

Connector options

General

Option Type Required Default Description
connector STRING Yes Set to jdbc.
url STRING Yes JDBC URL of the database.
table-name STRING Yes Name of the table to read from or write to.
username STRING No Database username. Set together with password.
password STRING No Database password.

Source options

Option Type Required Default Description
scan.partition.column STRING No Column used to split data into partitions. Must be NUMERIC or TIMESTAMP type. See Partitioned scan.
scan.partition.num INTEGER No Number of partitions.
scan.partition.lower-bound LONG No Smallest value of the first partition.
scan.partition.upper-bound LONG No Largest value of the last partition.
scan.fetch-size INTEGER No 0 Rows fetched per database round trip. If set to 0, this option is ignored.
scan.auto-commit BOOLEAN No true Enables auto-commit for read transactions.

Sink options

Option Type Required Default Description
sink.buffer-flush.max-rows INTEGER No 100 Maximum rows to buffer before flushing. Set to 0 to flush every row immediately.
sink.buffer-flush.interval DURATION No 1000 ms Maximum time to hold buffered rows before flushing. Set to 0 to flush every row immediately.
sink.max-retries INTEGER No 3 Maximum write retries on failure.
sink.ignore-delete BOOLEAN No false Ignores delete messages instead of forwarding them. Requires VVR 11.4+.
sink.ignore-delete-mode STRING No ALL Controls which delete messages are ignored when sink.ignore-delete is true. Requires VVR 11.4+. Valid values: ALL (ignores -D and -U), REAL_DELETE (ignores -D only), UPDATE_BEFORE (ignores -U only).
Note

To flush buffered rows asynchronously on a timer rather than on row count, set sink.buffer-flush.max-rows to 0 and configure sink.buffer-flush.interval to your desired flush interval.

Dimension table options

Option Type Required Default Description
lookup.cache.max-rows INTEGER No Maximum rows in the lookup cache. When the cache is full, the least recently used row expires. Caching is disabled unless both lookup.cache.max-rows and lookup.cache.ttl are set.
lookup.cache.ttl DURATION No Maximum time a cached row is valid before it expires.
lookup.cache.caching-missing-key BOOLEAN No true Caches empty lookup results to avoid repeated database queries for missing keys.
lookup.max-retries INTEGER No 3 Maximum retries when a database query fails.

PostgreSQL options

Option Type Required Default Description
source.extend-type.enabled BOOLEAN No false When true, maps PostgreSQL JSONB and UUID columns to Flink STRING. For lookup joins on UUID columns, also add stringtype=unspecified to the JDBC URL so PostgreSQL queries by actual type rather than casting.

Key behaviors

Partitioned scan

To enable parallel reads from a source table, configure the scan.partition.column, scan.partition.lower-bound, and scan.partition.upper-bound options together with scan.partition.num. The partition column must be numeric or TIMESTAMP. For details on how splits are calculated, see Partitioned Scan in the Apache Flink documentation.

Lookup cache

By default, every lookup join query hits the database directly. Enable the LRU cache by setting both lookup.cache.max-rows and lookup.cache.ttl to reduce database load at the cost of slightly stale data.

Idempotent writes

When the sink table has a primary key, the connector issues database-specific upsert statements. For MySQL, the connector uses INSERT ... ON DUPLICATE KEY UPDATE ....

Examples

All three examples use a blackhole or datagen connector as a lightweight counterpart so the statements are self-contained.

Read from a database (source table)

CREATE TEMPORARY TABLE jdbc_source (
  `id`   INT,
  `name` VARCHAR
) WITH (
  'connector'  = 'jdbc',
  'url'        = 'jdbc:mysql://localhost:3306/mydb',
  'table-name' = '<yourTable>',
  'username'   = '<yourUsername>',
  'password'   = '<yourPassword>'
);

CREATE TEMPORARY TABLE blackhole_sink (
  `id`   INT,
  `name` VARCHAR
) WITH (
  'connector' = 'blackhole'
);

INSERT INTO blackhole_sink SELECT * FROM jdbc_source;

Write to a database (sink table)

CREATE TEMPORARY TABLE datagen_source (
  `name` VARCHAR,
  `age`  INT
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE jdbc_sink (
  `name` VARCHAR,
  `age`  INT
) WITH (
  'connector'  = 'jdbc',
  'url'        = 'jdbc:mysql://localhost:3306/mydb',
  'table-name' = '<yourTable>',
  'username'   = '<yourUsername>',
  'password'   = '<yourPassword>'
);

INSERT INTO jdbc_sink SELECT * FROM datagen_source;

Enrich a stream with database lookups (dimension table)

CREATE TEMPORARY TABLE datagen_source (
  `id`       INT,
  `data`     BIGINT,
  `proctime` AS PROCTIME()
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE jdbc_dim (
  `id`   INT,
  `name` VARCHAR
) WITH (
  'connector'  = 'jdbc',
  'url'        = 'jdbc:mysql://localhost:3306/mydb',
  'table-name' = '<yourTable>',
  'username'   = '<yourUsername>',
  'password'   = '<yourPassword>'
);

CREATE TEMPORARY TABLE blackhole_sink (
  `id`   INT,
  `data` BIGINT,
  `name` VARCHAR
) WITH (
  'connector' = 'blackhole'
);

-- Look up the matching name for each stream record at processing time
INSERT INTO blackhole_sink
SELECT T.`id`, T.`data`, H.`name`
FROM datagen_source AS T
JOIN jdbc_dim FOR SYSTEM_TIME AS OF T.proctime AS H
  ON T.id = H.id;

Data type mappings

MySQL Oracle PostgreSQL Flink SQL
TINYINT TINYINT
SMALLINT, TINYINT UNSIGNED SMALLINT, INT2, SMALLSERIAL, SERIAL2 SMALLINT
INT, MEDIUMINT, SMALLINT UNSIGNED INTEGER, SERIAL INT
BIGINT, INT UNSIGNED BIGINT, BIGSERIAL BIGINT
BIGINT UNSIGNED DECIMAL(20, 0)
FLOAT BINARY_FLOAT REAL, FLOAT4 FLOAT
DOUBLE, DOUBLE PRECISION BINARY_DOUBLE FLOAT8, DOUBLE PRECISION DOUBLE
NUMERIC(p, s), DECIMAL(p, s) SMALLINT, FLOAT(s), DOUBLE PRECISION, REAL, NUMBER(p, s) NUMERIC(p, s), DECIMAL(p, s) DECIMAL(p, s)
BOOLEAN, TINYINT(1) BOOLEAN BOOLEAN
DATE DATE DATE DATE
TIME [(p)] DATE TIME [(p)] [WITHOUT TIMEZONE] TIME [(p)] [WITHOUT TIMEZONE]
DATETIME [(p)] TIMESTAMP [(p)] [WITHOUT TIMEZONE] TIMESTAMP [(p)] [WITHOUT TIMEZONE] TIMESTAMP [(p)] [WITHOUT TIMEZONE]
CHAR(n), VARCHAR(n), TEXT CHAR(n), VARCHAR(n), CLOB CHAR(n), CHARACTER(n), VARCHAR(n), CHARACTER VARYING(n), TEXT, JSONB, UUID STRING
BINARY, VARBINARY, BLOB RAW(s), BLOB BYTEA BYTES
ARRAY ARRAY

What's next