Hologres推出的新版Flink Connector外掛程式,支援通過Flink將資料大量匯入到Hologres,實現高效且低負載的資料匯入。
背景資訊
在巨量資料處理領域,Hologres作為一款強大的線上分析處理(OLAP)系統,與Flink的整合提供了強大的即時資料流處理能力。然而,對資料時效性要求不高的情境,如歷史資料的批量載入、離線資料處理或日誌彙總等任務,推薦使用Flink大量匯入Hologres。大量匯入能夠以更高效、節約資源的方式將大量資料一次性寫入Hologres,不僅提升了匯入效率,還兼顧了資源使用率。您可以根據自身的業務特性和資源狀況,靈活選擇即時匯入或大量匯入。關於即時匯入詳情,請參見Flink全託管。
前提條件
已購買Hologres執行個體。具體操作,請參見購買Hologres。
已部署Flink 1.15及以上版本叢集環境。具體操作,詳情請參見
開源Flink:Deploy Flink。
阿里雲Realtime ComputeFlink版:開通Realtime ComputeFlink版。
阿里雲Realtime ComputeFlink版大量匯入
通過串連HoloWeb並執行查詢建立Hologres結果表,用於接收Flink匯入的資料。本文以
test_sink_customer表為例。-- 建立Hologres結果表 CREATE TABLE test_sink_customer ( c_custkey BIGINT, c_name TEXT, c_address TEXT, c_nationkey INT, c_phone TEXT, c_acctbal NUMERIC(15,2), c_mktsegment TEXT, c_comment TEXT, "date" DATE ) WITH ( distribution_key="c_custkey,date", orientation="column" );說明Flink源表需要與Hologres的結果表的欄位名稱和類型保持一致。
登入Realtime Compute控制台,在作業營運頁面單擊部署作業,配置部署作業並單擊部署。配置參數詳情,請參見部署JAR作業。
其中主要參數介紹,如下表所示。
參數
說明
部署作業類型
選擇為JAR。
部署模式
支援流模式或批模式,本文選擇批模式。
引擎版本
JAR URI
上傳開源Flink Connector:hologres-connector-flink-repartition.jar。
說明通過開源Flink Connector支援將資料大量匯入Hologres,Flink Connector外掛程式相關開原始碼,詳情請參見Hologres GitHub官方庫。
Entry Point Class
程式的入口類。Flink Connector指定主類名稱為:
com.alibaba.ververica.connectors.hologres.example.FlinkToHoloRePartitionExample。Entry Point Main Arguments
傳入
repartition.sql檔案的路徑參數。Realtime Compute Flink運行時,附加依賴檔案的路徑在/flink/usrlib/下,因此完整的參數為:--sqlFilePath="/flink/usrlib/repartition.sql"。附加依賴檔案
上傳
repartition.sql檔案。repartition.sql是Flink SQL指令碼的檔案,主要用於定義資料來源、結果表聲明以及Hologres的串連資訊,本文repartition.sql檔案的樣本內容如下。--sourceDDL,本文使用了Flink DataGen公用測試資料作為來源資料。 CREATE TEMPORARY TABLE source_table ( c_custkey BIGINT ,c_name STRING ,c_address STRING ,c_nationkey INTEGER ,c_phone STRING ,c_acctbal NUMERIC(15, 2) ,c_mktsegment STRING ,c_comment STRING ) WITH ( 'connector' = 'datagen' ,'rows-per-second' = '10000' ,'number-of-rows' = '1000000' ); --sourceDql,源表查詢語句應當保證查詢結果與sinkDDL聲明的結果表對應,包括欄位數量和欄位類型。 SELECT *, cast('2024-04-21' as DATE) FROM source_table; -- sinkDDL,結果表聲明以及配置串連Hologres資訊。 CREATE TABLE sink_table ( c_custkey BIGINT ,c_name STRING ,c_address STRING ,c_nationkey INTEGER ,c_phone STRING ,c_acctbal NUMERIC(15, 2) ,c_mktsegment STRING ,c_comment STRING ,`date` DATE ) WITH ( 'connector' = 'hologres' ,'dbname' = 'doc_****' ,'tablename' = 'test_sink_customer' ,'username' = 'yourAccessKeyId' ,'password' = 'yourAccessKeySecret' ,'endpoint' = 'hgpostcn-cn-7pp2e1k7****-cn-hangzhou.hologres.aliyuncs.com:80' ,'jdbccopywritemode' = 'true' ,'bulkload' = 'true' ,'target-shards.enabled'='true' );說明repartition.sql檔案中串連Hologres的更多參數介紹詳情,請參見Hologres Flink Connector參數說明。單擊作業名稱,進入部署詳情看板,編輯資源配置,修改並發度。
說明建議與Hologres結果表的ShardCount數保持一致。
查詢Hologres結果表。
Flink作業提交成功後,您可以在Hologres中查詢寫入的資料。樣本語句如下。
SELECT * FROM test_sink_customer;
開源Flink大量匯入
通過串連HoloWeb並執行查詢建立Hologres結果表,用於接收Flink匯入的資料。本文以
test_sink_customer表為例。-- 建立Hologres結果表 CREATE TABLE test_sink_customer ( c_custkey BIGINT, c_name TEXT, c_address TEXT, c_nationkey INT, c_phone TEXT, c_acctbal NUMERIC(15,2), c_mktsegment TEXT, c_comment TEXT, "date" DATE ) WITH ( distribution_key="c_custkey,date", orientation="column" );說明您可以根據資料量合理設定Shard數,關於Shard詳情,請參見Table Group與Shard Count操作指南。
建立
repartition.sql檔案並上傳至Flink叢集環境中的任意位置。本文以上傳至/flink-1.15.4/src/repartition.sql路徑為例,repartition.sql檔案的樣本內容如下。說明repartition.sql是Flink SQL指令碼的檔案,主要用於定義資料來源、結果表聲明以及Hologres的串連資訊。--sourceDDL,本文使用了Flink DataGen公用測試資料作為來源資料。 CREATE TEMPORARY TABLE source_table ( c_custkey BIGINT ,c_name STRING ,c_address STRING ,c_nationkey INTEGER ,c_phone STRING ,c_acctbal NUMERIC(15, 2) ,c_mktsegment STRING ,c_comment STRING ) WITH ( 'connector' = 'datagen' ,'rows-per-second' = '10000' ,'number-of-rows' = '1000000' ); --sourceDql,源表查詢語句應當保證查詢結果與sinkDDL聲明的結果表對應,包括欄位數量和欄位類型。 SELECT *, cast('2024-04-21' as DATE) FROM source_table; -- sinkDDL,結果表聲明以及配置串連Hologres資訊。 CREATE TABLE sink_table ( c_custkey BIGINT ,c_name STRING ,c_address STRING ,c_nationkey INTEGER ,c_phone STRING ,c_acctbal NUMERIC(15, 2) ,c_mktsegment STRING ,c_comment STRING ,`date` DATE ) WITH ( 'connector' = 'hologres' ,'dbname' = 'doc_****' ,'tablename' = 'test_sink_customer' ,'username' = 'yourAccessKeyId' ,'password' = 'yourAccessKeySecret' ,'endpoint' = 'hgpostcn-cn-7pp2e1k7****-cn-hangzhou.hologres.aliyuncs.com:80' ,'jdbccopywritemode' = 'true' ,'bulkload' = 'true' ,'target-shards.enabled'='true' );其中主要參數解釋如下:
參數
是否必填
說明
connector
是
結果表類型,固定值為hologres。
dbname
是
Hologres的資料庫名稱。
tablename
是
Hologres接收資料的表名稱。
username
是
當前阿里雲帳號的AccessKey ID。
您可以單擊AccessKey 管理,擷取AccessKey ID。
password
是
當前阿里雲帳號AccessKey ID對應的AccessKey Secret。
endpoint
是
Hologres的VPC網路地址。進入Hologres管理主控台的執行個體詳情頁,從執行個體配置擷取Endpoint。
說明endpoint需包含連接埠號碼,格式為
ip:port同一個地區使用VPC網路地址,跨地區請使用公用網路。jdbccopywritemode
否
資料寫入方式。取值說明如下:
false(預設值):使用INSERT方式寫入。
true:使用Copy方式寫入,Copy寫入方式分為流式Copy(Fixed Copy)和批量Copy,當前預設使用流式Copy(Fixed Copy)方式寫入。
說明與使用INSERT方式寫入相比,Fixed Copy方式可以實現更高的吞吐(因為採用流模式),更低的資料延時以及更低的用戶端記憶體消耗(因為不需要攢批資料),但不支援資料回撤功能。
bulkload
否
是否使用批量Copy方式寫入。取值說明如下:
true:使用批量Copy方式寫入,僅jdbccopywritemode參數也設定為true時,該參數才會生效,否則使用Fixed Copy方式寫入。
說明批量Copy相較於流式Copy(Fixed Copy),具備更高的效率,能更好地利用Hologres的資源,從而在資料寫入過程中提供更優的效能,您可以根據業務需要,選擇合適的資料寫入方式。
在對主鍵表進行批量Copy寫入時,通常會出現表鎖的情況,您可以通過配置target-shards.enabled參數為true,將寫入鎖粒度降至Shard層級,從而允許並發執行多個大量匯入任務,減少了表鎖的發生。相比Fixed Copy模式,批量Copy寫入有主鍵表時,通過這種方式能夠顯著降低Hologres執行個體的負載,實測顯示,可以減少約66.7%的負載。
批量Copy寫入時,如果目標表包含主鍵,要求在寫入之前目標表為空白表,否則寫入過程中進行主鍵去重會影響寫入效能。
false(預設值):不使用。
target-shards.enabled
否
是否啟用Target Shard批量寫入。取值說明如下:
true:啟用Target Shard批量寫入,當來源資料已按Shard重新分區時,可以將寫入鎖粒度降至Shard層級。
false(預設值):不啟用。
說明repartition.sql檔案中串連Hologres的更多參數介紹詳情,請參見Hologres Flink Connector參數說明。在Flink叢集環境中,上傳開源Flink Connector:hologres-connector-flink-repartition.jar至任意目錄下。本文以上傳至根目錄為例。
說明通過開源Flink Connector支援將資料大量匯入Hologres,Flink Connector外掛程式相關開原始碼,詳情請參見Hologres GitHub官方庫。
提交Flink作業,程式碼範例如下。
./bin/flink run -Dexecution.runtime-mode=BATCH -p 3 -c com.alibaba.ververica.connectors.hologres.example.FlinkToHoloRePartitionExample hologres-connector-flink-repartition.jar --sqlFilePath="/flink-1.15.4/src/repartition.sql"上述參數說明:
Dexecution.runtime-mode:Flink作業的執行模式,詳情請參見Execution Mode。
p:作業並發數。建議在配置作業並發數時取值與結果表的ShardCount相同,或者可以被ShardCount整除。
c:hologres-connector-flink-repartition.jar的主類名稱以及所在的路徑。
sqlFilePath:
repartition.sql檔案的路徑。
查詢Hologres結果表。
Flink作業提交成功後,您可以在Hologres中查詢寫入的資料。樣本語句如下。
SELECT * FROM test_sink_customer;