全部產品
Search
文件中心

Hologres:Flink/Blink即時消費Hologres Binlog

更新時間:Mar 13, 2025

本文將會為您介紹如何通過Flink和Blink即時消費Hologres Binlog。

注意事項

消費Hologres Binlog需要注意如下事項:

  • 僅Hologres V0.9及以上版本支援消費Hologres Binlog;僅Hologres V1.3.21及以上版本支援配置引擎白名單,HologresV1.3.21以下版本當前暫不支援配置引擎白名單,開啟白名單後,會造成Binlog消費失敗。如果您的執行個體版本低於所要求執行個體版本,請您加入HologresDingTalk群進行反饋,詳情可參見線上支援

  • Hologres支援單表層級的Binlog功能,支援行存表和列存表,以及從Hologres V1.1版本開始支援行列共存表。開啟Binlog後,理論上列存表的開銷要大於行存表的開銷。因此對於資料更新頻繁的情境,建議為使用行存儲存格式的表開啟Binlog。

  • Hologres Binlog的支援情況以及開啟、配置Hologres Binlog,請參見訂閱Hologres Binlog

  • 僅阿里雲Flink支援消費Hologres Binlog。Holohub模式下Flink消費Hologres Binlog只支援單一資料型別,從Flink 6.0.3版本開始,支援通過JDBC模式消費Hologres Binlog,相比Holohub,JDBC支援更多的資料類型,詳情請參見Blink/Flink與Hologres的資料類型映射。同時增加了部分許可權限制,詳情請參見許可權說明

  • 目前不支援消費分區父表的Binlog。

  • Hologres V2.0版本起有限支援Holohub模式;V2.1版本起下線Holohub模式,全面轉為JDBC模式。在您升級Hologres版本前,請參考Holohub模式切換到JDBC模式,查看您當前正在使用Holohub模式的Flink任務,並按步驟升級Flink VVR作業版本,然後升級Hologres執行個體。

許可權說明

  • Flink通過JDBC模式消費Hologres Binlog支援使用Hologres自訂帳號,通過Holohub模式不支援使用Hologres自訂帳號。

  • Flink通過Holohub模式消費Hologres Binlog需要表的讀寫權限。

  • Flink通過JDBC模式消費Hologres Binlog需要如下前提條件,詳情請參見通過JDBC消費Hologres Binlog

    1. 已建立hg_binlogExtension(Hologres V2.0版本起預設建立)。

    2. 使用者為執行個體的Superuser或使用者同時擁有目標表的Owner許可權和執行個體的Replication Role許可權。

Flink即時消費Binlog

VVP-2.4及以上版本支援Hologres Connector即時消費Binlog,使用方法如下。

源表DDL(非CDC模式)

該模式下Source消費的Binlog資料是作為普通的Flink資料傳遞給下遊節點的,即所有資料都是作為Insert類型的資料,可以根據業務情況選擇如何處理特定hg_binlog_event_type類型的資料。Hologres表開啟Binlog後,在Flink中源表(非CDC模式)使用如下DDL可以即時消費Binlog。

create table test_message_src_binlog_table(
  hg_binlog_lsn BIGINT,
  hg_binlog_event_type BIGINT,
  hg_binlog_timestamp_us BIGINT,
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) with (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username'='<yourAccessID>',
  'password'='<yourAccessSecret>',
  'endpoint'='<yourEndpoint>',
  'binlog' = 'true',
  'binlogMaxRetryTimes' = '10',
  'binlogRetryIntervalMs' = '500',
  'binlogBatchReadSize' = '100'
);
  • 三個binlogxxx 參數表示Binlog系統欄位,命名和類型是固定的不能修改。

  • 其他欄位是跟使用者欄位一一對應,必須是全小寫。

源表DDL(CDC模式)

該模式下Source消費的Binlog資料,將根據hg_binlog_event_type自動為每行資料設定準確的Flink RowKind類型(INSERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER),這樣就能完成表的資料的鏡像同步,類似MySQL和Postgres的CDC功能。

說明

Hologres Binlog源表(CDC模式)暫不支援定義Watermark。如果您需要進行視窗彙總,您可以採用非視窗彙總的方式,詳情請參見MySQL/Hologres CDC源表不支援視窗函數,如何?類似每分鐘彙總統計的需求?

Hologres表開啟Binlog後,在Flink中源表(CDC模式)使用如下DDL可以即時消費Binlog。

create table test_message_src_binlog_table(
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) with (
  'connector'='hologres',
  'dbname'='<yourDbname>',//Hologres的DB名
  'tablename'='<yourTablename>',//Hologres的表名
  'username'='<yourAccessID>',//當前帳號的access id
  'password'='<yourAccessSecret>',//當前帳號的access key
  'endpoint'='<yourEndpoint>',//Hologres的vpc網路地址
  'binlog' = 'true',
  'cdcMode' = 'true',
  'binlogMaxRetryTimes' = '10',
  'binlogRetryIntervalMs' = '500',
  'binlogBatchReadSize' = '100'
);

全增量一體源表

VVR引擎1.13-vvr-4.0.13版本,Hologres執行個體0.10及以上版本開始,Hologres Binlog CDC源表支援全增量一體的消費,這種方式會先讀取資料庫的歷史全量資料,並平滑切換到Binlog讀取增量資料,詳情請參見即時數倉Hologres

JDBC模式Binlog源表

從Flink 6.0.3版本開始,支援通過JDBC模式消費Hologres Binlog,JDBC模式相比Holohub支援更多的資料類型和支援自訂帳號,詳情使用請參見即時數倉Hologres

Holohub模式切換到JDBC模式

Hologres從V2.0版本起逐步下線Holohub模式。如果您需要升級Hologres版本,需要將Holohub模式的作業切換到JDBC模式。請參考如下方式進行。

Hologres執行個體升級為V2.1版本

您在升級Hologres執行個體版本到V2.1前,請選擇如下兩個方案之一,檢查Flink任務與Hologres執行個體,以保障Flink任務正常運行。

  • (方案一)(推薦)將Flink VVR版本升級到8.0.7及以上版本,Flink會自動將Holohub模式切換為JDBC模式。

  • (方案二)將Flink VVR升級到6.0.7~8.0.5版本,在源表中添加參數'sdkMode'='jdbc'之後重新啟動作業,同時需要授予使用者如下許可權選項中的其中之一,確認作業正常運行之後再對Hologres執行個體進行升級。

    • (選項一)執行個體的Superuser許可權。

    • (選項二)目標表的Owner許可權,CREATE DATABASE許可權及執行個體的Replication Role許可權。

  • (方案三)(不推薦)將Flink VVR版本升級至8.0.6,Flink會自動將Holohub模式切換為JDBC模式。但VVR 8.0.6版本存在已知缺陷,當維表欄位過多時可能導致VVR上線逾時,詳情請參見Hologres Connector Release Note

  • (可選)如果您的Flink VVR作業數量較多,擷取需要升級版本的作業和表資訊請參見如下內容。

Hologres執行個體升級為V2.0版本

  • (方案一)(推薦)將Flink VVR版本升級到8.0.6及以上版本,Flink會自動將Holohub模式切換為JDBC模式,其中VVR 8.0.6版本存在已知缺陷,當維表欄位過多時可能導致VVR作業上線逾時,詳情請參見Hologres Connector Release Note。建議選擇VVR 8.0.7版本。

  • (方案二)將Flink VVR版本升級到8.0.4或8.0.5版本,並重啟Flink作業,同時授予使用者如下許可權選項中的其中之一,確認作業正常運行之後再對Hologres執行個體進行升級。

    • (選項一)執行個體的Superuser許可權。

    • (選項二)目標表的Owner許可權,CREATE DATABASE許可權,及執行個體的Replication Role許可權。

  • (方案三)將Flink VVR版本升級到6.0.7到8.0.3版本,Flink會繼續使用Holohub模式消費Binlog。

如果您的Flink VVR消費Hologres Binlog的作業過多,可以使用如下方式擷取需要升級版本的作業和表資訊。

說明

該工具僅支援擷取如下作業資訊:

  • 通過DDL方式進行表定義的SQL作業。

  • 通過Hints方式指定參數的Catalog作業。

不支援擷取JAR作業資訊,不支援擷取沒有Hints參數的Catalog表資訊。

  1. 下載開源工具find-incompatible-flink-jobs-1.0-SNAPSHOT-jar-with-dependencies.jar

  2. 使用本地命令列進入開源工具目錄,然後運行如下命令,即可查看全部需要升級版本的作業和表資訊。

    說明

    運行如下命令需要安裝Java環境,使用JDK 8及以上版本。

    java -cp find-incompatible-flink-jobs-1.0-SNAPSHOT-jar-with-dependencies.jar com.alibaba.hologres.FindIncompatibleFlinkJobs <region> <url> <AccessKeyID> <AccessKeySecret> <binlog/rpc>
    
    # 使用樣本
    java -cp find-incompatible-flink-jobs-1.0-SNAPSHOT-jar-with-dependencies.jar com.alibaba.hologres.FindIncompatibleFlinkJobs 北京 https://vvp.console.aliyun.com/web/xxxxxx/zh/#/workspaces/xxxx/namespaces/xxxx/operations/stream/xxxx my-access-key-id my-access-key-secret binlog

    參數說明如下:

    參數

    說明

    region

    目標Realtime ComputeFlink版專案空間所在地區的中文簡稱,取值請參見region取值對應表

    url

    目標Realtime ComputeFlink版專案任意一個作業的串連地址。

    AccessKeyID

    能訪問Realtime ComputeFlink版專案空間的帳號AccessKey ID。

    AccessKeySecret

    能訪問Realtime ComputeFlink版專案空間的帳號AccessKey Secret。

    binlog/rpc

    需要檢查的作業內容,取值如下:

    • binlog:表示檢查整個專案中所有作業的Hologres Binlog源表。

    • rpc:表示檢查整個專案中所有作業使用了rpc模式的維表或結果表。

    region取值對應表(單擊展開)

    地區

    取值

    華北2(北京)

    北京

    華東2(上海)

    上海

    華東1(杭州)

    杭州

    華南1(深圳)

    深圳

    華北3(張家口)

    張家口

    中國(香港)

    香港

    新加坡

    新加坡

    德國(法蘭克福)

    德國

    印尼(雅加達)

    印尼

    馬來西亞(吉隆坡)

    馬來西亞

    美國(矽谷)

    美國

    上海金融雲

    上海金融雲

  3. 樣本返回結果如下。

    image