すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:OceanBase (パブリックプレビュー)

最終更新日:Dec 07, 2025

このトピックでは、OceanBaseコネクタの使用方法について説明します。

背景情報

OceanBase は、ネイティブの分散型ハイブリッドトランザクション/分析処理 (HTAP) データベース管理システムです。詳細については、OceanBase の公式サイトをご参照ください。OceanBase は、Oracle と MySQL の両方の互換モードをサポートしています。これにより、MySQL または Oracle データベースから移行する際の業務システムのリファクタリングコストを削減できます。これらのモードのデータの型、SQL の特徴、および内部ビューは、MySQL または Oracle のものと一致しています。各モードで推奨されるコネクタは次のとおりです:

  • Oracle モード:OceanBase コネクタのみを使用できます。

  • MySQL モード:このモードはネイティブの MySQL 構文と高い互換性があります。OceanBase と MySQL コネクタの両方を使用して、OceanBase への読み書きができます。

    重要
    • OceanBase コネクタはパブリックプレビュー段階です。OceanBase 3.2.4.4 以降では、MySQL コネクタを使用して OceanBase への読み書きができます。この機能もパブリックプレビュー段階です。使用する前に、この機能を慎重に評価してください。

    • MySQL コネクタを使用して OceanBase から増分データを読み取る場合は、OceanBase のバイナリログ (Binlog) が有効になっており、正しく構成されていることを確認してください。OceanBase Binlog の詳細については、「概要」または「バイナリログ関連の操作」をご参照ください。

次の表に、OceanBase コネクタでサポートされる情報を示します。

カテゴリ

詳細

サポートされているタイプ

ソーステーブル、ディメンションテーブル、およびシンクテーブル

実行モード

ストリーミングとバッチ

データ形式

該当なし

特定の監視メトリック

なし

APIタイプ

SQL

結果テーブル内のデータの更新または削除をサポート

はい

前提条件

制限事項

  • OceanBase コネクタは、Realtime Compute for Apache Flink with Ververica Runtime (VVR) 8.0.1 以降でサポートされています。

  • at-least-once セマンティクスが保証されます。結果テーブルにプライマリキーがある場合、べき等性によってデータの正確性が保証されます。

構文

CREATE TABLE oceanabse_source (
   order_id INT,
   order_date TIMESTAMP(0),
   customer_name STRING,
   price DECIMAL(10, 5),
   product_id INT,
   order_status BOOLEAN,
   PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
  'connector' = 'oceanbase',
  'url' = '<yourJdbcUrl>',  // JDBC URL を指定します
  'tableName' = '<yourTableName>', // テーブル名を指定します
  'userName' = '<yourUserName>', // ユーザー名を指定します
  'password' = '<yourPassword>' // パスワードを指定します
);
説明

コネクタは、受信した各データレコードに対して SQL 文を構築して実行することで、結果テーブルに書き込みます。

  • 結果テーブルにプライマリキーがない場合は、INSERT INTO 文が生成されます。

  • 結果テーブルにプライマリキーがある場合は、データベースの互換モードに基づいて UPSERT 文が生成されます。

WITH パラメーター

  • 全般

    パラメーター

    説明

    必須

    データ型

    デフォルト値

    備考

    connector

    テーブルのタイプ。

    はい

    STRING

    なし

    値は oceanbase である必要があります。

    password

    パスワード。

    はい

    STRING

    なし

    なし。

  • ソース固有

    重要

    注:VVR 11.4.0 以降の Realtime Compute for Apache Flink から、OceanBase CDC コネクタは大幅なアーキテクチャのアップグレードと機能調整が行われました。更新内容を理解し、バージョンをスムーズに移行するために、主な変更点を以下に説明します:

    • OceanBase LogProxy サービスに基づく元の CDC コネクタは、正式に非推奨となり、ディストリビューションから削除されました。VVR 11.4.0 以降、OceanBase CDC コネクタは、OceanBase Binlog サービスを介してのみ増分ログのキャプチャとデータ同期をサポートします。

    • OceanBase CDC コネクタは、OceanBase Binlog サービスとのプロトコル互換性と接続安定性が強化されています。そのため、OceanBase CDC コネクタの使用を推奨します。

      OceanBase Binlog サービスは、MySQL レプリケーションプロトコルと完全に互換性があります。標準の MySQL CDC コネクタを使用して OceanBase Binlog サービスに接続して変更追跡を行うこともできますが、これは推奨されません。

    • VVR 11.4.0 以降の Realtime Compute for Apache Flink から、OceanBase CDC コネクタは Oracle 互換モードでの増分データサブスクリプションをサポートしなくなりました。Oracle 互換モードでの増分データサブスクリプションについては、OceanBase のエンタープライズテクニカルサポートにお問い合わせください。

    パラメーター

    説明

    必須

    データの型

    デフォルト値

    備考

    hostname

    OceanBase データベースの IP アドレスまたはホスト名。

    はい

    STRING

    いいえ

    VPC アドレスを指定することを推奨します。

    説明

    OceanBase と Realtime Compute for Apache Flink が同じ VPC にない場合は、まず VPC 間ネットワーク接続を確立するか、インターネットを使用してアクセスする必要があります。詳細については、「ストレージの管理と操作」および「フルマネージド Flink クラスターはどのようにインターネットにアクセスできますか?」をご参照ください。

    username

    OceanBase データベースサービスのユーザー名。

    はい

    STRING

    いいえ

    なし。

    database-name

    OceanBase データベースの名前。

    はい

    STRING

    なし

    • ソーステーブルとして、データベース名は正規表現をサポートしており、複数のデータベースからデータを読み取ることができます。

    • 正規表現を使用する場合、先頭と末尾を一致させるために ^ および $ 記号を使用することは避けてください。その理由については、table-name パラメーターの備考をご参照ください。

    table-name

    OceanBase によって示されます。

    はい

    STRING

    なし

    • ソーステーブルとして、テーブル名は正規表現をサポートしており、複数のテーブルからデータを読み取ることができます。

    • 正規表現を使用する場合、先頭と末尾を一致させるために ^ および $ 記号を使用することは避けてください。その理由については、以下の注記をご参照ください。

    説明

    OceanBase ソーステーブルが正規表現でテーブル名を照合する場合、指定した database-nametable-name を文字列 \\. (VVR 8.0.1 より前は文字 . が使用されていました) を使用して完全パスの正規表現に結合します。この結合された正規表現は、OceanBase データベース内のテーブルの完全修飾名を照合するために使用されます。

    たとえば、'database-name'='db_.*' および 'table-name'='tb_.+' と設定した場合、コネクタは正規表現 db_.*\\.tb_.+ (またはバージョン 8.0.1 より前は db_.*.tb_.+) を使用して完全修飾テーブル名を照合し、どのテーブルを読み取るかを決定します。

    port

    OceanBase データベースサービスのポート番号。

    いいえ

    INTEGER

    3306

    なし。

    server-id

    データベースクライアントの数値 ID。

    いいえ

    STRING

    デフォルトでは 5400 から 6400 の間のランダムな値が生成されます。

    この ID はグローバルに一意である必要があります。同じデータベースに接続する各ジョブには、異なる ID を設定することを推奨します。

    このパラメーターは、5400-5408 のような ID 範囲形式もサポートします。増分読み取りが有効な場合、複数の同時読み取りがサポートされます。この場合、各同時タスクが異なる ID を使用するように ID 範囲を設定することを推奨します。詳細については、「サーバー ID の使用法」をご参照ください。

    scan.incremental.snapshot.chunk.size

    各チャンクのサイズ (行数)。

    いいえ

    INTEGER

    8096

    増分スナップショット読み取りが有効な場合、テーブルは読み取りのために複数のチャンクに分割されます。チャンクのデータは、完全に読み取られるまでメモリにバッファリングされます。

    チャンクサイズが小さいと、テーブルの総チャンク数が多くなります。これにより、よりきめ細かい障害回復が可能になりますが、メモリ不足 (OOM) エラーや全体的なスループットの低下につながる可能性があります。したがって、バランスを見つけて適切なチャンクサイズを設定する必要があります。

    scan.snapshot.fetch.size

    テーブルの完全データを読み取る際に、各バッチでプルするレコードの最大数。

    いいえ

    INTEGER

    1024

    なし。

    scan.startup.mode

    データ消費の起動モード。

    いいえ

    STRING

    initial

    有効な値:

    • initial (デフォルト):最初の起動時に履歴の完全データをスキャンし、その後最新の Binlog データを読み取ります。

    • latest-offset:最初の起動時に履歴の完全データをスキャンしません。Binlog の末尾 (最新の Binlog 位置) から読み取りを開始します。つまり、コネクタの起動後に行われた最新の変更のみを読み取ります。

    • earliest-offset:履歴の完全データをスキャンしません。利用可能な最も古い Binlog 位置から読み取りを開始します。

    • specific-offset:履歴の完全データをスキャンしません。指定した特定の Binlog オフセットから開始します。scan.startup.specific-offset.filescan.startup.specific-offset.pos パラメーターの両方を設定するか、scan.startup.specific-offset.gtid-set パラメーターのみを設定して特定の GTID セットから開始することでオフセットを指定できます。

    • timestamp:履歴の完全データをスキャンしません。指定したタイムスタンプから Binlog の読み取りを開始します。タイムスタンプは scan.startup.timestamp-millis でミリ秒単位で指定します。

    重要

    earliest-offsetspecific-offset、または timestamp 起動モードを使用する場合、指定された Binlog 消費位置とジョブの起動時間の間に対応するテーブルのスキーマが変更されないようにしてください。これにより、スキーマの違いによるエラーを回避できます。

    scan.startup.specific-offset.file

    特定オフセットモードを使用する場合の開始オフセットの Binlog ファイル名。

    いいえ

    STRING

    なし

    このパラメーターを使用する場合、scan.startup.modespecific-offset に設定する必要があります。ファイル名の例は mysql-bin.000003 です。

    scan.startup.specific-offset.pos

    特定オフセットモードを使用する場合の、指定された Binlog ファイル内の開始オフセット。

    いいえ

    INTEGER

    なし

    このパラメーターを使用する場合、scan.startup.modespecific-offset に設定する必要があります。

    scan.startup.specific-offset.gtid-set

    特定オフセットモードを使用する場合の開始オフセットの GTID セット。

    いいえ

    STRING

    なし

    このパラメーターを使用する場合、scan.startup.modespecific-offset に設定する必要があります。GTID セットの形式例は 24DA167-0C0C-11E8-8442-00059A3C7B00:1-19 です。

    scan.startup.timestamp-millis

    特定時間モードを使用する場合の開始オフセットのタイムスタンプ (ミリ秒)。

    いいえ

    LONG

    なし

    このパラメーターを使用する場合、scan.startup.modetimestamp に設定する必要があります。単位はミリ秒です。

    重要

    特定の時間を使用する場合、OceanBase CDC は各 Binlog ファイルの初期イベントを読み取ってそのタイムスタンプを決定し、最終的に指定された時間に対応する Binlog ファイルを特定しようとします。指定されたタイムスタンプの Binlog ファイルがデータベースからクリアされておらず、読み取り可能であることを確認してください。

    server-time-zone

    データベースが使用するセッションタイムゾーン。

    いいえ

    STRING

    このパラメーターを指定しない場合、システムは Flink ジョブのランタイムの環境タイムゾーンをデータベースサーバーのタイムゾーンとして使用します。これは、選択したゾーンのタイムゾーンです。

    例:Asia/Shanghai。このパラメーターは、TIMESTAMP 型が STRING 型にどのように変換されるかを制御します。詳細については、「Debezium の時間値」をご参照ください。

    debezium.min.row.count.to.stream.results

    テーブルの行数がこの値より大きい場合、バッチ読み取りモードが使用されます。

    いいえ

    INTEGER

    1000

    Flink は、次のいずれかの方法で OceanBase ソーステーブルからデータを読み取ります:

    • 完全読み取り:テーブル全体のデータを直接メモリに読み込みます。これは高速ですが、対応する量のメモリを消費します。ソーステーブルが非常に大きい場合、OOM エラーのリスクがあります。

    • バッチ読み取り:すべてのデータが読み取られるまで、1 バッチあたり特定の行数で、複数のバッチでデータを読み取ります。これにより、大きなテーブルでの OOM リスクを回避できますが、比較的低速です。

    connect.timeout

    OceanBase データベースサーバーへの接続がタイムアウトした後、接続リトライまでに待機する最大時間。

    いいえ

    DURATION

    30s

    なし。

    connect.max-retries

    OceanBase データベースサービスへの接続に失敗した後の最大リトライ回数。

    いいえ

    INTEGER

    3

    なし。

    connection.pool.size

    データベース接続プールのサイズ。

    いいえ

    INTEGER

    20

    データベース接続プールは接続を再利用するために使用され、これによりデータベース接続の数を減らすことができます。

    jdbc.properties.*

    JDBC URL のカスタム接続パラメーター。

    いいえ

    STRING

    なし

    カスタム接続パラメーターを渡すことができます。たとえば、SSL プロトコルを使用しないようにするには、'jdbc.properties.useSSL' = 'false' と設定します。

    サポートされている接続パラメーターの詳細については、「MySQL 構成プロパティ」をご参照ください。

    debezium.*

    Debezium がバイナリログを読み取るためのカスタムパラメーター。

    いいえ

    STRING

    なし

    カスタム Debezium パラメーターを渡すことができます。たとえば、'debezium.event.deserialization.failure.handling.mode'='ignore' を使用して、解析エラーの処理ロジックを指定します。

    heartbeat.interval

    ソースがハートビートイベントを使用して Binlog オフセットを進める間隔。

    いいえ

    DURATION

    30s

    ハートビートイベントは、ソースの Binlog オフセットを進めるために使用され、OceanBase の更新が遅いテーブルに非常に役立ちます。このようなテーブルでは、Binlog オフセットは自動的に進みません。ハートビートイベントは Binlog オフセットを前進させ、期限切れになるのを防ぎます。Binlog オフセットが期限切れになると、ジョブが失敗して回復不能になり、ステートレス再起動が必要になる場合があります。

    scan.incremental.snapshot.chunk.key-column

    スナップショットフェーズ中にチャンクを分割するために使用する列。

    備考を参照

    STRING

    なし

    • プライマリキーのないテーブルでは必須です。選択された列は非 NULL 型 (NOT NULL) である必要があります。

    • プライマリキーのあるテーブルではオプションです。プライマリキーから 1 つの列のみを選択できます。

    scan.incremental.close-idle-reader.enabled

    スナップショットフェーズが終了した後にアイドルリーダーを閉じるかどうかを指定します。

    いいえ

    BOOLEAN

    false

    • VVR 8.0.1 以降を使用する Realtime Compute for Apache Flink でのみサポートされます。

    • このパラメーターを有効にするには、execution.checkpointing.checkpoints-after-tasks-finish.enabled を true に設定します。

    scan.read-changelog-as-append-only.enabled

    変更ログデータストリームを追加専用データストリームに変換するかどうかを指定します。

    いいえ

    BOOLEAN

    false

    有効な値:

    • true:すべてのタイプのメッセージ (INSERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER を含む) が INSERT メッセージに変換されます。これは、先祖テーブルから削除メッセージを保存する必要がある場合など、特別なシナリオでのみ有効にしてください。

    • false (デフォルト):すべてのタイプのメッセージがそのままダウンストリームに送信されます。

    説明

    VVR 8.0.8 以降を使用する Realtime Compute for Apache Flink でのみサポートされます。

    scan.only.deserialize.captured.tables.changelog.enabled

    増分フェーズ中に、指定されたテーブルの変更イベントのみを逆シリアル化するかどうかを指定します。

    いいえ

    BOOLEAN

    • VVR 8.x バージョンではデフォルト値は false です。

    • VVR 11.1 以降のバージョンではデフォルト値は true です。

    有効な値:

    • true:ターゲットテーブルの変更データのみを逆シリアル化し、Binlog の読み取りを高速化します。

    • false (デフォルト):すべてのテーブルの変更データを逆シリアル化します。

    説明
    • VVR 8.0.7 以降を使用する Realtime Compute for Apache Flink でのみサポートされます。

    • VVR 8.0.8 以前を使用する Realtime Compute for Apache Flink でこのパラメーターを使用する場合、パラメーター名を debezium.scan.only.deserialize.captured.tables.changelog.enable に変更してください。

    scan.parse.online.schema.changes.enabled

    増分フェーズ中に RDS のロックレス変更の DDL イベントを解析するかどうかを指定します。

    いいえ

    BOOLEAN

    false

    有効な値:

    • true:RDS のロックレス変更の DDL イベントを解析します。

    • false (デフォルト):RDS のロックレス変更の DDL イベントを解析しません。

    これは実験的な機能です。オンラインでロックレス変更を実行する前に、復旧のために Flink ジョブのスナップショットを作成することを推奨します。

    説明

    VVR 11.1 以降を使用する Realtime Compute for Apache Flink でのみサポートされます。

    scan.incremental.snapshot.backfill.skip

    スナップショット読み取りフェーズ中にバックフィルをスキップするかどうかを指定します。

    いいえ

    BOOLEAN

    false

    有効な値:

    • true:スナップショット読み取りフェーズ中にバックフィルをスキップします。

    • false (デフォルト):スナップショット読み取りフェーズ中にバックフィルをスキップしません。

    バックフィルをスキップすると、スナップショットフェーズ中のテーブルへの変更は、スナップショットにマージされるのではなく、後の増分フェーズで読み取られます。

    重要

    バックフィルをスキップすると、スナップショットフェーズ中に発生した変更が再実行される可能性があるため、データの一貫性が損なわれる可能性があります。at-least-once セマンティクスのみが保証されます。

    説明

    VVR 11.1 以降を使用する Realtime Compute for Apache Flink でのみサポートされます。

    scan.incremental.snapshot.unbounded-chunk-first.enabled

    スナップショット読み取りフェーズ中に無制限チャンクを最初に配布するかどうかを指定します。

    いいえ

    BOOLEAN

    false

    有効な値:

    • true:スナップショット読み取りフェーズ中に無制限チャンクを最初に配布します。

    • false (デフォルト):スナップショット読み取りフェーズ中に無制限チャンクを最初に配布しません。

    これは実験的な機能です。これを有効にすると、TaskManager がスナップショットフェーズ中に最後のチャンクを同期する際の OOM エラーのリスクを軽減できます。ジョブの最初の起動前にこのパラメーターを追加することを推奨します。

    説明

    VVR 11.1 以降を使用する Realtime Compute for Apache Flink でのみサポートされます。

  • ディメンションテーブル固有

    パラメーター

    説明

    必須

    データ型

    デフォルト値

    備考

    url

    JDBC URL。

    はい

    STRING

    なし

    • URL には、MySQL データベース名または Oracle サービス名を含める必要があります。

    userName

    ユーザー名。

    はい

    STRING

    なし

    なし。

    cache

    キャッシュポリシー。

    いいえ

    STRING

    ALL

    次の 3 つのキャッシュポリシーがサポートされています:

    • ALL:ディメンションテーブルのすべてのデータをキャッシュします。ジョブが実行される前に、システムはディメンションテーブルからすべてのデータをキャッシュにロードします。その後のディメンションテーブルデータのルックアップはすべてキャッシュを介して行われます。キャッシュにデータが見つからない場合、そのキーは存在しません。キャッシュの有効期限が切れると、完全なキャッシュが再読み込みされます。

      このポリシーは、リモートテーブルのデータ量が少なく、欠落しているキーが多い (ソーステーブルとディメンションテーブルが ON 条件で結合できない) シナリオに適しています。

    • LRU:ディメンションテーブルのデータの一部をキャッシュします。ソーステーブルの各レコードについて、システムはまずキャッシュでデータを検索します。見つからない場合は、物理的なディメンションテーブルをクエリします。このキャッシュポリシーを使用する場合、cacheSize パラメーターを構成する必要があります。

    • None:キャッシュなし。

    重要
    • ALL キャッシュポリシーを使用する場合、OOM エラーを防ぐためにノードのメモリサイズを監視してください。

    • システムはディメンションテーブルのデータを非同期でロードするため、ALL キャッシュポリシーを使用する場合、ディメンションテーブル結合ノードのメモリを増やす必要があります。増やすメモリサイズは、リモートテーブルのデータ量の 2 倍にする必要があります。

    cacheSize

    キャッシュされるエントリの最大数。

    いいえ

    INTEGER

    100000

    • LRU キャッシュポリシーを選択した場合は、キャッシュサイズを設定する必要があります。

    • ALL キャッシュポリシーを選択した場合は、キャッシュサイズを設定する必要はありません。

    cacheTTLMs

    キャッシュの生存時間 (TTL)。

    いいえ

    LONG

    Long.MAX_VALUE

    cacheTTLMs の構成は cache パラメーターに依存します:

    • cacheNone に設定されている場合、cacheTTLMs を構成する必要はありません。これは、キャッシュがタイムアウトしないことを意味します。

    • cacheLRU に設定されている場合、cacheTTLMs はキャッシュの TTL です。デフォルトでは、キャッシュは期限切れになりません。

    • cacheALL に設定されている場合、cacheTTLMs はキャッシュの再読み込み時間です。デフォルトでは、キャッシュは再読み込みされません。

    maxRetryTimeout

    最大リトライ時間。

    いいえ

    DURATION

    60s

    なし。

  • シンク固有

    パラメーター

    説明

    必須

    データ型

    デフォルト値

    備考

    userName

    ユーザー名。

    はい

    STRING

    なし

    なし。

    compatibleMode

    OceanBase の互換モード。

    いいえ

    STRING

    mysql

    有効な値:

    • mysql

    • oracle

    説明

    これは OceanBase 固有のパラメーターです。

    url

    JDBC URL。

    はい

    STRING

    なし

    • URL には、MySQL データベース名または Oracle サービス名を含める必要があります。

    tableName

    テーブル名。

    はい

    STRING

    なし

    なし。

    maxRetryTimes

    再試行の最大回数。

    いいえ

    INTEGER

    3

    なし。

    poolInitialSize

    データベース接続プールの初期サイズ。

    いいえ

    INTEGER

    1

    なし。

    poolMaxActive

    データベース接続プール内の最大接続数。

    いいえ

    INTEGER

    8

    なし。

    poolMaxWait

    データベース接続プールから接続を取得するまでの最大待機時間。

    いいえ

    INTEGER

    2000

    単位: ミリ秒。

    poolMinIdle

    データベース接続プール内のアイドル接続の最小数。

    いいえ

    INTEGER

    1

    なし。

    connectionProperties

    JDBC接続プロパティ。

    いいえ

    STRING

    なし

    フォーマットは "k1=v1;k2=v2;k3=v3" です。

    ignoreDelete

    DELETE 操作を無視するかどうかを指定します。

    いいえ

    Boolean

    false

    なし。

    excludeUpdateColumns

    除外する列の名前を指定します。これらの列は更新操作中に更新されません。

    いいえ

    STRING

    なし

    複数の列を無視するように指定する場合は、カンマ (,) で区切ります。例: excludeUpdateColumns=column1,column2

    説明

    この値には常にプライマリキー列が含まれます。実際に有効になる列は、指定した列とプライマリキー列です。

    partitionKey

    パーティションキー。

    いいえ

    STRING

    なし

    パーティションキーが設定されている場合、コネクタはまずパーティションキーでデータをグループ化します。各グループはその後、個別にデータベースに書き込まれます。このグループ化は modRule の前に処理されます。

    modRule

    グループ化ルール。

    いいえ

    STRING

    なし

    グループ化ルールは "column_name mod number" の形式である必要があります。例: user_id mod 8。列は数値型である必要があります。

    グループ化ルールが設定されている場合、データはまず partitionKey でパーティション分割されます。各パーティション内で、データは modRule 計算の結果に基づいてグループ化されます。

    bufferSize

    データバッファーのサイズ。

    いいえ

    INTEGER

    1000

    なし。

    flushIntervalMs

    キャッシュをフラッシュする間隔。指定された待機時間の後、キャッシュ内のデータが出力条件を満たさない場合、システムは自動的にキャッシュ内のすべてのデータを出力します。

    いいえ

    LONG

    1000

    なし。

    retryIntervalMs

    最大リトライ時間。

    いいえ

    INTEGER

    5000

    単位: ミリ秒。

型のマッピング

  • MySQL 互換モード

    OceanBase フィールド型

    Flink フィールド型

    TINYINT

    TINYINT

    SMALLINT

    SMALLINT

    TINYINT 符号なし

    INT

    INT

    MEDIUMINT

    符号なし SMALLINT

    BIGINT

    BIGINT

    符号なし INT

    BIGINT UNSIGNED

    DECIMAL(20, 0)

    REAL

    FLOAT

    FLOAT

    DOUBLE

    DOUBLE

    NUMERIC(p, s)

    DECIMAL(p, s)

    説明

    p は 38 以下である必要があります。

    DECIMAL(p, s)

    BOOLEAN

    BOOLEAN

    TINYINT(1)

    DATE

    DATE

    TIME [(p)]

    TIME [(p)] [WITHOUT TIME ZONE]

    DATETIME [(p)]

    TIMESTAMP [(p)] [WITHOUT TIME ZONE]

    TIMESTAMP [(p)]

    CHAR(n)

    CHAR(n)

    VARCHAR(n)

    VARCHAR(n)

    BIT(n)

    BINARY(⌈n/8⌉)

    BINARY(n)

    BINARY(n)

    VARBINARY(N)

    VARBINARY(N)

    TINYTEXT

    STRING

    テキスト

    MEDIUMTEXT

    STRING

    TINYBLOB

    BYTES

    重要

    Flink は、2,147,483,647 (2^31 - 1) バイト以下の BLOB 型レコードのみをサポートします。

    BLOB

    MEDIUMBLOB

    LONGBLOB

  • Oracle 互換モード

    OceanBase フィールド型

    Flink フィールド型

    NUMBER(p, s <= 0), p - s < 3

    TINYINT

    NUMBER(p, s <= 0), p - s < 5

    SMALLINT

    NUMBER(p, s <= 0), p - s < 10

    INT

    NUMBER(p, s <= 0), p - s < 19

    BIGINT

    NUMBER(p, s <= 0), 19 <= p - s <= 38

    DECIMAL(p - s, 0)

    NUMBER(p, s > 0)

    DECIMAL(p, s)

    NUMBER(p, s <= 0), p - s > 38

    STRING

    FLOAT

    FLOAT

    BINARY_FLOAT

    BINARY_DOUBLE

    DOUBLE

    NUMBER(1)

    BOOLEAN

    DATE

    TIMESTAMP [(p)] [WITHOUT TIMEZONE]

    TIMESTAMP [(p)]

    CHAR(n)

    STRING

    NCHAR(n)

    NVARCHAR2(n)

    VARCHAR(n)

    VARCHAR2(n)

    CLOB

    BLOB

    BYTES

    ROWID

使用例

  • ソーステーブルとシンクテーブル

    -- OceanBase CDC ソーステーブル
    CREATE TEMPORARY TABLE oceanbase_source (
      a INT,
      b VARCHAR,
      c VARCHAR
    ) WITH (
      'connector' = 'oceanbase',
      'hostname' = '<yourHostname>',
      'port' = '3306',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>',
      'database-name' = '<yourDatabaseName>',
      'table-name' = '<yourTableName>'
    );
    
    -- OceanBase 結果テーブル
    CREATE TEMPORARY TABLE oceanbase_sink (
      a INT,
      b VARCHAR,
      c VARCHAR
    ) WITH (
      'connector' = 'oceanbase',
      'url' = '<yourJdbcUrl>',
      'userName' = '<yourUserName>',
      'password' = '<yourPassword>',
      'tableName' = '<yourTableName>'
    );
    
    
    BEGIN STATEMENT SET;  
    INSERT INTO oceanbase_sink
    SELECT * FROM oceanbase_source;
    END; 

  • ディメンションテーブル

    CREATE TEMPORARY TABLE datagen_source(
      a INT,
      b BIGINT,
      c STRING,
      `proctime` AS PROCTIME()
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE oceanbase_dim (
      a INT,
      b VARCHAR,
      c VARCHAR
    ) WITH (
      'connector' = 'oceanbase',
      'url' = '<yourJdbcUrl>', // JDBC URL を指定します
      'userName' = '<yourUserName>', // ユーザー名を指定します
      'password' = '${secret_values.password}', // パスワードを指定します
      'tableName' = '<yourTableName>' // テーブル名を指定します
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
      a INT,
      b STRING
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink
    SELECT T.a, H.b
    FROM datagen_source AS T 
    JOIN oceanbase_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H 
    ON T.a = H.a;

参照

Flink がサポートするコネクタの詳細については、「サポートされるコネクタ」をご参照ください。