StarRocks支援從本地直接匯入資料,支援CSV檔案格式,資料量在10 GB以下。本文為您介紹Stream Load匯入的基本原理、使用樣本和最佳實務。
背景資訊
Stream Load是一種同步的匯入方式,通過發送HTTP請求將本地檔案或資料流匯入到StarRocks中。Stream Load同步執行匯入並返回匯入結果。您可以直接通過請求的傳回值判斷匯入是否成功。
基本概念
Coordinator:協調節點。負責接收資料並分發資料到其他資料節點,匯入完成後返回結果。
基本原理
Stream Load通過HTTP協議提交匯入命令。如果提交到FE節點,則FE節點會通過HTTP Redirect指令將請求轉寄給某一個BE節點,您也可以直接提交匯入命令給某一指定BE節點。該BE節點作為Coordinator節點,將資料按表Schema劃分並分發資料到相關的BE節點。匯入的最終結果由Coordinator節點返回給使用者。
Stream Load的主要流程如下圖所示。
匯入樣本
建立匯入任務
Stream Load通過HTTP協議提交和傳輸資料。本樣本通過curl命令展示如何提交匯入任務。您也可以通過其他HTTP Client進行操作。
文法
curl --location-trusted -u user:passwd [-H ""...] -T data.file -XPUT \
http://fe_host:http_port/api/{db}/{table}/_stream_load當前支援HTTP chunked與非chunked兩種上傳方式,對於非chunked方式,必須要有Content-Length來標示上傳的內容長度,保證資料的完整性。
建議設定Expect Header欄位內容為100-continue,可以在某些出錯情境下避免不必要的資料轉送。
Header中支援的屬性見下表的匯入任務參數描述,格式為-H "key1:value1"。如果同時有多個任務參數,需要用多個-H來指示,類似於-H "key1:value1" -H "key2:value2"……。Stream Load中所有與匯入任務相關的參數均設定在Header中。相關參數描述如下表所示。
參數 | 描述 | |
簽名參數 | user:passwd | Stream Load建立匯入任務使用的是HTTP協議,已通過Basic access authentication進行簽名。StarRocks系統會根據簽名來驗證使用者身份和匯入許可權。 |
匯入任務參數 | label | 匯入任務的標籤,相同標籤的資料無法多次匯入。 您可以通過指定Label的方式來避免一份資料重複匯入的問題。當前StarRocks系統會保留最近30分鐘內成功完成的任務的Label。 |
column_separator | 用於指定匯入檔案中的資料行分隔符號,預設為\t。 如果是不可見字元,則需要加\x作為首碼,使用十六進位來表示分隔字元。例如,Hive檔案的分隔字元\x01,需要指定為 | |
row_delimiter | 指定匯入檔案中的行分隔字元,預設為\n。 重要 curl命令無法傳遞\n,分行符號手動指定為\n時,Shell會先傳遞反斜線(\),然後傳遞n而不是直接傳遞分行符號\n。 Bash支援另一種逸出字元串文法,傳遞\n和\t時,使用貨幣符號和全形單引號($')啟動字串並以半形單引號(')結束字串。例如, | |
columns | 用於指定匯入檔案中的列和Table中列的對應關係。 如果源檔案中的列正好對應表中的內容,則無需指定該參數。如果源檔案與表Schema不對應,則需要該參數來配置資料轉換規則。列有兩種形式,一種是直接對應於匯入檔案中的欄位,可以直接使用欄位名表示,一種需要通過計算得出。
| |
where | 用於抽取部分資料。如需過濾掉不需要的資料,則可以通過設定該參數來實現。 例如,只匯入k1列等於20180601的資料,則可以在匯入時指定 | |
max_filter_ratio | 最大容忍可過濾(例如,因為資料不規範等原因而過濾)的資料比例。預設為0,取值範圍是0~1。 說明 此處資料不規範的資料不包括通過WHERE條件過濾的資料。 | |
partitions | 用於指定該匯入所涉及的Partition。 如果您能夠確定資料對應的Partition,則推薦指定該項。不滿足指定分區的資料將被過濾掉。例如,指定匯入到p1和p2分區,可以指定 | |
timeout | 指定匯入的逾時時間。預設是600,單位為秒。 設定範圍為1~259200。 | |
strict_mode | 指定此次匯入是否開啟strict 模式,預設為開啟。 關閉方式為 | |
timezone | 指定本次匯入所使用的時區。預設為東八區。 該參數會影響所有匯入涉及和時區有關的函數結果。 | |
exec_mem_limit | 匯入記憶體限制。預設值為2 GB。 | |
樣本
curl --location-trusted -u root -T date -H "label:123" \
http://abc.com:8030/api/test/date/_stream_load匯入任務完成後,Stream Load會以JSON格式返回匯入任務的相關內容,返回結果樣本如下。
{
"TxnId": 11672,
"Label": "f6b62abf-4e16-4564-9009-b77823f3c024",
"Status": "Success",
"Message": "OK",
"NumberTotalRows": 199563535,
"NumberLoadedRows": 199563535,
"NumberFilteredRows": 0,
"NumberUnselectedRows": 0,
"LoadBytes": 50706674331,
"LoadTimeMs": 801327,
"BeginTxnTimeMs": 103,
"StreamLoadPlanTimeMs": 0,
"ReadDataTimeMs": 760189,
"WriteDataTimeMs": 801023,
"CommitAndPublishTimeMs": 199"
}參數 | 描述 |
TxnId | 匯入的事務ID。使用者可不感知。 |
Label | 匯入的Label。由使用者指定或系統自動產生。 |
Status | 匯入完成狀態。
|
ExistingJobStatus | 已存在Label對應的匯入作業的狀態。該欄位只有當Status為Label Already Exists時才會顯示。您可以通過該狀態,知曉已存在Label對應的匯入作業的狀態。
|
Message | 匯入狀態的詳細說明。匯入失敗時會返回具體的失敗原因。 |
NumberTotalRows | 從資料流中讀取到的總行數。 |
NumberLoadedRows | 匯入任務的資料行數,僅在匯入狀態為Success時有效。 |
NumberFilteredRows | 匯入任務過濾掉的行數,即資料品質不合格的行。 |
NumberUnselectedRows | 通過Where條件被過濾掉的行數。 |
LoadBytes | 匯入任務的源檔案資料量大小。 |
LoadTimeMs | 匯入任務所用的時間,單位為ms。 |
ErrorURL | 被過濾資料的具體內容,僅保留前1000條資料。如果匯入任務失敗,可以直接用以下方式擷取被過濾的資料並進行分析,以調整匯入任務。 |
取消匯入任務
Stream Load可以通過停止進程來取消任務,Stream Load在逾時或者匯入錯誤後會被系統自動取消。
ps -ef | grep stream_load最佳實務
應用情境
Stream Load的最佳使用情境是原始檔案在記憶體中或者儲存在本地磁碟中。由於Stream Load是一種同步的匯入方式,所以當您希望用同步方式擷取匯入結果時,也可以使用該匯入方式。
資料量
由於Stream Load是由BE發起的匯入並分發資料,建議的匯入資料量在1 GB到10 GB之間。系統預設的最大Stream Load匯入資料量為10 GB,所以匯入超過10 GB的檔案需要修改BE的配置項streaming_load_max_mb。例如,待匯入檔案大小約為15 GB(15360 MB),則可以修改BE的配置項streaming_load_max_mb大於15 GB即可。
curl --location-trusted -u 'admin:****' -XPOST http://be-c-****-internal.starrocks.aliyuncs.com:8040/api/update_config?streaming_load_max_mb=15360Stream Load的預設逾時為600秒,您可以登入EMR Serverless控制台,通過設定FE的參數來修改該參數值。
完整樣本
資料情況:資料在用戶端本地磁碟路徑/mnt/disk1/customer.tbl中,希望匯入到資料庫stream_load的表customer中。
標準資料下載:customer.tbl
叢集情況:Stream Load的並發數不受叢集大小影響。
樣本如下:
當匯入檔案大小超過預設的最大匯入大小時,需要修改BE的設定檔BE.conf。例如,修改參數streaming_load_max_mb,調整最大匯入為15360 MB。
curl --location-trusted -u 'admin:*****' -XPOST http://be-c-****-internal.starrocks.aliyuncs.com:8040/api/update_config?streaming_load_max_mb=15360在EMR Serverless StarRocks執行個體配置頁面,修改參數stream_load_default_timeout_second,本樣本調整逾時時間為3600。
建立目標表customer。
CREATE TABLE `customer` ( `c_custkey` bigint(20) NULL COMMENT "", `c_name` varchar(65533) NULL COMMENT "", `c_address` varchar(65533) NULL COMMENT "", `c_nationkey` bigint(20) NULL COMMENT "", `c_phone` varchar(65533) NULL COMMENT "", `c_acctbal` double NULL COMMENT "", `c_mktsegment` varchar(65533) NULL COMMENT "", `c_comment` varchar(65533) NULL COMMENT "" ) ENGINE=OLAP DUPLICATE KEY(`c_custkey`) COMMENT "OLAP" DISTRIBUTED BY HASH(`c_custkey`) BUCKETS 24 PROPERTIES ( "replication_num" = "1", "in_memory" = "false", "storage_format" = "DEFAULT", "enable_persistent_index" = "false", "compression" = "LZ4" );建立匯入任務。由於資料集較大,可以在後台執行。
curl --location-trusted -u 'admin:*****' -T /mnt/disk1/customer.tbl -H "label:labelname" -H "column_separator:|" http://fe-c-****-internal.starrocks.aliyuncs.com:8030/api/load_test/customer/_stream_load返回資訊如下。
{ "TxnId": 575, "Label": "labelname", "Status": "Success", "Message": "OK", "NumberTotalRows": 150000, "NumberLoadedRows": 150000, "NumberFilteredRows": 0, "NumberUnselectedRows": 0, "LoadBytes": 24196144, "LoadTimeMs": 1081, "BeginTxnTimeMs": 104, "StreamLoadPlanTimeMs": 106, "ReadDataTimeMs": 85, "WriteDataTimeMs": 850, "CommitAndPublishTimeMs": 20 }說明如果報錯
"ErrorURL": "http://***:8040/api/_load_error_log?file=error_log_***",使用curl命令查看詳細資料即可。
代碼整合樣本
Java開發Stream Load,詳情請參見stream_load。
Spark整合Stream Load,詳情請參見01_sparkStreaming2StarRocks。