Stream Load是一種同步的匯入方式,您可以通過HTTP協議發送請求將本地檔案或資料流匯入到Doris中。Stream Load同步執行匯入並返回匯入結果。您可以直接通過請求的返回體判斷本次匯入是否成功。本文為您介紹Stream Load匯入的基本原理、基本操作、系統配置以及最佳實務。
適用情境
Stream Load主要適用於匯入本地檔案或通過程式匯入資料流中的資料。
基本原理
下面為您展示了Stream Load的主要流程,省略了部分匯入細節。
^ +
| |
| | 1A. User submit load to FE
| |
| +--v-----------+
| | FE |
5. Return result to user | +--+-----------+
| |
| | 2. Redirect to BE
| |
| +--v-----------+
+---+Coordinator BE| 1B. User submit load to BE
+-+-----+----+-+
| | |
+-----+ | +-----+
| | | 3. Distrbute data
| | |
+-v-+ +-v-+ +-v-+
|BE | |BE | |BE |
+---+ +---+ +---+Stream Load中,Doris會選定一個節點作為Coordinator節點,該節點負責接收資料並分發資料到其他資料節點。您可以通過HTTP協議提交匯入命令。如果提交到FE,則FE會通過HTTP redirect指令將請求轉寄給某一個BE。您也可以直接提交匯入命令給某一指定BE。匯入的最終結果由Coordinator BE返回給您。
支援的資料格式
Stream Load支援CSV(文本)和JSON兩個資料格式。
基本操作
建立匯入任務
Stream Load通過HTTP協議提交和傳輸資料。本樣本通過curl命令展示如何提交匯入任務。您也可以通過其他HTTP Client進行操作。
curl命令curl --location-trusted -u user:passwd [-H ""...] -T data.file -XPUT http://fe_host:http_port/api/{db}/{table}/_stream_load # Header中支援屬性見下表。 # 格式為: -H "key1:value1"建立匯入任務的詳細文法可以通過
HELP STREAM LOAD命令查看。Stream Load中所有與匯入任務相關的參數均設定在Header中。相關參數描述如下表所示。參數
說明
簽名參數
user:passwd
Stream Load建立匯入任務使用的是HTTP協議,已通過Basic access authentication進行簽名。Doris會根據簽名來驗證使用者身份和匯入許可權。
匯入任務參數
label
匯入任務的標識。
每個匯入任務,都有一個在單database內部唯一的Label。Label是您在匯入命令中自訂的名稱。通過該Label,您可以查看對應匯入任務的執行情況。Label的另一個作用是防止您重複匯入相同的資料。強烈推薦您同一批次資料使用相同Label,這樣同一批次資料的重複請求只會被接受一次,保證了At-Most-Once。當Label對應的匯入作業狀態為CANCELLED時,該Label可以再次被使用。
column_separator
用於指定匯入檔案中的資料行分隔符號,預設為\t。
如果是不可見字元,則需要加\x作為首碼,使用十六進位來表示分隔字元。例如,Hive檔案的分隔字元\x01,需要指定為
-H "column_separator:\x01"。可以使用多個字元的組合作為資料行分隔符號。line_delimiter
用於指定匯入檔案中的分行符號,預設為\n。可以使用多個字元的組合作為分行符號。
max_filter_ratio
匯入任務的最大容忍率,預設為0容忍,取值範圍是0~1。
當匯入的錯誤率超過該值,則匯入失敗。如果您希望忽略錯誤的行,可以通過設定該參數大於0來保證匯入成功。計算公式為
(dpp.abnorm.ALL / (dpp.abnorm.ALL + dpp.norm.ALL ) ) > max_filter_ratio,其中dpp.abnorm.ALL表示資料品質不合格的行數,例如類型不符、列數不匹配、長度不匹配等;dpp.norm.ALL表示匯入處理程序中正確資料的條數,可以通過SHOW LOAD命令查詢匯入任務的正確資料量。原始檔案的行數 = dpp.abnorm.ALL + dpp.norm.ALLwhere
匯入任務指定的過濾條件。
Stream Load支援對未經處理資料指定where語句進行過濾。被過濾的資料將不會被匯入,也不會參與filter ratio的計算,但會被計入num_rows_unselected。
Partitions
待匯入表的Partition資訊,如果待匯入資料不屬於指定的Partition,則不會被匯入。未被匯入的資料將計入 dpp.abnorm.ALL。
columns
待匯入資料的函數變換配置,目前Stream Load支援的函數變換方法包含列的順序變化以及運算式變換,其中運算式變換的方法與查詢語句的一致。
exec_mem_limit
匯入記憶體限制。預設為2 GB,單位為位元組。
strict_mode
指定此次匯入是否開啟strict mode模式,預設關閉。
Stream Load匯入可以開啟strict mode模式,開啟方式為在HEADER中聲明
strict_mode=true。strict mode模式的意思是對於匯入處理程序中的列類型轉換進行嚴格過濾。嚴格過濾的策略如下:對於列類型轉換來說,如果strict mode為true,則錯誤資料將被filter。這裡的錯誤資料是指未經處理資料並不為空白值,在參與列類型轉換後結果為空白值的這一類資料。
對於匯入的某列由函數變換產生時,strict mode對其不產生影響。
對於匯入的某列類型包含範圍限制的,如果未經處理資料能正常通過類型轉換,但無法通過範圍限制的,strict mode對其也不產生影響。例如,如果類型是decimal(1,0),未經處理資料為10,則屬於可以通過類型轉換但不在列聲明的範圍內,strict mode對其不產生影響。
merge_type
資料的合并類型,共支援APPEND、DELETE、MERGE三種類型。
APPEND(預設值):表示這批資料全部需要追加到現有資料中。
DELETE:表示刪除與這批資料key相同的所有行。
MERGE:需要與DELETE條件聯合使用,表示滿足DELETE條件的資料按照DELETE語義處理,其餘的按照APPEND語義處理。
two_phase_commit
Stream Load匯入可以開啟兩階段事務提交模式:在Stream load過程中,資料寫入完成即會返回資訊,此時資料不可見,事務狀態為PRECOMMITTED,您手動觸發commit操作之後,資料才可見。預設的兩階段批量事務提交為關閉。
開啟方式是在be.conf中配置
disable_stream_load_2pc=false,並且在HEADER中聲明two_phase_commit=true。樣本:
發起Stream Load預提交操作。
說明列順序變換例子:未經處理資料有三列src_c1、src_c2、rc_c3,目前Doris表也有三列dst_c1、dst_c2、dst_c3。
如果原始表的src_c1列對應目標表dst_c1列,原始表的src_c2列對應目標表dst_c2列,原始表的src_c3列對應目標表dst_c3列,則寫法為
columns: dst_c1, dst_c2, dst_c3。如果原始表的src_c1列對應目標表dst_c2列,原始表的src_c2列對應目標表dst_c3列,原始表的src_c3列對應目標表dst_c1列,則寫法為
columns: dst_c2, dst_c3, dst_c1。運算式變換例子:原始檔案有兩列,目標表也有兩列(c1,c2),但是原始檔案的兩列均需要經過函數變換才能對應目標表的兩列,則寫法如下為
columns: tmp_c1, tmp_c2, c1 = year(tmp_c1), c2 = month(tmp_c2),其中tmp_*是一個預留位置,代表的是原始檔案中的兩個原始列。
curl --location-trusted -u user:passwd -H "two_phase_commit:true" -T test.txt http://fe_host:http_port/api/{db}/{table}/_stream_load { "TxnId": 18036, "Label": "55c8ffc9-1c40-4d51-b75e-f2265b36****", "TwoPhaseCommit": "true", "Status": "Success", "Message": "OK", "NumberTotalRows": 100, "NumberLoadedRows": 100, "NumberFilteredRows": 0, "NumberUnselectedRows": 0, "LoadBytes": 1031, "LoadTimeMs": 77, "BeginTxnTimeMs": 1, "StreamLoadPutTimeMs": 1, "ReadDataTimeMs": 0, "WriteDataTimeMs": 58, "CommitAndPublishTimeMs": 0 }對事務觸發commit操作。
對事務觸發abort操作。
樣本
curl --location-trusted -u root -T date -H "label:123" http://abc.com:8030/api/test/date/_stream_load返回結果
由於Stream Load是一種同步的匯入方式,所以匯入的結果會通過建立匯入的傳回值直接返回給使用者。樣本如下。
{ "TxnId": 1003, "Label": "b6f3bc78-0d2c-45d9-9e4c-faa0a014****", "Status": "Success", "ExistingJobStatus": "FINISHED", // optional "Message": "OK", "NumberTotalRows": 1000000, "NumberLoadedRows": 1000000, "NumberFilteredRows": 1, "NumberUnselectedRows": 0, "LoadBytes": 40888898, "LoadTimeMs": 2144, "BeginTxnTimeMs": 1, "StreamLoadPutTimeMs": 2, "ReadDataTimeMs": 325, "WriteDataTimeMs": 1933, "CommitAndPublishTimeMs": 106, "ErrorURL": "http://192.168.**.**:8042/api/_load_error_log?file=__shard_0/error_log_insert_stmt_db18266d4d9b4ee5-abb00ddd64bdf005_db18266d4d9b4ee5_abb00ddd64bd****" }Stream load匯入結果參數如下表。
參數
說明
TxnId
匯入的事務ID。使用者可不感知。
Label
匯入的Label。由使用者指定或系統自動產生。
Status
匯入完成狀態。
Success:表示匯入成功。
Publish Timeout:表示匯入已經完成,只是資料可能會延遲可見,無需重試。
Label Already Exists:Label重複,需更換Label。
Fail:匯入失敗。
ExistingJobStatus
已存在Label對應的匯入作業的狀態。該欄位只有當Status為Label Already Exists時才會顯示。您可以通過該狀態,知曉已存在Label對應的匯入作業的狀態。
RUNNING:表示作業在執行中。
FINISHED:表示作業成功。
Message
匯入錯誤資訊。
NumberTotalRows
匯入總處理的行數。
NumberLoadedRows
成功匯入的行數。
NumberFilteredRows
資料品質不合格的行數。
NumberUnselectedRows
被where條件過濾的行數。
LoadBytes
匯入的位元組數。
LoadTimeMs
匯入完成時間。單位毫秒。
BeginTxnTimeMs
向FE請求開始一個事務所花費的時間,單位毫秒。
StreamLoadPutTimeMs
向FE請求擷取匯入資料執行計畫所花費的時間,單位毫秒。
ReadDataTimeMs
讀取資料所花費的時間,單位毫秒。
WriteDataTimeMs
執行寫入資料操作所花費的時間,單位毫秒。
CommitAndPublishTimeMs
向FE請求提交並且發布事務所花費的時間,單位毫秒。
ErrorURL
如果有資料品質問題,通過訪問該URL查看具體錯誤行。
重要由於Stream Load是同步的匯入方式,所以並不會在Doris中記錄匯入資訊,您無法非同步通過查看匯入命令看到Stream Load。使用時需監聽建立匯入請求的傳回值擷取匯入結果。
取消匯入
您無法手動取消Stream Load,Stream Load在逾時或者匯入錯誤後會被系統自動取消。
查看Stream Load
您可以通過show stream load來查看已經完成的Stream Load任務。
預設BE是不記錄Stream Load的記錄,如果您要查看需要在BE上啟用記錄,配置參數enable_stream_load_record=true ,具體配置詳情請參見BE參數配置。
相關係統配置
FE配置
stream_load_default_timeout_second:匯入任務的逾時時間(以秒為單位),匯入任務在設定的timeout時間內未完成則會被系統取消,變成CANCELLED。預設的timeout時間為600秒。如果匯入的源檔案無法在規定時間內完成匯入,您可以在Stream Load 請求中設定單獨的逾時時間,或者調整FE的stream_load_default_timeout_second參數來設定全域的預設逾時時間。
BE配置
streaming_load_max_mbStream:Stream Load的最大匯入大小,預設為10 GB,單位是MB。如果您的原始檔案超過該值,則需要調整BE的streaming_load_max_mb參數。
最佳實務
應用情境
使用Stream Load最合適的情境就是原始檔案在記憶體中或者在磁碟中。其次,由於Stream Load是一種同步的匯入方式,所以如果您希望用同步方式擷取匯入結果,也可以使用這種匯入。
資料量
由於Stream Load的原理是由BE發起的匯入並分發資料,建議的匯入資料量在1 GB到10 GB之間。由於預設的最大Stream Load匯入資料量為 10 GB,所以匯入超過10 GB的檔案就需要修改BE的配置streaming_load_max_mb。
例如,如果待匯入檔案大小為15 GB,則需修改BE配置streaming_load_max_mb為16000即可。
Stream Load的預設逾時為300秒,按照Doris目前最大的匯入限速來看,約超過3 GB的檔案就需要修改匯入任務預設逾時時間。
匯入任務逾時時間 = 匯入資料量 / 10M/s (具體的平均匯入速度需要您根據自己的叢集情況計算)例如,如果匯入一個10 GB的檔案,則timeout = 1000s ,為10G / 10M/s。
完整樣本
資料情況:資料在用戶端本地磁碟路徑/home/store-sales中,匯入的資料量約為15 GB,希望匯入到資料庫bj-sales的表store-sales中。
叢集情況:Stream Load的並發數不受叢集大小影響。
樣本如下:
因為匯入檔案大小超過預設的最大匯入大小10 GB,所以需要修改BE的設定檔BE.conf。
streaming_load_max_mb = 16000計算大概的匯入時間是否超過預設timeout值,匯入時間為
15000 / 10 = 1500s,如果超過了預設的timeout時間,則需要修改FE的配置FE.conf,修改參數stream_load_default_timeout_second,將匯入時間調整為1500。建立匯入任務。
curl --location-trusted -u user:password -T /home/store_sales -H "label:abc" http://abc.com:8030/api/bj_sales/store_sales/_stream_load