すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:OceanBase (パブリックプレビュー)

最終更新日:Jan 10, 2026

このトピックでは、OceanBaseコネクタの使用方法について説明します。

背景情報

OceanBase は、ネイティブの分散型ハイブリッドトランザクション/分析処理 (HTAP) データベース管理システムです。詳細については、OceanBase 公式ウェブサイトをご参照ください。MySQL または Oracle データベースからの移行時に業務システムを修正するコストを削減するため、OceanBase は Oracle と MySQL の両方の互換モードをサポートしています。これらのモードでは、データの型、SQL 機能、および内部ビューが MySQL または Oracle データベースと互換性があります。各モードで推奨されるコネクタは次のとおりです。

  • Oracle モード: OceanBase コネクタのみを使用します。

  • MySQL モード: このモードはネイティブの MySQL 構文との互換性が高くなっています。OceanBase コネクタと MySQL コネクタの両方を使用して、OceanBase のデータを読み書きできます。

    重要
    • OceanBase コネクタはパブリックプレビュー段階です。OceanBase 3.2.4.4 以降では、MySQL コネクタを使用して OceanBase のデータを読み書きできます。この機能もパブリックプレビュー段階です。この機能を十分に評価し、注意して使用してください。

    • MySQL コネクタを使用して OceanBase から増分データを読み取る場合、OceanBase のバイナリロギング (Binlog) が有効になっており、正しく構成されていることを確認する必要があります。OceanBase Binlog の詳細については、「概要」または「Binlog 関連の操作」をご参照ください。

OceanBase コネクタは以下をサポートしています。

カテゴリ

詳細

サポートされているタイプ

ソーステーブル、ディメンションテーブル、結果テーブル

ランタイムモード

ストリーミングモードおよびバッチモード

データ形式

該当なし

特定の監視メトリック

なし

APIタイプ

SQL

結果テーブルのデータの更新または削除をサポート

はい

前提条件

  • 接続先のデータベースとテーブルが作成されていること。

  • IP アドレスホワイトリストが構成済みであること。詳細については、「ホワイトリストグループの構成」をご参照ください。

  • OceanBase から増分変更データキャプチャ (CDC) データを収集するには、OceanBase Binlog サービスも有効にする必要があります。詳細については、「Binlog 関連の操作」をご参照ください。

  • 結果テーブルにバイパスインポートを使用するには、まずバイパスインポートポートを有効にする必要があります。詳細については、「バイパスインポートのドキュメント」をご参照ください。

制限事項

  • OceanBase コネクタは、Realtime Compute for Apache Flink (VVR) 8.0.1 以降でサポートされています。

  • セマンティクス保証

    • CDC ソーステーブルは exactly-once セマンティクスをサポートしています。これにより、完全な履歴データを読み取ってから Binlog データの読み取りに切り替える際に、データが失われたり重複したりすることがなくなります。障害が発生した場合でも、これらのセマンティクスはデータ処理の正確性を保証します。

    • 結果テーブルは at-least-once セマンティクスをサポートしています。結果テーブルにプライマリキーがある場合、べき等性によりデータの正確性が保証されます。

構文

CREATE TABLE oceanbase_source (
   order_id INT,
   order_date TIMESTAMP(0),
   customer_name STRING,
   price DECIMAL(10, 5),
   product_id INT,
   order_status BOOLEAN,
   PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
  'connector' = 'oceanbase',
  'url' = '<yourJdbcUrl>',
  'tableName' = '<yourTableName>',
  'userName' = '<yourUserName>',
  'password' = '<yourPassword>'
);
説明

結果テーブルへの書き込み時、コネクタは受信した各データレコードに対して SQL 文を構築し、実行します。構築される SQL 文の種類は、以下の条件によって決まります。

  • 結果テーブルにプライマリキーがない場合、INSERT INTO 文が構築されます。

  • 結果テーブルにプライマリキーがある場合、データベースの互換モードに基づいて UPSERT 文が構築されます。

WITH パラメーター

  • 全般

    パラメーター

    説明

    必須

    データ型

    デフォルト値

    備考

    connector

    テーブルのタイプ。

    はい

    STRING

    なし

    静的フィールドは oceanbase に設定されます。

    password

    パスワード。

    はい

    STRING

    なし

    なし。

  • ソーステーブルにのみ適用されます。

    重要

    Realtime Compute for Apache Flink VVR 11.4.0 以降、OceanBase CDC コネクタのアーキテクチャと機能がアップグレードされました。以下の主要な変更点は、更新内容を理解し、バージョンをスムーズに移行するのに役立ちます。

    • OceanBase LogProxy サービスをベースにした従来の CDC コネクタは非推奨となり、ディストリビューションから削除されました。VVR 11.4.0 以降、OceanBase CDC コネクタは、OceanBase Binlog サービスを介してのみ増分ログのキャプチャとデータ同期をサポートします。

    • OceanBase CDC コネクタは、OceanBase Binlog サービスとのプロトコル互換性と接続安定性が強化されています。OceanBase CDC コネクタを優先的に使用することを推奨します。

      OceanBase Binlog サービスは、プロトコル層で MySQL レプリケーションプロトコルと完全に互換性があります。標準の MySQL CDC コネクタを OceanBase Binlog サービスに接続して変更追跡を行うこともできますが、これは推奨されません。

    • Realtime Compute for Apache Flink VVR 11.4.0 以降、OceanBase CDC コネクタは Oracle 互換モードでの増分変更追跡をサポートしなくなりました。Oracle 互換モードでの増分変更追跡については、OceanBase Enterprise Technical Support にお問い合わせください。

    パラメーター

    説明

    必須

    データの型

    デフォルト値

    備考

    hostname

    OceanBase データベースの IP アドレスまたはホスト名。

    はい

    STRING

    いいえ

    VPC アドレスを指定することを推奨します。

    説明

    OceanBase と Realtime Compute for Apache Flink が同じ VPC にない場合は、VPC 間のネットワーク接続を確立するか、パブリックエンドポイントを使用してアクセスする必要があります。詳細については、「ワークスペースの管理と操作」および「フルマネージド Flink クラスターはどのようにインターネットにアクセスしますか?」をご参照ください。

    username

    OceanBase データベースサービスのユーザー名。

    はい

    STRING

    いいえ

    なし。

    database-name

    OceanBase データベースの名前。

    はい

    STRING

    なし

    • ソーステーブルとして使用する場合、データベース名は正規表現をサポートし、複数のデータベースからデータを読み取ることができます。

    • 正規表現を使用する場合、先頭と末尾に一致させるための文字 ^$ の使用は避けてください。詳細については、table-name パラメーターの備考をご参照ください。

    table-name

    OceanBase テーブルの名前。

    はい

    STRING

    なし

    • ソーステーブルとして使用する場合、テーブル名は正規表現をサポートし、複数のテーブルからデータを読み取ることができます。

    • 正規表現を使用する場合、先頭と末尾に一致させるための文字 ^$ の使用は避けてください。詳細については、以下の注記をご参照ください。

    説明

    テーブル名を照合する際、OceanBase ソーステーブルコネクタは、指定された database-nametable-name\\. 文字列 (VVR 8.0.1 より前のバージョンでは . 文字) で連結し、完全なパスの正規表現を形成します。この正規表現は、OceanBase データベース内のテーブルの完全修飾名と照合するために使用されます。

    たとえば、'database-name' を 'db_.*' に、'table-name' を 'tb_.+' に設定した場合、コネクタは正規表現 db_.*\\.tb_.+ (または VVR 8.0.1 より前のバージョンでは db_.*.tb_.+) を使用して完全修飾テーブル名を照合し、読み取るテーブルを決定します。

    port

    OceanBase データベースサービスのポート番号。

    いいえ

    INTEGER

    3306

    なし。

    server-id

    データベースクライアントの数値 ID。

    いいえ

    STRING

    5400 から 6400 の間のランダムな値が生成されます。

    この ID はグローバルに一意である必要があります。同じデータベースに接続するジョブごとに異なる ID を設定することを推奨します。

    このパラメーターは、5400-5408 のような ID 範囲もサポートします。増分読み取りが有効な場合、各同時リーダーが異なる ID を使用できるように ID 範囲を指定することを推奨します。詳細については、「サーバー ID の使用法」をご参照ください。

    scan.incremental.snapshot.chunk.size

    各チャンクのサイズ (行数)。

    いいえ

    INTEGER

    8096

    増分スナップショット読み取りが有効な場合、テーブルは複数のチャンクに分割されて読み取られます。チャンク内のデータが完全に読み取られるまで、データはメモリにバッファリングされます。

    チャンクあたりの行数が少ないと、テーブルの総チャンク数が多くなります。これにより、障害回復の粒度が細かくなりますが、Out-of-Memory (OOM) エラーや全体的なスループットの低下につながる可能性があります。したがって、バランスを取り、適切なチャンクサイズを設定する必要があります。

    scan.snapshot.fetch.size

    テーブルの完全なデータを読み取る際に一度にプルするレコードの最大数。

    いいえ

    INTEGER

    1024

    なし。

    scan.startup.mode

    データ消費の起動モード。

    いいえ

    STRING

    initial

    有効な値:

    • initial (デフォルト): 最初の起動時にすべての履歴データをスキャンし、その後最新の Binlog データを読み取ります。

    • latest-offset: 最初の起動時に履歴データをスキャンしません。Binlog の末尾から読み取りを開始します。つまり、コネクタの起動後に行われた最新の変更のみを読み取ります。

    • earliest-offset: 履歴データをスキャンしません。利用可能な最も古い Binlog から読み取りを開始します。

    • specific-offset: 履歴データをスキャンしません。指定した特定の Binlog オフセットから開始します。scan.startup.specific-offset.filescan.startup.specific-offset.pos パラメーターの両方を設定するか、scan.startup.specific-offset.gtid-set パラメーターのみを設定して特定の GTID セットから開始することでオフセットを指定できます。

    • timestamp: 履歴データをスキャンしません。指定したタイムスタンプから Binlog の読み取りを開始します。タイムスタンプは scan.startup.timestamp-millis パラメーターでミリ秒単位で指定します。

    重要

    earliest-offsetspecific-offset、または timestamp 起動モードを使用する場合、指定された Binlog 消費位置とジョブの起動時刻の間で対応するテーブルのスキーマが変更されないことを確認してください。これにより、スキーマの違いによるエラーを防ぎます。

    scan.startup.specific-offset.file

    特定オフセット起動モードを使用する場合の開始オフセットの Binlog ファイル名。

    いいえ

    STRING

    なし

    このパラメーターを使用するには、scan.startup.modespecific-offset に設定する必要があります。ファイル名の例: mysql-bin.000003

    scan.startup.specific-offset.pos

    特定オフセット起動モードを使用する場合の、指定された Binlog ファイル内の開始オフセット。

    いいえ

    INTEGER

    なし

    このパラメーターを使用するには、scan.startup.modespecific-offset に設定する必要があります。

    scan.startup.specific-offset.gtid-set

    特定オフセット起動モードを使用する場合の開始オフセットの GTID セット。

    いいえ

    STRING

    なし

    このパラメーターを使用するには、scan.startup.modespecific-offset に設定する必要があります。GTID セットの例: 24DA167-0C0C-11E8-8442-00059A3C7B00:1-19

    scan.startup.timestamp-millis

    タイムスタンプ起動モードを使用する場合の開始オフセットのタイムスタンプ (ミリ秒)。

    いいえ

    LONG

    なし

    このパラメーターを使用するには、scan.startup.modetimestamp に設定する必要があります。単位はミリ秒です。

    重要

    時間を指定すると、OceanBase CDC は各 Binlog ファイルの初期イベントを読み取ってそのタイムスタンプを特定し、指定された時間に対応する Binlog ファイルを見つけようとします。指定されたタイムスタンプの Binlog ファイルがデータベースからクリアされておらず、読み取り可能であることを確認してください。

    server-time-zone

    データベースが使用するセッションタイムゾーン。

    いいえ

    STRING

    このパラメーターを指定しない場合、システムは Flink ジョブのランタイム環境のタイムゾーンをデータベースサーバーのタイムゾーンとして使用します。これは、選択したゾーンのタイムゾーンです。

    例: Asia/Shanghai。このパラメーターは、TIMESTAMP 型が STRING 型にどのように変換されるかを制御します。詳細については、「Debezium の時間型」をご参照ください。

    debezium.min.row.count.to.stream.results

    テーブルの行数がこの値を超えると、バッチ読み取りモードが使用されます。

    いいえ

    INTEGER

    1000

    Flink は、次のいずれかの方法で OceanBase ソーステーブルからデータを読み取ります:

    • 完全読み取り: テーブル全体のデータを直接メモリに読み取ります。この方法は高速ですが、それに応じたメモリを消費します。ソーステーブルが非常に大きい場合、OOM エラーを引き起こす可能性があります。

    • バッチ読み取り: データを複数のバッチで読み取ります。各バッチには一定数の行が含まれ、すべてのデータが読み取られるまで続きます。この方法は、大きなテーブルを読み取る際の OOM エラーを回避しますが、比較的低速です。

    connect.timeout

    OceanBase データベースサーバーへの接続がタイムアウトした際に、接続を再試行するまでの最大待機時間。

    いいえ

    DURATION

    30s

    なし。

    connect.max-retries

    OceanBase データベースサービスへの接続に失敗した後の最大リトライ回数。

    いいえ

    INTEGER

    3

    なし。

    connection.pool.size

    データベース接続プールのサイズ。

    いいえ

    INTEGER

    20

    データベース接続プールは接続を再利用するために使用され、これによりデータベース接続の数を減らすことができます。

    jdbc.properties.*

    JDBC URL のカスタム接続パラメーター。

    いいえ

    STRING

    なし

    カスタム接続パラメーターを渡すことができます。たとえば、SSL プロトコルを使用しないようにするには、'jdbc.properties.useSSL' = 'false' と設定します。

    サポートされている接続パラメーターの詳細については、「MySQL 構成プロパティ」をご参照ください。

    debezium.*

    Debezium が Binlog データを読み取るためのカスタムパラメーター。

    いいえ

    STRING

    なし

    カスタム Debezium パラメーターを渡すことができます。たとえば、'debezium.event.deserialization.failure.handling.mode'='ignore' を使用して、解析エラーの処理ロジックを指定します。

    heartbeat.interval

    ソースがハートビートイベントを使用して Binlog オフセットを進める時間間隔。

    いいえ

    DURATION

    30s

    ハートビートイベントは、ソースの Binlog オフセットを進めるために使用されます。これは、更新が頻繁に行われない OceanBase のテーブルに役立ちます。このようなテーブルでは、Binlog オフセットは自動的に進みません。ハートビートイベントは Binlog オフセットを前進させ、期限切れになるのを防ぎます。Binlog オフセットが期限切れになると、ジョブは失敗し、回復不能になり、ステートレス再起動が必要になります。

    scan.incremental.snapshot.chunk.key-column

    スナップショットフェーズでチャンクを分割するためのチャンクキーとして使用する列を指定します。

    「備考」列をご参照ください。

    STRING

    なし

    • このパラメーターは、プライマリキーのないテーブルに必須です。選択された列は非 NULL 型 (NOT NULL) である必要があります。

    • このパラメーターは、プライマリキーのあるテーブルではオプションです。プライマリキーから 1 つの列のみを選択できます。

    scan.incremental.close-idle-reader.enabled

    スナップショット完了後にアイドル状態のリーダーを閉じるかどうかを指定します。

    いいえ

    BOOLEAN

    false

    • Realtime Compute for Apache Flink VVR 8.0.1 以降でのみサポートされます。

    • この設定を有効にするには、execution.checkpointing.checkpoints-after-tasks-finish.enabled を true に設定します。

    scan.read-changelog-as-append-only.enabled

    変更ログストリームを追加専用ストリームに変換するかどうかを指定します。

    いいえ

    BOOLEAN

    false

    有効な値:

    • true: INSERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER を含むすべてのタイプのメッセージが INSERT メッセージに変換されます。これは、先祖テーブルから削除メッセージを保存する必要がある場合など、特別なシナリオでのみ有効にしてください。

    • false (デフォルト):すべてのタイプのメッセージがそのままダウンストリームに送信されます。

    説明

    Realtime Compute for Apache Flink VVR 8.0.8 以降でのみサポートされます。

    scan.only.deserialize.captured.tables.changelog.enabled

    増分フェーズ中に、指定されたテーブルの変更イベントのみを逆シリアル化するかどうかを指定します。

    いいえ

    BOOLEAN

    • VVR 8.x バージョンではデフォルト値は false です。

    • VVR 11.1 以降のバージョンではデフォルト値は true です。

    有効な値:

    • true:ターゲットテーブルの変更データのみを逆シリアル化し、Binlog の読み取りを高速化します。

    • false (デフォルト):すべてのテーブルの変更データを逆シリアル化します。

    説明
    • Realtime Compute for Apache Flink VVR 8.0.7 以降でのみサポートされます。

    • Realtime Compute for Apache Flink VVR 8.0.8 以前で使用する場合、パラメーター名を debezium.scan.only.deserialize.captured.tables.changelog.enable に変更する必要があります。

    scan.parse.online.schema.changes.enabled

    増分フェーズ中に、ApsaraDB RDS のロックレス変更に対する DDL イベントの解析を試みるかどうかを指定します。

    いいえ

    BOOLEAN

    false

    有効な値:

    • true: ApsaraDB RDS のロックレス変更に対する DDL イベントを解析します。

    • false (デフォルト): ApsaraDB RDS のロックレス変更に対する DDL イベントを解析しません。

    これは実験的な機能です。オンラインのロックレス変更を実行する前に、回復のために Flink ジョブのスナップショットを取得することを推奨します。

    説明

    Realtime Compute for Apache Flink VVR 11.1 以降でのみサポートされます。

    scan.incremental.snapshot.backfill.skip

    スナップショット読み取りフェーズ中にバックフィルをスキップするかどうかを指定します。

    いいえ

    BOOLEAN

    false

    有効な値:

    • true: スナップショット読み取りフェーズ中にバックフィルをスキップします。

    • false (デフォルト): スナップショット読み取りフェーズ中にバックフィルをスキップしません。

    バックフィルをスキップすると、スナップショットフェーズ中のテーブルへの変更は、スナップショットにマージされるのではなく、後の増分フェーズで読み取られます。

    重要

    バックフィルをスキップすると、スナップショットフェーズ中に発生した変更が再実行される可能性があるため、データの一貫性が損なわれる可能性があります。at-least-once セマンティクスのみが保証されます。

    説明

    Realtime Compute for Apache Flink VVR 11.1 以降でのみサポートされます。

    scan.incremental.snapshot.unbounded-chunk-first.enabled

    スナップショット読み取りフェーズ中に、無制限チャンクを最初に配布するかどうかを指定します。

    いいえ

    BOOLEAN

    false

    有効な値:

    • true: スナップショット読み取りフェーズ中に無制限チャンクを最初に配布します。

    • false (デフォルト): スナップショット読み取りフェーズ中に無制限チャンクを最初に配布しません。

    これは実験的な機能です。有効にすると、スナップショットフェーズ中に TaskManager が最後のチャンクを同期する際の OOM エラーのリスクを軽減できます。ジョブを初めて開始する前にこのパラメーターを追加することを推奨します。

    説明

    Realtime Compute for Apache Flink VVR 11.1 以降でのみサポートされます。

  • ディメンションテーブルのみ

    パラメーター

    説明

    必須

    データ型

    デフォルト値

    備考

    url

    JDBC URL。

    はい

    STRING

    なし

    • URL には、MySQL データベース名または Oracle サービス名を含める必要があります。

    userName

    ユーザー名。

    はい

    STRING

    なし

    なし。

    cache

    キャッシュポリシー。

    いいえ

    STRING

    ALL

    次の 3 つのキャッシュポリシーがサポートされています:

    • ALL: ディメンションテーブルのすべてのデータをキャッシュします。ジョブが実行される前に、システムはディメンションテーブルのすべてのデータをキャッシュにロードします。その後のディメンションテーブルデータのルックアップはすべてキャッシュを介して行われます。キャッシュにデータが見つからない場合、そのキーは存在しません。キャッシュが期限切れになると、完全なキャッシュが再読み込みされます。

      このポリシーは、リモートテーブルが小さく、ミスキー (ソーステーブルとディメンションテーブルを結合する際の ON 条件が関連付けられない) が多いシナリオに適しています。

    • LRU: ディメンションテーブルのデータの一部をキャッシュします。ソーステーブルの各レコードについて、システムはまずキャッシュでデータを検索します。見つからない場合は、物理的なディメンションテーブルを検索します。このキャッシュポリシーを使用する場合、cacheSize パラメーターを設定する必要があります。

    • None:キャッシュなし。

    重要
    • ALL キャッシュポリシーを使用する場合、OOM エラーを防ぐためにノードのメモリサイズに注意してください。

    • システムはディメンションテーブルのデータを非同期でロードするため、ALL キャッシュポリシーを使用する場合、ディメンションテーブル結合ノードのメモリを増やす必要があります。増やすメモリサイズは、リモートテーブルデータのサイズの 2 倍にする必要があります。

    cacheSize

    キャッシュされるエントリの最大数。

    いいえ

    INTEGER

    100000

    • LRU キャッシュポリシーを選択した場合は、キャッシュサイズを設定する必要があります。

    • ALL キャッシュポリシーを選択した場合は、キャッシュサイズを設定する必要はありません。

    cacheTTLMs

    キャッシュのタイムアウト期間。

    いいえ

    LONG

    Long.MAX_VALUE

    cacheTTLMs の構成は cache パラメーターに依存します:

    • cacheNone に設定されている場合、cacheTTLMs を構成する必要はありません。これは、キャッシュがタイムアウトしないことを意味します。

    • cacheLRU に設定されている場合、cacheTTLMs はキャッシュのタイムアウト期間です。デフォルトでは、キャッシュは期限切れになりません。

    • cacheALL に設定されている場合、cacheTTLMs はキャッシュの再読み込み時間です。デフォルトでは、キャッシュは再読み込みされません。

    maxRetryTimeout

    最大リトライ時間。

    いいえ

    DURATION

    60s

    なし。

  • 結果テーブル: JDBC のみ

    パラメーター

    説明

    必須

    データ型

    デフォルト値

    備考

    userName

    ユーザー名。

    はい

    STRING

    なし

    なし。

    compatibleMode

    OceanBase の互換モード。

    いいえ

    STRING

    mysql

    有効な値:

    • mysql

    • oracle

    説明

    これは OceanBase 固有のパラメーターです。

    url

    JDBC URL。

    はい

    STRING

    なし

    • URL には、MySQL データベース名または Oracle サービス名を含める必要があります。

    tableName

    テーブル名。

    はい

    STRING

    なし

    なし。

    sink.mode

    OceanBase 結果テーブルの書き込みモード。

    はい

    STRING

    jdbc

    jdbcdirect-load をサポートします。

    maxRetryTimes

    再試行の最大回数。

    いいえ

    INTEGER

    3

    なし。

    poolInitialSize

    データベース接続プールの初期サイズ。

    いいえ

    INTEGER

    1

    なし。

    poolMaxActive

    データベース接続プール内の最大接続数。

    いいえ

    INTEGER

    8

    なし。

    poolMaxWait

    データベース接続プールから接続を取得するまでの最大待機時間。

    いいえ

    INTEGER

    2000

    単位はミリ秒です。

    poolMinIdle

    データベース接続プール内のアイドル接続の最小数。

    いいえ

    INTEGER

    1

    なし。

    connectionProperties

    JDBC の接続プロパティ。

    いいえ

    STRING

    なし

    フォーマットは "k1=v1;k2=v2;k3=v3" です。

    ignoreDelete

    データ削除操作を無視するかどうかを指定します。

    いいえ

    Boolean

    false

    なし。

    excludeUpdateColumns

    除外する列の名前を指定します。これらの列は更新操作中に更新されません。

    いいえ

    STRING

    なし

    複数の列を指定する場合は、カンマ (,) で区切ります。例: excludeUpdateColumns=column1,column2

    説明

    この値には常にプライマリキー列が含まれます。有効になる列は、指定した列とプライマリキー列です。

    partitionKey

    パーティションキー。

    いいえ

    STRING

    なし

    パーティションキーが設定されている場合、コネクタはまずパーティションキーでデータをグループ化し、各グループは個別にデータベースに書き込まれます。このグループ化は modRule 処理の前に行われます。

    modRule

    グループ化ルール。

    いいえ

    STRING

    なし

    グループ化ルールのフォーマットは "column_name mod number" である必要があります。例: user_id mod 8。列の型は数値である必要があります。

    グループ化ルールが設定されている場合、データはまず partitionKey でパーティション分割されます。各パーティション内で、modRule 計算の結果に基づいてさらにグループ化されます。

    bufferSize

    データバッファーのサイズ。

    いいえ

    INTEGER

    1000

    なし。

    flushIntervalMs

    キャッシュをフラッシュする時間間隔。指定された時間待機してもキャッシュ内のデータが出力条件を満たさない場合、システムは自動的にキャッシュ内のすべてのデータを出力します。

    いいえ

    LONG

    1000

    なし。

    retryIntervalMs

    最大リトライ時間。

    いいえ

    INTEGER

    5000

    単位はミリ秒です。

  • 結果テーブルへのバイパスインポート専用です。

重要
  • 結果テーブルのバイパスインポートは、Ververica Runtime (VVR) 11.5 以降で利用できます。バイパスインポートの詳細については、「こちらのドキュメント」をご参照ください。

  • 有限ストリームのみをサポート:データソースは有限ストリームである必要があります。無限ストリームはサポートされていません。パフォーマンスを向上させるために、Flink バッチモードを使用できます。

  • 高スループット書き込み:このメソッドは、大規模バッチデータインポートシナリオに適しています。

  • インポート中のテーブルロック:バイパスインポートを実行すると、ターゲットテーブルがロックされます。テーブルがロックされている間は、データ変更の書き込みおよび DDL の変更はブロックされます。データクエリは影響を受けません。

  • リアルタイム書き込みには非対応:リアルタイムまたはストリーム書き込みシナリオでは、Java Database Connectivity (JDBC) 結果テーブルを使用してください。

パラメーター

説明

必須

データ型

デフォルト値

備考

sink.mode

OceanBase 結果テーブルにデータを書き込むためのメソッド

いいえ

STRING

jdbc

`jdbc` モードと `direct-load` モードをサポートしています。バイパスインポートを使用して OceanBase の結果テーブルにデータを書き込むには、このパラメーターを静的フィールド `direct-load` に設定します。

host

OceanBase データベースの IP アドレスまたはホスト名です。

はい

STRING

なし

なし。

port

OceanBase データベースの RPC ポートです。

いいえ

INTEGER

2882

なし。

username

ユーザー名です。

はい

STRING

なし

なし。

tenant-name

OceanBase データベースのテナント名です。

はい

STRING

なし

schema-name

  • MySQL テナントの場合は、データベース名を入力します。

  • Oracle テナントの場合は、オーナー名を入力します。

はい

STRING

なし

なし。

table-name

OceanBase テーブルの名前です。

はい

STRING

なし

なし。

parallel

バイパスインポートタスクのサーバー側の同時実行数です。

いいえ

INTEGER

8

  • このパラメーターは、インポートタスクのサーバー側の CPU リソースを定義し、クライアントの同時実行数とは独立しています。サーバーは、エラーを返すことなく、テナントの CPU 仕様に基づいて最大並列度を制限します。実際の並列度は、テナントの CPU 仕様とテーブルのパーティション分散によって決まります。

  • たとえば、テナントに 2 つの CPU コアがあり、並列度が 10 に設定されている場合、実際の並列度は MIN(2 コア * 2, 10) で計算され、4 になります。

  • テーブルパーティションが 2 つのノードに分散している場合、実際の合計並列度は MIN(2 コア * 2, 10) * 2 = 8 になります。

buffer-size

バイパスインポートタスクで OceanBase に書き込む際のバッファーサイズです。

いいえ

INTEGER

1024

Flink は buffer-size で指定された数のデータレコードをキャッシュし、1 回の操作で OceanBase に書き込みます。

dup-action

バイパスインポートタスク中に重複するプライマリキーを処理するためのポリシーです。有効な値は STOP_ON_DUP (インポートが失敗)、REPLACE (既存の行が置き換えられる)、または IGNORE (新しい行が無視される) です。

いいえ

STRING

REPLACE

  • STOP_ON_DUP:インポートは失敗します。

  • REPLACE:インポートされた行が既存の行を置き換えます。

  • IGNORE:インポートされた行は破棄され、既存の行が保持されます。

load-method

バイパスインポートモードです。

full

  • full:完全なバイパスインポート。これがデフォルト値です。

  • inc:増分バイパスインポート。このモードはプライマリキーの競合をチェックします。observer 4.3.2 以降でサポートされています。`direct-load.dup-action` を `REPLACE` に設定することはサポートされていません。

  • inc_replace:置換モードでの増分バイパスインポート。このモードはプライマリキーの競合をチェックせず、古いデータを直接上書きします。これは `REPLACE` と同じ効果があります。`direct-load.dup-action` パラメーターは無視されます。observer 4.3.2 以降でサポートされています。

max-error-rows

バイパスインポートタスクが許容できるエラー行の最大数です。

いいえ

LONG

0

次の場合、行はエラー行と見なされます:

  • `dupAction` が `STOP_ON_DUP` に設定されている場合の、プライマリキーが重複する行。

  • 列数が一致しない (多すぎる、または少なすぎる) 行。

  • データ型の変換に失敗した行。

timeout

バイパスインポートタスク全体のタイムアウト期間です。

いいえ

DURATION

7d

heartbeat-timeout

バイパスインポートタスクのクライアント側のハートビートタイムアウトです。

いいえ

DURATION

60s

heartbeat-interval

バイパスインポートタスクのクライアント側のハートビート間隔です。

いいえ

DURATION

10s

型のマッピング

  • MySQL 互換モード

    OceanBase フィールド型

    Flink フィールド型

    TINYINT

    TINYINT

    SMALLINT

    SMALLINT

    TINYINT 符号なし

    INT

    INT

    MEDIUMINT

    符号なし SMALLINT

    BIGINT

    BIGINT

    符号なし INT

    BIGINT UNSIGNED

    DECIMAL(20, 0)

    REAL

    FLOAT

    FLOAT

    DOUBLE

    DOUBLE

    NUMERIC(p, s)

    DECIMAL(p, s)

    説明

    p <= 38 の場合。

    DECIMAL(p, s)

    BOOLEAN

    BOOLEAN

    TINYINT(1)

    DATE

    DATE

    TIME [(p)]

    TIME [(p)] [WITHOUT TIME ZONE]

    DATETIME [(p)]

    TIMESTAMP [(p)] [WITHOUT TIME ZONE]

    TIMESTAMP [(p)]

    CHAR(n)

    CHAR(n)

    VARCHAR(n)

    VARCHAR(n)

    BIT(n)

    BINARY(⌈n/8⌉)

    BINARY(n)

    BINARY(n)

    VARBINARY(N)

    VARBINARY(N)

    TINYTEXT

    STRING

    テキスト

    MEDIUMTEXT

    長いテキスト

    TINYBLOB

    BYTES

    重要

    Flink は、サイズが 2,147,483,647 (2^31 - 1) バイト以下の BLOB レコードをサポートします。

    BLOB

    MEDIUMBLOB

    LONGBLOB

  • Oracle 互換モード

    OceanBase フィールド型

    Flink フィールド型

    NUMBER(p, s <= 0), p - s < 3

    TINYINT

    NUMBER(p, s <= 0), p - s < 5

    SMALLINT

    NUMBER(p, s <= 0), p - s < 10

    INT

    NUMBER(p, s <= 0), p - s < 19

    BIGINT

    NUMBER(p, s <= 0), 19 <= p - s <= 38

    DECIMAL(p - s, 0)

    NUMBER(p, s > 0)

    DECIMAL(p, s)

    NUMBER(p, s <= 0), p - s > 38

    STRING

    FLOAT

    FLOAT

    BINARY_FLOAT

    BINARY_DOUBLE

    DOUBLE

    NUMBER(1)

    BOOLEAN

    DATE

    TIMESTAMP [(p)] [WITHOUT TIME ZONE]

    TIMESTAMP [(p)]

    CHAR(n)

    STRING

    NCHAR(n)

    NVARCHAR2(n)

    VARCHAR(n)

    VARCHAR2(n)

    CLOB

    BLOB

    BYTES

    ROWID

使用例

  • ソーステーブルとシンクテーブル

    -- OceanBase CDC ソーステーブル
    CREATE TEMPORARY TABLE oceanbase_source (
      a INT,
      b VARCHAR,
      c VARCHAR
    ) WITH (
      'connector' = 'oceanbase',
      'hostname' = '<yourHostname>',
      'port' = '3306',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>',
      'database-name' = '<yourDatabaseName>',
      'table-name' = '<yourTableName>'
    );
    
    -- OceanBase JDBC シンクテーブル
    CREATE TEMPORARY TABLE oceanbase_sink (
      a INT,
      b VARCHAR,
      c VARCHAR
    ) WITH (
      'connector' = 'oceanbase',
      'url' = '<yourJdbcUrl>',
      'userName' = '<yourUserName>',
      'password' = '<yourPassword>',
      'tableName' = '<yourTableName>'
    );
    
    -- OceanBase ダイレクトロードシンクテーブル
    CREATE TEMPORARY TABLE oceanbase_directload_sink (
      a INT,
      b VARCHAR,
      c VARCHAR
    ) WITH (
      'connector' = 'oceanbase',
      'sink.mode' = 'direct-load',
      'host' = '<yourHost>',
      'port' = 'yourPort',
      'tenant-name' = '<yourTenantName>',
      'schema-name' = '<yourSchemaName>',
      'table-name' = '<yourTableName>',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>'
    );
    
    
    BEGIN STATEMENT SET;  
    INSERT INTO oceanbase_sink
    SELECT * FROM oceanbase_source;
    END; 

  • ディメンションテーブル

    CREATE TEMPORARY TABLE datagen_source(
      a INT,
      b BIGINT,
      c STRING,
      `proctime` AS PROCTIME()
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE oceanbase_dim (
      a INT,
      b VARCHAR,
      c VARCHAR
    ) WITH (
      'connector' = 'oceanbase',
      'url' = '<yourJdbcUrl>', // JDBC URL を指定します
      'userName' = '<yourUserName>', // ユーザー名を指定します
      'password' = '${secret_values.password}', // パスワードを指定します
      'tableName' = '<yourTableName>' // テーブル名を指定します
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
      a INT,
      b STRING
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink
    SELECT T.a, H.b
    FROM datagen_source AS T 
    JOIN oceanbase_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H 
    ON T.a = H.a;

参照

Flink でサポートされているコネクタのリストについては、「サポートされているコネクタ」をご参照ください。