このトピックでは、Java Database Connectivity (JDBC) コネクタの使用方法について説明します。
背景情報
このコネクタは、オープンソースの Flink JDBC コネクタです。MySQL、PostgreSQL、Oracle などの一般的なデータベースからデータを読み取ったり、データを書き込んだりすることができます。次の表に、JDBC コネクタの機能を示します。
カテゴリ | 説明 |
サポートされるタイプ | ソーステーブル、ディメンションテーブル、結果テーブル |
実行モード | ストリーミングモードとバッチモード |
データフォーマット | 該当なし |
特定の監視メトリック | なし |
API タイプ | SQL |
結果テーブルでのデータの更新または削除のサポート | はい |
前提条件
接続先のデータベースとテーブルを作成しておく必要があります。
制限事項
JDBC ソーステーブルは有界データソースです。すべてのデータが読み取られると、タスクは自動的に終了します。リアルタイムの変更をキャプチャするには、Change Data Capture (CDC) コネクタを使用します。詳細については、「MySQL CDC ソーステーブル」および「PostgreSQL CDC ソーステーブル (パブリックプレビュー)」をご参照ください。
PostgreSQL の結果テーブルにデータを書き込む場合、データベースのバージョンは 9.5 以降である必要があります。以前のバージョンでは ON CONFLICT 構文がサポートされていないため、書き込み操作は失敗します。
Flink には、組み込みのデータベースドライバーは含まれていません。データベースドライバーの JAR パッケージを追加の依存関係として手動でアップロードする必要があります。サポートされているドライバーを次の表に示します。
ドライバー
グループ ID
アーティファクト ID
MySQL
mysql
Oracle
com.oracle.database.jdbc
PostgreSQL
org.postgresql
表に記載されていない JDBC ドライバーを使用する場合は、使用前にその有効性と可用性をテストする必要があります。
JDBC コネクタが MySQL の結果テーブルにデータを書き込む場合、受信した各レコードを連結して 1 つの SQL 文にし、それを実行します。プライマリキーを含む MySQL の結果テーブルに対しては、次の文が実行されます:
INSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...) ON DUPLICATE KEY UPDATE column1 = VALUES(column1), column2 = VALUES(column2), ...;。警告テーブルにプライマリキーではない一意なインデックスがある場合、プライマリキーは異なるが、一意なインデックスの値が同じレコードを挿入すると、競合が発生します。この競合によりデータが上書きされ、データ損失につながります。
構文
CREATE TABLE jdbc_table (
`id` BIGINT,
`name` VARCHAR,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:xxx',
'table-name' = '<yourTable>',
'username' = '<yourUsername>',
'password' = '<yourPassword>'
);パラメーター
全般
パラメーター
説明
データの型
必須
デフォルト値
注意
connector
テーブルのタイプ。
String
はい
なし
値は `jdbc` である必要があります。
url
データベースの URL。
String
はい
なし
なし。
table-name
JDBC テーブルの名前。
String
はい
なし
なし。
username
JDBC 接続のユーザー名。
String
いいえ
なし
`username` または `password` パラメーターのいずれかを指定した場合は、両方を指定する必要があります。
password
JDBC 接続のパスワード。
String
いいえ
なし
ソーステーブル固有
パラメーター
説明
データの型
必須
デフォルト値
注意
scan.partition.column
入力のパーティション分割に使用される列の名前。
String
いいえ
なし
この列は、数値型またはタイムスタンプ型である必要があります。また、この型はデータベース内の数値型との比較をサポートしている必要があります。パーティションスキャンの詳細については、「パーティションスキャン」をご参照ください。
scan.partition.num
パーティションの数。
Integer
いいえ
なし
なし。
scan.partition.lower-bound
最初のパーティションの最小値。
Long
いいえ
なし
なし。
scan.partition.upper-bound
最後のパーティションの最大値。
Long
いいえ
なし
なし。
scan.fetch-size
1 回の読み取りループでデータベースからフェッチする行数。
Integer
いいえ
0
このパラメーターを 0 に設定すると、無視されます。
scan.auto-commit
自動コミットを有効にするかどうかを指定します。
Boolean
いいえ
true
なし。
結果テーブル固有
パラメーター
説明
データの型
必須
デフォルト値
注意
sink.buffer-flush.max-rows
フラッシュする前にキャッシュするレコードの最大数。
Integer
いいえ
100
このパラメーターを 0 に設定すると、キャッシュが無効になります。レコードはすぐにフラッシュされます。
sink.buffer-flush.interval
フラッシュ間隔。データがこの間隔を超えて Flink にキャッシュされると、非同期スレッドがデータをデータベースにフラッシュします。
Duration
いいえ
1000
単位:ミリ秒 (ms)。
このパラメーターを 0 に設定すると、キャッシュが無効になります。レコードはすぐにフラッシュされます。
説明キャッシュされたフラッシュイベントを完全に非同期で処理するには、sink.buffer-flush.max-rows を 0 に設定し、適切なフラッシュ間隔を設定します。
sink.max-retries
データベースへのレコードの書き込みが失敗した場合の最大再試行回数。
Integer
いいえ
3
なし。
sink.ignore-delete
削除メッセージを無視するかどうかを指定します。
Boolean
いいえ
false
このパラメーターは V11.4 以降でサポートされています。デフォルトでは、削除メッセージは無視されません。
sink.ignore-delete-mode
無視された削除メッセージを処理するためのポリシー。
String
いいえ
ALL
このパラメーターは、`sink.ignore-delete` が `true` に設定されている場合にのみ有効です。
有効な値:
ALL (デフォルト):-D および -U メッセージを無視します。
REAL_DELETE:-D メッセージのみを無視します。
UPDATE_BEFORE:-U メッセージのみを無視します。
このパラメーターは V11.4 以降でサポートされています。
ディメンションテーブル固有
パラメーター
説明
データの型
必須
デフォルト値
注意
lookup.cache.max-rows
キャッシュする行の最大数。この値を超えると、最も古い行が有効期限切れになり、新しいレコードに置き換えられます。
Integer
いいえ
なし
デフォルトでは、ディメンションテーブルのキャッシュは無効になっています。lookup.cache.max-rows と lookup.cache.ttl パラメーターを設定して、ディメンションテーブルのキャッシュを有効にします。キャッシュが有効な場合、Least Recently Used (LRU) ポリシーが使用されます。
lookup.cache.ttl
キャッシュ内の各レコードの最大存続可能時間 (TTL)。レコードがこの時間を超えると、有効期限切れになります。
Duration
いいえ
なし
lookup.cache.caching-missing-key
空のクエリ結果をキャッシュするかどうかを指定します。
Boolean
いいえ
true
有効な値:
true (デフォルト):空のクエリ結果をキャッシュします。
false:空のクエリ結果をキャッシュしません。
lookup.max-retries
データベースクエリが失敗した場合の最大再試行回数。
Integer
いいえ
3
なし。
PostgreSQL 固有
パラメーター
説明
データの型
必須
デフォルト値
注意
source.extend-type.enabled
ソーステーブルまたはディメンションテーブルとして使用する場合、JSONB や UUID などの拡張型を読み取り、Flink がサポートする型にマッピングすることを許可するかどうかを指定します。
Boolean
いいえ
false
有効な値:
true:拡張型の読み取りとマッピングをサポートします。
false (デフォルト):拡張型の読み取りとマッピングをサポートしません。
説明ディメンションテーブルクエリの外部キーフィールドが UUID 型の場合、URL で stringtype=unspecified も設定する必要があります。これにより、PostgreSQL サーバーは実際のデータ型に基づいて自動的にクエリを実行します。
データ型のマッピング
MySQL 型 | Oracle 型 | PostgreSQL 型 | Flink SQL 型 |
TINYINT | N/A | N/A | TINYINT |
| N/A |
| SMALLINT |
| N/A |
| INT |
| N/A |
| BIGINT |
BIGINT UNSIGNED | N/A | N/A | DECIMAL(20, 0) |
BIGINT | N/A | BIGINT | BIGINT |
FLOAT | BINARY_FLOAT |
| FLOAT |
| BINARY_DOUBLE |
| DOUBLE |
|
|
| DECIMAL(p, s) |
| N/A | BOOLEAN can | BOOLEAN |
DATE | DATE | DATE | DATE |
TIME [(p)] | DATE | TIME [(p)] [WITHOUT TIMEZONE] | TIME [(p)] [WITHOUT TIMEZONE] |
DATETIME [(p)] | TIMESTAMP [(p)] [WITHOUT TIMEZONE] | TIMESTAMP [(p)] [WITHOUT TIMEZONE] | TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
|
|
| STRING |
|
| BYTEA | BYTES |
N/A | N/A | ARRAY | ARRAY |
例
ソーステーブル
CREATE TEMPORARY TABLE jdbc_source ( `id` INT, `name` VARCHAR ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:xxx', 'table-name' = '<yourTable>', 'username' = '<yourUsername>', 'password' = '<yourPassword>' ); CREATE TEMPORARY TABLE blackhole_sink( `id` INT, `name` VARCHAR ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT * FROM jdbc_source ;結果テーブル
CREATE TEMPORARY TABLE datagen_source ( `name` VARCHAR, `age` INT ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE jdbc_sink ( `name` VARCHAR, `age` INT ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:xxxx', 'table-name' = '<yourTable>', 'username' = '<yourUsername>', 'password' = '<yourPassword>' ); INSERT INTO jdbc_sink SELECT * FROM datagen_source;ディメンションテーブル
CREATE TEMPORARY TABLE datagen_source( `id` INT, `data` BIGINT, `proctime` AS PROCTIME() ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE jdbc_dim ( `id` INT, `name` VARCHAR ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:xxx', 'table-name' = '<yourTable>', 'username' = '<yourUsername>', 'password' = '<yourPassword>' ); CREATE TEMPORARY TABLE blackhole_sink( `id` INT, `data` BIGINT, `name` VARCHAR ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT T.`id`,T.`data`, H.`name` FROM datagen_source AS T JOIN jdbc_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.id = H.id;