全部產品
Search
文件中心

Hologres:開源Flink 1.11及以上版本即時寫入

更新時間:Nov 19, 2025

本文為您介紹開源Flink 1.11如何即時寫入資料至Hologres。

前提條件

  • 開通Hologres執行個體,並串連開發工具,詳情請參見串連HoloWeb並執行查詢

  • 搭建Flink叢集(本次樣本使用的是1.15版本),可以前往Flink官網下載二進位包,啟動一個Standalone叢集,詳情請參見文檔叢集搭建

背景資訊

從開源Flink1.11版本開始,Hologres代碼已開源,相應版本的Connector已經在中央倉庫發布Release包,可在專案中參照如下pom檔案進行配置。詳細內容請參見Hologres GitHub官方庫

<dependency>
    <groupId>com.alibaba.hologres</groupId>
    <artifactId>hologres-connector-flink-1.15</artifactId>
    <version>1.4.0</version>
    <classifier>jar-with-dependencies</classifier>
</dependency>

開源Flink版本與hologres-connector-flink最新版本對應關係如下,建議使用1.15及以上版本,功能更豐富:

Flink版本

Connector版本

Flink 1.11

hologres-connector-flink-1.11:1.0.1

Flink 1.12

hologres-connector-flink-1.12:1.0.1

Flink 1.13

hologres-connector-flink-1.13:1.3.2

Flink 1.14

hologres-connector-flink-1.14:1.3.2

Flink 1.15

hologres-connector-flink-1.15:1.4.1

Flink 1.17

hologres-connector-flink-1.17:1.4.1

Flink SQL寫入資料至Hologres程式碼範例

您可以參照如下程式碼範例,通過將Flink SQL將資料寫入Hologres。其中,更多詳細的程式碼範例請參見Hologres GitHub官方庫

        String createHologresTable =
                String.format(
                        "create table sink("
                                + "  user_id bigint,"
                                + "  user_name string,"
                                + "  price decimal(38,2),"
                                + "  sale_timestamp timestamp"
                                + ") with ("
                                + "  'connector'='hologres',"
                                + "  'dbname' = '%s',"
                                + "  'tablename' = '%s',"
                                + "  'username' = '%s',"
                                + "  'password' = '%s',"
                                + "  'endpoint' = '%s'"
                                + ")",
                        database, tableName, userName, password, endPoint);
        tEnv.executeSql(createHologresTable);

        createScanTable(tEnv);

        tEnv.executeSql("insert into sink select * from source");

更多詳盡的程式碼範例請參見hologres-connector-flink-examples,包括如下樣本。

  • FlinkSQLToHoloExample:一個使用純Flink SQL介面實現的應用,將資料寫入至Hologres。

  • FlinkDSAndSQLToHoloExample:一個混合Flink DataStream以及SQL 介面實現的應用,寫入Hologres前,將DataStream轉換成Table,之後再用Flink SQL寫入Hologres。

  • FlinkDataStreamToHoloExample:一個使用純Flink DataStream介面實現的應用,將資料寫入至Hologres。

  • FlinkRoaringBitmapAggJob:一個使用Flink及RoaringBitmap,結合Hologres維表,實現即時去重統計UV的應用,並將統計結果寫入Hologres。

  • 通過Flink DataStream介面的即時資料寫入方法,可以對資料進行基於Hologres Shard的Repartition操作,有效減少寫入Hologres執行個體時的小檔案數量,從而提升寫入效能並降低系統負載。該方法適用於需要大量匯入具有主鍵的空表,實作類別似Insert Overwrite的情境。

Hologres Flink Connector參數說明

您可以將Flink資料寫入Hologres,Hologres Flink Connector相關參數具體內容如下:

參數

是否必填

說明

connector

結果表類型,固定值為hologres。

dbname

Hologres的資料庫名稱。

tablename

Hologres接收資料的表名稱。

username

當前阿里雲帳號的AccessKey ID。

您可以單擊AccessKey 管理,擷取AccessKey ID。

password

當前阿里雲帳號的AccessKey Secret。

您可以單擊AccessKey 管理,擷取AccessKey Secret。

endpoint

Hologres的VPC網路地址。進入Hologres管理主控台的執行個體詳情頁,擷取Endpoint。

說明

endpoint需包含連接埠號碼,格式為ip:port。同一個地區使用VPC網路地址,跨地區請使用公用網路。

  • 串連參數

    參數

    是否必填

    說明

    connectionSize

    單個Flink Hologres Task所建立的JDBC串連池大小。

    預設值:3,和吞吐成正比。

    connectionPoolName

    串連池名稱,同一個TaskManager中,表配置同名的串連池名稱可以共用串連池。

    無預設值,每個表預設使用自己的串連池。如果設定串連池名稱,則所有表的connectionSize需要相同

    fixedConnectionMode

    寫入和點查不佔用串連數(beta功能,需要connector版本>=1.2.0,hologres引擎版本>=1.3)

    預設值:false

    jdbcRetryCount

    當串連故障時,寫入和查詢的重試次數。

    預設值:10。

    jdbcRetrySleepInitMs

    每次重試的等待時間=retrySleepInitMs+retry*retrySleepStepMs。

    預設值:1000ms。

    jdbcRetrySleepStepMs

    每次重試的等待時間=retrySleepInitMs+retry*retrySleepStepMs。

    預設值:5000ms。

    jdbcConnectionMaxIdleMs

    寫入線程和點查線程資料庫連接的最大Idle時間,超過串連將被釋放。

    預設值:60000ms。

    jdbcMetaCacheTTL

    TableSchema資訊的本機快取時間。

    預設值:60000ms。

    jdbcMetaAutoRefreshFactor

    當TableSchema cache剩餘存活時間短於metaCacheTTL/metaAutoRefreshFactor 將自動重新整理cache。

    預設值:-1,表示不自動重新整理。

    connection.ssl.mode

    是否啟用資料轉送加密。取值說明如下:

    • disable(預設值):不啟用傳輸加密。

    • require:啟用SSL,只對資料鏈路加密。

    • verify-ca:啟用SSL,加密資料鏈路,同時使用CA認證驗證Hologres服務端的真實性。

    • verify-full:啟用SSL,加密資料鏈路,使用CA認證驗證Hologres服務端的真實性,同時比對認證內的CN或DNS與串連時配置的Hologres串連地址是否一致。

    connection.ssl.root-cert.location

    CA認證的路徑,且確保已經將CA認證上傳至Flink叢集環境。

    說明

    當connection.ssl.mode參數配置為verify-ca或verify-full時,需要配置此參數。

    jdbcDirectConnect

    是否開啟直連。取值說明如下:

    • false(預設值):不開啟。

    • true:開啟。

      Flink批量寫入的瓶頸是VIP Endpoint的網路吞吐。開啟此參數會測試當前環境能否直連Hologres FE,若支援預設使用直連。

  • 寫入參數

    參數

    是否必填

    說明

    mutatetype

    資料寫入模式,詳情請參見流式語義

    預設值:insertorignore。

    ignoredelete

    是否忽略撤回訊息。通常Flink的Group By會產生回撤訊息,回撤訊息到Hologres connector會產生Delete請求。

    預設值:true,僅在使用流式語義時生效。

    createparttable

    當寫入分區表時,是否自動根據分區值自動建立分區表。

    • false(預設值):不會自動建立。

    • true:自動建立。

    建議慎用該功能,確保分區值不會出現髒資料導致建立錯誤的分區表。

    ignoreNullWhenUpdate

    mutatetype='insertOrUpdate'時,是否忽略更新寫入資料中的Null值。

    預設值:false。

    jdbcWriteBatchSize

    Hologres Sink節點資料攢批的最大批大小。

    預設值:256

    jdbcWriteBatchByteSize

    Hologres Sink節點單個線程資料攢批的最大位元組大小。

    預設值:2097152(2 * 1024 * 1024),2MB。

    jdbcWriteBatchTotalByteSize

    Hologres Sink節點所有資料攢批的最大位元組大小。

    預設值:20971520(20 * 1024 * 1024),20MB。

    jdbcWriteFlushInterval

    Hologres Sink節點資料攢批的最長Flush等待時間。

    預設值:10000,即10秒。

    jdbcUseLegacyPutHandler

    寫入SQL格式,取值說明如下:

    • true:寫入SQL格式為insert into xxx(c0,c1,...) values (?,?,...),... on conflict;

    • false(預設值):寫入SQL格式為insert into xxx(c0,c1,...) select unnest(?),unnest(?),... on conflict

    jdbcEnableDefaultForNotNullColumn

    設定為true時,not null且未在表上設定default的欄位傳入null時,將以預設值寫入。String類型預設為"",Number類型預設為0,Date、timestamp、timestamptz 類型預設為1970-01-01 00:00:00。

    預設值:true。

    remove-u0000-in-text.enabled

    是否自動替換TEXT資料類型中的非UTF-8的u0000字元。取值說明如下:

    • true:替換。

    • false(預設值):不替換。

    deduplication.enabled

    若一批寫入的資料中有主鍵相同的資料,是否進行去重。取值說明如下:

    • true(預設值):進行去重,只保留後到達的這條資料。

    • false:不進行去重。

      首先將批處理資料寫入,待寫入完成後再繼續寫入後到達的資料。

      說明

      在不進行去重時,極端情況下(例如所有資料的主鍵都相同),寫入操作會退化為非批量寫入,對效能會產生一定影響。

    aggressive.enabled

    是否啟用激進提交模式。取值說明如下:

    • true:啟用。

      啟用後,即使批量處理沒有達到預期的條數,只要檢測到串連空閑,系統就會強制提交資料。在流量較小時,這種方法可以有效減少資料轉送的延時。

    • false(預設值):不啟用。

    jdbcCopyWriteMode

    資料寫入方式。取值說明如下:

    • true:使用Copy方式寫入,Copy寫入方式分為流式Copy(Fixed Copy)和批量Copy,當前預設使用流式Copy(Fixed Copy)方式寫入。

      說明

      與使用INSERT方式寫入相比,Fixed Copy方式可以實現更高的吞吐(因為採用流模式),更低的資料延時以及更低的用戶端記憶體消耗(因為不需要攢批資料),但不支援資料回撤功能。

    • false(預設值):使用INSERT方式寫入。

    說明

    僅Hologres V1.3.1及以上版本支援此參數。

    jdbcCopyWriteFormat

    底層是否採用二進位協議。

    • binary(預設值):表示使用二進位模式,二進位會更快。

    • text:為文字模式。

    說明

    僅Hologres V1.3.1及以上版本支援此參數。

    bulkLoad

    是否使用批量Copy方式寫入。取值說明如下:

    • true:使用批量Copy方式寫入,僅jdbcCopyWriteMode參數也設定為true時,該參數才會生效,否則使用Fixed Copy方式寫入。

      說明
      • 批量Copy相較於流式Copy(Fixed Copy),具備更高的效率,能更好地利用Hologres的資源,從而在資料寫入過程中提供更優的效能,您可以根據業務需要,選擇合適的資料寫入方式。

      • 在對主鍵表進行批量Copy寫入時,通常會出現表鎖的情況,您可以通過配置target-shards.enabled參數為true,將寫入鎖粒度降至Shard層級,從而允許並發執行多個大量匯入任務,減少了表鎖的發生。相比Fixed Copy模式,批量Copy寫入有主鍵表時,通過這種方式能夠顯著降低Hologres執行個體的負載,實測顯示,可以減少約66.7%的負載。

      • 批量Copy寫入時,如果目標表包含主鍵,要求在寫入之前目標表為空白表,否則寫入過程中進行主鍵去重會影響寫入效能。

    • false(預設值):不使用。

    說明

    僅Hologres V1.4.0及以上版本支援此參數。

    target-shards.enabled

    是否啟用Target Shard批量寫入。取值說明如下:

    • true:啟用Target Shard批量寫入,當來源資料已按Shard重新分區時,可以將寫入鎖粒度降至Shard層級。

    • false(預設值):不啟用。

    說明

    僅Hologres V1.4.1及以上版本支援此參數。

  • 點查參數

    參數

    是否必填

    說明

    jdbcReadBatchSize

    維表點查最大批次大小。

    預設值:128。

    jdbcReadBatchQueueSize

    維表點查請求緩衝隊列大小。

    預設值:256。

    async

    是否採用非同步方式同步資料。

    預設值:false。非同步模式可以並發地處理多個請求和響應,從而連續的請求之間不需要阻塞等待,提高查詢的吞吐。但在非同步模式下,無法保證請求的絕對順序。

    cache

    緩衝策略。

    預設值:None。Hologres僅支援以下兩種緩衝策略:None:無緩衝。LRU:緩衝維表裡的部分資料。源表的每條資料都會觸發系統先在Cache中尋找資料,如果未找到,則去物理維表中尋找。

    cachesize

    緩衝大小。

    預設值:10000。選擇LRU緩衝策略後,可以設定緩衝大小。

    cachettlms

    更新緩衝的時間間隔,單位為毫秒。

    當選擇LRU緩衝策略後,可以設定緩衝失效的逾時時間,預設不到期。

    cacheempty

    是否緩衝join結果為空白的資料。

    預設值:true,表示緩衝join結果為空白的資料。false:表示不緩衝join結果為空白的資料。

  • 資料類型映射

    當前Flink全託管與Hologres的資料類型映射請參見Blink/Flink與Hologres的資料類型映射