このトピックでは、OceanBaseコネクタの使用方法について説明します。
背景情報
OceanBase は、ネイティブの分散型ハイブリッドトランザクション/分析処理 (HTAP) データベース管理システムです。詳細については、OceanBase の公式サイトをご参照ください。OceanBase は、Oracle と MySQL の両方の互換モードをサポートしています。これにより、MySQL または Oracle データベースから移行する際の業務システムのリファクタリングコストを削減できます。これらのモードのデータの型、SQL の特徴、および内部ビューは、MySQL または Oracle のものと一致しています。各モードで推奨されるコネクタは次のとおりです:
Oracle モード:OceanBase コネクタのみを使用できます。
MySQL モード:このモードはネイティブの MySQL 構文と高い互換性があります。OceanBase と MySQL コネクタの両方を使用して、OceanBase への読み書きができます。
重要OceanBase コネクタはパブリックプレビュー段階です。OceanBase 3.2.4.4 以降では、MySQL コネクタを使用して OceanBase への読み書きができます。この機能もパブリックプレビュー段階です。使用する前に、この機能を慎重に評価してください。
MySQL コネクタを使用して OceanBase から増分データを読み取る場合は、OceanBase のバイナリログ (Binlog) が有効になっており、正しく構成されていることを確認してください。OceanBase Binlog の詳細については、「概要」または「バイナリログ関連の操作」をご参照ください。
次の表に、OceanBase コネクタでサポートされる情報を示します。
カテゴリ | 詳細 |
サポートされているタイプ | ソーステーブル、ディメンションテーブル、およびシンクテーブル |
実行モード | ストリーミングとバッチ |
データ形式 | 該当なし |
特定の監視メトリック | なし |
APIタイプ | SQL |
結果テーブル内のデータの更新または削除をサポート | はい |
前提条件
接続先のデータベースとテーブルが作成済みであること。
IP アドレスホワイトリストが構成済みであること。詳細については、「ホワイトリストグループの構成」をご参照ください。
OceanBase から変更データキャプチャ (CDC) を使用して増分データを収集するには、OceanBase Binlog サービスも有効にする必要があります。詳細については、「バイナリログ関連の操作」をご参照ください。
制限事項
OceanBase コネクタは、Realtime Compute for Apache Flink with Ververica Runtime (VVR) 8.0.1 以降でサポートされています。
at-least-once セマンティクスが保証されます。結果テーブルにプライマリキーがある場合、べき等性によってデータの正確性が保証されます。
構文
CREATE TABLE oceanabse_source (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
'connector' = 'oceanbase',
'url' = '<yourJdbcUrl>', // JDBC URL を指定します
'tableName' = '<yourTableName>', // テーブル名を指定します
'userName' = '<yourUserName>', // ユーザー名を指定します
'password' = '<yourPassword>' // パスワードを指定します
);コネクタは、受信した各データレコードに対して SQL 文を構築して実行することで、結果テーブルに書き込みます。
結果テーブルにプライマリキーがない場合は、INSERT INTO 文が生成されます。
結果テーブルにプライマリキーがある場合は、データベースの互換モードに基づいて UPSERT 文が生成されます。
WITH パラメーター
全般
パラメーター
説明
必須
データ型
デフォルト値
備考
connector
テーブルのタイプ。
はい
STRING
なし
値は
oceanbaseである必要があります。password
パスワード。
はい
STRING
なし
なし。
ソース固有
重要注:VVR 11.4.0 以降の Realtime Compute for Apache Flink から、OceanBase CDC コネクタは大幅なアーキテクチャのアップグレードと機能調整が行われました。更新内容を理解し、バージョンをスムーズに移行するために、主な変更点を以下に説明します:
OceanBase LogProxy サービスに基づく元の CDC コネクタは、正式に非推奨となり、ディストリビューションから削除されました。VVR 11.4.0 以降、OceanBase CDC コネクタは、OceanBase Binlog サービスを介してのみ増分ログのキャプチャとデータ同期をサポートします。
OceanBase CDC コネクタは、OceanBase Binlog サービスとのプロトコル互換性と接続安定性が強化されています。そのため、OceanBase CDC コネクタの使用を推奨します。
OceanBase Binlog サービスは、MySQL レプリケーションプロトコルと完全に互換性があります。標準の MySQL CDC コネクタを使用して OceanBase Binlog サービスに接続して変更追跡を行うこともできますが、これは推奨されません。
VVR 11.4.0 以降の Realtime Compute for Apache Flink から、OceanBase CDC コネクタは Oracle 互換モードでの増分データサブスクリプションをサポートしなくなりました。Oracle 互換モードでの増分データサブスクリプションについては、OceanBase のエンタープライズテクニカルサポートにお問い合わせください。
パラメーター
説明
必須
データの型
デフォルト値
備考
hostname
OceanBase データベースの IP アドレスまたはホスト名。
はい
STRING
いいえ
VPC アドレスを指定することを推奨します。
説明OceanBase と Realtime Compute for Apache Flink が同じ VPC にない場合は、まず VPC 間ネットワーク接続を確立するか、インターネットを使用してアクセスする必要があります。詳細については、「ストレージの管理と操作」および「フルマネージド Flink クラスターはどのようにインターネットにアクセスできますか?」をご参照ください。
username
OceanBase データベースサービスのユーザー名。
はい
STRING
いいえ
なし。
database-name
OceanBase データベースの名前。
はい
STRING
なし
ソーステーブルとして、データベース名は正規表現をサポートしており、複数のデータベースからデータを読み取ることができます。
正規表現を使用する場合、先頭と末尾を一致させるために ^ および $ 記号を使用することは避けてください。その理由については、table-name パラメーターの備考をご参照ください。
table-name
OceanBase によって示されます。
はい
STRING
なし
ソーステーブルとして、テーブル名は正規表現をサポートしており、複数のテーブルからデータを読み取ることができます。
正規表現を使用する場合、先頭と末尾を一致させるために ^ および $ 記号を使用することは避けてください。その理由については、以下の注記をご参照ください。
説明OceanBase ソーステーブルが正規表現でテーブル名を照合する場合、指定した database-name と table-name を文字列 \\. (VVR 8.0.1 より前は文字 . が使用されていました) を使用して完全パスの正規表現に結合します。この結合された正規表現は、OceanBase データベース内のテーブルの完全修飾名を照合するために使用されます。
たとえば、'database-name'='db_.*' および 'table-name'='tb_.+' と設定した場合、コネクタは正規表現 db_.*\\.tb_.+ (またはバージョン 8.0.1 より前は db_.*.tb_.+) を使用して完全修飾テーブル名を照合し、どのテーブルを読み取るかを決定します。
port
OceanBase データベースサービスのポート番号。
いいえ
INTEGER
3306
なし。
server-id
データベースクライアントの数値 ID。
いいえ
STRING
デフォルトでは 5400 から 6400 の間のランダムな値が生成されます。
この ID はグローバルに一意である必要があります。同じデータベースに接続する各ジョブには、異なる ID を設定することを推奨します。
このパラメーターは、5400-5408 のような ID 範囲形式もサポートします。増分読み取りが有効な場合、複数の同時読み取りがサポートされます。この場合、各同時タスクが異なる ID を使用するように ID 範囲を設定することを推奨します。詳細については、「サーバー ID の使用法」をご参照ください。
scan.incremental.snapshot.chunk.size
各チャンクのサイズ (行数)。
いいえ
INTEGER
8096
増分スナップショット読み取りが有効な場合、テーブルは読み取りのために複数のチャンクに分割されます。チャンクのデータは、完全に読み取られるまでメモリにバッファリングされます。
チャンクサイズが小さいと、テーブルの総チャンク数が多くなります。これにより、よりきめ細かい障害回復が可能になりますが、メモリ不足 (OOM) エラーや全体的なスループットの低下につながる可能性があります。したがって、バランスを見つけて適切なチャンクサイズを設定する必要があります。
scan.snapshot.fetch.size
テーブルの完全データを読み取る際に、各バッチでプルするレコードの最大数。
いいえ
INTEGER
1024
なし。
scan.startup.mode
データ消費の起動モード。
いいえ
STRING
initial
有効な値:
initial (デフォルト):最初の起動時に履歴の完全データをスキャンし、その後最新の Binlog データを読み取ります。
latest-offset:最初の起動時に履歴の完全データをスキャンしません。Binlog の末尾 (最新の Binlog 位置) から読み取りを開始します。つまり、コネクタの起動後に行われた最新の変更のみを読み取ります。
earliest-offset:履歴の完全データをスキャンしません。利用可能な最も古い Binlog 位置から読み取りを開始します。
specific-offset:履歴の完全データをスキャンしません。指定した特定の Binlog オフセットから開始します。scan.startup.specific-offset.file と scan.startup.specific-offset.pos パラメーターの両方を設定するか、scan.startup.specific-offset.gtid-set パラメーターのみを設定して特定の GTID セットから開始することでオフセットを指定できます。
timestamp:履歴の完全データをスキャンしません。指定したタイムスタンプから Binlog の読み取りを開始します。タイムスタンプは scan.startup.timestamp-millis でミリ秒単位で指定します。
重要earliest-offset、specific-offset、または timestamp 起動モードを使用する場合、指定された Binlog 消費位置とジョブの起動時間の間に対応するテーブルのスキーマが変更されないようにしてください。これにより、スキーマの違いによるエラーを回避できます。
scan.startup.specific-offset.file
特定オフセットモードを使用する場合の開始オフセットの Binlog ファイル名。
いいえ
STRING
なし
このパラメーターを使用する場合、scan.startup.mode は specific-offset に設定する必要があります。ファイル名の例は
mysql-bin.000003です。scan.startup.specific-offset.pos
特定オフセットモードを使用する場合の、指定された Binlog ファイル内の開始オフセット。
いいえ
INTEGER
なし
このパラメーターを使用する場合、scan.startup.mode は specific-offset に設定する必要があります。
scan.startup.specific-offset.gtid-set
特定オフセットモードを使用する場合の開始オフセットの GTID セット。
いいえ
STRING
なし
このパラメーターを使用する場合、scan.startup.mode は specific-offset に設定する必要があります。GTID セットの形式例は
24DA167-0C0C-11E8-8442-00059A3C7B00:1-19です。scan.startup.timestamp-millis
特定時間モードを使用する場合の開始オフセットのタイムスタンプ (ミリ秒)。
いいえ
LONG
なし
このパラメーターを使用する場合、scan.startup.mode は timestamp に設定する必要があります。単位はミリ秒です。
重要特定の時間を使用する場合、OceanBase CDC は各 Binlog ファイルの初期イベントを読み取ってそのタイムスタンプを決定し、最終的に指定された時間に対応する Binlog ファイルを特定しようとします。指定されたタイムスタンプの Binlog ファイルがデータベースからクリアされておらず、読み取り可能であることを確認してください。
server-time-zone
データベースが使用するセッションタイムゾーン。
いいえ
STRING
このパラメーターを指定しない場合、システムは Flink ジョブのランタイムの環境タイムゾーンをデータベースサーバーのタイムゾーンとして使用します。これは、選択したゾーンのタイムゾーンです。
例:Asia/Shanghai。このパラメーターは、TIMESTAMP 型が STRING 型にどのように変換されるかを制御します。詳細については、「Debezium の時間値」をご参照ください。
debezium.min.row.count.to.stream.results
テーブルの行数がこの値より大きい場合、バッチ読み取りモードが使用されます。
いいえ
INTEGER
1000
Flink は、次のいずれかの方法で OceanBase ソーステーブルからデータを読み取ります:
完全読み取り:テーブル全体のデータを直接メモリに読み込みます。これは高速ですが、対応する量のメモリを消費します。ソーステーブルが非常に大きい場合、OOM エラーのリスクがあります。
バッチ読み取り:すべてのデータが読み取られるまで、1 バッチあたり特定の行数で、複数のバッチでデータを読み取ります。これにより、大きなテーブルでの OOM リスクを回避できますが、比較的低速です。
connect.timeout
OceanBase データベースサーバーへの接続がタイムアウトした後、接続リトライまでに待機する最大時間。
いいえ
DURATION
30s
なし。
connect.max-retries
OceanBase データベースサービスへの接続に失敗した後の最大リトライ回数。
いいえ
INTEGER
3
なし。
connection.pool.size
データベース接続プールのサイズ。
いいえ
INTEGER
20
データベース接続プールは接続を再利用するために使用され、これによりデータベース接続の数を減らすことができます。
jdbc.properties.*
JDBC URL のカスタム接続パラメーター。
いいえ
STRING
なし
カスタム接続パラメーターを渡すことができます。たとえば、SSL プロトコルを使用しないようにするには、'jdbc.properties.useSSL' = 'false' と設定します。
サポートされている接続パラメーターの詳細については、「MySQL 構成プロパティ」をご参照ください。
debezium.*
Debezium がバイナリログを読み取るためのカスタムパラメーター。
いいえ
STRING
なし
カスタム Debezium パラメーターを渡すことができます。たとえば、'debezium.event.deserialization.failure.handling.mode'='ignore' を使用して、解析エラーの処理ロジックを指定します。
heartbeat.interval
ソースがハートビートイベントを使用して Binlog オフセットを進める間隔。
いいえ
DURATION
30s
ハートビートイベントは、ソースの Binlog オフセットを進めるために使用され、OceanBase の更新が遅いテーブルに非常に役立ちます。このようなテーブルでは、Binlog オフセットは自動的に進みません。ハートビートイベントは Binlog オフセットを前進させ、期限切れになるのを防ぎます。Binlog オフセットが期限切れになると、ジョブが失敗して回復不能になり、ステートレス再起動が必要になる場合があります。
scan.incremental.snapshot.chunk.key-column
スナップショットフェーズ中にチャンクを分割するために使用する列。
備考を参照
STRING
なし
プライマリキーのないテーブルでは必須です。選択された列は非 NULL 型 (NOT NULL) である必要があります。
プライマリキーのあるテーブルではオプションです。プライマリキーから 1 つの列のみを選択できます。
scan.incremental.close-idle-reader.enabled
スナップショットフェーズが終了した後にアイドルリーダーを閉じるかどうかを指定します。
いいえ
BOOLEAN
false
VVR 8.0.1 以降を使用する Realtime Compute for Apache Flink でのみサポートされます。
このパラメーターを有効にするには、execution.checkpointing.checkpoints-after-tasks-finish.enabled を true に設定します。
scan.read-changelog-as-append-only.enabled
変更ログデータストリームを追加専用データストリームに変換するかどうかを指定します。
いいえ
BOOLEAN
false
有効な値:
true:すべてのタイプのメッセージ (INSERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER を含む) が INSERT メッセージに変換されます。これは、先祖テーブルから削除メッセージを保存する必要がある場合など、特別なシナリオでのみ有効にしてください。
false (デフォルト):すべてのタイプのメッセージがそのままダウンストリームに送信されます。
説明VVR 8.0.8 以降を使用する Realtime Compute for Apache Flink でのみサポートされます。
scan.only.deserialize.captured.tables.changelog.enabled
増分フェーズ中に、指定されたテーブルの変更イベントのみを逆シリアル化するかどうかを指定します。
いいえ
BOOLEAN
VVR 8.x バージョンではデフォルト値は false です。
VVR 11.1 以降のバージョンではデフォルト値は true です。
有効な値:
true:ターゲットテーブルの変更データのみを逆シリアル化し、Binlog の読み取りを高速化します。
false (デフォルト):すべてのテーブルの変更データを逆シリアル化します。
説明VVR 8.0.7 以降を使用する Realtime Compute for Apache Flink でのみサポートされます。
VVR 8.0.8 以前を使用する Realtime Compute for Apache Flink でこのパラメーターを使用する場合、パラメーター名を debezium.scan.only.deserialize.captured.tables.changelog.enable に変更してください。
scan.parse.online.schema.changes.enabled
増分フェーズ中に RDS のロックレス変更の DDL イベントを解析するかどうかを指定します。
いいえ
BOOLEAN
false
有効な値:
true:RDS のロックレス変更の DDL イベントを解析します。
false (デフォルト):RDS のロックレス変更の DDL イベントを解析しません。
これは実験的な機能です。オンラインでロックレス変更を実行する前に、復旧のために Flink ジョブのスナップショットを作成することを推奨します。
説明VVR 11.1 以降を使用する Realtime Compute for Apache Flink でのみサポートされます。
scan.incremental.snapshot.backfill.skip
スナップショット読み取りフェーズ中にバックフィルをスキップするかどうかを指定します。
いいえ
BOOLEAN
false
有効な値:
true:スナップショット読み取りフェーズ中にバックフィルをスキップします。
false (デフォルト):スナップショット読み取りフェーズ中にバックフィルをスキップしません。
バックフィルをスキップすると、スナップショットフェーズ中のテーブルへの変更は、スナップショットにマージされるのではなく、後の増分フェーズで読み取られます。
重要バックフィルをスキップすると、スナップショットフェーズ中に発生した変更が再実行される可能性があるため、データの一貫性が損なわれる可能性があります。at-least-once セマンティクスのみが保証されます。
説明VVR 11.1 以降を使用する Realtime Compute for Apache Flink でのみサポートされます。
scan.incremental.snapshot.unbounded-chunk-first.enabled
スナップショット読み取りフェーズ中に無制限チャンクを最初に配布するかどうかを指定します。
いいえ
BOOLEAN
false
有効な値:
true:スナップショット読み取りフェーズ中に無制限チャンクを最初に配布します。
false (デフォルト):スナップショット読み取りフェーズ中に無制限チャンクを最初に配布しません。
これは実験的な機能です。これを有効にすると、TaskManager がスナップショットフェーズ中に最後のチャンクを同期する際の OOM エラーのリスクを軽減できます。ジョブの最初の起動前にこのパラメーターを追加することを推奨します。
説明VVR 11.1 以降を使用する Realtime Compute for Apache Flink でのみサポートされます。
ディメンションテーブル固有
パラメーター
説明
必須
データ型
デフォルト値
備考
url
JDBC URL。
はい
STRING
なし
URL には、MySQL データベース名または Oracle サービス名を含める必要があります。
userName
ユーザー名。
はい
STRING
なし
なし。
cache
キャッシュポリシー。
いいえ
STRING
ALL
次の 3 つのキャッシュポリシーがサポートされています:
ALL:ディメンションテーブルのすべてのデータをキャッシュします。ジョブが実行される前に、システムはディメンションテーブルからすべてのデータをキャッシュにロードします。その後のディメンションテーブルデータのルックアップはすべてキャッシュを介して行われます。キャッシュにデータが見つからない場合、そのキーは存在しません。キャッシュの有効期限が切れると、完全なキャッシュが再読み込みされます。
このポリシーは、リモートテーブルのデータ量が少なく、欠落しているキーが多い (ソーステーブルとディメンションテーブルが ON 条件で結合できない) シナリオに適しています。
LRU:ディメンションテーブルのデータの一部をキャッシュします。ソーステーブルの各レコードについて、システムはまずキャッシュでデータを検索します。見つからない場合は、物理的なディメンションテーブルをクエリします。このキャッシュポリシーを使用する場合、cacheSize パラメーターを構成する必要があります。
None:キャッシュなし。
重要ALL キャッシュポリシーを使用する場合、OOM エラーを防ぐためにノードのメモリサイズを監視してください。
システムはディメンションテーブルのデータを非同期でロードするため、ALL キャッシュポリシーを使用する場合、ディメンションテーブル結合ノードのメモリを増やす必要があります。増やすメモリサイズは、リモートテーブルのデータ量の 2 倍にする必要があります。
cacheSize
キャッシュされるエントリの最大数。
いいえ
INTEGER
100000
LRU キャッシュポリシーを選択した場合は、キャッシュサイズを設定する必要があります。
ALL キャッシュポリシーを選択した場合は、キャッシュサイズを設定する必要はありません。
cacheTTLMs
キャッシュの生存時間 (TTL)。
いいえ
LONG
Long.MAX_VALUE
cacheTTLMs の構成は cache パラメーターに依存します:
cache が None に設定されている場合、cacheTTLMs を構成する必要はありません。これは、キャッシュがタイムアウトしないことを意味します。
cache が LRU に設定されている場合、cacheTTLMs はキャッシュの TTL です。デフォルトでは、キャッシュは期限切れになりません。
cache が ALL に設定されている場合、cacheTTLMs はキャッシュの再読み込み時間です。デフォルトでは、キャッシュは再読み込みされません。
maxRetryTimeout
最大リトライ時間。
いいえ
DURATION
60s
なし。
シンク固有
パラメーター
説明
必須
データ型
デフォルト値
備考
userName
ユーザー名。
はい
STRING
なし
なし。
compatibleMode
OceanBase の互換モード。
いいえ
STRING
mysql
有効な値:
mysql
oracle
説明これは OceanBase 固有のパラメーターです。
url
JDBC URL。
はい
STRING
なし
URL には、MySQL データベース名または Oracle サービス名を含める必要があります。
tableName
テーブル名。
はい
STRING
なし
なし。
maxRetryTimes
再試行の最大回数。
いいえ
INTEGER
3
なし。
poolInitialSize
データベース接続プールの初期サイズ。
いいえ
INTEGER
1
なし。
poolMaxActive
データベース接続プール内の最大接続数。
いいえ
INTEGER
8
なし。
poolMaxWait
データベース接続プールから接続を取得するまでの最大待機時間。
いいえ
INTEGER
2000
単位: ミリ秒。
poolMinIdle
データベース接続プール内のアイドル接続の最小数。
いいえ
INTEGER
1
なし。
connectionProperties
JDBC接続プロパティ。
いいえ
STRING
なし
フォーマットは "k1=v1;k2=v2;k3=v3" です。
ignoreDelete
DELETE 操作を無視するかどうかを指定します。
いいえ
Boolean
false
なし。
excludeUpdateColumns
除外する列の名前を指定します。これらの列は更新操作中に更新されません。
いいえ
STRING
なし
複数の列を無視するように指定する場合は、カンマ (,) で区切ります。例:
excludeUpdateColumns=column1,column2。説明この値には常にプライマリキー列が含まれます。実際に有効になる列は、指定した列とプライマリキー列です。
partitionKey
パーティションキー。
いいえ
STRING
なし
パーティションキーが設定されている場合、コネクタはまずパーティションキーでデータをグループ化します。各グループはその後、個別にデータベースに書き込まれます。このグループ化は modRule の前に処理されます。
modRule
グループ化ルール。
いいえ
STRING
なし
グループ化ルールは "column_name mod number" の形式である必要があります。例:
user_id mod 8。列は数値型である必要があります。グループ化ルールが設定されている場合、データはまず partitionKey でパーティション分割されます。各パーティション内で、データは modRule 計算の結果に基づいてグループ化されます。
bufferSize
データバッファーのサイズ。
いいえ
INTEGER
1000
なし。
flushIntervalMs
キャッシュをフラッシュする間隔。指定された待機時間の後、キャッシュ内のデータが出力条件を満たさない場合、システムは自動的にキャッシュ内のすべてのデータを出力します。
いいえ
LONG
1000
なし。
retryIntervalMs
最大リトライ時間。
いいえ
INTEGER
5000
単位: ミリ秒。
型のマッピング
MySQL 互換モード
OceanBase フィールド型
Flink フィールド型
TINYINT
TINYINT
SMALLINT
SMALLINT
TINYINT 符号なし
INT
INT
MEDIUMINT
符号なし SMALLINT
BIGINT
BIGINT
符号なし INT
BIGINT UNSIGNED
DECIMAL(20, 0)
REAL
FLOAT
FLOAT
DOUBLE
DOUBLE
NUMERIC(p, s)
DECIMAL(p, s)
説明p は 38 以下である必要があります。
DECIMAL(p, s)
BOOLEAN
BOOLEAN
TINYINT(1)
DATE
DATE
TIME [(p)]
TIME [(p)] [WITHOUT TIME ZONE]
DATETIME [(p)]
TIMESTAMP [(p)] [WITHOUT TIME ZONE]
TIMESTAMP [(p)]
CHAR(n)
CHAR(n)
VARCHAR(n)
VARCHAR(n)
BIT(n)
BINARY(⌈n/8⌉)
BINARY(n)
BINARY(n)
VARBINARY(N)
VARBINARY(N)
TINYTEXT
STRING
テキスト
MEDIUMTEXT
STRING
TINYBLOB
BYTES
重要Flink は、2,147,483,647 (2^31 - 1) バイト以下の BLOB 型レコードのみをサポートします。
BLOB
MEDIUMBLOB
LONGBLOB
Oracle 互換モード
OceanBase フィールド型
Flink フィールド型
NUMBER(p, s <= 0), p - s < 3
TINYINT
NUMBER(p, s <= 0), p - s < 5
SMALLINT
NUMBER(p, s <= 0), p - s < 10
INT
NUMBER(p, s <= 0), p - s < 19
BIGINT
NUMBER(p, s <= 0), 19 <= p - s <= 38
DECIMAL(p - s, 0)
NUMBER(p, s > 0)
DECIMAL(p, s)
NUMBER(p, s <= 0), p - s > 38
STRING
FLOAT
FLOAT
BINARY_FLOAT
BINARY_DOUBLE
DOUBLE
NUMBER(1)
BOOLEAN
DATE
TIMESTAMP [(p)] [WITHOUT TIMEZONE]
TIMESTAMP [(p)]
CHAR(n)
STRING
NCHAR(n)
NVARCHAR2(n)
VARCHAR(n)
VARCHAR2(n)
CLOB
BLOB
BYTES
ROWID
使用例
ソーステーブルとシンクテーブル
-- OceanBase CDC ソーステーブル CREATE TEMPORARY TABLE oceanbase_source ( a INT, b VARCHAR, c VARCHAR ) WITH ( 'connector' = 'oceanbase', 'hostname' = '<yourHostname>', 'port' = '3306', 'username' = '<yourUsername>', 'password' = '<yourPassword>', 'database-name' = '<yourDatabaseName>', 'table-name' = '<yourTableName>' ); -- OceanBase 結果テーブル CREATE TEMPORARY TABLE oceanbase_sink ( a INT, b VARCHAR, c VARCHAR ) WITH ( 'connector' = 'oceanbase', 'url' = '<yourJdbcUrl>', 'userName' = '<yourUserName>', 'password' = '<yourPassword>', 'tableName' = '<yourTableName>' ); BEGIN STATEMENT SET; INSERT INTO oceanbase_sink SELECT * FROM oceanbase_source; END;ディメンションテーブル
CREATE TEMPORARY TABLE datagen_source( a INT, b BIGINT, c STRING, `proctime` AS PROCTIME() ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE oceanbase_dim ( a INT, b VARCHAR, c VARCHAR ) WITH ( 'connector' = 'oceanbase', 'url' = '<yourJdbcUrl>', // JDBC URL を指定します 'userName' = '<yourUserName>', // ユーザー名を指定します 'password' = '${secret_values.password}', // パスワードを指定します 'tableName' = '<yourTableName>' // テーブル名を指定します ); CREATE TEMPORARY TABLE blackhole_sink( a INT, b STRING ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT T.a, H.b FROM datagen_source AS T JOIN oceanbase_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.a = H.a;
参照
Flink がサポートするコネクタの詳細については、「サポートされるコネクタ」をご参照ください。