全部產品
Search
文件中心

Hologres:Flink大量匯入Hologres

更新時間:Mar 29, 2025

Hologres推出的新版Flink Connector外掛程式,支援通過Flink將資料大量匯入到Hologres,實現高效且低負載的資料匯入。

背景資訊

在巨量資料處理領域,Hologres作為一款強大的線上分析處理(OLAP)系統,與Flink的整合提供了強大的即時資料流處理能力。然而,對資料時效性要求不高的情境,如歷史資料的批量載入、離線資料處理或日誌彙總等任務,推薦使用Flink大量匯入Hologres。大量匯入能夠以更高效、節約資源的方式將大量資料一次性寫入Hologres,不僅提升了匯入效率,還兼顧了資源使用率。您可以根據自身的業務特性和資源狀況,靈活選擇即時匯入或大量匯入。關於即時匯入詳情,請參見Flink全託管

前提條件

阿里雲Realtime ComputeFlink版大量匯入

  1. 通過串連HoloWeb並執行查詢建立Hologres結果表,用於接收Flink匯入的資料。本文以test_sink_customer表為例。

    -- 建立Hologres結果表
    CREATE TABLE test_sink_customer
    (
      c_custkey     BIGINT,
      c_name        TEXT,
      c_address     TEXT,
      c_nationkey   INT,
      c_phone       TEXT,
      c_acctbal     NUMERIC(15,2),
      c_mktsegment  TEXT,
      c_comment     TEXT,
      "date"        DATE
    ) WITH (
      distribution_key="c_custkey,date", 
      orientation="column"
    );
    說明

    Flink源表需要與Hologres的結果表的欄位名稱和類型保持一致。

  2. 登入Realtime Compute控制台,在作業營運頁面單擊部署作業,配置部署作業並單擊部署。配置參數詳情,請參見部署JAR作業

    其中主要參數介紹,如下表所示。

    參數

    說明

    部署作業類型

    選擇為JAR。

    部署模式

    支援流模式或批模式,本文選擇批模式。

    引擎版本

    引擎版本詳情請參見引擎版本介紹生命週期策略。本文以vvr-8.0.7-flink-1.17版本為例。

    JAR URI

    上傳開源Flink Connector:hologres-connector-flink-repartition.jar

    說明

    通過開源Flink Connector支援將資料大量匯入Hologres,Flink Connector外掛程式相關開原始碼,詳情請參見Hologres GitHub官方庫

    Entry Point Class

    程式的入口類。Flink Connector指定主類名稱為:com.alibaba.ververica.connectors.hologres.example.FlinkToHoloRePartitionExample

    Entry Point Main Arguments

    傳入repartition.sql檔案的路徑參數。Realtime Compute Flink運行時,附加依賴檔案的路徑在/flink/usrlib/下,因此完整的參數為:--sqlFilePath="/flink/usrlib/repartition.sql"

    附加依賴檔案

    上傳repartition.sql檔案。repartition.sql是Flink SQL指令碼的檔案,主要用於定義資料來源、結果表聲明以及Hologres的串連資訊,本文repartition.sql檔案的樣本內容如下。

    --sourceDDL,本文使用了Flink DataGen公用測試資料作為來源資料。
    CREATE TEMPORARY TABLE source_table
    (
      c_custkey     BIGINT
      ,c_name       STRING
      ,c_address    STRING
      ,c_nationkey  INTEGER
      ,c_phone      STRING
      ,c_acctbal    NUMERIC(15, 2)
      ,c_mktsegment STRING
      ,c_comment    STRING
    )
    WITH (
      'connector' = 'datagen'
      ,'rows-per-second' = '10000'
      ,'number-of-rows' = '1000000'
    );
    
    --sourceDql,源表查詢語句應當保證查詢結果與sinkDDL聲明的結果表對應,包括欄位數量和欄位類型。
    SELECT *, cast('2024-04-21' as DATE) FROM source_table;
    
    -- sinkDDL,結果表聲明以及配置串連Hologres資訊。
    CREATE TABLE sink_table
    (
      c_custkey     BIGINT
      ,c_name       STRING
      ,c_address    STRING
      ,c_nationkey  INTEGER
      ,c_phone      STRING
      ,c_acctbal    NUMERIC(15, 2)
      ,c_mktsegment STRING
      ,c_comment    STRING
      ,`date`       DATE
    )
    WITH (
      'connector' = 'hologres'
      ,'dbname' = 'doc_****'
      ,'tablename' = 'test_sink_customer'
      ,'username' = 'yourAccessKeyId'
      ,'password' = 'yourAccessKeySecret'
      ,'endpoint' = 'hgpostcn-cn-7pp2e1k7****-cn-hangzhou.hologres.aliyuncs.com:80'
      ,'jdbccopywritemode' = 'true'
      ,'bulkload' = 'true'
      ,'target-shards.enabled'='true'
    );
    說明

    repartition.sql檔案中串連Hologres的更多參數介紹詳情,請參見Hologres Flink Connector參數說明

  3. 單擊作業名稱,進入部署詳情看板,編輯資源配置,修改並發度

    說明

    建議與Hologres結果表的ShardCount數保持一致。

  4. 查詢Hologres結果表。

    Flink作業提交成功後,您可以在Hologres中查詢寫入的資料。樣本語句如下。

    SELECT * FROM test_sink_customer;

開源Flink大量匯入

  1. 通過串連HoloWeb並執行查詢建立Hologres結果表,用於接收Flink匯入的資料。本文以test_sink_customer表為例。

    -- 建立Hologres結果表
    CREATE TABLE test_sink_customer
    (
      c_custkey     BIGINT,
      c_name        TEXT,
      c_address     TEXT,
      c_nationkey   INT,
      c_phone       TEXT,
      c_acctbal     NUMERIC(15,2),
      c_mktsegment  TEXT,
      c_comment     TEXT,
      "date"        DATE
    ) WITH (
      distribution_key="c_custkey,date", 
    
      orientation="column"
    );
    說明

    您可以根據資料量合理設定Shard數,關於Shard詳情,請參見Table Group與Shard Count操作指南

  2. 建立repartition.sql檔案並上傳至Flink叢集環境中的任意位置。本文以上傳至/flink-1.15.4/src/repartition.sql路徑為例,repartition.sql檔案的樣本內容如下。

    說明

    repartition.sql是Flink SQL指令碼的檔案,主要用於定義資料來源、結果表聲明以及Hologres的串連資訊。

    --sourceDDL,本文使用了Flink DataGen公用測試資料作為來源資料。
    CREATE TEMPORARY TABLE source_table
    (
      c_custkey     BIGINT
      ,c_name       STRING
      ,c_address    STRING
      ,c_nationkey  INTEGER
      ,c_phone      STRING
      ,c_acctbal    NUMERIC(15, 2)
      ,c_mktsegment STRING
      ,c_comment    STRING
    )
    WITH (
      'connector' = 'datagen'
      ,'rows-per-second' = '10000'
      ,'number-of-rows' = '1000000'
    );
    
    --sourceDql,源表查詢語句應當保證查詢結果與sinkDDL聲明的結果表對應,包括欄位數量和欄位類型。
    SELECT *, cast('2024-04-21' as DATE) FROM source_table;
    
    -- sinkDDL,結果表聲明以及配置串連Hologres資訊。
    CREATE TABLE sink_table
    (
      c_custkey     BIGINT
      ,c_name       STRING
      ,c_address    STRING
      ,c_nationkey  INTEGER
      ,c_phone      STRING
      ,c_acctbal    NUMERIC(15, 2)
      ,c_mktsegment STRING
      ,c_comment    STRING
      ,`date`       DATE
    )
    WITH (
      'connector' = 'hologres'
      ,'dbname' = 'doc_****'
      ,'tablename' = 'test_sink_customer'
      ,'username' = 'yourAccessKeyId'
      ,'password' = 'yourAccessKeySecret'
      ,'endpoint' = 'hgpostcn-cn-7pp2e1k7****-cn-hangzhou.hologres.aliyuncs.com:80'
      ,'jdbccopywritemode' = 'true'
      ,'bulkload' = 'true'
      ,'target-shards.enabled'='true'
    );

    其中主要參數解釋如下:

    參數

    是否必填

    說明

    connector

    結果表類型,固定值為hologres。

    dbname

    Hologres的資料庫名稱。

    tablename

    Hologres接收資料的表名稱。

    username

    當前阿里雲帳號的AccessKey ID。

    您可以單擊AccessKey 管理,擷取AccessKey ID。

    password

    當前阿里雲帳號AccessKey ID對應的AccessKey Secret。

    endpoint

    Hologres的VPC網路地址。進入Hologres管理主控台的執行個體詳情頁,從執行個體配置擷取Endpoint。

    說明

    endpoint需包含連接埠號碼,格式為ip:port同一個地區使用VPC網路地址,跨地區請使用公用網路。

    jdbccopywritemode

    資料寫入方式。取值說明如下:

    • false(預設值):使用INSERT方式寫入。

    • true:使用Copy方式寫入,Copy寫入方式分為流式Copy(Fixed Copy)和批量Copy,當前預設使用流式Copy(Fixed Copy)方式寫入。

      說明

      與使用INSERT方式寫入相比,Fixed Copy方式可以實現更高的吞吐(因為採用流模式),更低的資料延時以及更低的用戶端記憶體消耗(因為不需要攢批資料),但不支援資料回撤功能。

    bulkload

    是否使用批量Copy方式寫入。取值說明如下:

    • true:使用批量Copy方式寫入,僅jdbccopywritemode參數也設定為true時,該參數才會生效,否則使用Fixed Copy方式寫入。

      說明
      • 批量Copy相較於流式Copy(Fixed Copy),具備更高的效率,能更好地利用Hologres的資源,從而在資料寫入過程中提供更優的效能,您可以根據業務需要,選擇合適的資料寫入方式。

      • 在對主鍵表進行批量Copy寫入時,通常會出現表鎖的情況,您可以通過配置target-shards.enabled參數為true,將寫入鎖粒度降至Shard層級,從而允許並發執行多個大量匯入任務,減少了表鎖的發生。相比Fixed Copy模式,批量Copy寫入有主鍵表時,通過這種方式能夠顯著降低Hologres執行個體的負載,實測顯示,可以減少約66.7%的負載。

      • 批量Copy寫入時,如果目標表包含主鍵,要求在寫入之前目標表為空白表,否則寫入過程中進行主鍵去重會影響寫入效能。

    • false(預設值):不使用。

    target-shards.enabled

    是否啟用Target Shard批量寫入。取值說明如下:

    • true:啟用Target Shard批量寫入,當來源資料已按Shard重新分區時,可以將寫入鎖粒度降至Shard層級。

    • false(預設值):不啟用。

    說明

    repartition.sql檔案中串連Hologres的更多參數介紹詳情,請參見Hologres Flink Connector參數說明

  3. 在Flink叢集環境中,上傳開源Flink Connector:hologres-connector-flink-repartition.jar至任意目錄下。本文以上傳至根目錄為例。

    說明

    通過開源Flink Connector支援將資料大量匯入Hologres,Flink Connector外掛程式相關開原始碼,詳情請參見Hologres GitHub官方庫

  4. 提交Flink作業,程式碼範例如下。

    ./bin/flink run -Dexecution.runtime-mode=BATCH -p 3 -c com.alibaba.ververica.connectors.hologres.example.FlinkToHoloRePartitionExample hologres-connector-flink-repartition.jar --sqlFilePath="/flink-1.15.4/src/repartition.sql"

    上述參數說明:

    • Dexecution.runtime-mode:Flink作業的執行模式,詳情請參見Execution Mode

    • p:作業並發數。建議在配置作業並發數時取值與結果表的ShardCount相同,或者可以被ShardCount整除。

    • c:hologres-connector-flink-repartition.jar的主類名稱以及所在的路徑。

    • sqlFilePath:repartition.sql檔案的路徑。

  5. 查詢Hologres結果表。

    Flink作業提交成功後,您可以在Hologres中查詢寫入的資料。樣本語句如下。

    SELECT * FROM test_sink_customer;