All Products
Search
Document Center

Realtime Compute for Apache Flink:JDBC connector

Last Updated:Jan 22, 2024

This topic describes how to use the Java Database Connectivity (JDBC) connector.

Background information

The JDBC connector is provided by Apache Flink and can be used to read data from and write data to common databases, such as MySQL, PostgreSQL, and Oracle. The following table describes the capabilities supported by the JDBC connector.

Item

Description

Table type

Source table, dimension table, and result table

Running mode

Streaming mode and batch mode

Data format

N/A

Metric

N/A

API type

SQL API

Data update or deletion in a result table

Supported

Prerequisites

The database and table to which you want to connect are created.

Limits

  • Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 6.0.1 or later supports the JDBC connector.

  • A JDBC source table is a bounded source. After the JDBC source connector reads all data from a table in an upstream database and writes the data to a source table, the task for the JDBC source table is complete. If you want to capture real-time change data, use a Change Data Capture (CDC) connector. For more information, see Create a MySQL CDC source table and Create a PostgreSQL CDC source table (public preview).

  • If you use the JDBC sink connector to connect to a PostgreSQL database, make sure that the database version is PostgreSQL 9.5 or later. PostgreSQL uses the ON CONFLICT syntax to insert or update data when a primary key is specified in a DDL statement. The ON CONFLICT syntax is supported only in PostgreSQL 9.5 or later.

  • Fully managed Flink supports only the open source JDBC connector that does not include a JDBC driver for a specific database. When you use the JDBC connector, you must manually upload the JAR package of the driver of the destination database as a dependency file. For more information, see Step 3: Configure parameters on the Configurations tab. The following table describes the JDBC drivers supported by fully managed Flink.

    Driver

    Group ID

    Artifact ID

    MySQL

    mysql

    mysql-connector-java

    Oracle

    com.oracle.database.jdbc

    ojdbc8

    PostgreSQL

    org.postgresql

    postgresql

    Note

    If you use a JDBC driver that is not included in the table, you must test the validity and availability of the JDBC driver.

Syntax

CREATE TABLE jdbc_table (
  `id` BIGINT,
  `name` VARCHAR,
   PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:xxx',
  'table-name' = '<yourTable>',
  'username' = '<yourUsername>',
  'password' = '<yourPassword>'
);

Parameters in the WITH clause

  • Common parameters

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    connector

    The type of the table.

    STRING

    Yes

    No default value

    Set the value to jdbc.

    url

    The URL of the database.

    STRING

    Yes

    No default value

    N/A.

    table-name

    The name of the JDBC table.

    STRING

    Yes

    No default value

    N/A.

    username

    The name of the JDBC user.

    STRING

    No

    No default value

    If you configure one of the username and password parameters, you must also configure the other parameter.

    password

    The password of the JDBC user.

    STRING

    No

    No default value

  • Parameters only for source tables

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    scan.partition.column

    The name of the column that is used to partition the input data.

    STRING

    No

    No default value

    The values in the column must be of the NUMERIC, DATE, or TIMESTAMP type. For more information about partitioned scan, see Partitioned Scan.

    scan.partition.num

    The number of partitions.

    INTEGER

    No

    No default value

    N/A.

    scan.partition.lower-bound

    The smallest value of the first partition.

    INTEGER

    No

    No default value

    N/A.

    scan.partition.upper-bound

    The largest value of the last partition.

    INTEGER

    No

    No default value

    N/A.

    scan.fetch-size

    The number of rows of data that are obtained from the database each time data is read from a source table.

    INTEGER

    No

    0

    If you set this parameter to 0, this parameter is ignored.

    scan.auto-commit

    Specifies whether to enable auto-commit.

    BOOLEAN

    No

    true

    N/A.

  • Parameters only for result tables

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    sink.buffer-flush.max-rows

    The maximum number of data records that can be cached before the flush operation is performed.

    INTEGER

    No

    100

    If you set this parameter to 0, data records are not cached before the flush operation is performed.

    sink.buffer-flush.interval

    The interval at which data is flushed. If data records are cached for a period that exceeds the duration specified by this parameter, the flush operation is performed in an asynchronous thread.

    DURATION

    No

    1s

    If you set this parameter to 0, data records are not cached before the flush operation is performed.

    Note

    If you want to process cached flush events in asynchronous mode, you can set the sink.buffer-flush.max-rows parameter to 0 and configure the sink.buffer-flush.interval parameter based on your business requirements.

    sink.max-retries

    The maximum number of retries that are allowed when data fails to be written to the database.

    INTEGER

    No

    3

    N/A.

  • Parameters only for dimension tables

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    lookup.cache.max-rows

    The maximum number of rows of data that can be cached. If the number of rows of data in the cache exceeds the value of this parameter, the earliest row of data expires and is replaced by a new row of data.

    INTEGER

    No

    No default value

    By default, caching for dimension tables is disabled. You can configure the lookup.cache.max-rows and lookup.cache.ttl parameters to enable caching for dimension tables. If caching for dimension tables is enabled, the LRU cache policy is used.

    lookup.cache.ttl

    The maximum time to live (TTL) of each row of data in the cache. If the time period for which a row of data is cached exceeds the value of this parameter, the row of data expires.

    DURATION

    No

    No default value

    lookup.cache.caching-missing-key

    Specifies whether to cache empty query results.

    BOOLEAN

    No

    true

    Valid values:

    • true: Empty query results are cached. This is the default value.

    • false: Empty query results are not cached.

    lookup.max-retries

    The maximum number of retries when the database fails to be queried.

    INTEGER

    No

    3

    N/A.

  • Parameters only for PostgreSQL databases

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    source.extend-type.enabled

    Specifies whether data of the JSONB and UUID extension types can be read and mapped to the data types supported by Flink when a PostgreSQL table is used as a source table or a dimension table.

    BOOLEAN

    No

    false

    Valid values:

    • true: Data of the JSONB and UUID extension types can be read and mapped to the data types supported by Flink.

    • false: Data of the JSONB and UUID extension types cannot be read or mapped to the data types supported by Flink. This is the default value.

Data type mappings

Data type of MySQL

Data type of Oracle

Data type of PostgreSQL

Data type of Flink SQL

TINYINT

N/A

N/A

TINYINT

  • SMALLINT

  • TINYINT UNSIGNED

N/A

  • SMALLINT

  • INT2

  • SMALLSERIAL

  • SERIAL2

SMALLINT

  • INT

  • MEDIUMINT

  • SMALLINT UNSIGNED

N/A

  • INTEGER

  • SERIAL

INT

  • BIGINT

  • INT UNSIGNED

N/A

  • BIGINT

  • BIGSERIAL

BIGINT

BIGINT UNSIGNED

N/A.

N/A

DECIMAL(20, 0)

BIGINT

N/A

BIGINT

BIGINT

FLOAT

BINARY_FLOAT

  • REAL

  • FLOAT4

FLOAT

  • DOUBLE

  • DOUBLE PRECISION

BINARY_DOUBLE

  • FLOAT8

  • DOUBLE PRECISION

DOUBLE

  • NUMERIC(p, s)

  • DECIMAL(p, s)

  • SMALLINT

  • FLOAT(s)

  • DOUBLE PRECISION

  • REAL

  • NUMBER(p, s)

  • NUMERIC(p, s)

  • DECIMAL(p, s)

DECIMAL(p, s)

  • BOOLEAN

  • TINYINT(1)

N/A

BOOLEANcan

BOOLEAN

DATE

DATE

DATE

DATE

TIME [(p)]

DATE

TIME [(p)] [WITHOUT TIMEZONE]

TIME [(p)] [WITHOUT TIMEZONE]

DATETIME [(p)]

TIMESTAMP [(p)] [WITHOUT TIMEZONE]

TIMESTAMP [(p)] [WITHOUT TIMEZONE]

TIMESTAMP [(p)] [WITHOUT TIMEZONE]

  • CHAR(n)

  • VARCHAR(n)

  • TEXT

  • CHAR(n)

  • VARCHAR(n)

  • CLOB

  • CHAR(n)

  • CHARACTER(n)

  • VARCHAR(n)

  • CHARACTER VARYING(n)

  • TEXT

  • JSONB

  • UUID

STRING

  • BINARY

  • VARBINARY

  • BLOB

  • RAW(s)

  • BLOB

BYTEA

BYTES

N/A

N/A

ARRAY

ARRAY

Sample code

  • Sample code for a source table

    CREATE TEMPORARY TABLE jdbc_source (
      `id` INT,
      `name` VARCHAR
    ) WITH (
      'connector' = 'jdbc',
      'url' = 'jdbc:xxx',
      'table-name' = '<yourTable>',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
      `id` INT,
      `name` VARCHAR
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink SELECT * FROM jdbc_source ;
  • Sample code for a result table

    CREATE TEMPORARY TABLE datagen_source (
      `name` VARCHAR,
      `age` INT
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE jdbc_sink (
      `name` VARCHAR,
      `age` INT
    ) WITH (
      'connector' = 'jdbc',
      'url' = 'jdbc:xxxx',
      'table-name' = '<yourTable>',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>'
    );
    
    INSERT INTO jdbc_sink
    SELECT * FROM datagen_source;
  • Sample code for a dimension table

    CREATE TEMPORARY TABLE datagen_source(
     `id` INT,
     `data` BIGINT,
     `proctime` AS PROCTIME()
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE jdbc_dim (
      `id` INT,
      `name` VARCHAR
    ) WITH (
      'connector' = 'jdbc',
      'url' = 'jdbc:xxx',
      'table-name' = '<yourTable>',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
      `id` INT,
      `data` BIGINT,
      `name` VARCHAR
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink
    SELECT T.`id`,T.`data`, H.`name`
    FROM datagen_source AS T
    JOIN jdbc_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.id = H.id;