全部產品
Search
文件中心

Realtime Compute for Apache Flink:OceanBase(公測中)

更新時間:Jan 10, 2026

本文為您介紹如何使用OceanBase連接器。

背景資訊

OceanBase資料庫是一款原生分布式的HTAP資料庫管理系統,詳情請參見OceanBase官網。為了降低您從MySQL資料庫或Oracle資料庫遷移到OceanBase資料庫時引發的業務系統改造成本,OceanBase資料庫支援Oracle和MySQL兩種相容模式,兩種模式下的資料類型、SQL功能、內部視圖等與MySQL資料庫或Oracle資料庫保持一致。兩種模式下建議使用的連接器如下:

  • Oracle模式:只能使用OceanBase連接器。

  • MySQL模式:與原生MySQL文法保持高度相容,支援使用OceanBase和MySQL兩種連接器讀寫OceanBase。

    重要
    • OceanBase連接器目前處於公測階段。在OceanBase 3.2.4.4及以上版本,您可以使用MySQL連接器讀寫OceanBase,該功能也屬於公測範圍,請在使用前充分評估並謹慎使用。

    • 在使用MySQL連接器讀取OceanBase增量資料時,請確保OceanBase Binlog已開啟且被正確設定。有關OceanBase Binlog的更多資訊,請參見概述Binlog 相關操作

OceanBase連接器支援的資訊如下。

類別

詳情

支援類型

源表、維表和結果表

運行模式

流模式和批模式

資料格式

暫不適用

特有監控指標

暫無

API種類

SQL

是否支援更新或刪除結果表資料

前提條件

使用限制

  • Flink計算引擎VVR 8.0.1及以上版本支援OceanBase連接器。

  • 語義保證

    • CDC 源表在語義上支援 Exactly Once。在讀取全量歷史資料並切換至 Binlog 讀取時,能夠確保資料不重不漏,即使發生故障,也能通過精準的語義保證資料處理的正確性。

    • 結果表語義上可以保證At-Least-Once,在結果表有主鍵的情況下,等冪可以保證資料的正確性。

文法結構

CREATE TABLE oceanabse_source (
   order_id INT,
   order_date TIMESTAMP(0),
   customer_name STRING,
   price DECIMAL(10, 5),
   product_id INT,
   order_status BOOLEAN,
   PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
  'connector' = 'oceanbase',
  'url' = '<yourJdbcUrl>',
  'tableName' = '<yourTableName>',
  'userName' = '<yourUserName>',
  'password' = '<yourPassword>'
);
說明

連接器寫入結果表原理:寫入結果表時,會將接收到的每條資料拼接成一條SQL去執行。具體執行的SQL情況如下:

  • 對於沒有主鍵的結果表,會拼接成INSERT INTO語句。

  • 對於包含主鍵的結果表,會根據資料庫的相容模式拼接成UPSERT語句。

WITH參數

  • 通用

    參數

    說明

    是否必填

    資料類型

    預設值

    備忘

    connector

    表類型。

    STRING

    固定值為oceanbase

    password

    密碼。

    STRING

    無。

  • 源表專屬

    重要

    注意事項:自Flink計算引擎VVR 11.4.0版本起,OceanBase CDC連接器進行了架構升級與功能調整。為確保使用者準確理解變更內容並順利完成版本遷移,現將核心變更說明如下:

    • 原基於OceanBase LogProxy服務實現的 CDC 連接器已正式下線並從發行版本中移除。自 VVR-11.4.0 版本起,OceanBase CDC連接器僅支援通過OceanBase Binlog服務進行增量日誌的捕獲與資料同步。

    • OceanBase CDC連接器增強了對 OceanBase Binlog 服務的協議相容性、串連穩定性,建議使用者優先使用 OceanBase CDC 連接器。

      OceanBase Binlog服務在協議層完全相容 MySQL 複製協議,也可使用標準MySQL CDC連接器串連至OceanBase Binlog服務以實現資料訂閱,但不作推薦。

    • 自Flink計算引擎VVR 11.4.0版本起,OceanBase CDC 連接器不再支援在 Oracle 相容模式下進行增量資料訂閱。Oracle 相容模式下的增量資料訂閱請聯絡 OceanBase 企業支援人員。

    參數

    說明

    是否必填

    資料類型

    預設值

    備忘

    hostname

    OceanBase資料庫的IP地址或者Hostname。

    STRING

    建議填寫Virtual Private Cloud地址。

    說明

    如果OceanBase與Realtime ComputeFlink版不在同一VPC,需要先打通跨VPC的網路或者使用公網的形式訪問,詳情請參見空間管理與操作Flink全託管叢集如何訪問公網?

    username

    OceanBase資料庫服務的使用者名稱。

    STRING

    無。

    database-name

    OceanBase資料庫名稱。

    STRING

    • 作為源表時,資料庫名稱支援Regex以讀取多個資料庫的資料。

    • 使用Regex時,盡量不要使用^$符號匹配開頭和結尾。具體原因詳見table-name備忘的說明。

    table-name

    OceanBase表名。

    STIRNG

    • 作為源表時,表名支援Regex以讀取多個表的資料。

    • 使用Regex時,盡量不要使用^$符號匹配開頭和結尾。具體原因詳見以下說明。

    說明

    OceanBase 源表在正則匹配表名時,會將您填寫的 database-nametable-name 通過字串 \\.(VVR 8.0.1前使用字元.)串連成為一個全路徑的Regex,然後使用該Regex和OceanBase資料庫中表的全限定名進行正則匹配。

    例如:當配置'database-name'='db_.*'且'table-name'='tb_.+'時,連接器將會使用Regexdb_.*\\.tb_.+(8.0.1版本前為db_.*.tb_.+)去匹配表的全限定名來確定需要讀取的表。

    port

    OceanBase資料庫服務的連接埠號碼。

    INTEGER

    3306

    無。

    server-id

    資料庫用戶端的一個數字ID。

    STRING

    預設會隨機產生一個5400~6400的值。

    該ID必須是全域唯一的。建議針對同一個資料庫的每個作業都設定一個不同的ID。

    該參數也支援ID範圍的格式,例如5400-5408。在開啟增量讀模數式時支援多並發讀取,此時推薦設定為ID範圍,使得每個並發使用不同的ID。詳情請參見Server ID使用

    scan.incremental.snapshot.chunk.size

    每個chunk的大小(包含的行數)。

    INTEGER

    8096

    當開啟增量快照讀取時,表會被切分成多個chunk讀取。在讀完chunk的資料之前,chunk的資料會先緩衝在記憶體中。

    每個chunk包含的行數越少,則表中的chunk的總數量越大,儘管這會降低故障恢複的粒度,但可能導致記憶體OOM和整體的輸送量降低。因此,您需要進行權衡,並設定合理的chunk大小。

    scan.snapshot.fetch.size

    當讀取表的全量資料時,每次最多拉取的記錄數。

    INTEGER

    1024

    無。

    scan.startup.mode

    消費資料時的啟動模式。

    STRING

    initial

    參數取值如下:

    • initial(預設):在第一次啟動時,會先掃描歷史全量資料,然後讀取最新的Binlog資料。

    • latest-offset:在第一次啟動時,不會掃描歷史全量資料,直接從Binlog的末尾(最新的Binlog處)開始讀取,即唯讀取該連接器啟動以後的最新變更。

    • earliest-offset:不掃描歷史全量資料,直接從可讀取的最早Binlog開始讀取。

    • specific-offset:不掃描歷史全量資料,從您指定的Binlog位點啟動,位點可通過同時配置scan.startup.specific-offset.filescan.startup.specific-offset.pos參數來指定從特定Binlog檔案名稱和位移量啟動,也可以只配置scan.startup.specific-offset.gtid-set來指定從某個GTID集合啟動。

    • timestamp:不掃描歷史全量資料,從指定的時間戳記開始讀取Binlog。時間戳記通過scan.startup.timestamp-millis指定,單位為毫秒。

    重要

    使用earliest-offsetspecific-offsettimestamp啟動模式時,確保在指定的Binlog消費位置到作業啟動的時間之間,對應表的結構不發生變化,避免因表結構不同而報錯。

    scan.startup.specific-offset.file

    使用指錨點模式啟動時,啟動位點的Binlog檔案名稱。

    STRING

    使用該配置時,scan.startup.mode必須配置為specific-offset。檔案名稱格式例如mysql-bin.000003

    scan.startup.specific-offset.pos

    使用指錨點模式啟動時,啟動位點在指定Binlog檔案中的位移量。

    INTEGER

    使用該配置時,scan.startup.mode必須配置為specific-offset

    scan.startup.specific-offset.gtid-set

    使用指錨點模式啟動時,啟動位點的GTID集合。

    STRING

    使用該配置時,scan.startup.mode必須配置為specific-offset。GTID集合格式例如24DA167-0C0C-11E8-8442-00059A3C7B00:1-19

    scan.startup.timestamp-millis

    使用指定時間模式啟動時,啟動位點的毫秒時間戳記。

    LONG

    使用該配置時,scan.startup.mode必須配置為timestamp。時間戳記單位為毫秒。

    重要

    在使用指定時間時,OceanBase CDC會嘗試讀取每個Binlog檔案的初始事件以確定其時間戳記,最終定位至指定時間對應的Binlog檔案。請保證指定的時間戳記對應的Binlog檔案在資料庫上沒有被清理且可以被讀取到。

    server-time-zone

    資料庫在使用的會話時區。

    STRING

    如果您沒有指定該參數,則系統預設使用Flink作業運行時的環境時區作為資料庫伺服器時區,即您選擇的可用性區域所在的時區。

    例如Asia/Shanghai,該參數控制了TIMESTAMP類型如何轉成STRING類型。更多資訊請參見Debezium時間類型

    debezium.min.row.count.to.stream.results

    當表的條數大於該值時,會使用分批讀模數式。

    INTEGER

    1000

    Flink採用以下方式讀取OceanBase源表資料:

    • 全量讀取:直接將整個表的資料讀取到記憶體裡。優點是速度快,缺點是會消耗對應大小的記憶體,如果源表資料量非常大,可能會有OOM風險。

    • 分批讀取:分多次讀取,每次讀取一定數量的行數,直到讀取完所有資料。優點是讀取資料量比較大的表沒有OOM風險,缺點是讀取速度相對較慢。

    connect.timeout

    串連OceanBase資料庫伺服器逾時時,重試串連之前等待逾時的最長時間。

    DURATION

    30s

    無。

    connect.max-retries

    串連OceanBase資料庫服務時,串連失敗後重試的最大次數。

    INTEGER

    3

    無。

    connection.pool.size

    資料庫連接池大小。

    INTEGER

    20

    資料庫連接池用於複用串連,可以降低資料庫連接數量。

    jdbc.properties.*

    JDBC URL中的自訂串連參數。

    STRING

    您可以傳遞自訂的串連參數,例如不使用SSL協議,則可配置為'jdbc.properties.useSSL' = 'false'

    支援的串連參數請參見MySQL Configuration Properties

    debezium.*

    Debezium讀取Binlog的自訂參數。

    STRING

    您可以傳遞自訂的Debezium參數,例如使用'debezium.event.deserialization.failure.handling.mode'='ignore'來指定解析錯誤時的處理邏輯。

    heartbeat.interval

    Source通過心跳事件推動Binlog位點前進的時間間隔。

    DURATION

    30s

    心跳事件用於推動Source中的Binlog位點前進,這對OceanBase中更新緩慢的表非常有用。對於更新緩慢的表,Binlog位點無法自動前進,通過夠心跳事件可以推到Binlog位點前進,可以避免Binlog位點不前進引起Binlog位點到期問題,Binlog位點到期會導致作業失敗無法恢複,只能無狀態重啟。

    scan.incremental.snapshot.chunk.key-column

    可以指定某一列作為快照階段切分分區的切分列。

    見備忘列。

    STRING

    • 無主鍵表必填,選擇的列必須是非空類型(NOT NULL)。

    • 有主鍵的表為選填,僅支援從主鍵中選擇一列。

    scan.incremental.close-idle-reader.enabled

    是否在快照結束後關閉閒置 Reader。

    BOOLEAN

    false

    • 僅Flink計算引擎VVR 8.0.1及以上版本支援。

    • 該配置生效需要設定execution.checkpointing.checkpoints-after-tasks-finish.enabled為true。

    scan.read-changelog-as-append-only.enabled

    是否將changelog資料流轉換為append-only資料流。

    BOOLEAN

    false

    參數取值如下:

    • true:所有類型的訊息(包括INSERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER)都會轉換成INSERT類型的訊息。僅在需要儲存上遊表刪除訊息等特殊情境下開啟使用。

    • false(預設):所有類型的訊息都保持原樣下發。

    說明

    僅Flink計算引擎VVR 8.0.8及以上版本支援。

    scan.only.deserialize.captured.tables.changelog.enabled

    在增量階段,是否僅對指定表的變更事件進行還原序列化。

    BOOLEAN

    • VVR 8.x版本中預設值為false。

    • VVR 11.1及以上版本預設值為true。

    參數取值如下:

    • true:僅對目標表的變更資料進行還原序列化,加快Binlog讀取速度。

    • false(預設):對所有表的變更資料進行還原序列化。

    說明
    • 僅Flink計算引擎VVR 8.0.7及以上版本支援。

    • 在Flink計算引擎VVR 8.0.8及以下版本使用時,參數名需要修改為debezium.scan.only.deserialize.captured.tables.changelog.enable

    scan.parse.online.schema.changes.enabled

    在增量階段,是否嘗試解析 RDS 無鎖變更 DDL 事件。

    BOOLEAN

    false

    參數取值如下:

    • true:解析 RDS 無鎖變更 DDL 事件。

    • false(預設):不解析 RDS 無鎖變更 DDL 事件。

    實驗性功能。建議在執行線上無鎖變更前,先對Flink作業執行一次快照以便恢複。

    說明

    僅Flink計算引擎VVR 11.1及以上版本支援。

    scan.incremental.snapshot.backfill.skip

    是否在快照讀取階段跳過backfill。

    BOOLEAN

    false

    參數取值如下:

    • true:快照讀取階段跳過backfill。

    • false(預設):快照讀取階段不跳過backfill。

    如果跳過backfill,快照階段表的更改將在稍後的增量階段讀取,而不是合并到快照中。

    重要

    跳過backfill可能導致資料不一致,因為快照階段發生的變更可能會被重放,僅保證at-least-once語義。

    說明

    僅Flink計算引擎VVR 11.1及以上版本支援。

    scan.incremental.snapshot.unbounded-chunk-first.enabled

    快照讀取階段是否先分發無界的分區。

    BOOELEAN

    false

    參數取值如下:

    • true:快照讀取階段優先分發無界的分區。

    • false(預設):快照讀取階段不優先分發無界的分區。

    實驗性功能。開啟後能夠降低TaskManager在快照階段同步最後一個分區時遇到記憶體溢出 (OOM) 的風險,建議在作業第一次啟動前添加。

    說明

    僅Flink計算引擎VVR 11.1及以上版本支援。

  • 維表專屬

    參數

    說明

    是否必填

    資料類型

    預設值

    備忘

    url

    JDBC url。

    STRING

    • url中需要包含MySQL database名或Oracle service名。

    userName

    使用者名稱。

    STRING

    無。

    cache

    緩衝策略。

    STRING

    ALL

    支援以下三種緩衝策略:

    • ALL:緩衝維表裡的所有資料。在Job運行前,系統會將維表中所有資料載入到Cache中,之後所有的維表尋找資料都會通過Cache進行。如果在Cache中無法找到資料,則KEY不存在,並在Cache到期後重新載入一遍全量Cache。

      適用於遠端資料表資料量小且MISS KEY(源表資料和維表JOIN時,ON條件無法關聯)特別多的情境。

    • LRU:緩衝維表裡的部分資料。源表的每條資料都會觸發系統先在Cache中尋找資料。如果沒有找到,則去物理維表中尋找。使用該緩衝策略時,必須配置cacheSize參數。

    • None:無緩衝。

    重要
    • 使用ALL緩衝策略時,請注意節點記憶體大小,防止出現OOM。

    • 因為系統會非同步載入維表資料,所以在使用ALL緩衝策略時,需要增加維表JOIN節點的記憶體,增加的記憶體大小為遠端資料表資料量的兩倍。

    cacheSize

    最大緩衝條數。

    INTEGER

    100000

    • 當選擇LRU緩衝策略後,必須設定緩衝大小。

    • 當選擇ALL緩衝策略後,可以不設定緩衝大小。

    cacheTTLMs

    緩衝逾時時間。

    LONG

    Long.MAX_VALUE

    cacheTTLMs的配置和cache有關,詳情如下:

    • 如果cache配置為None,則cacheTTLMs可以不配置,表示緩衝不逾時。

    • 如果cache配置為LRU,則cacheTTLMs為緩衝逾時時間。預設不到期。

    • 如果cache配置為ALL,則cacheTTLMs為緩衝載入時間。預設不重新載入。

    maxRetryTimeout

    最大重試時間。

    DURATION

    60s

    無。

  • 結果表-JDBC方式專屬

    參數

    說明

    是否必填

    資料類型

    預設值

    備忘

    userName

    使用者名稱。

    STRING

    無。

    compatibleMode

    OceanBase的相容模式。

    STRING

    mysql

    參數取值如下:

    • mysql

    • oracle

    說明

    oceanbase專屬參數。

    url

    JDBC url。

    STRING

    • url中需要包含MySQL database名或Oracle service名。

    tableName

    表名。

    STRING

    無。

    sink.mode

    OceanBase結果表寫入方式。

    STRING

    jdbc

    支援jdbcdirect-load兩種。

    maxRetryTimes

    最大重試次數。

    INTEGER

    3

    無。

    poolInitialSize

    資料庫連接池初始大小。

    INTEGER

    1

    無。

    poolMaxActive

    資料庫連接池最大串連數。

    INTEGER

    8

    無。

    poolMaxWait

    從資料庫連接池中擷取串連的最大等待時間。

    INTEGER

    2000

    單位毫秒。

    poolMinIdle

    資料庫連接池中最小空閑串連數。

    INTEGER

    1

    無。

    connectionProperties

    jdbc的串連屬性。

    STRING

    格式為"k1=v1;k2=v2;k3=v3"。

    ignoreDelete

    是否忽略資料Delete操作。

    Boolean

    false

    無。

    excludeUpdateColumns

    指定要排除的列名。在執行更新操作時,這些列將不會被更新。

    STRING

    如果忽略指定的欄位為多個時,則需要使用英文逗號(,)分隔。例如excludeUpdateColumns=column1,column2

    說明

    該值始終會包含主鍵列,也就是實際生效的列名為您指定的列名和主鍵列。

    partitionKey

    分區鍵。

    STRING

    當設定分區鍵時,連接器會先將資料按照分區鍵進行分組,各個分組將分別寫入資料庫。這裡的分組處理早於modRule的處理。

    modRule

    分組規則。

    STRING

    分組規則格式需要為"列名mod數字",如user_id mod 8,列類型必須為數字類型。

    當設定分組規則時,資料先按partitionKey分區;在每個分區內,再根據 modRule 計算結果分組;

    bufferSize

    資料緩衝區大小。

    INTEGER

    1000

    無。

    flushIntervalMs

    清空緩衝的時間間隔。表示如果緩衝中的資料在等待指定時間後,依然沒有達到輸出條件,系統會自動輸出緩衝中的所有資料。

    LONG

    1000

    無。

    retryIntervalMs

    最大重試時間。

    INTEGER

    5000

    單位毫秒。

  • 結果表-旁路匯入方式專屬

重要
  • 旁路匯入結果表從VVR 11.5版本開始支援。關於旁路匯入見文檔

  • 僅支援有界流(Bounded Stream):資料來源必須是有界的,不支援無界流。推薦使用 Flink Batch 模式以獲得更好的效能。

  • 高輸送量寫入:適合大批量資料匯入情境。

  • 匯入期間鎖表:旁路匯入執行期間會對目標表加鎖,鎖表期間不能寫入變更資料,不能進行DDL變更,但可以進行資料查詢。

  • 不適合即時寫入:如果您有即時/流式寫入的情境,請使用前面的JDBC方式結果表。

參數

說明

是否必填

資料類型

預設值

備忘

sink.mode

OceanBase結果表寫入方式。

STRING

jdbc

支援 jdbc 和 direct-load 模式。若通過旁路匯入寫入 OceanBase 結果表,必須配置固定值 direct-load。

host

OceanBase資料庫的IP地址或者Hostname。

STRING

無。

port

OceanBase資料庫的RPC連接埠。

INTEGER

2882

無。

username

使用者名稱。

STRING

無。

tenant-name

OceanBase資料庫的租戶名稱。

STRING

schema-name

  • 對於MySQL租戶,填寫資料庫名稱。

  • 串連到Oracle租戶時,填寫Owner名稱。

STRING

無。

table-name

OceanBase表名。

STRING

無。

parallel

旁路匯入任務服務端的並發度。

INTEGER

8

  • 此參數定義服務端處理匯入任務的 CPU 資源。該設定獨立於用戶端並發。服務端依據租戶 CPU 規格限制最大並行度,且不會返回錯誤。租戶CPU規格和表分區分布決定實際並行度。

  • 例如,當租戶配置為 2 核且並行度設為 10 時,系統取 4 (MIN(2核 * 2,10))作為實際並行度。

  • 若表分區分布在 2 個節點,則總實際並行度為 MIN(2核*2, 10) * 2 = 8 並行度。

buffer-size

旁路匯入任務寫入 OceanBase 的緩衝區大小。

INTEGER

1024

Flink 側每緩衝 buffer-size 條資料寫入一次OceanBase。

dup-action

旁路匯入任務中主鍵重複時的處理策略。可以是 STOP_ON_DUP(本次匯入失敗),REPLACE(替換)或 IGNORE(忽略)。

STRING

REPLACE

  • STOP_ON_DUP:匯入失敗

  • REPLACE:匯入的行替換已有的行

  • IGNORE:丟棄匯入的行,保留已有的行

load-method

旁路匯入模式。

full

  • full:全量旁路匯入,預設值。

  • inc:普通增量旁路匯入,會進行主鍵衝突檢查,observer-4.3.2及以上支援,暫時不支援direct-load.dup-action為REPLACE。

  • inc_replace: 特殊replace模式的增量旁路匯入,不會進行主鍵衝突檢查,直接覆蓋舊資料(相當於replace的效果),direct-load.dup-action參數會被忽略,observer-4.3.2及以上版本支援。

max-error-rows

旁路匯入任務最大可容忍的錯誤行數目。

LONG

0

以下情形會被認定為錯誤行:

  • dupAction=STOP_ON_DUP時主鍵重複的行。

  • 列數目不匹配的行,多列或少列。

  • 類型轉換失敗的行。

timeout

旁路匯入任務整體逾時時間。

DURATION

7d

heartbeat-timeout

旁路匯入任務用戶端的心跳逾時時間。

DURATION

60s

heartbeat-interval

旁路匯入任務用戶端的心跳間隔時間。

Duration

10s

類型映射

  • MySQL相容模式

    OceanBase欄位類型

    Flink欄位類型

    TINYINT

    TINYINT

    SMALLINT

    SMALLINT

    TINYINT UNSIGNED

    INT

    INT

    MEDIUMINT

    SMALLINT UNSIGNED

    BIGINT

    BIGINT

    INT UNSIGNED

    BIGINT UNSIGNED

    DECIMAL(20, 0)

    REAL

    FLOAT

    FLOAT

    DOUBLE

    DOUBLE

    NUMERIC(p, s)

    DECIMAL(p, s)

    說明

    其中p <= 38。

    DECIMAL(p, s)

    BOOLEAN

    BOOLEAN

    TINYINT(1)

    DATE

    DATE

    TIME [(p)]

    TIME [(p)] [WITHOUT TIME ZONE]

    DATETIME [(p)]

    TIMESTAMP [(p)] [WITHOUT TIME ZONE]

    TIMESTAMP [(p)]

    CHAR(n)

    CHAR(n)

    VARCHAR(n)

    VARCHAR(n)

    BIT(n)

    BINARY(⌈n/8⌉)

    BINARY(n)

    BINARY(n)

    VARBINARY(N)

    VARBINARY(N)

    TINYTEXT

    STRING

    TEXT

    MEDIUMTEXT

    LONGTEXT

    TINYBLOB

    BYTES

    重要

    Flink僅支援小於等於2,147,483,647(2^31 - 1)的BLOB類型的記錄。

    BLOB

    MEDIUMBLOB

    LONGBLOB

  • Oracle相容模式

    OceanBase欄位類型

    Flink欄位類型

    NUMBER(p, s <= 0), p - s < 3

    TINYINT

    NUMBER(p, s <= 0), p - s < 5

    SMALLINT

    NUMBER(p, s <= 0), p - s < 10

    INT

    NUMBER(p, s <= 0), p - s < 19

    BIGINT

    NUMBER(p, s <= 0), 19 <= p - s <= 38

    DECIMAL(p - s, 0)

    NUMBER(p, s > 0)

    DECIMAL(p, s)

    NUMBER(p, s <= 0), p - s > 38

    STRING

    FLOAT

    FLOAT

    BINARY_FLOAT

    BINARY_DOUBLE

    DOUBLE

    NUMBER(1)

    BOOLEAN

    DATE

    TIMESTAMP [(p)] [WITHOUT TIMEZONE]

    TIMESTAMP [(p)]

    CHAR(n)

    STRING

    NCHAR(n)

    NVARCHAR2(n)

    VARCHAR(n)

    VARCHAR2(n)

    CLOB

    BLOB

    BYTES

    ROWID

使用樣本

  • 源表&結果表

    -- oceanbase cdc 源表
    CREATE TEMPORARY TABLE oceanbase_source (
      a INT,
      b VARCHAR,
      c VARCHAR
    ) WITH (
      'connector' = 'oceanbase',
      'hostname' = '<yourHostname>',
      'port' = '3306',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>',
      'database-name' = '<yourDatabaseName>',
      'table-name' = '<yourTableName>'
    );
    
    -- oceanbase jdbc結果表
    CREATE TEMPORARY TABLE oceanbase_sink (
      a INT,
      b VARCHAR,
      c VARCHAR
    ) WITH (
      'connector' = 'oceanbase',
      'url' = '<yourJdbcUrl>',
      'userName' = '<yourUserName>',
      'password' = '<yourPassword>',
      'tableName' = '<yourTableName>'
    );
    
    -- oceanbase 旁路匯入結果表
    CREATE TEMPORARY TABLE oceanbase_dircetload_sink (
      a INT,
      b VARCHAR,
      c VARCHAR
    ) WITH (
      'connector' = 'oceanbase',
      'sink.mode' = 'direct-load',
      'host' = '<yourHost>',
      'port' = 'yourPort',
      'tenant-name' = '<yourTenantName>',
      'schema-name' = '<yourSchemaName>',
      'table-name' = '<yourTableName>',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>'
    );
    
    
    BEGIN STATEMENT SET;  
    INSERT INTO oceanbase_sink
    SELECT * FROM oceanbase_source;
    END; 

  • 維表

    CREATE TEMPORARY TABLE datagen_source(
      a INT,
      b BIGINT,
      c STRING,
      `proctime` AS PROCTIME()
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE oceanbase_dim (
      a INT,
      b VARCHAR,
      c VARCHAR
    ) WITH (
      'connector' = 'oceanbase',
      'url' = '<yourJdbcUrl>',
      'userName' = '<yourUserName>',
      'password' = '${secret_values.password}',
      'tableName' = '<yourTableName>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
      a INT,
      b STRING
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink
    SELECT T.a, H.b
    FROM datagen_source AS T 
    JOIN oceanbase_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H 
    ON T.a = H.a;

相關文檔

Flink支援的連接器,請參見支援的連接器