Broker Load是一種基於MySQL協議的非同步資料匯入方式。在Broker Load模式下,StarRocks通過部署的Broker服務可以讀取對應資料來源(例如,Apache HDFS,阿里雲OSS)上的資料,還能運用其內建的計算能力對資料進行即時預先處理,進而高效地執行匯入操作。本文為您介紹Broker Load匯入的使用樣本。
支援的資料檔案格式
Broker Load支援CSV、ORC和Parquet等檔案格式,建議單次匯入的資料量為幾十GB至上百GB層級。
Broker Load匯入
查看Broker資訊
阿里雲EMR Serverless StarRocks執行個體在建立時已經自動搭建並啟動Broker服務。您可以使用以下SQL命令查看當前執行個體中所有Broker的詳細資料。
SHOW PROC "/brokers";建立匯入任務
文法
LOAD LABEL [<database_name>.]<label_name> ( data_desc[, data_desc ...] ) WITH BROKER ( StorageCredentialParams ) [PROPERTIES ( opt_properties ) ]參數描述
<database_name>:可選,目標StarRocks表所在的資料庫。<label_name>:匯入任務的標籤。每個匯入任務在該資料庫內均具有唯一的標籤。通過該標籤,可以查看相應匯入作業的執行情況,並防止匯入重複的資料。當匯入任務狀態為FINISHED時,該標籤不可再用於其他匯入作業;而當匯入任務狀態為CANCELLED時,該標籤可以被複用至其他匯入作業,但通常情況下是用於重試同一匯入作業(即使用相同標籤匯入相同資料),以實現資料“精確一次(Exactly-Once)”的語義。
data_desc:用於描述一批次待匯入的資料。Broker Load 支援一次匯入多個資料檔案。在一個匯入作業中,您可以使用多個
data_desc來聲明匯入多個資料檔案,也可以使用一個data_desc來聲明匯入一個路徑下的所有資料檔案。Broker Load 還支援保證單次匯入事務的原子性,即單次匯入的多個資料檔案都成功或者都失敗,而不會出現部分匯入成功、部分匯入失敗的情況。DATA INFILE ("<file_path>"[, "<file_path>" ...]) [NEGATIVE] INTO TABLE <table_name> [PARTITION (<partition1_name>[, <partition2_name> ...])] [TEMPORARY PARTITION (<temporary_partition1_name>[, <temporary_partition2_name> ...])] [COLUMNS TERMINATED BY "<column_separator>"] [ROWS TERMINATED BY "<row_separator>"] [FORMAT AS "CSV | Parquet | ORC"] [(format_type_options)] [(column_list)] [COLUMNS FROM PATH AS (<partition_field_name>[, <partition_field_name> ...])] [SET <k1=f1(v1)>[, <k2=f2(v2)> ...]] [WHERE predicate]data_desc中部分參數描述如下表所示。參數
描述
file_path指定來源資料檔案所在的路徑。檔案路徑可以指定到檔案,也可以用星號(*)萬用字元指定某個目錄下的所有檔案。中間的目錄也可以使用萬用字元匹配。
可以使用的萬用字元有
?、*、[]、{}和^,使用規則請參見FileSystem。例如, 通過指定
oss://bucket/data/tablename/*/*路徑可以匹配data/tablename下所有分區內的所有檔案。通過指定oss://bucket/data/tablename/dt=202104*/*路徑可以匹配data/tablename目錄下所有202104分區內的資料檔案。NEGATIVE用於撤銷某一批已經成功匯入的資料。如果想要撤銷某一批已經成功匯入的資料,可以通過指定
NEGATIVE關鍵字來匯入同一批資料。說明該參數僅適用於目標StarRocks表使用彙總表、並且所有Value列的彙總函式均為
sum的情況。PARTITION指定待匯入表的Partition資訊。
指定要把資料匯入哪些分區。如果不指定該參數,則預設匯入到StarRocks表所在的所有分區中。
COLUMNS TERMINATED BY用於指定匯入檔案中的資料行分隔符號。如果不指定該參數,則預設資料行分隔符號為
\t,即Tab。Broker Load通過MySQL協議提交匯入請求,除了StarRocks會做轉義處理以外,MySQL協議也會做轉義處理。因此,如果資料行分隔符號是Tab等不可見字元,則需要在列分隔字元前面多加一個反斜線(\)。例如,如果資料行分隔符號是
\t,這裡必須輸入\\t;如果資料行分隔符號是\n,這裡必須輸入\\n。Apache Hive™ 檔案的資料行分隔符號為\x01,因此,如果來源資料檔案是 Hive 檔案,這裡必須傳入\\x01。FORMAT AS用於指定匯入檔案的格式。取值包括
CSV、Parquet和ORC。如果不指定該參數,則預設通過file_path參數中指定的副檔名(.csv、.parquet和.orc)來判斷檔案格式。COLUMNS FROM PATH AS用於從指定的檔案路徑中提取一個或多個分區欄位的資訊。該參數僅當指定的檔案路徑中存在分區欄位時有效。
例如,匯入檔案路徑為
/path/col_name=col_value/file1,其中col_name可以對應到StarRocks表中的列。這時候,您可以設定參數為col_name。匯入時,StarRocks會將col_value落入col_name對應的列中。說明該參數只有在從HDFS匯入資料時可用。
SET用於將來源資料檔案的某一列依據指定的函數進行轉換,並將轉換後的結果插入到 StarRocks 表中。文法為
column_name = expression。WHERE用於指定過濾條件,對已完成轉換的資料進行篩選。只有符合WHERE子句中所指定的過濾條件的資料,才能匯入到StarRocks表中。
opt_properties:用於指定一些匯入相關的選擇性參數,指定的參數設定作用於整個匯入作業。文法如下所示。PROPERTIES ("<key1>" = "<value1>"[, "<key2>" = "<value2>" ...])部分參數描述如下表所示。
參數
描述
timeout匯入作業的逾時時間(以秒為單位)。
您可以在
opt_properties中自行設定每個匯入的逾時時間。匯入任務在設定的時限內未完成則會被系統取消,變為CANCELLED。Broker Load的預設匯入逾時時間為4小時。重要通常情況下,不需要您手動設定匯入任務的逾時時間。當在預設逾時時間內無法完成匯入時,可以手動設定任務的逾時時間。
推薦逾時時間的計算方式為:
逾時時間 >(匯入檔案的總大小 x匯入檔案及相關物化視圖的個數)/平均匯入速度。例如,要匯入一個1 GB的資料檔案,待匯入表包含2個Rollup表,當前StarRocks執行個體的平均匯入速度為10 MB/s。在這種情況下,根據公式計算出來時間長度為
(1 x 1024 x 3)/10 = 307.2(秒)因此,建議匯入作業的逾時時間大於308秒。
說明由於每個StarRocks執行個體的機器環境不同,且執行個體並發的查詢任務也不同,所以StarRocks執行個體的最慢匯入速度需要您根據歷史的匯入任務速度進行推測。
max_filter_ratio匯入任務的最大容忍率,預設為0容忍,取值範圍是0~1。當匯入的錯誤率超過該值,則匯入失敗。如果您希望忽略錯誤的行,可以設定該參數值大於0,來保證匯入可以成功。
計算公式為:
max_filter_ratio = (dpp.abnorm.ALL / (dpp.abnorm.ALL + dpp.norm.ALL ) )其中,
dpp.abnorm.ALL表示資料品質不合格的行數,例如類型不符、列數不匹配和長度不匹配等。dpp.norm.ALL指的是匯入處理程序中正確資料的條數,可以通過SHOW LOAD命令查詢匯入任務的正確資料量。dpp.abnorm.ALL和dpp.norm.ALL的總和就等於待匯入的總行數。load_mem_limit匯入作業的記憶體限制,最大不超過BE或CN的記憶體限制。單位:位元組。
strict_mode是否開啟strict 模式。取值範圍:
true: 表示開啟。false(預設值):表示關閉。
strict 模式對匯入處理程序中的列類型轉換實施了嚴格的過濾。strict 模式的過濾策略如下:
如果開啟strict 模式,StarRocks會把錯誤的資料行過濾掉,只匯入正確的資料行,並返回錯誤資料詳情。
如果關閉strict 模式,StarRocks會把轉換失敗的錯誤欄位轉換成
NULL值,並把這些包含NULL值的錯誤資料行跟正確的資料行一起匯入。
查看匯入任務狀態
use <database_name>;
SHOW LOAD;返回參數的描述如下表所示。
參數 | 描述 |
| 匯入任務的唯一ID,每個匯入任務的JobId都不同,由系統自動產生。與Label不同的是,JobId永遠不會相同,而Label則可以在匯入任務失敗後被複用。 |
| 匯入任務的標識。 |
| 匯入任務當前所處的階段。取值如下:
|
| 匯入任務所處的階段。Broker Load匯入任務只有 LOAD的進度的計算公式為 如果所有匯入表均完成匯入,此時LOAD的進度為99%, 匯入進入到最後生效階段,待整個匯入任務完成後,LOAD的進度才會改為100%。 重要 匯入進度並不是線性,所以如果一段時間內進度沒有變化,並不代表匯入任務沒有執行。 |
| 匯入任務的類型。Broker Load的Type取值是BROKER。 |
| 主要顯示匯入的資料量指標 您可以根據 |
| 主要顯示當前置入任務參數,即建立Broker Load匯入任務時您指定的參數,包括cluster,timeout和max-filter-ratio。 |
| 匯入任務的失敗原因。當匯入作業的狀態為
|
| 匯入任務建立的時間。 |
| 由於Broker Load匯入沒有ETL階段,所以該參數值和 |
| 由於Broker Load匯入沒有ETL階段,所以該參數值和 |
|
|
| 匯入任務完成的時間。 |
| 匯入任務中品質不合格資料的訪問地址。您可以使用 |
| 匯入任務的其他資訊,包括:
|
取消匯入任務
當匯入任務狀態不為CANCELLED或FINISHED時,可以通過CANCEL LOAD語句來取消該匯入任務。
CANCEL LOAD FROM <database_name> WHERE LABEL = "<label_name>";匯入任務並發度
一個作業可以拆成一個或者多個任務,任務之間並存執行。拆分由LOAD語句中的DataDescription來決定。例如:
多個DataDescription對應匯入多個不同的表,每個會拆成一個任務。
多個DataDescription對應匯入同一個表的不同分區,每個也會拆成一個任務。
每個任務還會拆分成一個或者多個執行個體,然後將這些執行個體平均分配到BE上並存執行。執行個體的拆分由以下FE配置決定:
min_bytes_per_broker_scanner:單個執行個體處理的最小資料量,預設值為64 MB。max_broker_concurrency:單個任務最大並發執行個體數,預設值為100。load_parallel_instance_num:單個BE上並發執行個體數,預設值為1個。
執行個體總數的計算公式為執行個體的總數 = min(匯入檔案總大小/單個執行個體處理的最小資料量,單個任務最大並發執行個體數,單個BE上並發執行個體數 * BE數)。
通常情況下,一個作業只有一個DataDescription,只會拆分成一個任務。任務會拆成與BE數相等的執行個體,然後分配到所有BE上並存執行。
匯入樣本
通過EMR StarRocks Manager串連StarRocks執行個體,可以在SQL Editor頁面執行以下SQL。
從阿里雲OSS匯入
建立測試表。
create database if not exists load_test; use load_test; create table if not exists customer( c_customer_sk bigint, c_customer_id char(16), c_current_cdemo_sk bigint, c_current_hdemo_sk bigint, c_current_addr_sk bigint, c_first_shipto_date_sk bigint, c_first_sales_date_sk bigint, c_salutation char(10), c_first_name char(20), c_last_name char(30), c_preferred_cust_flag char(1), c_birth_day int, c_birth_month int, c_birth_year int, c_birth_country varchar(20), c_login char(13), c_email_address char(50), c_last_review_date_sk bigint ) duplicate key (c_customer_sk) distributed by hash(c_customer_sk) buckets 5 properties( "replication_num"="1" );建立匯入任務。
請下載並上傳資料檔案customer.orc至OSS,然後執行以下命令建立匯入任務。
LOAD LABEL load_test.customer_label ( DATA INFILE("<file_path>") INTO TABLE customer format as "orc" ) WITH BROKER 'broker' ( "fs.oss.accessKeyId" = "xxxxx", "fs.oss.accessKeySecret" = "xxxxx", "fs.oss.endpoint" = "oss-cn-xxx-internal.aliyuncs.com" );涉及參數如下表所示,請根據實際情況進行替換。
參數
說明
<file_path>customer.orc檔案所在的路徑。例如,oss://<yourBucketName>/data/customer.orc。fs.oss.accessKeyId阿里雲帳號或RAM使用者的AccessKey ID。您可以進入AccessKey管理頁面擷取AccessKey ID。
fs.oss.accessKeySecretAccessKey ID對應的 AccessKey Secret。
fs.oss.endpoint訪問OSS的Endpoint。例如,oss-cn-hangzhou-internal.aliyuncs.com。
如果StarRocks與OSS位於同一地區,則使用VPC網路Endpoint,否則使用公網Endpoint。擷取方法請參見OSS地區和訪問網域名稱。
查看匯入任務狀態。
use load_test; show load where label='customer_label';查詢表資訊。
樣本1:統計load_test資料庫中customer表的總行數。
select count(1) from load_test.customer;樣本2:展示load_test資料庫中customer表的前兩條完整記錄。
select * from load_test.customer limit 2;
從HDFS匯入
使用該方式匯入時,需注意以下資訊:
HDFS叢集和StarRocks執行個體需要在同一個VPC下,並且在同一個可用性區域下。
HDFS叢集建立後,需要為安全性群組開通所有DataNode連接埠,才能訪問HDFS資料。本文樣本使用的是EMR on ECS中包含了HDFS服務的DataLake叢集,因此關於安全性群組的開通詳情,請參見管理安全性群組。
HDFS匯入樣本
建立庫表。
CREATE DATABASE IF NOT EXISTS mydatabase; CREATE TABLE if NOT EXISTS mydatabase.userdata_broker_load ( userId INT, userName VARCHAR(20), registrationDate DATE ) ENGINE = OLAP DUPLICATE KEY(userId) DISTRIBUTED BY HASH(userId);建立匯入任務。
請下載並上傳資料檔案user_data.parquet至HDFS的
/data目錄下,然後執行以下命令建立匯入任務。LOAD LABEL mydatabase.userdata_broker_load_label ( DATA INFILE("hdfs://<hdfs_ip>:<hdfs_port>/data/user_data.parquet") INTO TABLE userdata_broker_load format AS "parquet" ) WITH BROKER PROPERTIES ( "timeout" = "72000" );其中,以下參數請您根據實際情況替換。
參數
說明
<hdfs_ip>HDFS叢集NameNode節點的內網IP地址。
如果您使用的是EMR on ECS中包含HDFS服務的叢集(DataLake或Custom類型),則可以在節點管理頁簽的Master節點群組下,查看內網IP地址。
<hdfs_port>HDFS叢集的NameNode節點服務監聽的連接埠號碼,預設是
9000。查看匯入任務。
USE mydatabase; SHOW LOAD WHERE label='userdata_broker_load_label';查看匯入資料。
SELECT * FROM mydatabase.userdata_broker_load;
HDFS認證方式
社區版本HDFS支援簡單認證和Kerberos認證兩種認證方式。
簡單認證:使用者的身份由與HDFS建立串連的用戶端作業系統決定。
如果使用簡單認證,請按如下配置
StorageCredentialParams。"hadoop.security.authentication" = "simple", "username" = "<hdfs_username>", "password" = "<hdfs_password>"StorageCredentialParams包含如下參數。參數
描述
hadoop.security.authentication認證方式。取值範圍:
simple和kerberos。預設值:simple。simple表示簡單認證,即無認證。kerberos表示Kerberos認證。username用於訪問HDFS叢集中NameNode節點的使用者名稱。
password用於訪問HDFS叢集中NameNode節點的密碼。
Kerberos認證:用戶端的身份由使用者自己的Kerberos認證決定。
如果使用Kerberos認證,請在Serverless StarRocks執行個體的執行個體配置頁面,為
hdfs-site.xml檔案添加以下配置。"hadoop.security.authentication" = "kerberos", "kerberos_principal" = "nn/zelda1@ZELDA.COM", "kerberos_keytab" = "/keytab/hive.keytab", "kerberos_keytab_content" = "YWFhYWFh"涉及參數說明如下所示。
參數
描述
hadoop.security.authentication認證方式。取值範圍:
simple和kerberos。預設值:simple。simple表示簡單認證,即無認證。kerberos表示Kerberos認證。kerberos_principal用於指定Kerberos的使用者或服務(Principal)。每個Principal在HDFS叢集內唯一,由如下三部分組成:
username或servicename:HDFS叢集中使用者或服務的名稱。instance:HDFS叢集要認證的節點所在伺服器的名稱,用來保證使用者或服務全域唯一。比如,HDFS叢集中有多個DataNode節點,各節點需要各自獨立認證。realm:域,必須全大寫。
例如,
nn/zelda1@ZELDA.COM。kerberos_keytab指定Kerberos的keytab檔案路徑。該檔案必須為Broker進程所在伺服器上的檔案。
kerberos_keytab_content指定Kerberos中keytab檔案內容經過Base64編碼之後的內容。
重要該參數和
kerberos_keytab參數只需配置一個。
HDFS HA配置
通過配置NameNode HA,可以在NameNode切換時,自動識別到新的NameNode。在Serverless StarRocks執行個體的執行個體配置頁面,為hdfs-site.xml 檔案添加以下配置,用於訪問以HA模式部署的HDFS叢集。
"dfs.nameservices" = "ha_cluster",
"dfs.ha.namenodes.ha_cluster" = "ha_n1,ha_n2",
"dfs.namenode.rpc-address.ha_cluster.ha_n1" = "<hdfs_host>:<hdfs_port>",
"dfs.namenode.rpc-address.ha_cluster.ha_n2" = "<hdfs_host>:<hdfs_port>",
"dfs.client.failover.proxy.provider.ha_cluster" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"涉及參數說明如下表所示。
參數 | 描述 |
| 指定HDFS服務的名稱,您可以自訂。 例如,設定 |
| 自訂NameNode的名稱,多個名稱時以逗號(,)分隔。其中 例如,設定 |
| 指定NameNode的RPC地址資訊。其中nn表示 例如,設定 |
| 指定Client串連NameNode的Provider,預設值為 |
如果您使用的是EMR on ECS中包含HDFS服務的叢集,則可以在目的地組群的叢集服務標籤下,進入HDFS服務的配置標籤,在hdfs-site.xml文件中尋找相關參數的值。