本文將會為您介紹如何通過Flink和Blink即時消費Hologres Binlog。
注意事項
消費Hologres Binlog需要注意如下事項:
僅Hologres V0.9及以上版本支援消費Hologres Binlog;僅Hologres V1.3.21及以上版本支援配置引擎白名單,HologresV1.3.21以下版本當前暫不支援配置引擎白名單,開啟白名單後,會造成Binlog消費失敗。如果您的執行個體版本低於所要求執行個體版本,請您加入HologresDingTalk群進行反饋,詳情可參見線上支援。
Hologres支援單表層級的Binlog功能,支援行存表和列存表,以及從Hologres V1.1版本開始支援行列共存表。開啟Binlog後,理論上列存表的開銷要大於行存表的開銷。因此對於資料更新頻繁的情境,建議為使用行存儲存格式的表開啟Binlog。
Hologres Binlog的支援情況以及開啟、配置Hologres Binlog,請參見訂閱Hologres Binlog。
僅阿里雲Flink支援消費Hologres Binlog。Holohub模式下Flink消費Hologres Binlog只支援單一資料型別,從Flink 6.0.3版本開始,支援通過JDBC模式消費Hologres Binlog,相比Holohub,JDBC支援更多的資料類型,詳情請參見Blink/Flink與Hologres的資料類型映射。同時增加了部分許可權限制,詳情請參見許可權說明。
目前不支援消費分區父表的Binlog。
Hologres V2.0版本起有限支援Holohub模式;V2.1版本起下線Holohub模式,全面轉為JDBC模式。在您升級Hologres版本前,請參考Holohub模式切換到JDBC模式,查看您當前正在使用Holohub模式的Flink任務,並按步驟升級Flink VVR作業版本,然後升級Hologres執行個體。
許可權說明
Flink通過JDBC模式消費Hologres Binlog支援使用Hologres自訂帳號,通過Holohub模式不支援使用Hologres自訂帳號。
Flink通過Holohub模式消費Hologres Binlog需要表的讀寫權限。
Flink通過JDBC模式消費Hologres Binlog需要如下前提條件,詳情請參見通過JDBC消費Hologres Binlog。
已建立
hg_binlogExtension(Hologres V2.0版本起預設建立)。使用者為執行個體的Superuser或使用者同時擁有目標表的Owner許可權和執行個體的Replication Role許可權。
Flink即時消費Binlog
VVP-2.4及以上版本支援Hologres Connector即時消費Binlog,使用方法如下。
源表DDL(非CDC模式)
該模式下Source消費的Binlog資料是作為普通的Flink資料傳遞給下遊節點的,即所有資料都是作為Insert類型的資料,可以根據業務情況選擇如何處理特定hg_binlog_event_type類型的資料。Hologres表開啟Binlog後,在Flink中源表(非CDC模式)使用如下DDL可以即時消費Binlog。
create table test_message_src_binlog_table(
hg_binlog_lsn BIGINT,
hg_binlog_event_type BIGINT,
hg_binlog_timestamp_us BIGINT,
id INTEGER,
title VARCHAR,
body VARCHAR
) with (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username'='<yourAccessID>',
'password'='<yourAccessSecret>',
'endpoint'='<yourEndpoint>',
'binlog' = 'true',
'binlogMaxRetryTimes' = '10',
'binlogRetryIntervalMs' = '500',
'binlogBatchReadSize' = '100'
);三個
binlogxxx參數表示Binlog系統欄位,命名和類型是固定的不能修改。其他欄位是跟使用者欄位一一對應,必須是全小寫。
源表DDL(CDC模式)
該模式下Source消費的Binlog資料,將根據hg_binlog_event_type自動為每行資料設定準確的Flink RowKind類型(INSERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER),這樣就能完成表的資料的鏡像同步,類似MySQL和Postgres的CDC功能。
Hologres Binlog源表(CDC模式)暫不支援定義Watermark。如果您需要進行視窗彙總,您可以採用非視窗彙總的方式,詳情請參見MySQL/Hologres CDC源表不支援視窗函數,如何?類似每分鐘彙總統計的需求?。
Hologres表開啟Binlog後,在Flink中源表(CDC模式)使用如下DDL可以即時消費Binlog。
create table test_message_src_binlog_table(
id INTEGER,
title VARCHAR,
body VARCHAR
) with (
'connector'='hologres',
'dbname'='<yourDbname>',//Hologres的DB名
'tablename'='<yourTablename>',//Hologres的表名
'username'='<yourAccessID>',//當前帳號的access id
'password'='<yourAccessSecret>',//當前帳號的access key
'endpoint'='<yourEndpoint>',//Hologres的vpc網路地址
'binlog' = 'true',
'cdcMode' = 'true',
'binlogMaxRetryTimes' = '10',
'binlogRetryIntervalMs' = '500',
'binlogBatchReadSize' = '100'
);全增量一體源表
VVR引擎1.13-vvr-4.0.13版本,Hologres執行個體0.10及以上版本開始,Hologres Binlog CDC源表支援全增量一體的消費,這種方式會先讀取資料庫的歷史全量資料,並平滑切換到Binlog讀取增量資料,詳情請參見即時數倉Hologres。
JDBC模式Binlog源表
從Flink 6.0.3版本開始,支援通過JDBC模式消費Hologres Binlog,JDBC模式相比Holohub支援更多的資料類型和支援自訂帳號,詳情使用請參見即時數倉Hologres。
Holohub模式切換到JDBC模式
Hologres從V2.0版本起逐步下線Holohub模式。如果您需要升級Hologres版本,需要將Holohub模式的作業切換到JDBC模式。請參考如下方式進行。
Hologres執行個體升級為V2.1版本
您在升級Hologres執行個體版本到V2.1前,請選擇如下兩個方案之一,檢查Flink任務與Hologres執行個體,以保障Flink任務正常運行。
(方案一)(推薦)將Flink VVR版本升級到8.0.7及以上版本,Flink會自動將Holohub模式切換為JDBC模式。
(方案二)將Flink VVR升級到6.0.7~8.0.5版本,在源表中添加參數
'sdkMode'='jdbc'之後重新啟動作業,同時需要授予使用者如下許可權選項中的其中之一,確認作業正常運行之後再對Hologres執行個體進行升級。(選項一)執行個體的Superuser許可權。
(選項二)目標表的Owner許可權,CREATE DATABASE許可權及執行個體的Replication Role許可權。
(方案三)(不推薦)將Flink VVR版本升級至8.0.6,Flink會自動將Holohub模式切換為JDBC模式。但VVR 8.0.6版本存在已知缺陷,當維表欄位過多時可能導致VVR上線逾時,詳情請參見Hologres Connector Release Note。
(可選)如果您的Flink VVR作業數量較多,擷取需要升級版本的作業和表資訊請參見如下內容。
Hologres執行個體升級為V2.0版本
(方案一)(推薦)將Flink VVR版本升級到8.0.6及以上版本,Flink會自動將Holohub模式切換為JDBC模式,其中VVR 8.0.6版本存在已知缺陷,當維表欄位過多時可能導致VVR作業上線逾時,詳情請參見Hologres Connector Release Note。建議選擇VVR 8.0.7版本。
(方案二)將Flink VVR版本升級到8.0.4或8.0.5版本,並重啟Flink作業,同時授予使用者如下許可權選項中的其中之一,確認作業正常運行之後再對Hologres執行個體進行升級。
(選項一)執行個體的Superuser許可權。
(選項二)目標表的Owner許可權,CREATE DATABASE許可權,及執行個體的Replication Role許可權。
(方案三)將Flink VVR版本升級到6.0.7到8.0.3版本,Flink會繼續使用Holohub模式消費Binlog。
如果您的Flink VVR消費Hologres Binlog的作業過多,可以使用如下方式擷取需要升級版本的作業和表資訊。
該工具僅支援擷取如下作業資訊:
通過DDL方式進行表定義的SQL作業。
通過Hints方式指定參數的Catalog作業。
不支援擷取JAR作業資訊,不支援擷取沒有Hints參數的Catalog表資訊。
下載開源工具find-incompatible-flink-jobs-1.0-SNAPSHOT-jar-with-dependencies.jar。
使用本地命令列進入開源工具目錄,然後運行如下命令,即可查看全部需要升級版本的作業和表資訊。
說明運行如下命令需要安裝Java環境,使用JDK 8及以上版本。
java -cp find-incompatible-flink-jobs-1.0-SNAPSHOT-jar-with-dependencies.jar com.alibaba.hologres.FindIncompatibleFlinkJobs <region> <url> <AccessKeyID> <AccessKeySecret> <binlog/rpc> # 使用樣本 java -cp find-incompatible-flink-jobs-1.0-SNAPSHOT-jar-with-dependencies.jar com.alibaba.hologres.FindIncompatibleFlinkJobs 北京 https://vvp.console.aliyun.com/web/xxxxxx/zh/#/workspaces/xxxx/namespaces/xxxx/operations/stream/xxxx my-access-key-id my-access-key-secret binlog參數說明如下:
參數
說明
region
目標Realtime ComputeFlink版專案空間所在地區的中文簡稱,取值請參見region取值對應表。
url
目標Realtime ComputeFlink版專案任意一個作業的串連地址。
AccessKeyID
能訪問Realtime ComputeFlink版專案空間的帳號AccessKey ID。
AccessKeySecret
能訪問Realtime ComputeFlink版專案空間的帳號AccessKey Secret。
binlog/rpc
需要檢查的作業內容,取值如下:
binlog:表示檢查整個專案中所有作業的Hologres Binlog源表。rpc:表示檢查整個專案中所有作業使用了rpc模式的維表或結果表。
樣本返回結果如下。
