全部產品
Search
文件中心

E-MapReduce:Broker Load

更新時間:Jun 06, 2025

Broker Load是一種基於MySQL協議的非同步資料匯入方式。在Broker Load模式下,StarRocks通過部署的Broker服務可以讀取對應資料來源(例如,Apache HDFS,阿里雲OSS)上的資料,還能運用其內建的計算能力對資料進行即時預先處理,進而高效地執行匯入操作。本文為您介紹Broker Load匯入的使用樣本。

支援的資料檔案格式

Broker Load支援CSV、ORC和Parquet等檔案格式,建議單次匯入的資料量為幾十GB至上百GB層級。

Broker Load匯入

查看Broker資訊

阿里雲EMR Serverless StarRocks執行個體在建立時已經自動搭建並啟動Broker服務。您可以使用以下SQL命令查看當前執行個體中所有Broker的詳細資料。

SHOW PROC "/brokers";

建立匯入任務

  • 文法

    LOAD LABEL [<database_name>.]<label_name>
    (
        data_desc[, data_desc ...]
    )
    WITH BROKER
    (
        StorageCredentialParams
    )
    [PROPERTIES
    (
        opt_properties
    )
    ]
  • 參數描述

    • <database_name>:可選,目標StarRocks表所在的資料庫。

    • <label_name>:匯入任務的標籤。

      每個匯入任務在該資料庫內均具有唯一的標籤。通過該標籤,可以查看相應匯入作業的執行情況,並防止匯入重複的資料。當匯入任務狀態為FINISHED時,該標籤不可再用於其他匯入作業;而當匯入任務狀態為CANCELLED時,該標籤可以被複用至其他匯入作業,但通常情況下是用於重試同一匯入作業(即使用相同標籤匯入相同資料),以實現資料“精確一次(Exactly-Once)”的語義。

    • data_desc:用於描述一批次待匯入的資料。

      Broker Load 支援一次匯入多個資料檔案。在一個匯入作業中,您可以使用多個 data_desc 來聲明匯入多個資料檔案,也可以使用一個 data_desc 來聲明匯入一個路徑下的所有資料檔案。Broker Load 還支援保證單次匯入事務的原子性,即單次匯入的多個資料檔案都成功或者都失敗,而不會出現部分匯入成功、部分匯入失敗的情況。

      DATA INFILE ("<file_path>"[, "<file_path>" ...])
      [NEGATIVE]
      INTO TABLE <table_name>
      [PARTITION (<partition1_name>[, <partition2_name> ...])]
      [TEMPORARY PARTITION (<temporary_partition1_name>[, <temporary_partition2_name> ...])]
      [COLUMNS TERMINATED BY "<column_separator>"]
      [ROWS TERMINATED BY "<row_separator>"]
      [FORMAT AS "CSV | Parquet | ORC"]
      [(format_type_options)]
      [(column_list)]
      [COLUMNS FROM PATH AS (<partition_field_name>[, <partition_field_name> ...])]
      [SET <k1=f1(v1)>[, <k2=f2(v2)> ...]]
      [WHERE predicate]

      data_desc 中部分參數描述如下表所示。

      參數

      描述

      file_path

      指定來源資料檔案所在的路徑。檔案路徑可以指定到檔案,也可以用星號(*)萬用字元指定某個目錄下的所有檔案。中間的目錄也可以使用萬用字元匹配。

      可以使用的萬用字元有?*[]{}^,使用規則請參見FileSystem

      例如, 通過指定oss://bucket/data/tablename/*/*路徑可以匹配data/tablename下所有分區內的所有檔案。通過指定oss://bucket/data/tablename/dt=202104*/*路徑可以匹配data/tablename目錄下所有 202104 分區內的資料檔案。

      NEGATIVE

      用於撤銷某一批已經成功匯入的資料。如果想要撤銷某一批已經成功匯入的資料,可以通過指定 NEGATIVE 關鍵字來匯入同一批資料。

      說明

      該參數僅適用於目標StarRocks表使用彙總表、並且所有Value列的彙總函式均為sum的情況。

      PARTITION

      指定待匯入表的Partition資訊。

      指定要把資料匯入哪些分區。如果不指定該參數,則預設匯入到StarRocks表所在的所有分區中。

      COLUMNS TERMINATED BY

      用於指定匯入檔案中的資料行分隔符號。如果不指定該參數,則預設資料行分隔符號為\t,即Tab。

      Broker Load通過MySQL協議提交匯入請求,除了StarRocks會做轉義處理以外,MySQL協議也會做轉義處理。因此,如果資料行分隔符號是Tab等不可見字元,則需要在列分隔字元前面多加一個反斜線(\)。例如,如果資料行分隔符號是 \t,這裡必須輸入 \\t;如果資料行分隔符號是 \n,這裡必須輸入 \\n。Apache Hive™ 檔案的資料行分隔符號為 \x01,因此,如果來源資料檔案是 Hive 檔案,這裡必須傳入 \\x01

      FORMAT AS

      用於指定匯入檔案的格式。取值包括 CSVParquetORC。如果不指定該參數,則預設通過file_path參數中指定的副檔名(.csv.parquet.orc)來判斷檔案格式。

      COLUMNS FROM PATH AS

      用於從指定的檔案路徑中提取一個或多個分區欄位的資訊。該參數僅當指定的檔案路徑中存在分區欄位時有效。

      例如,匯入檔案路徑為/path/col_name=col_value/file1,其中col_name可以對應到StarRocks表中的列。這時候,您可以設定參數為col_name。匯入時,StarRocks會將col_value落入col_name對應的列中。

      說明

      該參數只有在從HDFS匯入資料時可用。

      SET

      用於將來源資料檔案的某一列依據指定的函數進行轉換,並將轉換後的結果插入到 StarRocks 表中。文法為column_name = expression

      WHERE

      用於指定過濾條件,對已完成轉換的資料進行篩選。只有符合WHERE子句中所指定的過濾條件的資料,才能匯入到StarRocks表中。

    • opt_properties:用於指定一些匯入相關的選擇性參數,指定的參數設定作用於整個匯入作業。文法如下所示。

      PROPERTIES ("<key1>" = "<value1>"[, "<key2>" = "<value2>" ...])

      部分參數描述如下表所示。

      參數

      描述

      timeout

      匯入作業的逾時時間(以秒為單位)。

      您可以在opt_properties中自行設定每個匯入的逾時時間。匯入任務在設定的時限內未完成則會被系統取消,變為CANCELLED。Broker Load的預設匯入逾時時間為4小時。

      重要

      通常情況下,不需要您手動設定匯入任務的逾時時間。當在預設逾時時間內無法完成匯入時,可以手動設定任務的逾時時間。

      推薦逾時時間的計算方式為:逾時時間 >(匯入檔案的總大小 x匯入檔案及相關物化視圖的個數)/平均匯入速度

      例如,要匯入一個1 GB的資料檔案,待匯入表包含2個Rollup表,當前StarRocks執行個體的平均匯入速度為10 MB/s。在這種情況下,根據公式計算出來時間長度為(1 x 1024 x 3)/10 = 307.2(秒)

      因此,建議匯入作業的逾時時間大於308秒。

      說明

      由於每個StarRocks執行個體的機器環境不同,且執行個體並發的查詢任務也不同,所以StarRocks執行個體的最慢匯入速度需要您根據歷史的匯入任務速度進行推測。

      max_filter_ratio

      匯入任務的最大容忍率,預設為0容忍,取值範圍是0~1。當匯入的錯誤率超過該值,則匯入失敗。如果您希望忽略錯誤的行,可以設定該參數值大於0,來保證匯入可以成功。

      計算公式為:max_filter_ratio = (dpp.abnorm.ALL / (dpp.abnorm.ALL + dpp.norm.ALL ) )

      其中,dpp.abnorm.ALL表示資料品質不合格的行數,例如類型不符、列數不匹配和長度不匹配等。dpp.norm.ALL指的是匯入處理程序中正確資料的條數,可以通過SHOW LOAD命令查詢匯入任務的正確資料量。

      dpp.abnorm.ALL 和 dpp.norm.ALL 的總和就等於待匯入的總行數。

      load_mem_limit

      匯入作業的記憶體限制,最大不超過BE或CN的記憶體限制。單位:位元組。

      strict_mode

      是否開啟strict 模式。取值範圍:

      • true: 表示開啟。

      • false(預設值):表示關閉。

      strict 模式對匯入處理程序中的列類型轉換實施了嚴格的過濾。strict 模式的過濾策略如下:

      • 如果開啟strict 模式,StarRocks會把錯誤的資料行過濾掉,只匯入正確的資料行,並返回錯誤資料詳情。

      • 如果關閉strict 模式,StarRocks會把轉換失敗的錯誤欄位轉換成NULL值,並把這些包含NULL值的錯誤資料行跟正確的資料行一起匯入。

查看匯入任務狀態

use <database_name>;
SHOW LOAD;

返回參數的描述如下表所示。

參數

描述

JobId

匯入任務的唯一ID,每個匯入任務的JobId都不同,由系統自動產生。與Label不同的是,JobId永遠不會相同,而Label則可以在匯入任務失敗後被複用。

Label

匯入任務的標識。

State

匯入任務當前所處的階段。取值如下:

  • PENDING:匯入任務已建立。

  • QUEUEING:匯入任務正在等待執行中。

  • LOADING:表示正在執行中。

  • CANCELLED:表示匯入失敗。

  • FINISHED:表示匯入成功。

Progress

匯入任務所處的階段。Broker Load匯入任務只有LOAD階段,對應匯入作業狀態中的LOADINGLOAD進度為0~100%。

LOAD的進度的計算公式為LOAD進度 = 當前完成匯入的表個數 / 本次匯入任務設計的總表個數 * 100%

如果所有匯入表均完成匯入,此時LOAD的進度為99%, 匯入進入到最後生效階段,待整個匯入任務完成後,LOAD的進度才會改為100%。

重要

匯入進度並不是線性,所以如果一段時間內進度沒有變化,並不代表匯入任務沒有執行。

Type

匯入任務的類型。Broker Load的Type取值是BROKER。

EtlInfo

主要顯示匯入的資料量指標unselected.rowsdpp.norm.ALLdpp.abnorm.ALL

您可以根據unselected.rows的參數值判斷where條件過濾了多少行,根據dpp.norm.ALLdpp.abnorm.ALL兩個指標可以驗證當前置入任務的錯誤率是否超過max-filter-ratio。三個指標之和就是未經處理資料量的總行數。

TaskInfo

主要顯示當前置入任務參數,即建立Broker Load匯入任務時您指定的參數,包括cluster,timeout和max-filter-ratio。

ErrorMsg

匯入任務的失敗原因。當匯入作業的狀態為PENDINGLOADINGFINISHED時,該參數值為NULL。當匯入作業的狀態為CANCELLED時,該參數值包括typemsg兩部分:

  • type包括以下取值:

    • USER_CANCEL:取消的任務。

    • ETL_SUBMIT_FAIL:匯入任務提交失敗。

    • ETL_QUALITY_UNSATISFIED:資料品質不合格,即錯誤資料率超過了max-filter-ratio。

    • LOAD-RUN-FAIL:在LOAD階段失敗的匯入任務。

    • TIMEOUT:沒在逾時時間內完成的匯入任務。

    • UNKNOWN:未知的匯入錯誤。

  • msg顯示有關失敗原因的詳細資料。

CreateTime

匯入任務建立的時間。

EtlStartTime

由於Broker Load匯入沒有ETL階段,所以該參數值和LoadStartTime相同。

EtlFinishTime

由於Broker Load匯入沒有ETL階段,所以該參數值和LoadStartTime相同。

LoadStartTime

LOAD階段開始的時間。

LoadFinishTime

匯入任務完成的時間。

URL

匯入任務中品質不合格資料的訪問地址。您可以使用curlwget命令開啟該地址。如果匯入作業不存在品質不合格資料,該參數值為NULL

JobDetails

匯入任務的其他資訊,包括:

  • Unfinished backends:未完成匯入的BE節點ID。

  • ScannedRows:實際處理的行數,包括匯入的行數以及過濾掉的行數。

  • TaskNumber:子作業個數。

  • All backends:正在運行子作業的BE節點的 ID。

  • FileNumber:源檔案的個數。

  • FileSize:所有源檔案的總資料量,單位:位元組。

取消匯入任務

當匯入任務狀態不為CANCELLED或FINISHED時,可以通過CANCEL LOAD語句來取消該匯入任務。

CANCEL LOAD FROM <database_name> WHERE LABEL = "<label_name>";

匯入任務並發度

一個作業可以拆成一個或者多個任務,任務之間並存執行。拆分由LOAD語句中的DataDescription來決定。例如:

  • 多個DataDescription對應匯入多個不同的表,每個會拆成一個任務。

  • 多個DataDescription對應匯入同一個表的不同分區,每個也會拆成一個任務。

每個任務還會拆分成一個或者多個執行個體,然後將這些執行個體平均分配到BE上並存執行。執行個體的拆分由以下FE配置決定:

  • min_bytes_per_broker_scanner:單個執行個體處理的最小資料量,預設值為64 MB。

  • max_broker_concurrency:單個任務最大並發執行個體數,預設值為100。

  • load_parallel_instance_num:單個BE上並發執行個體數,預設值為1個。

執行個體總數的計算公式為執行個體的總數 = min(匯入檔案總大小/單個執行個體處理的最小資料量,單個任務最大並發執行個體數,單個BE上並發執行個體數 * BE數)

通常情況下,一個作業只有一個DataDescription,只會拆分成一個任務。任務會拆成與BE數相等的執行個體,然後分配到所有BE上並存執行。

匯入樣本

通過EMR StarRocks Manager串連StarRocks執行個體,可以在SQL Editor頁面執行以下SQL。

從阿里雲OSS匯入

  1. 建立測試表。

    create database if not exists load_test;
    use load_test;
    create table if not exists customer(
      c_customer_sk bigint,
      c_customer_id char(16),
      c_current_cdemo_sk bigint,
      c_current_hdemo_sk bigint,
      c_current_addr_sk bigint,
      c_first_shipto_date_sk bigint,
      c_first_sales_date_sk bigint,
      c_salutation char(10),
      c_first_name char(20),
      c_last_name char(30),
      c_preferred_cust_flag char(1),
      c_birth_day int,
      c_birth_month int,
      c_birth_year int,
      c_birth_country varchar(20),
      c_login char(13),
      c_email_address char(50),
      c_last_review_date_sk bigint
    )
    duplicate key (c_customer_sk)
    distributed by hash(c_customer_sk) buckets 5
    properties(
      "replication_num"="1"
    );
  2. 建立匯入任務。

    請下載並上傳資料檔案customer.orc至OSS,然後執行以下命令建立匯入任務。

    LOAD LABEL load_test.customer_label
    (
      DATA INFILE("<file_path>")
      INTO TABLE customer
      format as "orc"
    )
    WITH BROKER 'broker'
    (
      "fs.oss.accessKeyId" = "xxxxx",
      "fs.oss.accessKeySecret" = "xxxxx",
      "fs.oss.endpoint" = "oss-cn-xxx-internal.aliyuncs.com"
    );

    涉及參數如下表所示,請根據實際情況進行替換。

    參數

    說明

    <file_path>

    customer.orc檔案所在的路徑。例如,oss://<yourBucketName>/data/customer.orc

    fs.oss.accessKeyId

    阿里雲帳號或RAM使用者的AccessKey ID。您可以進入AccessKey管理頁面擷取AccessKey ID。

    fs.oss.accessKeySecret

    AccessKey ID對應的 AccessKey Secret。

    fs.oss.endpoint

    訪問OSS的Endpoint。例如,oss-cn-hangzhou-internal.aliyuncs.com。

    如果StarRocks與OSS位於同一地區,則使用VPC網路Endpoint,否則使用公網Endpoint。擷取方法請參見OSS地區和訪問網域名稱

  3. 查看匯入任務狀態。

    use load_test;
    show load where label='customer_label';
  4. 查詢表資訊。

    • 樣本1:統計load_test資料庫中customer表的總行數。

      select count(1) from load_test.customer;
    • 樣本2:展示load_test資料庫中customer表的前兩條完整記錄。

      select * from load_test.customer limit 2;

從HDFS匯入

說明

使用該方式匯入時,需注意以下資訊:

  • HDFS叢集和StarRocks執行個體需要在同一個VPC下,並且在同一個可用性區域下。

  • HDFS叢集建立後,需要為安全性群組開通所有DataNode連接埠,才能訪問HDFS資料。本文樣本使用的是EMR on ECS中包含了HDFS服務的DataLake叢集,因此關於安全性群組的開通詳情,請參見管理安全性群組

HDFS匯入樣本

  1. 建立庫表。

    CREATE DATABASE IF NOT EXISTS mydatabase;
    
    CREATE TABLE if NOT EXISTS mydatabase.userdata_broker_load (
        userId INT,
        userName VARCHAR(20),
        registrationDate DATE
    )
    ENGINE = OLAP
    DUPLICATE KEY(userId)
    DISTRIBUTED BY HASH(userId);
  2. 建立匯入任務。

    請下載並上傳資料檔案user_data.parquet至HDFS的/data目錄下,然後執行以下命令建立匯入任務。

    LOAD LABEL mydatabase.userdata_broker_load_label
    (
      DATA INFILE("hdfs://<hdfs_ip>:<hdfs_port>/data/user_data.parquet")
      INTO TABLE userdata_broker_load
      format AS "parquet"
    )
    WITH BROKER
    PROPERTIES
    (
        "timeout" = "72000"
    );

    其中,以下參數請您根據實際情況替換。

    參數

    說明

    <hdfs_ip>

    HDFS叢集NameNode節點的內網IP地址。

    如果您使用的是EMR on ECS中包含HDFS服務的叢集(DataLake或Custom類型),則可以在節點管理頁簽的Master節點群組下,查看內網IP地址。

    <hdfs_port>

    HDFS叢集的NameNode節點服務監聽的連接埠號碼,預設是 9000

  3. 查看匯入任務。

    USE mydatabase;
    SHOW LOAD WHERE label='userdata_broker_load_label';
  4. 查看匯入資料。

    SELECT * FROM mydatabase.userdata_broker_load;

HDFS認證方式

社區版本HDFS支援簡單認證和Kerberos認證兩種認證方式。

  • 簡單認證:使用者的身份由與HDFS建立串連的用戶端作業系統決定。

    如果使用簡單認證,請按如下配置 StorageCredentialParams

    "hadoop.security.authentication" = "simple",
    "username" = "<hdfs_username>",
    "password" = "<hdfs_password>"

    StorageCredentialParams 包含如下參數。

    參數

    描述

    hadoop.security.authentication

    認證方式。取值範圍:simple 和 kerberos。預設值:simplesimple表示簡單認證,即無認證。kerberos表示Kerberos認證。

    username

    用於訪問HDFS叢集中NameNode節點的使用者名稱。

    password

    用於訪問HDFS叢集中NameNode節點的密碼。

  • Kerberos認證:用戶端的身份由使用者自己的Kerberos認證決定。

    如果使用Kerberos認證,請在Serverless StarRocks執行個體的執行個體配置頁面,為hdfs-site.xml 檔案添加以下配置。

    "hadoop.security.authentication" = "kerberos",
    "kerberos_principal" = "nn/zelda1@ZELDA.COM",
    "kerberos_keytab" = "/keytab/hive.keytab",
    "kerberos_keytab_content" = "YWFhYWFh"

    涉及參數說明如下所示。

    參數

    描述

    hadoop.security.authentication

    認證方式。取值範圍:simple 和 kerberos。預設值:simplesimple表示簡單認證,即無認證。kerberos表示Kerberos認證。

    kerberos_principal

    用於指定Kerberos的使用者或服務(Principal)。每個Principal在HDFS叢集內唯一,由如下三部分組成:

    • usernameservicename:HDFS叢集中使用者或服務的名稱。

    • instance:HDFS叢集要認證的節點所在伺服器的名稱,用來保證使用者或服務全域唯一。比如,HDFS叢集中有多個DataNode節點,各節點需要各自獨立認證。

    • realm:域,必須全大寫。

    例如,nn/zelda1@ZELDA.COM

    kerberos_keytab

    指定Kerberos的keytab檔案路徑。該檔案必須為Broker進程所在伺服器上的檔案。

    kerberos_keytab_content

    指定Kerberos中keytab檔案內容經過Base64編碼之後的內容。

    重要

    該參數和kerberos_keytab參數只需配置一個。

HDFS HA配置

通過配置NameNode HA,可以在NameNode切換時,自動識別到新的NameNode。在Serverless StarRocks執行個體的執行個體配置頁面,為hdfs-site.xml 檔案添加以下配置,用於訪問以HA模式部署的HDFS叢集。

"dfs.nameservices" = "ha_cluster",
"dfs.ha.namenodes.ha_cluster" = "ha_n1,ha_n2",
"dfs.namenode.rpc-address.ha_cluster.ha_n1" = "<hdfs_host>:<hdfs_port>",
"dfs.namenode.rpc-address.ha_cluster.ha_n2" = "<hdfs_host>:<hdfs_port>",
"dfs.client.failover.proxy.provider.ha_cluster" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"

涉及參數說明如下表所示。

參數

描述

dfs.nameservices

指定HDFS服務的名稱,您可以自訂。

例如,設定dfs.nameservices為my_ha。

dfs.ha.namenodes.xxx

自訂NameNode的名稱,多個名稱時以逗號(,)分隔。其中xxxdfs.nameservices自訂的名稱。

例如,設定dfs.ha.namenodes.my_ha為my_nn。

dfs.namenode.rpc-address.xxx.nn

指定NameNode的RPC地址資訊。其中nn表示dfs.ha.namenodes.xxx中配置的NameNode的名稱。

例如,設定dfs.namenode.rpc-address.my_ha.my_nn參數值的格式為<hdfs_host>:<hdfs_port>

dfs.client.failover.proxy.provider.xxx

指定Client串連NameNode的Provider,預設值為org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider

說明

如果您使用的是EMR on ECS中包含HDFS服務的叢集,則可以在目的地組群的叢集服務標籤下,進入HDFS服務的配置標籤,在hdfs-site.xml文件中尋找相關參數的值。