すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:JDBC

最終更新日:Dec 17, 2025

このトピックでは、Java Database Connectivity (JDBC) コネクタの使用方法について説明します。

背景情報

このコネクタは、オープンソースの Flink JDBC コネクタです。MySQL、PostgreSQL、Oracle などの一般的なデータベースからデータを読み取ったり、データを書き込んだりすることができます。次の表に、JDBC コネクタの機能を示します。

カテゴリ

説明

サポートされるタイプ

ソーステーブル、ディメンションテーブル、結果テーブル

実行モード

ストリーミングモードとバッチモード

データフォーマット

該当なし

特定の監視メトリック

なし

API タイプ

SQL

結果テーブルでのデータの更新または削除のサポート

はい

前提条件

接続先のデータベースとテーブルを作成しておく必要があります。

制限事項

  • JDBC ソーステーブルは有界データソースです。すべてのデータが読み取られると、タスクは自動的に終了します。リアルタイムの変更をキャプチャするには、Change Data Capture (CDC) コネクタを使用します。詳細については、「MySQL CDC ソーステーブル」および「PostgreSQL CDC ソーステーブル (パブリックプレビュー)」をご参照ください。

  • PostgreSQL の結果テーブルにデータを書き込む場合、データベースのバージョンは 9.5 以降である必要があります。以前のバージョンでは ON CONFLICT 構文がサポートされていないため、書き込み操作は失敗します。

  • Flink には、組み込みのデータベースドライバーは含まれていません。データベースドライバーの JAR パッケージを追加の依存関係として手動でアップロードする必要があります。サポートされているドライバーを次の表に示します。

    ドライバー

    グループ ID

    アーティファクト ID

    MySQL

    mysql

    mysql-connector-java

    Oracle

    com.oracle.database.jdbc

    ojdbc8

    PostgreSQL

    org.postgresql

    postgresql

    表に記載されていない JDBC ドライバーを使用する場合は、使用前にその有効性と可用性をテストする必要があります。
  • JDBC コネクタが MySQL の結果テーブルにデータを書き込む場合、受信した各レコードを連結して 1 つの SQL 文にし、それを実行します。プライマリキーを含む MySQL の結果テーブルに対しては、次の文が実行されます:INSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...) ON DUPLICATE KEY UPDATE column1 = VALUES(column1), column2 = VALUES(column2), ...;

    警告

    テーブルにプライマリキーではない一意なインデックスがある場合、プライマリキーは異なるが、一意なインデックスの値が同じレコードを挿入すると、競合が発生します。この競合によりデータが上書きされ、データ損失につながります。

構文

CREATE TABLE jdbc_table (
  `id` BIGINT,
  `name` VARCHAR,
   PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:xxx',
  'table-name' = '<yourTable>',
  'username' = '<yourUsername>',
  'password' = '<yourPassword>'
);

パラメーター

  • 全般

    パラメーター

    説明

    データの型

    必須

    デフォルト値

    注意

    connector

    テーブルのタイプ。

    String

    はい

    なし

    値は `jdbc` である必要があります。

    url

    データベースの URL。

    String

    はい

    なし

    なし。

    table-name

    JDBC テーブルの名前。

    String

    はい

    なし

    なし。

    username

    JDBC 接続のユーザー名。

    String

    いいえ

    なし

    `username` または `password` パラメーターのいずれかを指定した場合は、両方を指定する必要があります。

    password

    JDBC 接続のパスワード。

    String

    いいえ

    なし

  • ソーステーブル固有

    パラメーター

    説明

    データの型

    必須

    デフォルト値

    注意

    scan.partition.column

    入力のパーティション分割に使用される列の名前。

    String

    いいえ

    なし

    この列は、数値型またはタイムスタンプ型である必要があります。また、この型はデータベース内の数値型との比較をサポートしている必要があります。パーティションスキャンの詳細については、「パーティションスキャン」をご参照ください。

    scan.partition.num

    パーティションの数。

    Integer

    いいえ

    なし

    なし。

    scan.partition.lower-bound

    最初のパーティションの最小値。

    Long

    いいえ

    なし

    なし。

    scan.partition.upper-bound

    最後のパーティションの最大値。

    Long

    いいえ

    なし

    なし。

    scan.fetch-size

    1 回の読み取りループでデータベースからフェッチする行数。

    Integer

    いいえ

    0

    このパラメーターを 0 に設定すると、無視されます。

    scan.auto-commit

    自動コミットを有効にするかどうかを指定します。

    Boolean

    いいえ

    true

    なし。

  • 結果テーブル固有

    パラメーター

    説明

    データの型

    必須

    デフォルト値

    注意

    sink.buffer-flush.max-rows

    フラッシュする前にキャッシュするレコードの最大数。

    Integer

    いいえ

    100

    このパラメーターを 0 に設定すると、キャッシュが無効になります。レコードはすぐにフラッシュされます。

    sink.buffer-flush.interval

    フラッシュ間隔。データがこの間隔を超えて Flink にキャッシュされると、非同期スレッドがデータをデータベースにフラッシュします。

    Duration

    いいえ

    1000

    単位:ミリ秒 (ms)。

    このパラメーターを 0 に設定すると、キャッシュが無効になります。レコードはすぐにフラッシュされます。

    説明

    キャッシュされたフラッシュイベントを完全に非同期で処理するには、sink.buffer-flush.max-rows を 0 に設定し、適切なフラッシュ間隔を設定します。

    sink.max-retries

    データベースへのレコードの書き込みが失敗した場合の最大再試行回数。

    Integer

    いいえ

    3

    なし。

    sink.ignore-delete

    削除メッセージを無視するかどうかを指定します。

    Boolean

    いいえ

    false

    このパラメーターは V11.4 以降でサポートされています。デフォルトでは、削除メッセージは無視されません。

    sink.ignore-delete-mode

    無視された削除メッセージを処理するためのポリシー。

    String

    いいえ

    ALL

    このパラメーターは、`sink.ignore-delete` が `true` に設定されている場合にのみ有効です。

    有効な値:

    • ALL (デフォルト):-D および -U メッセージを無視します。

    • REAL_DELETE:-D メッセージのみを無視します。

    • UPDATE_BEFORE:-U メッセージのみを無視します。

    このパラメーターは V11.4 以降でサポートされています。

  • ディメンションテーブル固有

    パラメーター

    説明

    データの型

    必須

    デフォルト値

    注意

    lookup.cache.max-rows

    キャッシュする行の最大数。この値を超えると、最も古い行が有効期限切れになり、新しいレコードに置き換えられます。

    Integer

    いいえ

    なし

    デフォルトでは、ディメンションテーブルのキャッシュは無効になっています。lookup.cache.max-rowslookup.cache.ttl パラメーターを設定して、ディメンションテーブルのキャッシュを有効にします。キャッシュが有効な場合、Least Recently Used (LRU) ポリシーが使用されます。

    lookup.cache.ttl

    キャッシュ内の各レコードの最大存続可能時間 (TTL)。レコードがこの時間を超えると、有効期限切れになります。

    Duration

    いいえ

    なし

    lookup.cache.caching-missing-key

    空のクエリ結果をキャッシュするかどうかを指定します。

    Boolean

    いいえ

    true

    有効な値:

    • true (デフォルト):空のクエリ結果をキャッシュします。

    • false:空のクエリ結果をキャッシュしません。

    lookup.max-retries

    データベースクエリが失敗した場合の最大再試行回数。

    Integer

    いいえ

    3

    なし。

  • PostgreSQL 固有

    パラメーター

    説明

    データの型

    必須

    デフォルト値

    注意

    source.extend-type.enabled

    ソーステーブルまたはディメンションテーブルとして使用する場合、JSONB や UUID などの拡張型を読み取り、Flink がサポートする型にマッピングすることを許可するかどうかを指定します。

    Boolean

    いいえ

    false

    有効な値:

    • true:拡張型の読み取りとマッピングをサポートします。

    • false (デフォルト):拡張型の読み取りとマッピングをサポートしません。

    説明

    ディメンションテーブルクエリの外部キーフィールドが UUID 型の場合、URL で stringtype=unspecified も設定する必要があります。これにより、PostgreSQL サーバーは実際のデータ型に基づいて自動的にクエリを実行します。

データ型のマッピング

MySQL 型

Oracle 型

PostgreSQL 型

Flink SQL 型

TINYINT

N/A

N/A

TINYINT

  • SMALLINT

  • TINYINT UNSIGNED

N/A

  • SMALLINT

  • INT2

  • SMALLSERIAL

  • SERIAL2

SMALLINT

  • INT

  • MEDIUMINT

  • SMALLINT UNSIGNED

N/A

  • INTEGER

  • SERIAL

INT

  • BIGINT

  • INT UNSIGNED

N/A

  • BIGINT

  • BIGSERIAL

BIGINT

BIGINT UNSIGNED

N/A

N/A

DECIMAL(20, 0)

BIGINT

N/A

BIGINT

BIGINT

FLOAT

BINARY_FLOAT

  • REAL

  • FLOAT4

FLOAT

  • DOUBLE

  • DOUBLE PRECISION

BINARY_DOUBLE

  • FLOAT8

  • DOUBLE PRECISION

DOUBLE

  • NUMERIC(p, s)

  • DECIMAL(p, s)

  • SMALLINT

  • FLOAT(s)

  • DOUBLE PRECISION

  • REAL

  • NUMBER(p, s)

  • NUMERIC(p, s)

  • DECIMAL(p, s)

DECIMAL(p, s)

  • BOOLEAN

  • TINYINT(1)

N/A

BOOLEAN can

BOOLEAN

DATE

DATE

DATE

DATE

TIME [(p)]

DATE

TIME [(p)] [WITHOUT TIMEZONE]

TIME [(p)] [WITHOUT TIMEZONE]

DATETIME [(p)]

TIMESTAMP [(p)] [WITHOUT TIMEZONE]

TIMESTAMP [(p)] [WITHOUT TIMEZONE]

TIMESTAMP [(p)] [WITHOUT TIMEZONE]

  • CHAR(n)

  • VARCHAR(n)

  • TEXT

  • CHAR(n)

  • VARCHAR(n)

  • CLOB

  • CHAR(n)

  • CHARACTER(n)

  • VARCHAR(n)

  • CHARACTER VARYING(n)

  • TEXT

  • JSONB

  • UUID

STRING

  • BINARY

  • VARBINARY

  • BLOB

  • RAW(s)

  • BLOB

BYTEA

BYTES

N/A

N/A

ARRAY

ARRAY

  • ソーステーブル

    CREATE TEMPORARY TABLE jdbc_source (
      `id` INT,
      `name` VARCHAR
    ) WITH (
      'connector' = 'jdbc',
      'url' = 'jdbc:xxx',
      'table-name' = '<yourTable>',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
      `id` INT,
      `name` VARCHAR
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink SELECT * FROM jdbc_source ;
  • 結果テーブル

    CREATE TEMPORARY TABLE datagen_source (
      `name` VARCHAR,
      `age` INT
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE jdbc_sink (
      `name` VARCHAR,
      `age` INT
    ) WITH (
      'connector' = 'jdbc',
      'url' = 'jdbc:xxxx',
      'table-name' = '<yourTable>',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>'
    );
    
    INSERT INTO jdbc_sink
    SELECT * FROM datagen_source;
  • ディメンションテーブル

    CREATE TEMPORARY TABLE datagen_source(
     `id` INT,
     `data` BIGINT,
     `proctime` AS PROCTIME()
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE jdbc_dim (
      `id` INT,
      `name` VARCHAR
    ) WITH (
      'connector' = 'jdbc',
      'url' = 'jdbc:xxx',
      'table-name' = '<yourTable>',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
      `id` INT,
      `data` BIGINT,
      `name` VARCHAR
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink
    SELECT T.`id`,T.`data`, H.`name`
    FROM datagen_source AS T
    JOIN jdbc_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.id = H.id;