全部產品
Search
文件中心

E-MapReduce:Stream Load

更新時間:Jul 15, 2025

當您需要將本地檔案或資料流匯入到StarRocks時,可以使用Stream Load進行資料匯入。本文介紹如何通過Stream Load匯入資料至StarRocks。

背景資訊

Stream Load是一種同步的匯入方式,您可以通過發送HTTP請求將本地檔案或資料流匯入到StarRocks中。該過程同步執行並返回匯入結果,您可以通過請求的傳回值直接判斷匯入是否成功。Stream Load支援CSV和JSON檔案格式,且單次匯入的資料量限制在10 GB以下。

建立匯入任務

Stream Load通過HTTP協議提交和傳輸資料。本文通過curl命令展示如何提交匯入任務。您也可以通過其他HTTP Client進行操作。

文法

curl --location-trusted -u <username>:<password> -XPUT <url>
(
    data_desc
)
[opt_properties]        
說明
  • 建議在HTTP請求的要求標頭欄位Expect中指定100-continue,即"Expect:100-continue"。這樣在伺服器拒絕匯入工作要求的情況下,可以避免不必要的資料轉送,從而減少不必要的資源開銷。

  • 在StarRocks中,部分文字屬於SQL語言的保留關鍵字,因而無法直接用於SQL語句。如果希望在SQL語句中引用這些保留關鍵字,必須使用反引號 (`) 將其包裹起來。更多關鍵字資訊,請參見Keywords

參數說明

  • <username>:<password>:指定StarRocks叢集的使用者名稱和密碼。必選參數。如果帳號沒有設定密碼,這裡只需要傳入<username>:

  • XPUT:用於指定HTTP要求方法。必選參數。Stream Load當前只支援PUT方法。

  • <url>:用於指定StarRocks表的URL地址。必選參數。填寫格式為:http://<fe_host>:<fe_http_port>/api/<database_name>/<table_name>/_stream_load

    涉及參數如下表所示。

    參數

    是否必須

    說明

    <fe_host>

    指定StarRocks叢集中FE的IP地址。

    <fe_http_port>

    指定StarRocks叢集中FE的HTTP連接埠號碼。預設為18030。

    您可以在StarRocks的叢集服務頁面的配置頁簽,通過搜尋http_port參數,查看連接埠號碼。此外,您也可以通過SHOW FRONTENDS命令查看FE節點的IP地址和HTTP連接埠號碼。

    <database_name>

    指定目標StarRocks表所在的資料庫的名稱。

    <table_name>

    指定目標StarRocks表的名稱。

  • desc:用於描述來源資料檔案的各項屬性,包括檔案名稱、格式、資料行分隔符號、行分隔字元、目標資料分割,以及與StarRocks表之間的列對應關係等。填寫格式為如下所示。

    -T <file_path>
    -H "format: CSV | JSON"
    -H "column_separator: <column_separator>"
    -H "row_delimiter: <row_delimiter>"
    -H "columns: <column1_name>[, <column2_name>, ... ]"
    -H "partitions: <partition1_name>[, <partition2_name>, ...]"
    -H "temporary_partitions: <temporary_partition1_name>[, <temporary_partition2_name>, ...]"
    -H "jsonpaths: [ \"<json_path1>\"[, \"<json_path2>\", ...] ]"
    -H "strip_outer_array: true | false"
    -H "json_root: <json_path>"
    -H "ignore_json_size: true | false"
    -H "compression: <compression_algorithm> | Content-Encoding: <compression_algorithm>"

    data_desc中的參數可以分為三類:公用參數、CSV適用的參數、以及JSON適用的參數。

    • 公用參數

      參數

      是否必選

      說明

      <file_path>

      指定來源資料檔案的儲存路徑。檔案名稱裡可選包含或者不包含副檔名。

      format

      指定待匯入資料的格式。取值包括CSVJSON。預設值:CSV

      partitions

      指定需將資料匯入的具體分區。如果未指定該參數,則預設將資料匯入StarRocks表所在的所有分區。

      temporary_partitions

      指定要匯入資料的臨時分區。

      columns

      指定來源資料檔案與StarRocks表之間的列對應關係。

      • 如果來源資料檔案中的列與StarRocks表中的列按順序一一對應,則不需要指定該參數。

      • 如果來源資料檔案與表Schema不對應,則需要該參數來配置資料轉換規則。列有兩種形式,一種是直接對應於匯入檔案中的欄位,可以直接使用欄位名表示,一種需要通過計算得出。

        • 樣本1:表中有3列c1, c2, c3,源檔案中的3列依次對應的是c3,c2,c1,則需要指定-H "columns: c3, c2, c1"

        • 樣本2:表中有3列c1, c2, c3,源檔案中前3列與表中的列一一對應,但是還有多餘1列,則需要指定-H "columns: c1, c2, c3, temp",最後1列隨意指定名稱用於佔位即可。

        • 樣本3:表中有3列year, month, day,源檔案中只有一個時間列,為2018-06-01 01:02:03格式,則可以指定 -H "columns: col, year = year(col), month=month(col), day=day(col)"完成匯入。

    • CSV適用參數

      參數

      是否必選

      說明

      column_separator

      用於指定來源資料檔案中的資料行分隔符號,預設為\t

      如果是不可見字元,則需要加\x作為首碼,使用十六進位來表示分隔字元。例如,Hive檔案的分隔字元\x01,需要指定為-H "column_separator:\x01"

      row_delimiter

      用於指定來源資料檔案中的行分隔字元,預設為\n

      重要

      curl命令無法傳遞\n,分行符號手動指定為\n時,shell會先傳遞反斜線(\),然後傳遞n而不是直接傳遞分行符號\n。

      Bash支援另一種逸出字元串文法,傳遞\n\t時,使用貨幣符號和全形單引號($')啟動字串並以半形單引號(')結束字串。例如,-H $'row_delimiter:\n'

      skip_header

      用於指定跳過CSV檔案開頭的若干行資料。取實值型別為整數(INTEGER),預設值為0。

      在某些CSV檔案中,最開頭的幾行資料用於定義列名、列類型等中繼資料資訊。通過設定該參數,可以使StarRocks在匯入資料時忽略CSV檔案的前幾行。例如,如果將該參數設定為1,則StarRocks在匯入資料時將忽略CSV檔案的第一行。

      這裡的行所使用的分隔字元須與您在匯入命令中所設定的行分隔字元一致。

      where

      用於抽取部分資料。使用者如需將不需要的資料過濾掉,那麼可以通過設定這個選項來達到。

      例如,只匯入k1列等於20180601的資料,則可以在匯入時指定-H "where: k1 = 20180601"

      max_filter_ratio

      最大容忍可過濾(例如,因為資料不規範等原因而過濾)的資料比例。預設零容忍。

      說明

      此處資料不規範的資料不包括通過WHERE條件過濾的資料。

      partitions

      用於指定該匯入所涉及的Partition。

      如果您能夠確定資料對應的Partition,則推薦指定該項。不滿足指定分區的資料將被過濾掉。例如,指定匯入到p1和p2分區,可以指定-H "partitions: p1, p2"

      timeout

      指定匯入的逾時時間。預設是600秒。

      設定範圍為1~259200,單位為秒。

      strict_mode

      指定此次匯入是否開啟strict 模式。取值如下:

      • false(預設值):不開啟。

      • true:開啟。開啟後,會對匯入處理程序中的列類型轉換進行嚴格過濾。

      timezone

      指定本次匯入所使用的時區。預設為東八區。

      該參數會影響所有匯入涉及和時區有關的函數結果。

      exec_mem_limit

      匯入記憶體限制。預設值為2 GB。

    • JSON適用參數

      參數

      是否必選

      說明

      jsonpaths

      用於指定待匯入的欄位的名稱。僅在使用匹配模式匯入 JSON 資料時需要指定該參數。參數取值為 JSON 格式。

      strip_outer_array

      用於指定是否裁剪最外層的數組結構。取值如下:

      • false(預設值):表示會保留JSON資料的原始結構,不剝離外層數組,效果是將整個JSON數組作為單一值匯入。

        例如,樣本資料[{"k1" : 1, "k2" : 2},{"k1" : 3, "k2" : 4}],在設定strip_outer_array為false後,會解析為一個數組資料匯入表中。

      • true:當匯入資料格式為JSON數組時,需要設定strip_outer_array為 true。

        例如,樣本資料[{"k1" : 1, "k2" : 2},{"k1" : 3, "k2" : 4}],在設定strip_outer_array為true後,會解析為兩條資料匯入表中。

      json_root

      用於指定待匯入 JSON 資料的根項目。僅在使用匹配模式匯入JSON資料時需要指定該參數。參數取值為合法的JsonPath字串。預設值為空白,表示會匯入整個 JSON 資料檔案的資料。

      ignore_json_size

      用於指定是否檢查 HTTP 要求中 JSON Body 的大小。

      說明

      HTTP請求中JSON Body的大小預設不能超過100 MB。如果JSON Body的大小超過100 MB,會提示 "The size of this batch exceed the max size [104857600] of json type data data [8617627793]. Set ignore_json_size to skip check, although it may lead huge memory consuming." 錯誤。為避免該報錯,可以在HTTP要求標頭中添加 "ignore_json_size:true" 設定,忽略對JSON Body大小的檢查。

      compression, Content-Encoding

      指定在STREAM LOAD資料轉送過程中使用哪種壓縮演算法,支援GZIP、BZIP2、LZ4_FRAME、ZSTD演算法。

      例如,curl --location-trusted -u root: -v 'http://127.0.0.1:18030/api/db0/tbl_simple/_stream_load' \-X PUT -H "expect:100-continue" \-H 'format: json' -H 'compression: lz4_frame' -T ./b.json.lz4

  • opt_properties:用於指定一些匯入相關的選擇性參數。指定的參數設定作用於整個匯入任務。

    填寫格式為如下所示。

    -H "label: <label_name>"
    -H "where: <condition1>[, <condition2>, ...]"
    -H "max_filter_ratio: <num>"
    -H "timeout: <num>"
    -H "strict_mode: true | false"
    -H "timezone: <string>"
    -H "load_mem_limit: <num>"
    -H "partial_update: true | false"
    -H "partial_update_mode: row | column"
    -H "merge_condition: <column_name>"

    參數說明如下表所示。

    參數

    是否必選

    說明

    label

    匯入任務的標籤,相同標籤的資料無法多次匯入。

    您可以通過指定Label的方式來避免一份資料重複匯入的問題。當前StarRocks系統會保留最近30分鐘內成功完成的任務的Label。

    where

    用於指定過濾條件。如果指定了該參數,StarRocks將根據所設定的過濾條件對轉換後的資料進行篩選。只有滿足where子句中所定義的過濾條件的資料才會被匯入。

    例如,只匯入k1列等於20180601的資料,則可以在匯入時指定-H "where: k1 = 20180601"

    max_filter_ratio

    最大容忍可過濾(例如,因為資料不規範等原因而過濾)的資料比例。預設零容忍。

    說明

    此處資料不規範的資料不包括通過where條件過濾的資料。

    log_rejected_record_num

    指定最多允許記錄因資料品質不合格而被過濾的資料行數。該參數自3.1版本起支援。取值範圍:0-1、大於 0 的正整數。預設值:0

    • 取值為0表示不記錄過濾掉的資料行。

    • 取值為-1表示記錄所有過濾掉的資料行。

    • 取值為大於0的正整數(比如 n)表示每個 BE(或 CN)節點上最多可以記錄 n 條過濾掉的資料行。

    timeout

    指定匯入的逾時時間。預設是600秒。

    設定範圍為1~259200,單位為秒。

    strict_mode

    指定此次匯入是否開啟strict 模式。

    • false(預設值):不開啟。

    • true:開啟。

    timezone

    指定本次匯入所使用的時區。預設為東八區。

    該參數會影響所有匯入涉及和時區有關的函數結果。

    load_mem_limit

    匯入任務的記憶體限制。預設值為2 GB。

    partial_update

    是否使用部分列更新。取值包括TRUEFALSE。預設值:FALSE

    partial_update_mode

    指定部分更新的模式,取值包括rowcolumn

    • row(預設值),指定使用行模式執行部分更新,比較適用於較多列且小批量的即時更新情境。

    • column,指定使用列模式執行部分更新,比較適用於少數列並且大量行的批處理更新情境。在該情境,開啟列模式,更新速度更快。

      例如,在一個包含100列的表中,每次更新10列(佔比10%)並更新所有行,則開啟列模式,更新效能將提高10倍。

    merge_condition

    用於指定作為更新生效條件的列名。這樣只有當匯入的資料中該列的值大於等於當前值的時候,更新才會生效。

    說明

    指定的列必須為非主鍵列,且僅主鍵表支援條件更新。

樣本

將CSV格式的檔案data.csv匯入至StarRocks叢集的load_test庫的example_table表中,完整樣本請參見匯入資料的完整樣本

curl --location-trusted -u "root:" \
     -H "Expect:100-continue" \
     -H "label:label2" \
     -H "column_separator: ," \
     -T data.csv -XPUT \
     http://172.17.**.**:18030/api/load_test/example_table/_stream_load

傳回值

匯入完成後,將以JSON格式返回匯入任務的結果資訊。返回結果樣本如下。

{
    "TxnId": 9,
    "Label": "label2",
    "Status": "Success",
    "Message": "OK",
    "NumberTotalRows": 4,
    "NumberLoadedRows": 4,
    "NumberFilteredRows": 0,
    "NumberUnselectedRows": 0,
    "LoadBytes": 45,
    "LoadTimeMs": 235,
    "BeginTxnTimeMs": 101,
    "StreamLoadPlanTimeMs": 102,
    "ReadDataTimeMs": 0,
    "WriteDataTimeMs": 11,
    "CommitAndPublishTimeMs": 19
}

返回結果中的參數說明如下表所述。

參數

描述

TxnId

匯入的事務ID。

Label

匯入的標籤。

Status

匯入狀態,取值如下:

  • Success:表示資料匯入成功,資料已經可見。

  • Publish Timeout:表示匯入已經成功提交,只是資料可能會延遲可見,無需重試匯入。

  • Label Already Exists:表示Label重複,需更換Label

  • Fail:表示資料匯入失敗。

ExistingJobStatus

已存在Label對應的匯入任務的狀態。該欄位只有當StatusLabel Already Exists時才會顯示。您可以通過該狀態,知曉已存在Label對應的匯入任務的狀態。

  • RUNNING:表示任務在執行中。

  • FINISHED:表示任務成功。

Message

匯入任務的狀態詳情。匯入失敗時會返回具體的失敗原因。

NumberTotalRows

從資料流中讀取到的總行數。

NumberLoadedRows

匯入任務的資料行數,僅在匯入狀態為Success時有效。

NumberFilteredRows

匯入任務過濾掉的行數,即資料品質不合格的行。

NumberUnselectedRows

通過Where條件被過濾掉的行數。

LoadBytes

匯入任務的源檔案資料量大小。

LoadTimeMs

匯入任務所用的時間,單位為ms。

BeginTxnTimeMs

匯入任務開啟事務的時間長度。

StreamLoadPlanTimeMs

匯入任務產生執行計畫的時間長度。

ReadDataTimeMs

匯入任務讀取資料的時間長度。

WriteDataTimeMs

匯入任務寫入資料的時間長度。

ErrorURL

如果任務匯入失敗,會返回該參數。

通過ErrorURL可以查看匯入處理程序中因資料品質不合格而過濾掉的錯誤資料行的具體資訊。您可以在提交匯入任務時,通過選擇性參數log_rejected_record_num來指定最多可以記錄多少條錯誤資料行的資訊。

您可以通過 curl "url" 命令直接查看錯誤資料行的資訊,也可以通過執行 wget "url" 命令匯出錯誤資料行的資訊。

例如,匯出錯誤資料行的資訊。

wget "http://172.17.**.**:18040/api/_load_error_log?file=error_log_b74dccdcf0ceb4de_e82b2709c6c013ad"

匯出的錯誤資料行資訊會儲存到一個名為_load_error_log?file=error_log_b74dccdcf0ceb4de_e82b2709c6c013ad的本地檔案中。您可以通過cat _load_error_log?file=error_log_b74dccdcf0ceb4de_e82b2709c6c013ad命令查看該檔案的內容。

您可以根據錯誤資訊調整匯入任務,然後重新提交匯入任務。

取消匯入任務

Stream Load無法手動取消,Stream Load在逾時或者匯入錯誤後會被系統自動取消。您可根據返回結果中的ErrorURL下載報錯資訊,進行錯誤排查。

匯入資料的完整樣本

本文樣本通過curl命令匯入任務。

  1. 建立待匯入資料的表。

    1. 使用SSH方式登入StarRocks叢集的Master節點,詳情請參見登入叢集

    2. 執行以下命令,通過MySQL用戶端串連StarRocks叢集。

      mysql -h127.0.0.1  -P 9030 -uroot
    3. 執行以下命令,建立對應的庫表。

      CREATE DATABASE IF NOT EXISTS load_test;
      USE load_test;
      
      CREATE TABLE IF NOT EXISTS example_table (
          id INT,
          name VARCHAR(50),
          age INT
      )
      DUPLICATE KEY(id)
      DISTRIBUTED BY HASH(id) BUCKETS 3
      PROPERTIES (
          "replication_num" = "1"  -- 副本數設為 1
      );
      

      執行完後按Ctrl+D退出MySQL用戶端。

  2. 準備測試資料。

    準備CSV資料

    例如,建立待匯入檔案data.csv,內容如下所示。

    id,name,age
    1,Alice,25
    2,Bob,30
    3,Charlie,35

    準備JSON資料

    例如,建立待匯入檔案json.data,內容如下所示。

    {"id":1,"name":"Emily","age":25}
    {"id":2,"name":"Benjamin","age":35}
    {"id":3,"name":"Olivia","age":28}
    {"id":4,"name":"Alexander","age":60}
    {"id":5,"name":"Ava","age":17}
  3. 執行以下命令,建立匯入任務。

    匯入CSV資料

    curl --location-trusted -u "root:" \
         -H "Expect:100-continue" \
         -H "label:label1" \
         -H "column_separator: ," \
         -T data.csv -XPUT \
         http://172.17.**.**:18030/api/load_test/example_table/_stream_load

    匯入JSON資料

    curl --location-trusted -u "root:" \
         -H "Expect:100-continue" \
         -H "label:label2" \
         -H "format:json" \
         -T json.data -XPUT \
         http://172.17.**.**:18030/api/load_test/example_table/_stream_load

代碼整合樣本