全部產品
Search
文件中心

E-MapReduce:Stream Load

更新時間:Jul 01, 2024

Stream Load是一種同步的匯入方式,您可以通過HTTP協議發送請求將本地檔案或資料流匯入到Doris中。Stream Load同步執行匯入並返回匯入結果。您可以直接通過請求的返回體判斷本次匯入是否成功。本文為您介紹Stream Load匯入的基本原理、基本操作、系統配置以及最佳實務。

適用情境

Stream Load主要適用於匯入本地檔案或通過程式匯入資料流中的資料。

基本原理

下面為您展示了Stream Load的主要流程,省略了部分匯入細節。

^      +
                         |      |
                         |      | 1A. User submit load to FE
                         |      |
                         |   +--v-----------+
                         |   | FE           |
5. Return result to user |   +--+-----------+
                         |      |
                         |      | 2. Redirect to BE
                         |      |
                         |   +--v-----------+
                         +---+Coordinator BE| 1B. User submit load to BE
                             +-+-----+----+-+
                               |     |    |
                         +-----+     |    +-----+
                         |           |          | 3. Distrbute data
                         |           |          |
                       +-v-+       +-v-+      +-v-+
                       |BE |       |BE |      |BE |
                       +---+       +---+      +---+

Stream Load中,Doris會選定一個節點作為Coordinator節點,該節點負責接收資料並分發資料到其他資料節點。您可以通過HTTP協議提交匯入命令。如果提交到FE,則FE會通過HTTP redirect指令將請求轉寄給某一個BE。您也可以直接提交匯入命令給某一指定BE。匯入的最終結果由Coordinator BE返回給您。

支援的資料格式

Stream Load支援CSV(文本)和JSON兩個資料格式。

基本操作

建立匯入任務

Stream Load通過HTTP協議提交和傳輸資料。本樣本通過curl命令展示如何提交匯入任務。您也可以通過其他HTTP Client進行操作。

  • curl命令

    curl --location-trusted -u user:passwd [-H ""...] -T data.file -XPUT http://fe_host:http_port/api/{db}/{table}/_stream_load
    
    # Header中支援屬性見下表。
    # 格式為: -H "key1:value1"

    建立匯入任務的詳細文法可以通過HELP STREAM LOAD命令查看。Stream Load中所有與匯入任務相關的參數均設定在Header中。相關參數描述如下表所示。

    參數

    說明

    簽名參數

    user:passwd

    Stream Load建立匯入任務使用的是HTTP協議,已通過Basic access authentication進行簽名。Doris會根據簽名來驗證使用者身份和匯入許可權。

    匯入任務參數

    label

    匯入任務的標識。

    每個匯入任務,都有一個在單database內部唯一的Label。Label是您在匯入命令中自訂的名稱。通過該Label,您可以查看對應匯入任務的執行情況。Label的另一個作用是防止您重複匯入相同的資料。強烈推薦您同一批次資料使用相同Label,這樣同一批次資料的重複請求只會被接受一次,保證了At-Most-Once。當Label對應的匯入作業狀態為CANCELLED時,該Label可以再次被使用。

    column_separator

    用於指定匯入檔案中的資料行分隔符號,預設為\t。

    如果是不可見字元,則需要加\x作為首碼,使用十六進位來表示分隔字元。例如,Hive檔案的分隔字元\x01,需要指定為-H "column_separator:\x01"。可以使用多個字元的組合作為資料行分隔符號。

    line_delimiter

    用於指定匯入檔案中的分行符號,預設為\n。可以使用多個字元的組合作為分行符號。

    max_filter_ratio

    匯入任務的最大容忍率,預設為0容忍,取值範圍是0~1。

    當匯入的錯誤率超過該值,則匯入失敗。如果您希望忽略錯誤的行,可以通過設定該參數大於0來保證匯入成功。計算公式為(dpp.abnorm.ALL / (dpp.abnorm.ALL + dpp.norm.ALL ) ) > max_filter_ratio,其中dpp.abnorm.ALL表示資料品質不合格的行數,例如類型不符、列數不匹配、長度不匹配等;dpp.norm.ALL表示匯入處理程序中正確資料的條數,可以通過SHOW LOAD命令查詢匯入任務的正確資料量。

    原始檔案的行數 = dpp.abnorm.ALL + dpp.norm.ALL

    where

    匯入任務指定的過濾條件。

    Stream Load支援對未經處理資料指定where語句進行過濾。被過濾的資料將不會被匯入,也不會參與filter ratio的計算,但會被計入num_rows_unselected。

    Partitions

    待匯入表的Partition資訊,如果待匯入資料不屬於指定的Partition,則不會被匯入。未被匯入的資料將計入 dpp.abnorm.ALL。

    columns

    待匯入資料的函數變換配置,目前Stream Load支援的函數變換方法包含列的順序變化以及運算式變換,其中運算式變換的方法與查詢語句的一致。

    exec_mem_limit

    匯入記憶體限制。預設為2 GB,單位為位元組。

    strict_mode

    指定此次匯入是否開啟strict mode模式,預設關閉。

    Stream Load匯入可以開啟strict mode模式,開啟方式為在HEADER中聲明strict_mode=true。strict mode模式的意思是對於匯入處理程序中的列類型轉換進行嚴格過濾。嚴格過濾的策略如下:

    • 對於列類型轉換來說,如果strict mode為true,則錯誤資料將被filter。這裡的錯誤資料是指未經處理資料並不為空白值,在參與列類型轉換後結果為空白值的這一類資料。

    • 對於匯入的某列由函數變換產生時,strict mode對其不產生影響。

    • 對於匯入的某列類型包含範圍限制的,如果未經處理資料能正常通過類型轉換,但無法通過範圍限制的,strict mode對其也不產生影響。例如,如果類型是decimal(1,0),未經處理資料為10,則屬於可以通過類型轉換但不在列聲明的範圍內,strict mode對其不產生影響。

    merge_type

    資料的合并類型,共支援APPEND、DELETE、MERGE三種類型。

    • APPEND(預設值):表示這批資料全部需要追加到現有資料中。

    • DELETE:表示刪除與這批資料key相同的所有行。

    • MERGE:需要與DELETE條件聯合使用,表示滿足DELETE條件的資料按照DELETE語義處理,其餘的按照APPEND語義處理。

    two_phase_commit

    Stream Load匯入可以開啟兩階段事務提交模式:在Stream load過程中,資料寫入完成即會返回資訊,此時資料不可見,事務狀態為PRECOMMITTED,您手動觸發commit操作之後,資料才可見。預設的兩階段批量事務提交為關閉。

    開啟方式是在be.conf中配置disable_stream_load_2pc=false,並且在HEADER中聲明two_phase_commit=true

    樣本:

    1. 發起Stream Load預提交操作。

      說明

      列順序變換例子:未經處理資料有三列src_c1、src_c2、rc_c3,目前Doris表也有三列dst_c1、dst_c2、dst_c3。

      • 如果原始表的src_c1列對應目標表dst_c1列,原始表的src_c2列對應目標表dst_c2列,原始表的src_c3列對應目標表dst_c3列,則寫法為columns: dst_c1, dst_c2, dst_c3

      • 如果原始表的src_c1列對應目標表dst_c2列,原始表的src_c2列對應目標表dst_c3列,原始表的src_c3列對應目標表dst_c1列,則寫法為columns: dst_c2, dst_c3, dst_c1

      • 運算式變換例子:原始檔案有兩列,目標表也有兩列(c1,c2),但是原始檔案的兩列均需要經過函數變換才能對應目標表的兩列,則寫法如下為columns: tmp_c1, tmp_c2, c1 = year(tmp_c1), c2 = month(tmp_c2),其中tmp_*是一個預留位置,代表的是原始檔案中的兩個原始列。

      curl  --location-trusted -u user:passwd -H "two_phase_commit:true" -T test.txt http://fe_host:http_port/api/{db}/{table}/_stream_load
      {
          "TxnId": 18036,
          "Label": "55c8ffc9-1c40-4d51-b75e-f2265b36****",
          "TwoPhaseCommit": "true",
          "Status": "Success",
          "Message": "OK",
          "NumberTotalRows": 100,
          "NumberLoadedRows": 100,
          "NumberFilteredRows": 0,
          "NumberUnselectedRows": 0,
          "LoadBytes": 1031,
          "LoadTimeMs": 77,
          "BeginTxnTimeMs": 1,
          "StreamLoadPutTimeMs": 1,
          "ReadDataTimeMs": 0,
          "WriteDataTimeMs": 58,
          "CommitAndPublishTimeMs": 0
      }
    2. 對事務觸發commit操作。

    3. 對事務觸發abort操作。

  • 樣本

    curl --location-trusted -u root -T date -H "label:123" http://abc.com:8030/api/test/date/_stream_load
  • 返回結果

    由於Stream Load是一種同步的匯入方式,所以匯入的結果會通過建立匯入的傳回值直接返回給使用者。樣本如下。

    {
        "TxnId": 1003,
        "Label": "b6f3bc78-0d2c-45d9-9e4c-faa0a014****",
        "Status": "Success",
        "ExistingJobStatus": "FINISHED", // optional
        "Message": "OK",
        "NumberTotalRows": 1000000,
        "NumberLoadedRows": 1000000,
        "NumberFilteredRows": 1,
        "NumberUnselectedRows": 0,
        "LoadBytes": 40888898,
        "LoadTimeMs": 2144,
        "BeginTxnTimeMs": 1,
        "StreamLoadPutTimeMs": 2,
        "ReadDataTimeMs": 325,
        "WriteDataTimeMs": 1933,
        "CommitAndPublishTimeMs": 106,
        "ErrorURL": "http://192.168.**.**:8042/api/_load_error_log?file=__shard_0/error_log_insert_stmt_db18266d4d9b4ee5-abb00ddd64bdf005_db18266d4d9b4ee5_abb00ddd64bd****"
    }

    Stream load匯入結果參數如下表。

    參數

    說明

    TxnId

    匯入的事務ID。使用者可不感知。

    Label

    匯入的Label。由使用者指定或系統自動產生。

    Status

    匯入完成狀態。

    • Success:表示匯入成功。

    • Publish Timeout:表示匯入已經完成,只是資料可能會延遲可見,無需重試。

    • Label Already Exists:Label重複,需更換Label。

    • Fail:匯入失敗。

    ExistingJobStatus

    已存在Label對應的匯入作業的狀態。該欄位只有當Status為Label Already Exists時才會顯示。您可以通過該狀態,知曉已存在Label對應的匯入作業的狀態。

    • RUNNING:表示作業在執行中。

    • FINISHED:表示作業成功。

    Message

    匯入錯誤資訊。

    NumberTotalRows

    匯入總處理的行數。

    NumberLoadedRows

    成功匯入的行數。

    NumberFilteredRows

    資料品質不合格的行數。

    NumberUnselectedRows

    被where條件過濾的行數。

    LoadBytes

    匯入的位元組數。

    LoadTimeMs

    匯入完成時間。單位毫秒。

    BeginTxnTimeMs

    向FE請求開始一個事務所花費的時間,單位毫秒。

    StreamLoadPutTimeMs

    向FE請求擷取匯入資料執行計畫所花費的時間,單位毫秒。

    ReadDataTimeMs

    讀取資料所花費的時間,單位毫秒。

    WriteDataTimeMs

    執行寫入資料操作所花費的時間,單位毫秒。

    CommitAndPublishTimeMs

    向FE請求提交並且發布事務所花費的時間,單位毫秒。

    ErrorURL

    如果有資料品質問題,通過訪問該URL查看具體錯誤行。

    重要

    由於Stream Load是同步的匯入方式,所以並不會在Doris中記錄匯入資訊,您無法非同步通過查看匯入命令看到Stream Load。使用時需監聽建立匯入請求的傳回值擷取匯入結果。

取消匯入

您無法手動取消Stream Load,Stream Load在逾時或者匯入錯誤後會被系統自動取消。

查看Stream Load

您可以通過show stream load來查看已經完成的Stream Load任務。

預設BE是不記錄Stream Load的記錄,如果您要查看需要在BE上啟用記錄,配置參數enable_stream_load_record=true ,具體配置詳情請參見BE參數配置

相關係統配置

FE配置

stream_load_default_timeout_second:匯入任務的逾時時間(以秒為單位),匯入任務在設定的timeout時間內未完成則會被系統取消,變成CANCELLED。預設的timeout時間為600秒。如果匯入的源檔案無法在規定時間內完成匯入,您可以在Stream Load 請求中設定單獨的逾時時間,或者調整FE的stream_load_default_timeout_second參數來設定全域的預設逾時時間。

BE配置

streaming_load_max_mbStream:Stream Load的最大匯入大小,預設為10 GB,單位是MB。如果您的原始檔案超過該值,則需要調整BE的streaming_load_max_mb參數。

最佳實務

應用情境

使用Stream Load最合適的情境就是原始檔案在記憶體中或者在磁碟中。其次,由於Stream Load是一種同步的匯入方式,所以如果您希望用同步方式擷取匯入結果,也可以使用這種匯入。

資料量

由於Stream Load的原理是由BE發起的匯入並分發資料,建議的匯入資料量在1 GB到10 GB之間。由於預設的最大Stream Load匯入資料量為 10 GB,所以匯入超過10 GB的檔案就需要修改BE的配置streaming_load_max_mb

例如,如果待匯入檔案大小為15 GB,則需修改BE配置streaming_load_max_mb為16000即可。

Stream Load的預設逾時為300秒,按照Doris目前最大的匯入限速來看,約超過3 GB的檔案就需要修改匯入任務預設逾時時間。

匯入任務逾時時間 = 匯入資料量 / 10M/s (具體的平均匯入速度需要您根據自己的叢集情況計算)

例如,如果匯入一個10 GB的檔案,則timeout = 1000s ,為10G / 10M/s

完整樣本

資料情況:資料在用戶端本地磁碟路徑/home/store-sales中,匯入的資料量約為15 GB,希望匯入到資料庫bj-sales的表store-sales中。

叢集情況:Stream Load的並發數不受叢集大小影響。

樣本如下:

  1. 因為匯入檔案大小超過預設的最大匯入大小10 GB,所以需要修改BE的設定檔BE.conf

    streaming_load_max_mb = 16000
  2. 計算大概的匯入時間是否超過預設timeout值,匯入時間為15000 / 10 = 1500s,如果超過了預設的timeout時間,則需要修改FE的配置FE.conf,修改參數stream_load_default_timeout_second,將匯入時間調整為1500。

  3. 建立匯入任務。

    curl --location-trusted -u user:password -T /home/store_sales -H "label:abc" http://abc.com:8030/api/bj_sales/store_sales/_stream_load