本文主要為您介紹Spark Load匯入方式。
背景資訊
Spark Load是一種非同步匯入方式,您需要通過MySQL協議建立Spark類型匯入任務,並通過
SHOW LOAD查看匯入結果。Spark Load利用了Spark叢集資源對要匯入的資料進行了排序,Doris BE直接寫檔案,這樣能大大降低Doris叢集的資源使用,對於歷史海量資料移轉降低Doris叢集資源使用及負載有很好的效果。
如果您在沒有Spark叢集這種資源的情況下,又想方便、快速的完成外部儲存歷史資料的遷移,可以使用Broker Load。相比於Spark Load匯入,Broker Load對Doris叢集的資源佔用會更高。
適用情境
Spark Load通過外部的Spark資源實現對匯入資料的預先處理,提高Doris巨量資料量的匯入效能並且節省Doris叢集的計算資源。主要用於初次遷移、巨量資料量匯入Doris的情境。
來源資料在Spark可以訪問的儲存系統中,如HDFS。
資料量達到10 GB以上至TB層級的業務情境。
資料量較小或不滿足上述情況時,建議使用Stream Load或者Broker Load。
基本流程
您可以通過MySQL用戶端提交Spark類型匯入任務,FE記錄中繼資料並返回提交成功,基本流程如下所示。
+
|
+----v----+
| FE |---------------------------------+
+----+----+ |
| 3. FE send push tasks |
| 5. FE publish version |
+------------+------------+ |
| | | |
+---v---+ +---v---+ +---v---+ |
| BE | | BE | | BE | |1. FE submit Spark ETL job
+---^---+ +---^---+ +---^---+ |
|4. BE push with broker | |
+---+---+ +---+---+ +---+---+ |
|Broker | |Broker | |Broker | |
+---^---+ +---^---+ +---^---+ |
| | | |
+---+------------+------------+---+ 2.ETL +-------------v---------------+
| HDFS +-------> Spark cluster |
| <-------+ |
+---------------------------------+ +-----------------------------+Spark Load任務的執行主要分為以下5個階段。
FE調度提交ETL任務到Spark叢集執行。
Spark叢集執行ETL完成對匯入資料的預先處理。包括全域字典構建(BITMAP類型)、分區、排序、彙總等。
ETL任務完成後,FE擷取預先處理過的每個分區的資料路徑,並調度相關的BE執行Push任務。
BE通過Broker讀取資料,轉化為Doris底層儲存格式。
FE調度生效版本,完成匯入任務。
全域字典
適用情境
目前Doris中Bitmap列是使用類庫Roaringbitmap實現的,而Roaringbitmap的輸入資料類型只能是整型,因此如果要在匯入流程中實現對於Bitmap列的預計算,則需要將輸入資料的類型轉換成整型。
在Doris現有的匯入流程中,全域字典的資料結構是基於Hive表實現的,儲存了原始值到編碼值的映射。
構建流程
讀取上遊資料來源的資料,產生一張Hive暫存資料表,記為hive_table。
從hive_table中抽取待去重欄位的去重值,產生一張新的Hive表,記為distinct_value_table。
建立一張全域字典表,記為dict_table。字典表一列為原始值,另一列為編碼後的值。
將distinct_value_table與dict_table做
left join,計算出新增的去重值集合,然後對這個集合使用視窗函數進行編碼,此時去重列原始值就多了一列編碼後的值,最後將這兩列的資料寫回dict_table。將dict_table與hive_table串連,完成hive_table中原始值替換成整型編碼值的工作。
hive_table會被下一步資料預先處理的流程所讀取,經過計算後匯入到Doris中。
資料預先處理(DPP)
基本流程
從資料來源讀取資料,上遊資料來源可以是HDFS檔案,也可以是Hive表。
對讀取到的資料進列欄位映射,運算式計算以及根據分區資訊產生分桶欄位
bucket_id。根據Doris表的rollup中繼資料產生RollupTree。
遍曆RollupTree,進行分層的彙總操作,下一個層級的rollup可以由上一個層的rollup計算得來。
每次完成彙總計算後,會對資料根據
bucket_id進行分桶然後寫入HDFS中。後續broker會拉取HDFS中的檔案然後匯入Doris BE中。
Hive Bitmap UDF
Spark支援將Hive產生的Bitmap資料直接匯入到Doris。
配置ETL叢集
配置叢集
Spark作為一種外部計算資源在Doris中用來完成ETL工作,引入Resource Management來管理Doris使用的這些外部資源。
提交Spark匯入任務之前,需要配置執行ETL任務的Spark叢集,參數配置如下所示,詳情參見下面的建立資源配置。
-- create spark resource
CREATE EXTERNAL RESOURCE resource_name
PROPERTIES
(
type = spark,
spark_conf_key = spark_conf_value,
working_dir = path,
broker = broker_name,
broker.property_key = property_value,
broker.hadoop.security.authentication = kerberos,
broker.kerberos_principal = doris@YOUR.COM,
broker.kerberos_keytab = /home/doris/my.keytab
broker.kerberos_keytab_content = ASDOWHDLAWI********ALDJSDIWALD
)
-- drop spark resource
DROP RESOURCE resource_name
-- show resources
SHOW RESOURCES
SHOW PROC "/resources"
-- privileges
GRANT USAGE_PRIV ON RESOURCE resource_name TO user_identity
GRANT USAGE_PRIV ON RESOURCE resource_name TO ROLE role_name
REVOKE USAGE_PRIV ON RESOURCE resource_name FROM user_identity
REVOKE USAGE_PRIV ON RESOURCE resource_name FROM ROLE role_name建立資源配置
resource_name為Doris中配置的Spark資源的名字。
PROPERTIES是Spark資源相關參數,參數如下:
type:資源類型,必填,目前僅支援Spark。
Spark相關參數如下:
spark.master:必填,參數值為yarn或
spark://host:port。spark.submit.deployMode:Spark程式的部署模式,必填,支援Cluster,Client兩種模式。
spark.hadoop.yarn.resourcemanager.address:參數spark.master為yarn時,該參數必填。
spark.hadoop.fs.defaultFS:參數spark.master為yarn時,該參數必填。
其他參數為可選,詳情請參見Spark Configuration。
working_dir: ETL使用的目錄。Spark作為ETL資源使用時必填。例如
hdfs://host:port/tmp/doris。broker.hadoop.security.authentication:指定認證方式為kerberos。
broker.kerberos_principal:指定kerberos的principal。
broker.kerberos_keytab:指定kerberos的keytab檔案路徑。該檔案必須為Broker進程所在伺服器上的檔案的絕對路徑。並且可以被Broker進程訪問。
broker.kerberos_keytab_content:指定kerberos中keytab檔案內容經過base64編碼之後的內容。該參數和broker.kerberos_keytab配置二選一即可。
broker: broker名字。Spark作為ETL資源使用時必填。需要使用
ALTER SYSTEM ADD BROKER命令提前完成配置。broker.property_key:broker讀取ETL產生的中間檔案時需要指定的認證資訊等。
參數樣本。
YARN Cluster模式
CREATE EXTERNAL RESOURCE "spark0" PROPERTIES ( "type" = "spark", "spark.master" = "yarn", "spark.submit.deployMode" = "cluster", "spark.jars" = "xxx.jar,yyy.jar", "spark.files" = "/tmp/aaa,/tmp/bbb", "spark.executor.memory" = "1g", "spark.yarn.queue" = "queue0", "spark.hadoop.yarn.resourcemanager.address" = "127.0.0.1:9999", "spark.hadoop.fs.defaultFS" = "hdfs://127.0.0.1:10000", "working_dir" = "hdfs://127.0.0.1:10000/tmp/doris", "broker" = "broker0", "broker.username" = "user0", "broker.password" = "password0" );Spark Standalone Client模式
CREATE EXTERNAL RESOURCE "spark1" PROPERTIES ( "type" = "spark", "spark.master" = "spark://127.0.0.1:7777", "spark.submit.deployMode" = "client", "working_dir" = "hdfs://127.0.0.1:10000/tmp/doris", "broker" = "broker1" );
Spark Load支援Kerberos認證
如果是Spark Load訪問帶有Kerberos認證的Hadoop叢集資源,您只需要在建立Spark resource的時候指定以下參數即可:
broker.hadoop.security.authentication:指定認證方式為kerberos。
broker.kerberos_principal:指定kerberos的principal。
broker.kerberos_keytab:指定kerberos的keytab檔案路徑。該檔案必須為Broker進程所在伺服器上的檔案的絕對路徑。並且可以被Broker進程訪問。
broker.kerberos_keytab_content:指定kerberos中keytab檔案內容經過base64編碼之後的內容。該參數和kerberos_keytab配置二選一即可。
參數樣本
CREATE EXTERNAL RESOURCE "spark_on_kerberos"
PROPERTIES
(
"type" = "spark",
"spark.master" = "yarn",
"spark.submit.deployMode" = "cluster",
"spark.jars" = "xxx.jar,yyy.jar",
"spark.files" = "/tmp/aaa,/tmp/bbb",
"spark.executor.memory" = "1g",
"spark.yarn.queue" = "queue0",
"spark.hadoop.yarn.resourcemanager.address" = "127.0.0.1:9999",
"spark.hadoop.fs.defaultFS" = "hdfs://127.0.0.1:10000",
"working_dir" = "hdfs://127.0.0.1:10000/tmp/doris",
"broker" = "broker0",
"broker.hadoop.security.authentication" = "kerberos",
"broker.kerberos_principal" = "doris@YOUR.COM",
"broker.kerberos_keytab" = "/home/doris/my.keytab"
);查看資源
普通賬戶只能看到自己有USAGE_PRIV使用許可權的資源。
root和admin賬戶可以看到所有的資源。
資源許可權
資源許可權通過GRANT REVOKE來管理,目前僅支援USAGE_PRIV使用許可權。
可以將USAGE_PRIV許可權賦予某個賬戶或者某個角色,角色的使用與之前一致。
-- 授予spark0資源的使用許可權給賬戶user0
GRANT USAGE_PRIV ON RESOURCE "spark0" TO "user0"@"%";
-- 授予spark0資源的使用許可權給角色role0
GRANT USAGE_PRIV ON RESOURCE "spark0" TO ROLE "role0";
-- 授予所有資源的使用許可權給賬戶user0
GRANT USAGE_PRIV ON RESOURCE * TO "user0"@"%";
-- 授予所有資源的使用許可權給角色role0
GRANT USAGE_PRIV ON RESOURCE * TO ROLE "role0";
-- 撤銷賬戶user0的spark0資源使用許可權
REVOKE USAGE_PRIV ON RESOURCE "spark0" FROM "user0"@"%";配置SPARK用戶端
FE底層通過執行spark-submit命令去提交Spark任務,因此需要為FE配置Spark用戶端,建議使用2.4.5或以上的Spark2官方版本,您可以通過Spark下載地址進行下載,下載完成後,請按以下步驟完成配置。
配置SPARK_HOME環境變數
將spark用戶端放在FE同一台機器上的目錄下,並在FE的設定檔配置spark_home_default_dir項指向此目錄,此配置項預設為FE根目錄下的lib/spark2x路徑,此項不可為空白。
配置SPARK依賴包
將Spark用戶端下的
jars檔案夾內所有JAR包歸檔打包成一個ZIP檔案,並在FE的設定檔配置spark_resource_path項指向此ZIP檔案,若此配置項為空白,則FE會嘗試尋找FE根目錄下的lib/spark2x/jars/spark-2x.zip檔案,若沒有找到則會報檔案不存在的錯誤。當提交Spark Load任務時,會將歸檔好的依賴檔案上傳至遠端倉庫,預設倉庫路徑掛在
working_dir/{cluster_id}目錄下,並以__spark_repository__{resource_name}命名,表示叢集內的一個resource對應一個遠端倉庫,遠端倉庫目錄結構參考如下:__spark_repository__spark0/ |-__archive_1.0.0/ | |-__lib_990325d2c0d1d5e45bf675e54e44fb16_spark-dpp-1.0.0-jar-with-dependencies.jar | |-__lib_7670c29daf535efe3c9b923f778f61fc_spark-2x.zip |-__archive_1.1.0/ | |-__lib_64d5696f99c379af2bee28c1c84271d5_spark-dpp-1.1.0-jar-with-dependencies.jar | |-__lib_1bbb74bb6b264a270bc7fca3e964160f_spark-2x.zip |-__archive_1.2.0/ | |-...說明除了Spark依賴(預設以spark-2x.zip命名),FE還會上傳DPP的依賴包至遠端倉庫,若此次Spark Load提交的所有依賴檔案都已存在遠端倉庫,那麼就不需要再上傳依賴,節省了每次重複上傳大量檔案的時間。
配置YARN用戶端
FE底層通過執行
yarn命令去擷取正在啟動並執行Application的狀態以及終止Application,因此需要為FE配置YARN用戶端,建議使用hadoop-2.5.2或hadoop-2.0以上的官方版本,下載詳情請參見hadoop下載地址。將下載好的YARN用戶端放在FE同一台機器的目錄下,並在FE設定檔配置yarn_client_path項指向yarn的二進位可執行檔,預設為FE根目錄下的lib/yarn-client/hadoop/bin/yarn路徑。
(可選操作)當FE通過yarn用戶端去擷取Application的狀態或者終止Application時,預設會在FE根目錄下的lib/yarn-config路徑下產生執行yarn命令所需的設定檔,此路徑可通過在FE設定檔配置yarn_config_dir項修改,目前產生的設定檔包括core-site.xml和yarn-site.xml。
建立匯入
主要介紹Spark Load的建立匯入文法中參數意義和注意事項。
建立匯入的詳細文法執行HELP SPARK LOAD查看文法協助。
Spark Load匯入文法
LOAD LABEL load_label (data_desc, ...) WITH RESOURCE resource_name [resource_properties] [PROPERTIES (key1=value1, ... )] * load_label: db_name.label_name * data_desc: DATA INFILE ('file_path', ...) [NEGATIVE] INTO TABLE tbl_name [PARTITION (p1, p2)] [COLUMNS TERMINATED BY separator ] [(col1, ...)] [COLUMNS FROM PATH AS (col2, ...)] [SET (k1=f1(xx), k2=f2(xx))] [WHERE predicate] DATA FROM TABLE hive_external_tbl [NEGATIVE] INTO TABLE tbl_name [PARTITION (p1, p2)] [SET (k1=f1(xx), k2=f2(xx))] [WHERE predicate] * resource_properties: (key2=value2, ...)樣本一:上遊資料來源為HDFS檔案的情況
LOAD LABEL db1.label1 ( DATA INFILE("hdfs://abc.com:8888/user/palo/test/ml/file1") INTO TABLE tbl1 COLUMNS TERMINATED BY "," (tmp_c1,tmp_c2) SET ( id=tmp_c2, name=tmp_c1 ), DATA INFILE("hdfs://abc.com:8888/user/palo/test/ml/file2") INTO TABLE tbl2 COLUMNS TERMINATED BY "," (col1, col2) where col1 > 1 ) WITH RESOURCE 'spark0' ( "spark.executor.memory" = "2g", "spark.shuffle.compress" = "true" ) PROPERTIES ( "timeout" = "3600" );樣本二:上遊資料來源是Hive表的情況
建立Hive外部表格。
CREATE EXTERNAL TABLE hive_t1 ( k1 INT, K2 SMALLINT, k3 varchar(50), uuid varchar(100) ) ENGINE=hive properties ( "database" = "tmp", "table" = "t1", "hive.metastore.uris" = "thrift://0.0.0.0:8080" );提交Load命令,要求匯入的Doris表中的列必須在Hive外部表格中存在。
LOAD LABEL db1.label1 ( DATA FROM TABLE hive_t1 INTO TABLE tbl1 SET ( uuid=bitmap_dict(uuid) ) ) WITH RESOURCE 'spark0' ( "spark.executor.memory" = "2g", "spark.shuffle.compress" = "true" ) PROPERTIES ( "timeout" = "3600" );
樣本三:上遊資料來源是Hive Binary類型情況
建立Hive外部表格。
CREATE EXTERNAL TABLE hive_t1 ( k1 INT, K2 SMALLINT, k3 varchar(50), uuid varchar(100) //hive中的類型為binary ) ENGINE=hive properties ( "database" = "tmp", "table" = "t1", "hive.metastore.uris" = "thrift://0.0.0.0:8080" );提交Load命令,要求匯入的Doris表中的列必須在Hive外部表格中存在。
LOAD LABEL db1.label1 ( DATA FROM TABLE hive_t1 INTO TABLE tbl1 SET ( uuid=binary_bitmap(uuid) ) ) WITH RESOURCE 'spark0' ( "spark.executor.memory" = "2g", "spark.shuffle.compress" = "true" ) PROPERTIES ( "timeout" = "3600" );
樣本四:匯入Hive分區表的資料
Hive建表語句
create table test_partition( id int, name string, age int ) partitioned by (dt string) row format delimited fields terminated by ',' stored as textfile;Doris建表語句
CREATE TABLE IF NOT EXISTS test_partition_04 ( dt date, id int, name string, age int ) UNIQUE KEY(`dt`, `id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1 PROPERTIES ( "replication_allocation" = "tag.location.default: 1" );Spark Load語句
CREATE EXTERNAL RESOURCE "spark_resource" PROPERTIES ( "type" = "spark", "spark.master" = "yarn", "spark.submit.deployMode" = "cluster", "spark.executor.memory" = "1g", "spark.yarn.queue" = "default", "spark.hadoop.yarn.resourcemanager.address" = "localhost:50056", "spark.hadoop.fs.defaultFS" = "hdfs://localhost:9000", "working_dir" = "hdfs://localhost:9000/tmp/doris", "broker" = "broker_01" ); LOAD LABEL demo.test_hive_partition_table_18 ( DATA INFILE("hdfs://localhost:9000/user/hive/warehouse/demo.db/test/dt=2022-08-01/*") INTO TABLE test_partition_04 COLUMNS TERMINATED BY "," FORMAT AS "csv" (id,name,age) COLUMNS FROM PATH AS (`dt`) SET ( dt=dt, id=id, name=name, age=age ) ) WITH RESOURCE 'spark_resource' ( "spark.executor.memory" = "1g", "spark.shuffle.compress" = "true" ) PROPERTIES ( "timeout" = "3600" );
參數和相關匯入說明
資料描述類參數
目前支援的資料來源有CSV和Hive table。其他規則與Broker Load一致。
匯入作業參數
匯入作業參數主要指的是Spark Load建立匯入語句中的屬於opt_properties部分的參數。匯入作業參數是作用於整個匯入作業的。規則與Broker Load一致。
Spark資源參數
Spark資源需要提前配置到Doris系統中並且賦予您USAGE_PRIV許可權後才能使用Spark Load。
當您有臨時性的需求,比如增加任務使用的資源而修改Spark configs,可參考如下樣本進行設定。
WITH RESOURCE 'spark0' ( "spark.driver.memory" = "1g", "spark.executor.memory" = "3g" )說明設定僅對本次任務生效,並不影響Doris叢集中已有的配置。
Hive表作為資料來源匯入
目前如果期望在匯入流程中將Hive表作為資料來源,那麼需要先建立一張類型為Hive的外部表格,然後提交匯入命令時指定外部表格的表名即可。
匯入流程構建全域字典
適用於Doris表彙總列的資料類型為Bitmap類型。在
load命令中指定需要構建全域字典的欄位即可,格式為:Doris欄位名稱=bitmap_dict(Hive表欄位名稱)。重要目前只有在上遊資料來源為Hive表時才支援全域字典的構建。
Hive Binary(bitmap)類型列的匯入
適用於Doris表彙總列的資料類型為Bitmap類型,且資料來源Hive表中對應列的資料類型為binary(通過FE中spark-dpp中的
org.apache.doris.load.loadv2.dpp.BitmapValue類序列化)類型。全域字典,在
load命令中指定相應欄位即可,格式為:Doris欄位名稱=binary_bitmap(Hive表欄位名稱)。重要目前只有在上遊資料來源為Hive表時才支援Binary(Bitmap)類型的資料匯入。
查看匯入
Spark Load匯入方式同Broker Load一樣都是非同步,所以您必須將建立匯入的Label記錄,並且在查看匯入命令中使用Label來查看匯入結果。查看匯入命令在所有匯入方式中是通用的,具體文法可執行HELP SHOW LOAD查看,查看匯入樣本如下。
show load order by createtime desc limit 1\G返回如下資訊。
*************************** 1. row ***************************
JobId: 76391
Label: label1
State: FINISHED
Progress: ETL:100%; LOAD:100%
Type: SPARK
EtlInfo: unselected.rows=4; dpp.abnorm.ALL=15; dpp.norm.ALL=28133376
TaskInfo: cluster:cluster0; timeout(s):10800; max_filter_ratio:5.0E-5
ErrorMsg: N/A
CreateTime: 2019-07-27 11:46:42
EtlStartTime: 2019-07-27 11:46:44
EtlFinishTime: 2019-07-27 11:49:44
LoadStartTime: 2019-07-27 11:49:44
LoadFinishTime: 2019-07-27 11:50:16
URL: http://1.1.*.*:80**/proxy/application_15866****3848_0035/
JobDetails: {"ScannedRows":28133395,"TaskNumber":1,"FileNumber":1,"FileSize":200000}返回結果集中參數意義可以參見Broker Load。不同點如下:
State匯入任務當前所處的階段。任務提交之後狀態為PENDING,提交Spark ETL之後狀態變為ETL,ETL完成之後FE調度BE執行push操作狀態變為LOADING,push完成並且版本生效後狀態變為FINISHED。
匯入任務的最終階段有兩個:CANCELLED和FINISHED。當Load job處於這兩個階段時匯入完成。其中CANCELLED為匯入失敗,FINISHED為匯入成功。
Progress匯入任務的進度描述。分為兩種進度:ETL和LOAD,對應了匯入流程的兩個階段ETL和LOADING。
LOAD進度=當前已完成所有replica匯入的tablet個數/本次匯入任務的總tablet個數*100%LOAD的進度範圍為0~100%。如果所有匯入表均完成匯入,此時LOAD的進度為99%,表示匯入進入到最後生效階段,整個匯入完成後,LOAD的進度才會改為100%。
說明匯入進度並不是線性,所以如果一段時間內進度沒有變化,並不代表匯入沒有在執行。
Type匯入任務的類型。Spark Load為SPARK。
以下參數值含義:
CreateTime:匯入建立的時間。
EtlStartTime:ETL階段開始的時間。
EtlFinishTime:ETL階段完成的時間。
LoadStartTime:LOADING階段開始的時間。
LoadFinishTime:整個匯入任務完成的時間。
JobDetails顯示一些作業的詳細運行狀態,ETL結束的時候更新。包括匯入檔案的個數、總大小(位元組)、子任務個數、已處理的原始行數等,如
{"ScannedRows":139264,"TaskNumber":1,"FileNumber":1,"FileSize":940754064}。URL可複製輸入到瀏覽器,跳轉至相應Application的Web介面。
查看Spark launcher提交日誌
Spark任務提交過程中產生的詳細日誌,預設儲存在FE根目錄下log/spark_launcher_log路徑下,並以spark_launcher_{load_job_id}_{label}.log命名。
日誌會在此目錄下儲存一段時間,當FE中繼資料中的匯入資訊被清理時,相應的日誌也會被清理,預設儲存時間為3天。
取消匯入
當Spark Load作業狀態不為CANCELLED或FINISHED時,您可以手動取消。取消時需要指定待取消匯入任務的Label。取消匯入命令文法可執行HELP CANCEL LOAD查看。
相關係統配置
下面配置屬於Spark Load的系統層級配置,即作用於所有Spark Load匯入任務的配置。主要通過修改fe.conf來調整配置值。
enable_spark_load:開啟Spark Load和建立Resource功能。預設為False,關閉此功能。
spark_load_default_timeout_second:任務預設逾時時間為259200秒(3天)。
spark_home_default_dirspark:用戶端路徑
fe/lib/spark2x。spark_resource_path:打包好的Spark依賴檔案路徑(預設為空白)。
spark_launcher_log_dirspark:用戶端的提交日誌存放的目錄
fe/log/spark_launcher_log。yarn_client_pathyarn:二進位可執行檔路徑
fe/lib/yarn-client/hadoop/bin/yarn。yarn_config_diryarn:設定檔產生路徑
fe/lib/yarn-config。