The JDBC connector reads from and writes to relational databases — MySQL, PostgreSQL, and Oracle — using standard SQL DDL in Flink SQL jobs.
Supported table types: source table · dimension table · sink table
Supported running modes: streaming mode · batch mode · SQL API
Prerequisites
Before you begin, make sure that:
-
The target database and table already exist
-
The JDBC driver JAR for your database is available to upload
Limitations
-
Bounded reads: A JDBC source table is a bounded source. The task completes after all rows are read. To capture real-time change data, use a Change Data Capture (CDC) connector instead — see Create a MySQL CDC source table and Create a PostgreSQL CDC source table (public preview).
-
PostgreSQL version: Writing to PostgreSQL requires version 9.5 or later, because the sink relies on the
ON CONFLICTclause. -
JDBC driver upload: Upload the JDBC driver JAR as a dependency file before running your job. Common drivers:
Database Group ID Artifact ID MySQL mysqlmysql-connector-java Oracle com.oracle.database.jdbcojdbc8 PostgreSQL org.postgresqlpostgresql For drivers not listed here, verify compatibility before use.
-
MySQL upsert behavior: When writing to a MySQL sink table with a primary key, the connector issues
INSERT INTO ... ON DUPLICATE KEY UPDATE ...statements.WarningInserting rows with duplicate unique index values — even when the primary keys differ — overwrites existing rows in any physical table with a unique index constraint, causing data loss.
Create a JDBC table
CREATE TABLE jdbc_table (
`id` BIGINT,
`name` VARCHAR,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:<db-type>://<host>:<port>/<database>',
'table-name' = '<yourTable>',
'username' = '<yourUsername>',
'password' = '<yourPassword>'
);
Connector options
General
| Option | Type | Required | Default | Description |
|---|---|---|---|---|
connector |
STRING | Yes | — | Set to jdbc. |
url |
STRING | Yes | — | JDBC URL of the database. |
table-name |
STRING | Yes | — | Name of the table to read from or write to. |
username |
STRING | No | — | Database username. Set together with password. |
password |
STRING | No | — | Database password. |
Source options
| Option | Type | Required | Default | Description |
|---|---|---|---|---|
scan.partition.column |
STRING | No | — | Column used to split data into partitions. Must be NUMERIC or TIMESTAMP type. See Partitioned scan. |
scan.partition.num |
INTEGER | No | — | Number of partitions. |
scan.partition.lower-bound |
LONG | No | — | Smallest value of the first partition. |
scan.partition.upper-bound |
LONG | No | — | Largest value of the last partition. |
scan.fetch-size |
INTEGER | No | 0 |
Rows fetched per database round trip. If set to 0, this option is ignored. |
scan.auto-commit |
BOOLEAN | No | true |
Enables auto-commit for read transactions. |
Sink options
| Option | Type | Required | Default | Description |
|---|---|---|---|---|
sink.buffer-flush.max-rows |
INTEGER | No | 100 |
Maximum rows to buffer before flushing. Set to 0 to flush every row immediately. |
sink.buffer-flush.interval |
DURATION | No | 1000 ms |
Maximum time to hold buffered rows before flushing. Set to 0 to flush every row immediately. |
sink.max-retries |
INTEGER | No | 3 |
Maximum write retries on failure. |
sink.ignore-delete |
BOOLEAN | No | false |
Ignores delete messages instead of forwarding them. Requires VVR 11.4+. |
sink.ignore-delete-mode |
STRING | No | ALL |
Controls which delete messages are ignored when sink.ignore-delete is true. Requires VVR 11.4+. Valid values: ALL (ignores -D and -U), REAL_DELETE (ignores -D only), UPDATE_BEFORE (ignores -U only). |
To flush buffered rows asynchronously on a timer rather than on row count, set sink.buffer-flush.max-rows to 0 and configure sink.buffer-flush.interval to your desired flush interval.
Dimension table options
| Option | Type | Required | Default | Description |
|---|---|---|---|---|
lookup.cache.max-rows |
INTEGER | No | — | Maximum rows in the lookup cache. When the cache is full, the least recently used row expires. Caching is disabled unless both lookup.cache.max-rows and lookup.cache.ttl are set. |
lookup.cache.ttl |
DURATION | No | — | Maximum time a cached row is valid before it expires. |
lookup.cache.caching-missing-key |
BOOLEAN | No | true |
Caches empty lookup results to avoid repeated database queries for missing keys. |
lookup.max-retries |
INTEGER | No | 3 |
Maximum retries when a database query fails. |
PostgreSQL options
| Option | Type | Required | Default | Description |
|---|---|---|---|---|
source.extend-type.enabled |
BOOLEAN | No | false |
When true, maps PostgreSQL JSONB and UUID columns to Flink STRING. For lookup joins on UUID columns, also add stringtype=unspecified to the JDBC URL so PostgreSQL queries by actual type rather than casting. |
Key behaviors
Partitioned scan
To enable parallel reads from a source table, configure the scan.partition.column, scan.partition.lower-bound, and scan.partition.upper-bound options together with scan.partition.num. The partition column must be numeric or TIMESTAMP. For details on how splits are calculated, see Partitioned Scan in the Apache Flink documentation.
Lookup cache
By default, every lookup join query hits the database directly. Enable the LRU cache by setting both lookup.cache.max-rows and lookup.cache.ttl to reduce database load at the cost of slightly stale data.
Idempotent writes
When the sink table has a primary key, the connector issues database-specific upsert statements. For MySQL, the connector uses INSERT ... ON DUPLICATE KEY UPDATE ....
Examples
All three examples use a blackhole or datagen connector as a lightweight counterpart so the statements are self-contained.
Read from a database (source table)
CREATE TEMPORARY TABLE jdbc_source (
`id` INT,
`name` VARCHAR
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/mydb',
'table-name' = '<yourTable>',
'username' = '<yourUsername>',
'password' = '<yourPassword>'
);
CREATE TEMPORARY TABLE blackhole_sink (
`id` INT,
`name` VARCHAR
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink SELECT * FROM jdbc_source;
Write to a database (sink table)
CREATE TEMPORARY TABLE datagen_source (
`name` VARCHAR,
`age` INT
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE jdbc_sink (
`name` VARCHAR,
`age` INT
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/mydb',
'table-name' = '<yourTable>',
'username' = '<yourUsername>',
'password' = '<yourPassword>'
);
INSERT INTO jdbc_sink SELECT * FROM datagen_source;
Enrich a stream with database lookups (dimension table)
CREATE TEMPORARY TABLE datagen_source (
`id` INT,
`data` BIGINT,
`proctime` AS PROCTIME()
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE jdbc_dim (
`id` INT,
`name` VARCHAR
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/mydb',
'table-name' = '<yourTable>',
'username' = '<yourUsername>',
'password' = '<yourPassword>'
);
CREATE TEMPORARY TABLE blackhole_sink (
`id` INT,
`data` BIGINT,
`name` VARCHAR
) WITH (
'connector' = 'blackhole'
);
-- Look up the matching name for each stream record at processing time
INSERT INTO blackhole_sink
SELECT T.`id`, T.`data`, H.`name`
FROM datagen_source AS T
JOIN jdbc_dim FOR SYSTEM_TIME AS OF T.proctime AS H
ON T.id = H.id;
Data type mappings
| MySQL | Oracle | PostgreSQL | Flink SQL |
|---|---|---|---|
TINYINT |
— | — | TINYINT |
SMALLINT, TINYINT UNSIGNED |
— | SMALLINT, INT2, SMALLSERIAL, SERIAL2 |
SMALLINT |
INT, MEDIUMINT, SMALLINT UNSIGNED |
— | INTEGER, SERIAL |
INT |
BIGINT, INT UNSIGNED |
— | BIGINT, BIGSERIAL |
BIGINT |
BIGINT UNSIGNED |
— | — | DECIMAL(20, 0) |
FLOAT |
BINARY_FLOAT |
REAL, FLOAT4 |
FLOAT |
DOUBLE, DOUBLE PRECISION |
BINARY_DOUBLE |
FLOAT8, DOUBLE PRECISION |
DOUBLE |
NUMERIC(p, s), DECIMAL(p, s) |
SMALLINT, FLOAT(s), DOUBLE PRECISION, REAL, NUMBER(p, s) |
NUMERIC(p, s), DECIMAL(p, s) |
DECIMAL(p, s) |
BOOLEAN, TINYINT(1) |
— | BOOLEAN |
BOOLEAN |
DATE |
DATE |
DATE |
DATE |
TIME [(p)] |
DATE |
TIME [(p)] [WITHOUT TIMEZONE] |
TIME [(p)] [WITHOUT TIMEZONE] |
DATETIME [(p)] |
TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
CHAR(n), VARCHAR(n), TEXT |
CHAR(n), VARCHAR(n), CLOB |
CHAR(n), CHARACTER(n), VARCHAR(n), CHARACTER VARYING(n), TEXT, JSONB, UUID |
STRING |
BINARY, VARBINARY, BLOB |
RAW(s), BLOB |
BYTEA |
BYTES |
| — | — | ARRAY |
ARRAY |
What's next
-
Create a MySQL CDC source table — capture real-time change data from MySQL
-
Create a PostgreSQL CDC source table (public preview) — capture real-time change data from PostgreSQL