このトピックでは、OceanBaseコネクタの使用方法について説明します。
背景情報
OceanBase は、ネイティブの分散型ハイブリッドトランザクション/分析処理 (HTAP) データベース管理システムです。詳細については、OceanBase 公式ウェブサイトをご参照ください。MySQL または Oracle データベースからの移行時に業務システムを修正するコストを削減するため、OceanBase は Oracle と MySQL の両方の互換モードをサポートしています。これらのモードでは、データの型、SQL 機能、および内部ビューが MySQL または Oracle データベースと互換性があります。各モードで推奨されるコネクタは次のとおりです。
Oracle モード: OceanBase コネクタのみを使用します。
MySQL モード: このモードはネイティブの MySQL 構文との互換性が高くなっています。OceanBase コネクタと MySQL コネクタの両方を使用して、OceanBase のデータを読み書きできます。
重要OceanBase コネクタはパブリックプレビュー段階です。OceanBase 3.2.4.4 以降では、MySQL コネクタを使用して OceanBase のデータを読み書きできます。この機能もパブリックプレビュー段階です。この機能を十分に評価し、注意して使用してください。
MySQL コネクタを使用して OceanBase から増分データを読み取る場合、OceanBase のバイナリロギング (Binlog) が有効になっており、正しく構成されていることを確認する必要があります。OceanBase Binlog の詳細については、「概要」または「Binlog 関連の操作」をご参照ください。
OceanBase コネクタは以下をサポートしています。
カテゴリ | 詳細 |
サポートされているタイプ | ソーステーブル、ディメンションテーブル、結果テーブル |
ランタイムモード | ストリーミングモードおよびバッチモード |
データ形式 | 該当なし |
特定の監視メトリック | なし |
APIタイプ | SQL |
結果テーブルのデータの更新または削除をサポート | はい |
前提条件
接続先のデータベースとテーブルが作成されていること。
IP アドレスホワイトリストが構成済みであること。詳細については、「ホワイトリストグループの構成」をご参照ください。
OceanBase から増分変更データキャプチャ (CDC) データを収集するには、OceanBase Binlog サービスも有効にする必要があります。詳細については、「Binlog 関連の操作」をご参照ください。
結果テーブルにバイパスインポートを使用するには、まずバイパスインポートポートを有効にする必要があります。詳細については、「バイパスインポートのドキュメント」をご参照ください。
制限事項
OceanBase コネクタは、Realtime Compute for Apache Flink (VVR) 8.0.1 以降でサポートされています。
セマンティクス保証
CDC ソーステーブルは exactly-once セマンティクスをサポートしています。これにより、完全な履歴データを読み取ってから Binlog データの読み取りに切り替える際に、データが失われたり重複したりすることがなくなります。障害が発生した場合でも、これらのセマンティクスはデータ処理の正確性を保証します。
結果テーブルは at-least-once セマンティクスをサポートしています。結果テーブルにプライマリキーがある場合、べき等性によりデータの正確性が保証されます。
構文
CREATE TABLE oceanbase_source (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
'connector' = 'oceanbase',
'url' = '<yourJdbcUrl>',
'tableName' = '<yourTableName>',
'userName' = '<yourUserName>',
'password' = '<yourPassword>'
);結果テーブルへの書き込み時、コネクタは受信した各データレコードに対して SQL 文を構築し、実行します。構築される SQL 文の種類は、以下の条件によって決まります。
結果テーブルにプライマリキーがない場合、INSERT INTO 文が構築されます。
結果テーブルにプライマリキーがある場合、データベースの互換モードに基づいて UPSERT 文が構築されます。
WITH パラメーター
全般
パラメーター
説明
必須
データ型
デフォルト値
備考
connector
テーブルのタイプ。
はい
STRING
なし
静的フィールドは
oceanbaseに設定されます。password
パスワード。
はい
STRING
なし
なし。
ソーステーブルにのみ適用されます。
重要Realtime Compute for Apache Flink VVR 11.4.0 以降、OceanBase CDC コネクタのアーキテクチャと機能がアップグレードされました。以下の主要な変更点は、更新内容を理解し、バージョンをスムーズに移行するのに役立ちます。
OceanBase LogProxy サービスをベースにした従来の CDC コネクタは非推奨となり、ディストリビューションから削除されました。VVR 11.4.0 以降、OceanBase CDC コネクタは、OceanBase Binlog サービスを介してのみ増分ログのキャプチャとデータ同期をサポートします。
OceanBase CDC コネクタは、OceanBase Binlog サービスとのプロトコル互換性と接続安定性が強化されています。OceanBase CDC コネクタを優先的に使用することを推奨します。
OceanBase Binlog サービスは、プロトコル層で MySQL レプリケーションプロトコルと完全に互換性があります。標準の MySQL CDC コネクタを OceanBase Binlog サービスに接続して変更追跡を行うこともできますが、これは推奨されません。
Realtime Compute for Apache Flink VVR 11.4.0 以降、OceanBase CDC コネクタは Oracle 互換モードでの増分変更追跡をサポートしなくなりました。Oracle 互換モードでの増分変更追跡については、OceanBase Enterprise Technical Support にお問い合わせください。
パラメーター
説明
必須
データの型
デフォルト値
備考
hostname
OceanBase データベースの IP アドレスまたはホスト名。
はい
STRING
いいえ
VPC アドレスを指定することを推奨します。
説明OceanBase と Realtime Compute for Apache Flink が同じ VPC にない場合は、VPC 間のネットワーク接続を確立するか、パブリックエンドポイントを使用してアクセスする必要があります。詳細については、「ワークスペースの管理と操作」および「フルマネージド Flink クラスターはどのようにインターネットにアクセスしますか?」をご参照ください。
username
OceanBase データベースサービスのユーザー名。
はい
STRING
いいえ
なし。
database-name
OceanBase データベースの名前。
はい
STRING
なし
ソーステーブルとして使用する場合、データベース名は正規表現をサポートし、複数のデータベースからデータを読み取ることができます。
正規表現を使用する場合、先頭と末尾に一致させるための文字 ^ と $ の使用は避けてください。詳細については、table-name パラメーターの備考をご参照ください。
table-name
OceanBase テーブルの名前。
はい
STRING
なし
ソーステーブルとして使用する場合、テーブル名は正規表現をサポートし、複数のテーブルからデータを読み取ることができます。
正規表現を使用する場合、先頭と末尾に一致させるための文字 ^ と $ の使用は避けてください。詳細については、以下の注記をご参照ください。
説明テーブル名を照合する際、OceanBase ソーステーブルコネクタは、指定された database-name と table-name を \\. 文字列 (VVR 8.0.1 より前のバージョンでは . 文字) で連結し、完全なパスの正規表現を形成します。この正規表現は、OceanBase データベース内のテーブルの完全修飾名と照合するために使用されます。
たとえば、'database-name' を 'db_.*' に、'table-name' を 'tb_.+' に設定した場合、コネクタは正規表現 db_.*\\.tb_.+ (または VVR 8.0.1 より前のバージョンでは db_.*.tb_.+) を使用して完全修飾テーブル名を照合し、読み取るテーブルを決定します。
port
OceanBase データベースサービスのポート番号。
いいえ
INTEGER
3306
なし。
server-id
データベースクライアントの数値 ID。
いいえ
STRING
5400 から 6400 の間のランダムな値が生成されます。
この ID はグローバルに一意である必要があります。同じデータベースに接続するジョブごとに異なる ID を設定することを推奨します。
このパラメーターは、5400-5408 のような ID 範囲もサポートします。増分読み取りが有効な場合、各同時リーダーが異なる ID を使用できるように ID 範囲を指定することを推奨します。詳細については、「サーバー ID の使用法」をご参照ください。
scan.incremental.snapshot.chunk.size
各チャンクのサイズ (行数)。
いいえ
INTEGER
8096
増分スナップショット読み取りが有効な場合、テーブルは複数のチャンクに分割されて読み取られます。チャンク内のデータが完全に読み取られるまで、データはメモリにバッファリングされます。
チャンクあたりの行数が少ないと、テーブルの総チャンク数が多くなります。これにより、障害回復の粒度が細かくなりますが、Out-of-Memory (OOM) エラーや全体的なスループットの低下につながる可能性があります。したがって、バランスを取り、適切なチャンクサイズを設定する必要があります。
scan.snapshot.fetch.size
テーブルの完全なデータを読み取る際に一度にプルするレコードの最大数。
いいえ
INTEGER
1024
なし。
scan.startup.mode
データ消費の起動モード。
いいえ
STRING
initial
有効な値:
initial (デフォルト): 最初の起動時にすべての履歴データをスキャンし、その後最新の Binlog データを読み取ります。
latest-offset: 最初の起動時に履歴データをスキャンしません。Binlog の末尾から読み取りを開始します。つまり、コネクタの起動後に行われた最新の変更のみを読み取ります。
earliest-offset: 履歴データをスキャンしません。利用可能な最も古い Binlog から読み取りを開始します。
specific-offset: 履歴データをスキャンしません。指定した特定の Binlog オフセットから開始します。scan.startup.specific-offset.file と scan.startup.specific-offset.pos パラメーターの両方を設定するか、scan.startup.specific-offset.gtid-set パラメーターのみを設定して特定の GTID セットから開始することでオフセットを指定できます。
timestamp: 履歴データをスキャンしません。指定したタイムスタンプから Binlog の読み取りを開始します。タイムスタンプは scan.startup.timestamp-millis パラメーターでミリ秒単位で指定します。
重要earliest-offset、specific-offset、または timestamp 起動モードを使用する場合、指定された Binlog 消費位置とジョブの起動時刻の間で対応するテーブルのスキーマが変更されないことを確認してください。これにより、スキーマの違いによるエラーを防ぎます。
scan.startup.specific-offset.file
特定オフセット起動モードを使用する場合の開始オフセットの Binlog ファイル名。
いいえ
STRING
なし
このパラメーターを使用するには、scan.startup.mode を specific-offset に設定する必要があります。ファイル名の例:
mysql-bin.000003。scan.startup.specific-offset.pos
特定オフセット起動モードを使用する場合の、指定された Binlog ファイル内の開始オフセット。
いいえ
INTEGER
なし
このパラメーターを使用するには、scan.startup.mode を specific-offset に設定する必要があります。
scan.startup.specific-offset.gtid-set
特定オフセット起動モードを使用する場合の開始オフセットの GTID セット。
いいえ
STRING
なし
このパラメーターを使用するには、scan.startup.mode を specific-offset に設定する必要があります。GTID セットの例:
24DA167-0C0C-11E8-8442-00059A3C7B00:1-19。scan.startup.timestamp-millis
タイムスタンプ起動モードを使用する場合の開始オフセットのタイムスタンプ (ミリ秒)。
いいえ
LONG
なし
このパラメーターを使用するには、scan.startup.mode を timestamp に設定する必要があります。単位はミリ秒です。
重要時間を指定すると、OceanBase CDC は各 Binlog ファイルの初期イベントを読み取ってそのタイムスタンプを特定し、指定された時間に対応する Binlog ファイルを見つけようとします。指定されたタイムスタンプの Binlog ファイルがデータベースからクリアされておらず、読み取り可能であることを確認してください。
server-time-zone
データベースが使用するセッションタイムゾーン。
いいえ
STRING
このパラメーターを指定しない場合、システムは Flink ジョブのランタイム環境のタイムゾーンをデータベースサーバーのタイムゾーンとして使用します。これは、選択したゾーンのタイムゾーンです。
例: Asia/Shanghai。このパラメーターは、TIMESTAMP 型が STRING 型にどのように変換されるかを制御します。詳細については、「Debezium の時間型」をご参照ください。
debezium.min.row.count.to.stream.results
テーブルの行数がこの値を超えると、バッチ読み取りモードが使用されます。
いいえ
INTEGER
1000
Flink は、次のいずれかの方法で OceanBase ソーステーブルからデータを読み取ります:
完全読み取り: テーブル全体のデータを直接メモリに読み取ります。この方法は高速ですが、それに応じたメモリを消費します。ソーステーブルが非常に大きい場合、OOM エラーを引き起こす可能性があります。
バッチ読み取り: データを複数のバッチで読み取ります。各バッチには一定数の行が含まれ、すべてのデータが読み取られるまで続きます。この方法は、大きなテーブルを読み取る際の OOM エラーを回避しますが、比較的低速です。
connect.timeout
OceanBase データベースサーバーへの接続がタイムアウトした際に、接続を再試行するまでの最大待機時間。
いいえ
DURATION
30s
なし。
connect.max-retries
OceanBase データベースサービスへの接続に失敗した後の最大リトライ回数。
いいえ
INTEGER
3
なし。
connection.pool.size
データベース接続プールのサイズ。
いいえ
INTEGER
20
データベース接続プールは接続を再利用するために使用され、これによりデータベース接続の数を減らすことができます。
jdbc.properties.*
JDBC URL のカスタム接続パラメーター。
いいえ
STRING
なし
カスタム接続パラメーターを渡すことができます。たとえば、SSL プロトコルを使用しないようにするには、'jdbc.properties.useSSL' = 'false' と設定します。
サポートされている接続パラメーターの詳細については、「MySQL 構成プロパティ」をご参照ください。
debezium.*
Debezium が Binlog データを読み取るためのカスタムパラメーター。
いいえ
STRING
なし
カスタム Debezium パラメーターを渡すことができます。たとえば、'debezium.event.deserialization.failure.handling.mode'='ignore' を使用して、解析エラーの処理ロジックを指定します。
heartbeat.interval
ソースがハートビートイベントを使用して Binlog オフセットを進める時間間隔。
いいえ
DURATION
30s
ハートビートイベントは、ソースの Binlog オフセットを進めるために使用されます。これは、更新が頻繁に行われない OceanBase のテーブルに役立ちます。このようなテーブルでは、Binlog オフセットは自動的に進みません。ハートビートイベントは Binlog オフセットを前進させ、期限切れになるのを防ぎます。Binlog オフセットが期限切れになると、ジョブは失敗し、回復不能になり、ステートレス再起動が必要になります。
scan.incremental.snapshot.chunk.key-column
スナップショットフェーズでチャンクを分割するためのチャンクキーとして使用する列を指定します。
「備考」列をご参照ください。
STRING
なし
このパラメーターは、プライマリキーのないテーブルに必須です。選択された列は非 NULL 型 (NOT NULL) である必要があります。
このパラメーターは、プライマリキーのあるテーブルではオプションです。プライマリキーから 1 つの列のみを選択できます。
scan.incremental.close-idle-reader.enabled
スナップショット完了後にアイドル状態のリーダーを閉じるかどうかを指定します。
いいえ
BOOLEAN
false
Realtime Compute for Apache Flink VVR 8.0.1 以降でのみサポートされます。
この設定を有効にするには、execution.checkpointing.checkpoints-after-tasks-finish.enabled を true に設定します。
scan.read-changelog-as-append-only.enabled
変更ログストリームを追加専用ストリームに変換するかどうかを指定します。
いいえ
BOOLEAN
false
有効な値:
true: INSERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER を含むすべてのタイプのメッセージが INSERT メッセージに変換されます。これは、先祖テーブルから削除メッセージを保存する必要がある場合など、特別なシナリオでのみ有効にしてください。
false (デフォルト):すべてのタイプのメッセージがそのままダウンストリームに送信されます。
説明Realtime Compute for Apache Flink VVR 8.0.8 以降でのみサポートされます。
scan.only.deserialize.captured.tables.changelog.enabled
増分フェーズ中に、指定されたテーブルの変更イベントのみを逆シリアル化するかどうかを指定します。
いいえ
BOOLEAN
VVR 8.x バージョンではデフォルト値は false です。
VVR 11.1 以降のバージョンではデフォルト値は true です。
有効な値:
true:ターゲットテーブルの変更データのみを逆シリアル化し、Binlog の読み取りを高速化します。
false (デフォルト):すべてのテーブルの変更データを逆シリアル化します。
説明Realtime Compute for Apache Flink VVR 8.0.7 以降でのみサポートされます。
Realtime Compute for Apache Flink VVR 8.0.8 以前で使用する場合、パラメーター名を debezium.scan.only.deserialize.captured.tables.changelog.enable に変更する必要があります。
scan.parse.online.schema.changes.enabled
増分フェーズ中に、ApsaraDB RDS のロックレス変更に対する DDL イベントの解析を試みるかどうかを指定します。
いいえ
BOOLEAN
false
有効な値:
true: ApsaraDB RDS のロックレス変更に対する DDL イベントを解析します。
false (デフォルト): ApsaraDB RDS のロックレス変更に対する DDL イベントを解析しません。
これは実験的な機能です。オンラインのロックレス変更を実行する前に、回復のために Flink ジョブのスナップショットを取得することを推奨します。
説明Realtime Compute for Apache Flink VVR 11.1 以降でのみサポートされます。
scan.incremental.snapshot.backfill.skip
スナップショット読み取りフェーズ中にバックフィルをスキップするかどうかを指定します。
いいえ
BOOLEAN
false
有効な値:
true: スナップショット読み取りフェーズ中にバックフィルをスキップします。
false (デフォルト): スナップショット読み取りフェーズ中にバックフィルをスキップしません。
バックフィルをスキップすると、スナップショットフェーズ中のテーブルへの変更は、スナップショットにマージされるのではなく、後の増分フェーズで読み取られます。
重要バックフィルをスキップすると、スナップショットフェーズ中に発生した変更が再実行される可能性があるため、データの一貫性が損なわれる可能性があります。at-least-once セマンティクスのみが保証されます。
説明Realtime Compute for Apache Flink VVR 11.1 以降でのみサポートされます。
scan.incremental.snapshot.unbounded-chunk-first.enabled
スナップショット読み取りフェーズ中に、無制限チャンクを最初に配布するかどうかを指定します。
いいえ
BOOLEAN
false
有効な値:
true: スナップショット読み取りフェーズ中に無制限チャンクを最初に配布します。
false (デフォルト): スナップショット読み取りフェーズ中に無制限チャンクを最初に配布しません。
これは実験的な機能です。有効にすると、スナップショットフェーズ中に TaskManager が最後のチャンクを同期する際の OOM エラーのリスクを軽減できます。ジョブを初めて開始する前にこのパラメーターを追加することを推奨します。
説明Realtime Compute for Apache Flink VVR 11.1 以降でのみサポートされます。
ディメンションテーブルのみ
パラメーター
説明
必須
データ型
デフォルト値
備考
url
JDBC URL。
はい
STRING
なし
URL には、MySQL データベース名または Oracle サービス名を含める必要があります。
userName
ユーザー名。
はい
STRING
なし
なし。
cache
キャッシュポリシー。
いいえ
STRING
ALL
次の 3 つのキャッシュポリシーがサポートされています:
ALL: ディメンションテーブルのすべてのデータをキャッシュします。ジョブが実行される前に、システムはディメンションテーブルのすべてのデータをキャッシュにロードします。その後のディメンションテーブルデータのルックアップはすべてキャッシュを介して行われます。キャッシュにデータが見つからない場合、そのキーは存在しません。キャッシュが期限切れになると、完全なキャッシュが再読み込みされます。
このポリシーは、リモートテーブルが小さく、ミスキー (ソーステーブルとディメンションテーブルを結合する際の ON 条件が関連付けられない) が多いシナリオに適しています。
LRU: ディメンションテーブルのデータの一部をキャッシュします。ソーステーブルの各レコードについて、システムはまずキャッシュでデータを検索します。見つからない場合は、物理的なディメンションテーブルを検索します。このキャッシュポリシーを使用する場合、cacheSize パラメーターを設定する必要があります。
None:キャッシュなし。
重要ALL キャッシュポリシーを使用する場合、OOM エラーを防ぐためにノードのメモリサイズに注意してください。
システムはディメンションテーブルのデータを非同期でロードするため、ALL キャッシュポリシーを使用する場合、ディメンションテーブル結合ノードのメモリを増やす必要があります。増やすメモリサイズは、リモートテーブルデータのサイズの 2 倍にする必要があります。
cacheSize
キャッシュされるエントリの最大数。
いいえ
INTEGER
100000
LRU キャッシュポリシーを選択した場合は、キャッシュサイズを設定する必要があります。
ALL キャッシュポリシーを選択した場合は、キャッシュサイズを設定する必要はありません。
cacheTTLMs
キャッシュのタイムアウト期間。
いいえ
LONG
Long.MAX_VALUE
cacheTTLMs の構成は cache パラメーターに依存します:
cache が None に設定されている場合、cacheTTLMs を構成する必要はありません。これは、キャッシュがタイムアウトしないことを意味します。
cache が LRU に設定されている場合、cacheTTLMs はキャッシュのタイムアウト期間です。デフォルトでは、キャッシュは期限切れになりません。
cache が ALL に設定されている場合、cacheTTLMs はキャッシュの再読み込み時間です。デフォルトでは、キャッシュは再読み込みされません。
maxRetryTimeout
最大リトライ時間。
いいえ
DURATION
60s
なし。
結果テーブル: JDBC のみ
パラメーター
説明
必須
データ型
デフォルト値
備考
userName
ユーザー名。
はい
STRING
なし
なし。
compatibleMode
OceanBase の互換モード。
いいえ
STRING
mysql
有効な値:
mysql
oracle
説明これは OceanBase 固有のパラメーターです。
url
JDBC URL。
はい
STRING
なし
URL には、MySQL データベース名または Oracle サービス名を含める必要があります。
tableName
テーブル名。
はい
STRING
なし
なし。
sink.mode
OceanBase 結果テーブルの書き込みモード。
はい
STRING
jdbc
jdbcとdirect-loadをサポートします。maxRetryTimes
再試行の最大回数。
いいえ
INTEGER
3
なし。
poolInitialSize
データベース接続プールの初期サイズ。
いいえ
INTEGER
1
なし。
poolMaxActive
データベース接続プール内の最大接続数。
いいえ
INTEGER
8
なし。
poolMaxWait
データベース接続プールから接続を取得するまでの最大待機時間。
いいえ
INTEGER
2000
単位はミリ秒です。
poolMinIdle
データベース接続プール内のアイドル接続の最小数。
いいえ
INTEGER
1
なし。
connectionProperties
JDBC の接続プロパティ。
いいえ
STRING
なし
フォーマットは "k1=v1;k2=v2;k3=v3" です。
ignoreDelete
データ削除操作を無視するかどうかを指定します。
いいえ
Boolean
false
なし。
excludeUpdateColumns
除外する列の名前を指定します。これらの列は更新操作中に更新されません。
いいえ
STRING
なし
複数の列を指定する場合は、カンマ (,) で区切ります。例:
excludeUpdateColumns=column1,column2。説明この値には常にプライマリキー列が含まれます。有効になる列は、指定した列とプライマリキー列です。
partitionKey
パーティションキー。
いいえ
STRING
なし
パーティションキーが設定されている場合、コネクタはまずパーティションキーでデータをグループ化し、各グループは個別にデータベースに書き込まれます。このグループ化は modRule 処理の前に行われます。
modRule
グループ化ルール。
いいえ
STRING
なし
グループ化ルールのフォーマットは "column_name mod number" である必要があります。例:
user_id mod 8。列の型は数値である必要があります。グループ化ルールが設定されている場合、データはまず partitionKey でパーティション分割されます。各パーティション内で、modRule 計算の結果に基づいてさらにグループ化されます。
bufferSize
データバッファーのサイズ。
いいえ
INTEGER
1000
なし。
flushIntervalMs
キャッシュをフラッシュする時間間隔。指定された時間待機してもキャッシュ内のデータが出力条件を満たさない場合、システムは自動的にキャッシュ内のすべてのデータを出力します。
いいえ
LONG
1000
なし。
retryIntervalMs
最大リトライ時間。
いいえ
INTEGER
5000
単位はミリ秒です。
結果テーブルへのバイパスインポート専用です。
結果テーブルのバイパスインポートは、Ververica Runtime (VVR) 11.5 以降で利用できます。バイパスインポートの詳細については、「こちらのドキュメント」をご参照ください。
有限ストリームのみをサポート:データソースは有限ストリームである必要があります。無限ストリームはサポートされていません。パフォーマンスを向上させるために、Flink バッチモードを使用できます。
高スループット書き込み:このメソッドは、大規模バッチデータインポートシナリオに適しています。
インポート中のテーブルロック:バイパスインポートを実行すると、ターゲットテーブルがロックされます。テーブルがロックされている間は、データ変更の書き込みおよび DDL の変更はブロックされます。データクエリは影響を受けません。
リアルタイム書き込みには非対応:リアルタイムまたはストリーム書き込みシナリオでは、Java Database Connectivity (JDBC) 結果テーブルを使用してください。
パラメーター | 説明 | 必須 | データ型 | デフォルト値 | 備考 |
sink.mode | OceanBase 結果テーブルにデータを書き込むためのメソッド | いいえ | STRING | jdbc | `jdbc` モードと `direct-load` モードをサポートしています。バイパスインポートを使用して OceanBase の結果テーブルにデータを書き込むには、このパラメーターを静的フィールド `direct-load` に設定します。 |
host | OceanBase データベースの IP アドレスまたはホスト名です。 | はい | STRING | なし | なし。 |
port | OceanBase データベースの RPC ポートです。 | いいえ | INTEGER | 2882 | なし。 |
username | ユーザー名です。 | はい | STRING | なし | なし。 |
tenant-name | OceanBase データベースのテナント名です。 | はい | STRING | なし | |
schema-name |
| はい | STRING | なし | なし。 |
table-name | OceanBase テーブルの名前です。 | はい | STRING | なし | なし。 |
parallel | バイパスインポートタスクのサーバー側の同時実行数です。 | いいえ | INTEGER | 8 |
|
buffer-size | バイパスインポートタスクで OceanBase に書き込む際のバッファーサイズです。 | いいえ | INTEGER | 1024 | Flink は |
dup-action | バイパスインポートタスク中に重複するプライマリキーを処理するためのポリシーです。有効な値は | いいえ | STRING | REPLACE |
|
load-method | バイパスインポートモードです。 | full |
| ||
max-error-rows | バイパスインポートタスクが許容できるエラー行の最大数です。 | いいえ | LONG | 0 | 次の場合、行はエラー行と見なされます:
|
timeout | バイパスインポートタスク全体のタイムアウト期間です。 | いいえ | DURATION | 7d | |
heartbeat-timeout | バイパスインポートタスクのクライアント側のハートビートタイムアウトです。 | いいえ | DURATION | 60s | |
heartbeat-interval | バイパスインポートタスクのクライアント側のハートビート間隔です。 | いいえ | DURATION | 10s |
型のマッピング
MySQL 互換モード
OceanBase フィールド型
Flink フィールド型
TINYINT
TINYINT
SMALLINT
SMALLINT
TINYINT 符号なし
INT
INT
MEDIUMINT
符号なし SMALLINT
BIGINT
BIGINT
符号なし INT
BIGINT UNSIGNED
DECIMAL(20, 0)
REAL
FLOAT
FLOAT
DOUBLE
DOUBLE
NUMERIC(p, s)
DECIMAL(p, s)
説明p <= 38 の場合。
DECIMAL(p, s)
BOOLEAN
BOOLEAN
TINYINT(1)
DATE
DATE
TIME [(p)]
TIME [(p)] [WITHOUT TIME ZONE]
DATETIME [(p)]
TIMESTAMP [(p)] [WITHOUT TIME ZONE]
TIMESTAMP [(p)]
CHAR(n)
CHAR(n)
VARCHAR(n)
VARCHAR(n)
BIT(n)
BINARY(⌈n/8⌉)
BINARY(n)
BINARY(n)
VARBINARY(N)
VARBINARY(N)
TINYTEXT
STRING
テキスト
MEDIUMTEXT
長いテキスト
TINYBLOB
BYTES
重要Flink は、サイズが 2,147,483,647 (2^31 - 1) バイト以下の BLOB レコードをサポートします。
BLOB
MEDIUMBLOB
LONGBLOB
Oracle 互換モード
OceanBase フィールド型
Flink フィールド型
NUMBER(p, s <= 0), p - s < 3
TINYINT
NUMBER(p, s <= 0), p - s < 5
SMALLINT
NUMBER(p, s <= 0), p - s < 10
INT
NUMBER(p, s <= 0), p - s < 19
BIGINT
NUMBER(p, s <= 0), 19 <= p - s <= 38
DECIMAL(p - s, 0)
NUMBER(p, s > 0)
DECIMAL(p, s)
NUMBER(p, s <= 0), p - s > 38
STRING
FLOAT
FLOAT
BINARY_FLOAT
BINARY_DOUBLE
DOUBLE
NUMBER(1)
BOOLEAN
DATE
TIMESTAMP [(p)] [WITHOUT TIME ZONE]
TIMESTAMP [(p)]
CHAR(n)
STRING
NCHAR(n)
NVARCHAR2(n)
VARCHAR(n)
VARCHAR2(n)
CLOB
BLOB
BYTES
ROWID
使用例
ソーステーブルとシンクテーブル
-- OceanBase CDC ソーステーブル CREATE TEMPORARY TABLE oceanbase_source ( a INT, b VARCHAR, c VARCHAR ) WITH ( 'connector' = 'oceanbase', 'hostname' = '<yourHostname>', 'port' = '3306', 'username' = '<yourUsername>', 'password' = '<yourPassword>', 'database-name' = '<yourDatabaseName>', 'table-name' = '<yourTableName>' ); -- OceanBase JDBC シンクテーブル CREATE TEMPORARY TABLE oceanbase_sink ( a INT, b VARCHAR, c VARCHAR ) WITH ( 'connector' = 'oceanbase', 'url' = '<yourJdbcUrl>', 'userName' = '<yourUserName>', 'password' = '<yourPassword>', 'tableName' = '<yourTableName>' ); -- OceanBase ダイレクトロードシンクテーブル CREATE TEMPORARY TABLE oceanbase_directload_sink ( a INT, b VARCHAR, c VARCHAR ) WITH ( 'connector' = 'oceanbase', 'sink.mode' = 'direct-load', 'host' = '<yourHost>', 'port' = 'yourPort', 'tenant-name' = '<yourTenantName>', 'schema-name' = '<yourSchemaName>', 'table-name' = '<yourTableName>', 'username' = '<yourUsername>', 'password' = '<yourPassword>' ); BEGIN STATEMENT SET; INSERT INTO oceanbase_sink SELECT * FROM oceanbase_source; END;ディメンションテーブル
CREATE TEMPORARY TABLE datagen_source( a INT, b BIGINT, c STRING, `proctime` AS PROCTIME() ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE oceanbase_dim ( a INT, b VARCHAR, c VARCHAR ) WITH ( 'connector' = 'oceanbase', 'url' = '<yourJdbcUrl>', // JDBC URL を指定します 'userName' = '<yourUserName>', // ユーザー名を指定します 'password' = '${secret_values.password}', // パスワードを指定します 'tableName' = '<yourTableName>' // テーブル名を指定します ); CREATE TEMPORARY TABLE blackhole_sink( a INT, b STRING ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT T.a, H.b FROM datagen_source AS T JOIN oceanbase_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.a = H.a;
参照
Flink でサポートされているコネクタのリストについては、「サポートされているコネクタ」をご参照ください。