本文為您介紹開源Flink 1.11如何即時寫入資料至Hologres。
前提條件
開通Hologres執行個體,並串連開發工具,詳情請參見串連HoloWeb並執行查詢。
搭建Flink叢集(本次樣本使用的是1.15版本),可以前往Flink官網下載二進位包,啟動一個Standalone叢集,詳情請參見文檔叢集搭建。
背景資訊
從開源Flink1.11版本開始,Hologres代碼已開源,相應版本的Connector已經在中央倉庫發布Release包,可在專案中參照如下pom檔案進行配置。詳細內容請參見Hologres GitHub官方庫。
<dependency>
<groupId>com.alibaba.hologres</groupId>
<artifactId>hologres-connector-flink-1.15</artifactId>
<version>1.4.0</version>
<classifier>jar-with-dependencies</classifier>
</dependency>開源Flink版本與hologres-connector-flink最新版本對應關係如下,建議使用1.15及以上版本,功能更豐富:
Flink版本 | Connector版本 |
Flink 1.11 | hologres-connector-flink-1.11:1.0.1 |
Flink 1.12 | hologres-connector-flink-1.12:1.0.1 |
Flink 1.13 | hologres-connector-flink-1.13:1.3.2 |
Flink 1.14 | hologres-connector-flink-1.14:1.3.2 |
Flink 1.15 | hologres-connector-flink-1.15:1.4.1 |
Flink 1.17 | hologres-connector-flink-1.17:1.4.1 |
Flink SQL寫入資料至Hologres程式碼範例
您可以參照如下程式碼範例,通過將Flink SQL將資料寫入Hologres。其中,更多詳細的程式碼範例請參見Hologres GitHub官方庫。
String createHologresTable =
String.format(
"create table sink("
+ " user_id bigint,"
+ " user_name string,"
+ " price decimal(38,2),"
+ " sale_timestamp timestamp"
+ ") with ("
+ " 'connector'='hologres',"
+ " 'dbname' = '%s',"
+ " 'tablename' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'endpoint' = '%s'"
+ ")",
database, tableName, userName, password, endPoint);
tEnv.executeSql(createHologresTable);
createScanTable(tEnv);
tEnv.executeSql("insert into sink select * from source");更多詳盡的程式碼範例請參見hologres-connector-flink-examples,包括如下樣本。
FlinkSQLToHoloExample:一個使用純Flink SQL介面實現的應用,將資料寫入至Hologres。
FlinkDSAndSQLToHoloExample:一個混合Flink DataStream以及SQL 介面實現的應用,寫入Hologres前,將DataStream轉換成Table,之後再用Flink SQL寫入Hologres。
FlinkDataStreamToHoloExample:一個使用純Flink DataStream介面實現的應用,將資料寫入至Hologres。
FlinkRoaringBitmapAggJob:一個使用Flink及RoaringBitmap,結合Hologres維表,實現即時去重統計UV的應用,並將統計結果寫入Hologres。
通過Flink DataStream介面的即時資料寫入方法,可以對資料進行基於Hologres Shard的Repartition操作,有效減少寫入Hologres執行個體時的小檔案數量,從而提升寫入效能並降低系統負載。該方法適用於需要大量匯入具有主鍵的空表,實作類別似Insert Overwrite的情境。
Hologres Flink Connector參數說明
您可以將Flink資料寫入Hologres,Hologres Flink Connector相關參數具體內容如下:
參數 | 是否必填 | 說明 |
connector | 是 | 結果表類型,固定值為hologres。 |
dbname | 是 | Hologres的資料庫名稱。 |
tablename | 是 | Hologres接收資料的表名稱。 |
username | 是 | 當前阿里雲帳號的AccessKey ID。 您可以單擊AccessKey 管理,擷取AccessKey ID。 |
password | 是 | 當前阿里雲帳號的AccessKey Secret。 您可以單擊AccessKey 管理,擷取AccessKey Secret。 |
endpoint | 是 | Hologres的VPC網路地址。進入Hologres管理主控台的執行個體詳情頁,擷取Endpoint。 說明 endpoint需包含連接埠號碼,格式為 |
串連參數
參數
是否必填
說明
connectionSize
否
單個Flink Hologres Task所建立的JDBC串連池大小。
預設值:3,和吞吐成正比。
connectionPoolName
否
串連池名稱,同一個TaskManager中,表配置同名的串連池名稱可以共用串連池。
無預設值,每個表預設使用自己的串連池。如果設定串連池名稱,則所有表的connectionSize需要相同
fixedConnectionMode
否
寫入和點查不佔用串連數(beta功能,需要connector版本>=1.2.0,hologres引擎版本>=1.3)
預設值:false
jdbcRetryCount
否
當串連故障時,寫入和查詢的重試次數。
預設值:10。
jdbcRetrySleepInitMs
否
每次重試的等待時間=retrySleepInitMs+retry*retrySleepStepMs。
預設值:1000ms。
jdbcRetrySleepStepMs
否
每次重試的等待時間=retrySleepInitMs+retry*retrySleepStepMs。
預設值:5000ms。
jdbcConnectionMaxIdleMs
否
寫入線程和點查線程資料庫連接的最大Idle時間,超過串連將被釋放。
預設值:60000ms。
jdbcMetaCacheTTL
否
TableSchema資訊的本機快取時間。
預設值:60000ms。
jdbcMetaAutoRefreshFactor
否
當TableSchema cache剩餘存活時間短於
metaCacheTTL/metaAutoRefreshFactor將自動重新整理cache。預設值:-1,表示不自動重新整理。
connection.ssl.mode
否
是否啟用資料轉送加密。取值說明如下:
disable(預設值):不啟用傳輸加密。
require:啟用SSL,只對資料鏈路加密。
verify-ca:啟用SSL,加密資料鏈路,同時使用CA認證驗證Hologres服務端的真實性。
verify-full:啟用SSL,加密資料鏈路,使用CA認證驗證Hologres服務端的真實性,同時比對認證內的CN或DNS與串連時配置的Hologres串連地址是否一致。
connection.ssl.root-cert.location
否
CA認證的路徑,且確保已經將CA認證上傳至Flink叢集環境。
說明當connection.ssl.mode參數配置為verify-ca或verify-full時,需要配置此參數。
jdbcDirectConnect
否
是否開啟直連。取值說明如下:
false(預設值):不開啟。
true:開啟。
Flink批量寫入的瓶頸是VIP Endpoint的網路吞吐。開啟此參數會測試當前環境能否直連Hologres FE,若支援預設使用直連。
寫入參數
參數
是否必填
說明
mutatetype
否
資料寫入模式,詳情請參見流式語義。
預設值:insertorignore。
ignoredelete
否
是否忽略撤回訊息。通常Flink的Group By會產生回撤訊息,回撤訊息到Hologres connector會產生Delete請求。
預設值:true,僅在使用流式語義時生效。
createparttable
否
當寫入分區表時,是否自動根據分區值自動建立分區表。
false(預設值):不會自動建立。
true:自動建立。
建議慎用該功能,確保分區值不會出現髒資料導致建立錯誤的分區表。
ignoreNullWhenUpdate
否
當
mutatetype='insertOrUpdate'時,是否忽略更新寫入資料中的Null值。預設值:false。
jdbcWriteBatchSize
否
Hologres Sink節點資料攢批的最大批大小。
預設值:256
jdbcWriteBatchByteSize
否
Hologres Sink節點單個線程資料攢批的最大位元組大小。
預設值:2097152(2 * 1024 * 1024),2MB。
jdbcWriteBatchTotalByteSize
否
Hologres Sink節點所有資料攢批的最大位元組大小。
預設值:20971520(20 * 1024 * 1024),20MB。
jdbcWriteFlushInterval
否
Hologres Sink節點資料攢批的最長Flush等待時間。
預設值:10000,即10秒。
jdbcUseLegacyPutHandler
否
寫入SQL格式,取值說明如下:
true:寫入SQL格式為
insert into xxx(c0,c1,...) values (?,?,...),... on conflict;。false(預設值):寫入SQL格式為
insert into xxx(c0,c1,...) select unnest(?),unnest(?),... on conflict。
jdbcEnableDefaultForNotNullColumn
否
設定為true時,not null且未在表上設定default的欄位傳入null時,將以預設值寫入。String類型預設為"",Number類型預設為0,Date、timestamp、timestamptz 類型預設為1970-01-01 00:00:00。
預設值:true。
remove-u0000-in-text.enabled
否
是否自動替換TEXT資料類型中的非UTF-8的u0000字元。取值說明如下:
true:替換。
false(預設值):不替換。
deduplication.enabled
否
若一批寫入的資料中有主鍵相同的資料,是否進行去重。取值說明如下:
true(預設值):進行去重,只保留後到達的這條資料。
false:不進行去重。
首先將批處理資料寫入,待寫入完成後再繼續寫入後到達的資料。
說明在不進行去重時,極端情況下(例如所有資料的主鍵都相同),寫入操作會退化為非批量寫入,對效能會產生一定影響。
aggressive.enabled
否
是否啟用激進提交模式。取值說明如下:
true:啟用。
啟用後,即使批量處理沒有達到預期的條數,只要檢測到串連空閑,系統就會強制提交資料。在流量較小時,這種方法可以有效減少資料轉送的延時。
false(預設值):不啟用。
jdbcCopyWriteMode
否
資料寫入方式。取值說明如下:
true:使用Copy方式寫入,Copy寫入方式分為流式Copy(Fixed Copy)和批量Copy,當前預設使用流式Copy(Fixed Copy)方式寫入。
說明與使用INSERT方式寫入相比,Fixed Copy方式可以實現更高的吞吐(因為採用流模式),更低的資料延時以及更低的用戶端記憶體消耗(因為不需要攢批資料),但不支援資料回撤功能。
false(預設值):使用INSERT方式寫入。
說明僅Hologres V1.3.1及以上版本支援此參數。
jdbcCopyWriteFormat
否
底層是否採用二進位協議。
binary(預設值):表示使用二進位模式,二進位會更快。
text:為文字模式。
說明僅Hologres V1.3.1及以上版本支援此參數。
bulkLoad
是否使用批量Copy方式寫入。取值說明如下:
true:使用批量Copy方式寫入,僅jdbcCopyWriteMode參數也設定為true時,該參數才會生效,否則使用Fixed Copy方式寫入。
說明批量Copy相較於流式Copy(Fixed Copy),具備更高的效率,能更好地利用Hologres的資源,從而在資料寫入過程中提供更優的效能,您可以根據業務需要,選擇合適的資料寫入方式。
在對主鍵表進行批量Copy寫入時,通常會出現表鎖的情況,您可以通過配置target-shards.enabled參數為true,將寫入鎖粒度降至Shard層級,從而允許並發執行多個大量匯入任務,減少了表鎖的發生。相比Fixed Copy模式,批量Copy寫入有主鍵表時,通過這種方式能夠顯著降低Hologres執行個體的負載,實測顯示,可以減少約66.7%的負載。
批量Copy寫入時,如果目標表包含主鍵,要求在寫入之前目標表為空白表,否則寫入過程中進行主鍵去重會影響寫入效能。
false(預設值):不使用。
說明僅Hologres V1.4.0及以上版本支援此參數。
target-shards.enabled
是否啟用Target Shard批量寫入。取值說明如下:
true:啟用Target Shard批量寫入,當來源資料已按Shard重新分區時,可以將寫入鎖粒度降至Shard層級。
false(預設值):不啟用。
說明僅Hologres V1.4.1及以上版本支援此參數。
點查參數
參數
是否必填
說明
jdbcReadBatchSize
否
維表點查最大批次大小。
預設值:128。
jdbcReadBatchQueueSize
否
維表點查請求緩衝隊列大小。
預設值:256。
async
否
是否採用非同步方式同步資料。
預設值:false。非同步模式可以並發地處理多個請求和響應,從而連續的請求之間不需要阻塞等待,提高查詢的吞吐。但在非同步模式下,無法保證請求的絕對順序。
cache
否
緩衝策略。
預設值:None。Hologres僅支援以下兩種緩衝策略:None:無緩衝。LRU:緩衝維表裡的部分資料。源表的每條資料都會觸發系統先在Cache中尋找資料,如果未找到,則去物理維表中尋找。
cachesize
否
緩衝大小。
預設值:10000。選擇LRU緩衝策略後,可以設定緩衝大小。
cachettlms
否
更新緩衝的時間間隔,單位為毫秒。
當選擇LRU緩衝策略後,可以設定緩衝失效的逾時時間,預設不到期。
cacheempty
否
是否緩衝join結果為空白的資料。
預設值:true,表示緩衝join結果為空白的資料。false:表示不緩衝join結果為空白的資料。
資料類型映射
當前Flink全託管與Hologres的資料類型映射請參見Blink/Flink與Hologres的資料類型映射。