本文為您介紹如何使用OceanBase連接器。
背景資訊
OceanBase資料庫是一款原生分布式的HTAP資料庫管理系統,詳情請參見OceanBase官網。為了降低您從MySQL資料庫或Oracle資料庫遷移到OceanBase資料庫時引發的業務系統改造成本,OceanBase資料庫支援Oracle和MySQL兩種相容模式,兩種模式下的資料類型、SQL功能、內部視圖等與MySQL資料庫或Oracle資料庫保持一致。兩種模式下建議使用的連接器如下:
Oracle模式:只能使用OceanBase連接器。
MySQL模式:與原生MySQL文法保持高度相容,支援使用OceanBase和MySQL兩種連接器讀寫OceanBase。
重要OceanBase連接器目前處於公測階段。在OceanBase 3.2.4.4及以上版本,您可以使用MySQL連接器讀寫OceanBase,該功能也屬於公測範圍,請在使用前充分評估並謹慎使用。
在使用MySQL連接器讀取OceanBase增量資料時,請確保OceanBase Binlog已開啟且被正確設定。有關OceanBase Binlog的更多資訊,請參見概述或Binlog 相關操作。
OceanBase連接器支援的資訊如下。
類別 | 詳情 |
支援類型 | 源表、維表和結果表 |
運行模式 | 流模式和批模式 |
資料格式 | 暫不適用 |
特有監控指標 | 暫無 |
API種類 | SQL |
是否支援更新或刪除結果表資料 | 是 |
前提條件
串連的資料庫和表都已被建立。
已設定IP白名單,詳情請參見設定白名單分組。
如需採集OceanBase增量(CDC)資料,還需開啟 OceanBase Binlog服務,參考Binlog 相關操作。
如需使用旁路匯入結果表,需要先開啟旁路匯入連接埠。參考旁路匯入文檔。
使用限制
Flink計算引擎VVR 8.0.1及以上版本支援OceanBase連接器。
語義保證
CDC 源表在語義上支援 Exactly Once。在讀取全量歷史資料並切換至 Binlog 讀取時,能夠確保資料不重不漏,即使發生故障,也能通過精準的語義保證資料處理的正確性。
結果表語義上可以保證At-Least-Once,在結果表有主鍵的情況下,等冪可以保證資料的正確性。
文法結構
CREATE TABLE oceanabse_source (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
'connector' = 'oceanbase',
'url' = '<yourJdbcUrl>',
'tableName' = '<yourTableName>',
'userName' = '<yourUserName>',
'password' = '<yourPassword>'
);連接器寫入結果表原理:寫入結果表時,會將接收到的每條資料拼接成一條SQL去執行。具體執行的SQL情況如下:
對於沒有主鍵的結果表,會拼接成INSERT INTO語句。
對於包含主鍵的結果表,會根據資料庫的相容模式拼接成UPSERT語句。
WITH參數
通用
參數
說明
是否必填
資料類型
預設值
備忘
connector
表類型。
是
STRING
無
固定值為
oceanbase。password
密碼。
是
STRING
無
無。
源表專屬
重要注意事項:自Flink計算引擎VVR 11.4.0版本起,OceanBase CDC連接器進行了架構升級與功能調整。為確保使用者準確理解變更內容並順利完成版本遷移,現將核心變更說明如下:
原基於OceanBase LogProxy服務實現的 CDC 連接器已正式下線並從發行版本中移除。自 VVR-11.4.0 版本起,OceanBase CDC連接器僅支援通過OceanBase Binlog服務進行增量日誌的捕獲與資料同步。
OceanBase CDC連接器增強了對 OceanBase Binlog 服務的協議相容性、串連穩定性,建議使用者優先使用 OceanBase CDC 連接器。
OceanBase Binlog服務在協議層完全相容 MySQL 複製協議,也可使用標準MySQL CDC連接器串連至OceanBase Binlog服務以實現資料訂閱,但不作推薦。
自Flink計算引擎VVR 11.4.0版本起,OceanBase CDC 連接器不再支援在 Oracle 相容模式下進行增量資料訂閱。Oracle 相容模式下的增量資料訂閱請聯絡 OceanBase 企業支援人員。
參數
說明
是否必填
資料類型
預設值
備忘
hostname
OceanBase資料庫的IP地址或者Hostname。
是
STRING
否
建議填寫Virtual Private Cloud地址。
說明如果OceanBase與Realtime ComputeFlink版不在同一VPC,需要先打通跨VPC的網路或者使用公網的形式訪問,詳情請參見空間管理與操作和Flink全託管叢集如何訪問公網?。
username
OceanBase資料庫服務的使用者名稱。
是
STRING
否
無。
database-name
OceanBase資料庫名稱。
是
STRING
無
作為源表時,資料庫名稱支援Regex以讀取多個資料庫的資料。
使用Regex時,盡量不要使用^和$符號匹配開頭和結尾。具體原因詳見table-name備忘的說明。
table-name
OceanBase表名。
是
STIRNG
無
作為源表時,表名支援Regex以讀取多個表的資料。
使用Regex時,盡量不要使用^和$符號匹配開頭和結尾。具體原因詳見以下說明。
說明OceanBase 源表在正則匹配表名時,會將您填寫的 database-name,table-name 通過字串 \\.(VVR 8.0.1前使用字元.)串連成為一個全路徑的Regex,然後使用該Regex和OceanBase資料庫中表的全限定名進行正則匹配。
例如:當配置'database-name'='db_.*'且'table-name'='tb_.+'時,連接器將會使用Regexdb_.*\\.tb_.+(8.0.1版本前為db_.*.tb_.+)去匹配表的全限定名來確定需要讀取的表。
port
OceanBase資料庫服務的連接埠號碼。
否
INTEGER
3306
無。
server-id
資料庫用戶端的一個數字ID。
否
STRING
預設會隨機產生一個5400~6400的值。
該ID必須是全域唯一的。建議針對同一個資料庫的每個作業都設定一個不同的ID。
該參數也支援ID範圍的格式,例如5400-5408。在開啟增量讀模數式時支援多並發讀取,此時推薦設定為ID範圍,使得每個並發使用不同的ID。詳情請參見Server ID使用。
scan.incremental.snapshot.chunk.size
每個chunk的大小(包含的行數)。
否
INTEGER
8096
當開啟增量快照讀取時,表會被切分成多個chunk讀取。在讀完chunk的資料之前,chunk的資料會先緩衝在記憶體中。
每個chunk包含的行數越少,則表中的chunk的總數量越大,儘管這會降低故障恢複的粒度,但可能導致記憶體OOM和整體的輸送量降低。因此,您需要進行權衡,並設定合理的chunk大小。
scan.snapshot.fetch.size
當讀取表的全量資料時,每次最多拉取的記錄數。
否
INTEGER
1024
無。
scan.startup.mode
消費資料時的啟動模式。
否
STRING
initial
參數取值如下:
initial(預設):在第一次啟動時,會先掃描歷史全量資料,然後讀取最新的Binlog資料。
latest-offset:在第一次啟動時,不會掃描歷史全量資料,直接從Binlog的末尾(最新的Binlog處)開始讀取,即唯讀取該連接器啟動以後的最新變更。
earliest-offset:不掃描歷史全量資料,直接從可讀取的最早Binlog開始讀取。
specific-offset:不掃描歷史全量資料,從您指定的Binlog位點啟動,位點可通過同時配置scan.startup.specific-offset.file和scan.startup.specific-offset.pos參數來指定從特定Binlog檔案名稱和位移量啟動,也可以只配置scan.startup.specific-offset.gtid-set來指定從某個GTID集合啟動。
timestamp:不掃描歷史全量資料,從指定的時間戳記開始讀取Binlog。時間戳記通過scan.startup.timestamp-millis指定,單位為毫秒。
重要使用earliest-offset,specific-offset和timestamp啟動模式時,確保在指定的Binlog消費位置到作業啟動的時間之間,對應表的結構不發生變化,避免因表結構不同而報錯。
scan.startup.specific-offset.file
使用指錨點模式啟動時,啟動位點的Binlog檔案名稱。
否
STRING
無
使用該配置時,scan.startup.mode必須配置為specific-offset。檔案名稱格式例如
mysql-bin.000003。scan.startup.specific-offset.pos
使用指錨點模式啟動時,啟動位點在指定Binlog檔案中的位移量。
否
INTEGER
無
使用該配置時,scan.startup.mode必須配置為specific-offset。
scan.startup.specific-offset.gtid-set
使用指錨點模式啟動時,啟動位點的GTID集合。
否
STRING
無
使用該配置時,scan.startup.mode必須配置為specific-offset。GTID集合格式例如
24DA167-0C0C-11E8-8442-00059A3C7B00:1-19。scan.startup.timestamp-millis
使用指定時間模式啟動時,啟動位點的毫秒時間戳記。
否
LONG
無
使用該配置時,scan.startup.mode必須配置為timestamp。時間戳記單位為毫秒。
重要在使用指定時間時,OceanBase CDC會嘗試讀取每個Binlog檔案的初始事件以確定其時間戳記,最終定位至指定時間對應的Binlog檔案。請保證指定的時間戳記對應的Binlog檔案在資料庫上沒有被清理且可以被讀取到。
server-time-zone
資料庫在使用的會話時區。
否
STRING
如果您沒有指定該參數,則系統預設使用Flink作業運行時的環境時區作為資料庫伺服器時區,即您選擇的可用性區域所在的時區。
例如Asia/Shanghai,該參數控制了TIMESTAMP類型如何轉成STRING類型。更多資訊請參見Debezium時間類型。
debezium.min.row.count.to.stream.results
當表的條數大於該值時,會使用分批讀模數式。
否
INTEGER
1000
Flink採用以下方式讀取OceanBase源表資料:
全量讀取:直接將整個表的資料讀取到記憶體裡。優點是速度快,缺點是會消耗對應大小的記憶體,如果源表資料量非常大,可能會有OOM風險。
分批讀取:分多次讀取,每次讀取一定數量的行數,直到讀取完所有資料。優點是讀取資料量比較大的表沒有OOM風險,缺點是讀取速度相對較慢。
connect.timeout
串連OceanBase資料庫伺服器逾時時,重試串連之前等待逾時的最長時間。
否
DURATION
30s
無。
connect.max-retries
串連OceanBase資料庫服務時,串連失敗後重試的最大次數。
否
INTEGER
3
無。
connection.pool.size
資料庫連接池大小。
否
INTEGER
20
資料庫連接池用於複用串連,可以降低資料庫連接數量。
jdbc.properties.*
JDBC URL中的自訂串連參數。
否
STRING
無
您可以傳遞自訂的串連參數,例如不使用SSL協議,則可配置為'jdbc.properties.useSSL' = 'false'。
支援的串連參數請參見MySQL Configuration Properties。
debezium.*
Debezium讀取Binlog的自訂參數。
否
STRING
無
您可以傳遞自訂的Debezium參數,例如使用'debezium.event.deserialization.failure.handling.mode'='ignore'來指定解析錯誤時的處理邏輯。
heartbeat.interval
Source通過心跳事件推動Binlog位點前進的時間間隔。
否
DURATION
30s
心跳事件用於推動Source中的Binlog位點前進,這對OceanBase中更新緩慢的表非常有用。對於更新緩慢的表,Binlog位點無法自動前進,通過夠心跳事件可以推到Binlog位點前進,可以避免Binlog位點不前進引起Binlog位點到期問題,Binlog位點到期會導致作業失敗無法恢複,只能無狀態重啟。
scan.incremental.snapshot.chunk.key-column
可以指定某一列作為快照階段切分分區的切分列。
見備忘列。
STRING
無
無主鍵表必填,選擇的列必須是非空類型(NOT NULL)。
有主鍵的表為選填,僅支援從主鍵中選擇一列。
scan.incremental.close-idle-reader.enabled
是否在快照結束後關閉閒置 Reader。
否
BOOLEAN
false
僅Flink計算引擎VVR 8.0.1及以上版本支援。
該配置生效需要設定execution.checkpointing.checkpoints-after-tasks-finish.enabled為true。
scan.read-changelog-as-append-only.enabled
是否將changelog資料流轉換為append-only資料流。
否
BOOLEAN
false
參數取值如下:
true:所有類型的訊息(包括INSERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER)都會轉換成INSERT類型的訊息。僅在需要儲存上遊表刪除訊息等特殊情境下開啟使用。
false(預設):所有類型的訊息都保持原樣下發。
說明僅Flink計算引擎VVR 8.0.8及以上版本支援。
scan.only.deserialize.captured.tables.changelog.enabled
在增量階段,是否僅對指定表的變更事件進行還原序列化。
否
BOOLEAN
VVR 8.x版本中預設值為false。
VVR 11.1及以上版本預設值為true。
參數取值如下:
true:僅對目標表的變更資料進行還原序列化,加快Binlog讀取速度。
false(預設):對所有表的變更資料進行還原序列化。
說明僅Flink計算引擎VVR 8.0.7及以上版本支援。
在Flink計算引擎VVR 8.0.8及以下版本使用時,參數名需要修改為debezium.scan.only.deserialize.captured.tables.changelog.enable。
scan.parse.online.schema.changes.enabled
在增量階段,是否嘗試解析 RDS 無鎖變更 DDL 事件。
否
BOOLEAN
false
參數取值如下:
true:解析 RDS 無鎖變更 DDL 事件。
false(預設):不解析 RDS 無鎖變更 DDL 事件。
實驗性功能。建議在執行線上無鎖變更前,先對Flink作業執行一次快照以便恢複。
說明僅Flink計算引擎VVR 11.1及以上版本支援。
scan.incremental.snapshot.backfill.skip
是否在快照讀取階段跳過backfill。
否
BOOLEAN
false
參數取值如下:
true:快照讀取階段跳過backfill。
false(預設):快照讀取階段不跳過backfill。
如果跳過backfill,快照階段表的更改將在稍後的增量階段讀取,而不是合并到快照中。
重要跳過backfill可能導致資料不一致,因為快照階段發生的變更可能會被重放,僅保證at-least-once語義。
說明僅Flink計算引擎VVR 11.1及以上版本支援。
scan.incremental.snapshot.unbounded-chunk-first.enabled
快照讀取階段是否先分發無界的分區。
否
BOOELEAN
false
參數取值如下:
true:快照讀取階段優先分發無界的分區。
false(預設):快照讀取階段不優先分發無界的分區。
實驗性功能。開啟後能夠降低TaskManager在快照階段同步最後一個分區時遇到記憶體溢出 (OOM) 的風險,建議在作業第一次啟動前添加。
說明僅Flink計算引擎VVR 11.1及以上版本支援。
維表專屬
參數
說明
是否必填
資料類型
預設值
備忘
url
JDBC url。
是
STRING
無
url中需要包含MySQL database名或Oracle service名。
userName
使用者名稱。
是
STRING
無
無。
cache
緩衝策略。
否
STRING
ALL
支援以下三種緩衝策略:
ALL:緩衝維表裡的所有資料。在Job運行前,系統會將維表中所有資料載入到Cache中,之後所有的維表尋找資料都會通過Cache進行。如果在Cache中無法找到資料,則KEY不存在,並在Cache到期後重新載入一遍全量Cache。
適用於遠端資料表資料量小且MISS KEY(源表資料和維表JOIN時,ON條件無法關聯)特別多的情境。
LRU:緩衝維表裡的部分資料。源表的每條資料都會觸發系統先在Cache中尋找資料。如果沒有找到,則去物理維表中尋找。使用該緩衝策略時,必須配置cacheSize參數。
None:無緩衝。
重要使用ALL緩衝策略時,請注意節點記憶體大小,防止出現OOM。
因為系統會非同步載入維表資料,所以在使用ALL緩衝策略時,需要增加維表JOIN節點的記憶體,增加的記憶體大小為遠端資料表資料量的兩倍。
cacheSize
最大緩衝條數。
否
INTEGER
100000
當選擇LRU緩衝策略後,必須設定緩衝大小。
當選擇ALL緩衝策略後,可以不設定緩衝大小。
cacheTTLMs
緩衝逾時時間。
否
LONG
Long.MAX_VALUE
cacheTTLMs的配置和cache有關,詳情如下:
如果cache配置為None,則cacheTTLMs可以不配置,表示緩衝不逾時。
如果cache配置為LRU,則cacheTTLMs為緩衝逾時時間。預設不到期。
如果cache配置為ALL,則cacheTTLMs為緩衝載入時間。預設不重新載入。
maxRetryTimeout
最大重試時間。
否
DURATION
60s
無。
結果表-JDBC方式專屬
參數
說明
是否必填
資料類型
預設值
備忘
userName
使用者名稱。
是
STRING
無
無。
compatibleMode
OceanBase的相容模式。
否
STRING
mysql
參數取值如下:
mysql
oracle
說明oceanbase專屬參數。
url
JDBC url。
是
STRING
無
url中需要包含MySQL database名或Oracle service名。
tableName
表名。
是
STRING
無
無。
sink.mode
OceanBase結果表寫入方式。
是
STRING
jdbc
支援
jdbc、direct-load兩種。maxRetryTimes
最大重試次數。
否
INTEGER
3
無。
poolInitialSize
資料庫連接池初始大小。
否
INTEGER
1
無。
poolMaxActive
資料庫連接池最大串連數。
否
INTEGER
8
無。
poolMaxWait
從資料庫連接池中擷取串連的最大等待時間。
否
INTEGER
2000
單位毫秒。
poolMinIdle
資料庫連接池中最小空閑串連數。
否
INTEGER
1
無。
connectionProperties
jdbc的串連屬性。
否
STRING
無
格式為"k1=v1;k2=v2;k3=v3"。
ignoreDelete
是否忽略資料Delete操作。
否
Boolean
false
無。
excludeUpdateColumns
指定要排除的列名。在執行更新操作時,這些列將不會被更新。
否
STRING
無
如果忽略指定的欄位為多個時,則需要使用英文逗號(,)分隔。例如
excludeUpdateColumns=column1,column2。說明該值始終會包含主鍵列,也就是實際生效的列名為您指定的列名和主鍵列。
partitionKey
分區鍵。
否
STRING
無
當設定分區鍵時,連接器會先將資料按照分區鍵進行分組,各個分組將分別寫入資料庫。這裡的分組處理早於modRule的處理。
modRule
分組規則。
否
STRING
無
分組規則格式需要為"列名mod數字",如
user_id mod 8,列類型必須為數字類型。當設定分組規則時,資料先按partitionKey分區;在每個分區內,再根據 modRule 計算結果分組;
bufferSize
資料緩衝區大小。
否
INTEGER
1000
無。
flushIntervalMs
清空緩衝的時間間隔。表示如果緩衝中的資料在等待指定時間後,依然沒有達到輸出條件,系統會自動輸出緩衝中的所有資料。
否
LONG
1000
無。
retryIntervalMs
最大重試時間。
否
INTEGER
5000
單位毫秒。
結果表-旁路匯入方式專屬
旁路匯入結果表從VVR 11.5版本開始支援。關於旁路匯入見文檔。
僅支援有界流(Bounded Stream):資料來源必須是有界的,不支援無界流。推薦使用 Flink Batch 模式以獲得更好的效能。
高輸送量寫入:適合大批量資料匯入情境。
匯入期間鎖表:旁路匯入執行期間會對目標表加鎖,鎖表期間不能寫入變更資料,不能進行DDL變更,但可以進行資料查詢。
不適合即時寫入:如果您有即時/流式寫入的情境,請使用前面的JDBC方式結果表。
參數 | 說明 | 是否必填 | 資料類型 | 預設值 | 備忘 |
sink.mode | OceanBase結果表寫入方式。 | 否 | STRING | jdbc | 支援 jdbc 和 direct-load 模式。若通過旁路匯入寫入 OceanBase 結果表,必須配置固定值 direct-load。 |
host | OceanBase資料庫的IP地址或者Hostname。 | 是 | STRING | 無 | 無。 |
port | OceanBase資料庫的RPC連接埠。 | 否 | INTEGER | 2882 | 無。 |
username | 使用者名稱。 | 是 | STRING | 無 | 無。 |
tenant-name | OceanBase資料庫的租戶名稱。 | 是 | STRING | 無 | |
schema-name |
| 是 | STRING | 無 | 無。 |
table-name | OceanBase表名。 | 是 | STRING | 無 | 無。 |
parallel | 旁路匯入任務服務端的並發度。 | 否 | INTEGER | 8 |
|
buffer-size | 旁路匯入任務寫入 OceanBase 的緩衝區大小。 | 否 | INTEGER | 1024 | Flink 側每緩衝 |
dup-action | 旁路匯入任務中主鍵重複時的處理策略。可以是 | 否 | STRING | REPLACE |
|
load-method | 旁路匯入模式。 | full |
| ||
max-error-rows | 旁路匯入任務最大可容忍的錯誤行數目。 | 否 | LONG | 0 | 以下情形會被認定為錯誤行:
|
timeout | 旁路匯入任務整體逾時時間。 | 否 | DURATION | 7d | |
heartbeat-timeout | 旁路匯入任務用戶端的心跳逾時時間。 | 否 | DURATION | 60s | |
heartbeat-interval | 旁路匯入任務用戶端的心跳間隔時間。 | 否 | Duration | 10s |
類型映射
MySQL相容模式
OceanBase欄位類型
Flink欄位類型
TINYINT
TINYINT
SMALLINT
SMALLINT
TINYINT UNSIGNED
INT
INT
MEDIUMINT
SMALLINT UNSIGNED
BIGINT
BIGINT
INT UNSIGNED
BIGINT UNSIGNED
DECIMAL(20, 0)
REAL
FLOAT
FLOAT
DOUBLE
DOUBLE
NUMERIC(p, s)
DECIMAL(p, s)
說明其中p <= 38。
DECIMAL(p, s)
BOOLEAN
BOOLEAN
TINYINT(1)
DATE
DATE
TIME [(p)]
TIME [(p)] [WITHOUT TIME ZONE]
DATETIME [(p)]
TIMESTAMP [(p)] [WITHOUT TIME ZONE]
TIMESTAMP [(p)]
CHAR(n)
CHAR(n)
VARCHAR(n)
VARCHAR(n)
BIT(n)
BINARY(⌈n/8⌉)
BINARY(n)
BINARY(n)
VARBINARY(N)
VARBINARY(N)
TINYTEXT
STRING
TEXT
MEDIUMTEXT
LONGTEXT
TINYBLOB
BYTES
重要Flink僅支援小於等於2,147,483,647(2^31 - 1)的BLOB類型的記錄。
BLOB
MEDIUMBLOB
LONGBLOB
Oracle相容模式
OceanBase欄位類型
Flink欄位類型
NUMBER(p, s <= 0), p - s < 3
TINYINT
NUMBER(p, s <= 0), p - s < 5
SMALLINT
NUMBER(p, s <= 0), p - s < 10
INT
NUMBER(p, s <= 0), p - s < 19
BIGINT
NUMBER(p, s <= 0), 19 <= p - s <= 38
DECIMAL(p - s, 0)
NUMBER(p, s > 0)
DECIMAL(p, s)
NUMBER(p, s <= 0), p - s > 38
STRING
FLOAT
FLOAT
BINARY_FLOAT
BINARY_DOUBLE
DOUBLE
NUMBER(1)
BOOLEAN
DATE
TIMESTAMP [(p)] [WITHOUT TIMEZONE]
TIMESTAMP [(p)]
CHAR(n)
STRING
NCHAR(n)
NVARCHAR2(n)
VARCHAR(n)
VARCHAR2(n)
CLOB
BLOB
BYTES
ROWID
使用樣本
源表&結果表
-- oceanbase cdc 源表 CREATE TEMPORARY TABLE oceanbase_source ( a INT, b VARCHAR, c VARCHAR ) WITH ( 'connector' = 'oceanbase', 'hostname' = '<yourHostname>', 'port' = '3306', 'username' = '<yourUsername>', 'password' = '<yourPassword>', 'database-name' = '<yourDatabaseName>', 'table-name' = '<yourTableName>' ); -- oceanbase jdbc結果表 CREATE TEMPORARY TABLE oceanbase_sink ( a INT, b VARCHAR, c VARCHAR ) WITH ( 'connector' = 'oceanbase', 'url' = '<yourJdbcUrl>', 'userName' = '<yourUserName>', 'password' = '<yourPassword>', 'tableName' = '<yourTableName>' ); -- oceanbase 旁路匯入結果表 CREATE TEMPORARY TABLE oceanbase_dircetload_sink ( a INT, b VARCHAR, c VARCHAR ) WITH ( 'connector' = 'oceanbase', 'sink.mode' = 'direct-load', 'host' = '<yourHost>', 'port' = 'yourPort', 'tenant-name' = '<yourTenantName>', 'schema-name' = '<yourSchemaName>', 'table-name' = '<yourTableName>', 'username' = '<yourUsername>', 'password' = '<yourPassword>' ); BEGIN STATEMENT SET; INSERT INTO oceanbase_sink SELECT * FROM oceanbase_source; END;維表
CREATE TEMPORARY TABLE datagen_source( a INT, b BIGINT, c STRING, `proctime` AS PROCTIME() ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE oceanbase_dim ( a INT, b VARCHAR, c VARCHAR ) WITH ( 'connector' = 'oceanbase', 'url' = '<yourJdbcUrl>', 'userName' = '<yourUserName>', 'password' = '${secret_values.password}', 'tableName' = '<yourTableName>' ); CREATE TEMPORARY TABLE blackhole_sink( a INT, b STRING ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT T.a, H.b FROM datagen_source AS T JOIN oceanbase_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.a = H.a;
相關文檔
Flink支援的連接器,請參見支援的連接器。