全部產品
Search
文件中心

E-MapReduce:Spark Load

更新時間:Jul 01, 2024

本文主要為您介紹Spark Load匯入方式。

背景資訊

  • Spark Load是一種非同步匯入方式,您需要通過MySQL協議建立Spark類型匯入任務,並通過SHOW LOAD查看匯入結果。

  • Spark Load利用了Spark叢集資源對要匯入的資料進行了排序,Doris BE直接寫檔案,這樣能大大降低Doris叢集的資源使用,對於歷史海量資料移轉降低Doris叢集資源使用及負載有很好的效果。

如果您在沒有Spark叢集這種資源的情況下,又想方便、快速的完成外部儲存歷史資料的遷移,可以使用Broker Load。相比於Spark Load匯入,Broker Load對Doris叢集的資源佔用會更高。

適用情境

Spark Load通過外部的Spark資源實現對匯入資料的預先處理,提高Doris巨量資料量的匯入效能並且節省Doris叢集的計算資源。主要用於初次遷移、巨量資料量匯入Doris的情境。

  • 來源資料在Spark可以訪問的儲存系統中,如HDFS。

  • 資料量達到10 GB以上至TB層級的業務情境。

說明

資料量較小或不滿足上述情況時,建議使用Stream Load或者Broker Load

基本流程

您可以通過MySQL用戶端提交Spark類型匯入任務,FE記錄中繼資料並返回提交成功,基本流程如下所示。

+
                 | 
            +----v----+
            |   FE    |---------------------------------+
            +----+----+                                 |
                 | 3. FE send push tasks                |
                 | 5. FE publish version                |
    +------------+------------+                         |
    |            |            |                         |
+---v---+    +---v---+    +---v---+                     |
|  BE   |    |  BE   |    |  BE   |                     |1. FE submit Spark ETL job
+---^---+    +---^---+    +---^---+                     |
    |4. BE push with broker   |                         |
+---+---+    +---+---+    +---+---+                     |
|Broker |    |Broker |    |Broker |                     |
+---^---+    +---^---+    +---^---+                     |
    |            |            |                         |
+---+------------+------------+---+ 2.ETL +-------------v---------------+
|               HDFS              +------->       Spark cluster         |
|                                 <-------+                             |
+---------------------------------+       +-----------------------------+

Spark Load任務的執行主要分為以下5個階段。

  1. FE調度提交ETL任務到Spark叢集執行。

  2. Spark叢集執行ETL完成對匯入資料的預先處理。包括全域字典構建(BITMAP類型)、分區、排序、彙總等。

  3. ETL任務完成後,FE擷取預先處理過的每個分區的資料路徑,並調度相關的BE執行Push任務。

  4. BE通過Broker讀取資料,轉化為Doris底層儲存格式。

  5. FE調度生效版本,完成匯入任務。

全域字典

適用情境

  • 目前Doris中Bitmap列是使用類庫Roaringbitmap實現的,而Roaringbitmap的輸入資料類型只能是整型,因此如果要在匯入流程中實現對於Bitmap列的預計算,則需要將輸入資料的類型轉換成整型。

  • 在Doris現有的匯入流程中,全域字典的資料結構是基於Hive表實現的,儲存了原始值到編碼值的映射。

構建流程

  1. 讀取上遊資料來源的資料,產生一張Hive暫存資料表,記為hive_table

  2. hive_table中抽取待去重欄位的去重值,產生一張新的Hive表,記為distinct_value_table

  3. 建立一張全域字典表,記為dict_table。字典表一列為原始值,另一列為編碼後的值。

  4. distinct_value_tabledict_tableleft join,計算出新增的去重值集合,然後對這個集合使用視窗函數進行編碼,此時去重列原始值就多了一列編碼後的值,最後將這兩列的資料寫回dict_table。

  5. dict_tablehive_table串連,完成hive_table中原始值替換成整型編碼值的工作。

  6. hive_table會被下一步資料預先處理的流程所讀取,經過計算後匯入到Doris中。

資料預先處理(DPP)

基本流程

  1. 從資料來源讀取資料,上遊資料來源可以是HDFS檔案,也可以是Hive表。

  2. 對讀取到的資料進列欄位映射,運算式計算以及根據分區資訊產生分桶欄位bucket_id

  3. 根據Doris表的rollup中繼資料產生RollupTree。

  4. 遍曆RollupTree,進行分層的彙總操作,下一個層級的rollup可以由上一個層的rollup計算得來。

  5. 每次完成彙總計算後,會對資料根據bucket_id進行分桶然後寫入HDFS中。

  6. 後續broker會拉取HDFS中的檔案然後匯入Doris BE中。

Hive Bitmap UDF

Spark支援將Hive產生的Bitmap資料直接匯入到Doris。

配置ETL叢集

配置叢集

Spark作為一種外部計算資源在Doris中用來完成ETL工作,引入Resource Management來管理Doris使用的這些外部資源。

提交Spark匯入任務之前,需要配置執行ETL任務的Spark叢集,參數配置如下所示,詳情參見下面的建立資源配置。

-- create spark resource
CREATE EXTERNAL RESOURCE resource_name
PROPERTIES
(
  type = spark,
  spark_conf_key = spark_conf_value,
  working_dir = path,
  broker = broker_name,
  broker.property_key = property_value,
  broker.hadoop.security.authentication = kerberos,
  broker.kerberos_principal = doris@YOUR.COM,
  broker.kerberos_keytab = /home/doris/my.keytab
  broker.kerberos_keytab_content = ASDOWHDLAWI********ALDJSDIWALD
)

-- drop spark resource
DROP RESOURCE resource_name

-- show resources
SHOW RESOURCES
SHOW PROC "/resources"

-- privileges
GRANT USAGE_PRIV ON RESOURCE resource_name TO user_identity
GRANT USAGE_PRIV ON RESOURCE resource_name TO ROLE role_name

REVOKE USAGE_PRIV ON RESOURCE resource_name FROM user_identity
REVOKE USAGE_PRIV ON RESOURCE resource_name FROM ROLE role_name

建立資源配置

resource_name為Doris中配置的Spark資源的名字。

PROPERTIES是Spark資源相關參數,參數如下:

  • type:資源類型,必填,目前僅支援Spark。

  • Spark相關參數如下:

    • spark.master:必填,參數值為yarn或spark://host:port

    • spark.submit.deployMode:Spark程式的部署模式,必填,支援Cluster,Client兩種模式。

    • spark.hadoop.yarn.resourcemanager.address:參數spark.master為yarn時,該參數必填。

    • spark.hadoop.fs.defaultFS:參數spark.master為yarn時,該參數必填。

    • 其他參數為可選,詳情請參見Spark Configuration

  • working_dir: ETL使用的目錄。Spark作為ETL資源使用時必填。例如hdfs://host:port/tmp/doris

  • broker.hadoop.security.authentication:指定認證方式為kerberos。

  • broker.kerberos_principal:指定kerberos的principal。

  • broker.kerberos_keytab:指定kerberos的keytab檔案路徑。該檔案必須為Broker進程所在伺服器上的檔案的絕對路徑。並且可以被Broker進程訪問。

  • broker.kerberos_keytab_content:指定kerberos中keytab檔案內容經過base64編碼之後的內容。該參數和broker.kerberos_keytab配置二選一即可。

  • broker: broker名字。Spark作為ETL資源使用時必填。需要使用ALTER SYSTEM ADD BROKER命令提前完成配置。

  • broker.property_key:broker讀取ETL產生的中間檔案時需要指定的認證資訊等。

參數樣本。

  • YARN Cluster模式

    CREATE EXTERNAL RESOURCE "spark0"
    PROPERTIES
    (
      "type" = "spark",
      "spark.master" = "yarn",
      "spark.submit.deployMode" = "cluster",
      "spark.jars" = "xxx.jar,yyy.jar",
      "spark.files" = "/tmp/aaa,/tmp/bbb",
      "spark.executor.memory" = "1g",
      "spark.yarn.queue" = "queue0",
      "spark.hadoop.yarn.resourcemanager.address" = "127.0.0.1:9999",
      "spark.hadoop.fs.defaultFS" = "hdfs://127.0.0.1:10000",
      "working_dir" = "hdfs://127.0.0.1:10000/tmp/doris",
      "broker" = "broker0",
      "broker.username" = "user0",
      "broker.password" = "password0"
    );
  • Spark Standalone Client模式

    CREATE EXTERNAL RESOURCE "spark1"
    PROPERTIES
    (
      "type" = "spark",
      "spark.master" = "spark://127.0.0.1:7777",
      "spark.submit.deployMode" = "client",
      "working_dir" = "hdfs://127.0.0.1:10000/tmp/doris",
      "broker" = "broker1"
    );

Spark Load支援Kerberos認證

如果是Spark Load訪問帶有Kerberos認證的Hadoop叢集資源,您只需要在建立Spark resource的時候指定以下參數即可:

  • broker.hadoop.security.authentication:指定認證方式為kerberos。

  • broker.kerberos_principal:指定kerberos的principal。

  • broker.kerberos_keytab:指定kerberos的keytab檔案路徑。該檔案必須為Broker進程所在伺服器上的檔案的絕對路徑。並且可以被Broker進程訪問。

  • broker.kerberos_keytab_content:指定kerberos中keytab檔案內容經過base64編碼之後的內容。該參數和kerberos_keytab配置二選一即可。

參數樣本

CREATE EXTERNAL RESOURCE "spark_on_kerberos"
PROPERTIES
(
  "type" = "spark",
  "spark.master" = "yarn",
  "spark.submit.deployMode" = "cluster",
  "spark.jars" = "xxx.jar,yyy.jar",
  "spark.files" = "/tmp/aaa,/tmp/bbb",
  "spark.executor.memory" = "1g",
  "spark.yarn.queue" = "queue0",
  "spark.hadoop.yarn.resourcemanager.address" = "127.0.0.1:9999",
  "spark.hadoop.fs.defaultFS" = "hdfs://127.0.0.1:10000",
  "working_dir" = "hdfs://127.0.0.1:10000/tmp/doris",
  "broker" = "broker0",
  "broker.hadoop.security.authentication" = "kerberos",
  "broker.kerberos_principal" = "doris@YOUR.COM",
  "broker.kerberos_keytab" = "/home/doris/my.keytab"
);

查看資源

  • 普通賬戶只能看到自己有USAGE_PRIV使用許可權的資源。

  • root和admin賬戶可以看到所有的資源。

資源許可權

資源許可權通過GRANT REVOKE來管理,目前僅支援USAGE_PRIV使用許可權。

可以將USAGE_PRIV許可權賦予某個賬戶或者某個角色,角色的使用與之前一致。

-- 授予spark0資源的使用許可權給賬戶user0
GRANT USAGE_PRIV ON RESOURCE "spark0" TO "user0"@"%";

-- 授予spark0資源的使用許可權給角色role0
GRANT USAGE_PRIV ON RESOURCE "spark0" TO ROLE "role0";

-- 授予所有資源的使用許可權給賬戶user0
GRANT USAGE_PRIV ON RESOURCE * TO "user0"@"%";

-- 授予所有資源的使用許可權給角色role0
GRANT USAGE_PRIV ON RESOURCE * TO ROLE "role0";

-- 撤銷賬戶user0的spark0資源使用許可權
REVOKE USAGE_PRIV ON RESOURCE "spark0" FROM "user0"@"%";

配置SPARK用戶端

FE底層通過執行spark-submit命令去提交Spark任務,因此需要為FE配置Spark用戶端,建議使用2.4.5或以上的Spark2官方版本,您可以通過Spark下載地址進行下載,下載完成後,請按以下步驟完成配置。

  1. 配置SPARK_HOME環境變數

    將spark用戶端放在FE同一台機器上的目錄下,並在FE的設定檔配置spark_home_default_dir項指向此目錄,此配置項預設為FE根目錄下的lib/spark2x路徑,此項不可為空白。

  2. 配置SPARK依賴包

    將Spark用戶端下的jars檔案夾內所有JAR包歸檔打包成一個ZIP檔案,並在FE的設定檔配置spark_resource_path項指向此ZIP檔案,若此配置項為空白,則FE會嘗試尋找FE根目錄下的lib/spark2x/jars/spark-2x.zip檔案,若沒有找到則會報檔案不存在的錯誤。

    當提交Spark Load任務時,會將歸檔好的依賴檔案上傳至遠端倉庫,預設倉庫路徑掛在working_dir/{cluster_id}目錄下,並以__spark_repository__{resource_name}命名,表示叢集內的一個resource對應一個遠端倉庫,遠端倉庫目錄結構參考如下:

    __spark_repository__spark0/
        |-__archive_1.0.0/
        |        |-__lib_990325d2c0d1d5e45bf675e54e44fb16_spark-dpp-1.0.0-jar-with-dependencies.jar
        |        |-__lib_7670c29daf535efe3c9b923f778f61fc_spark-2x.zip
        |-__archive_1.1.0/
        |        |-__lib_64d5696f99c379af2bee28c1c84271d5_spark-dpp-1.1.0-jar-with-dependencies.jar
        |        |-__lib_1bbb74bb6b264a270bc7fca3e964160f_spark-2x.zip
        |-__archive_1.2.0/
        |        |-...
    說明

    除了Spark依賴(預設以spark-2x.zip命名),FE還會上傳DPP的依賴包至遠端倉庫,若此次Spark Load提交的所有依賴檔案都已存在遠端倉庫,那麼就不需要再上傳依賴,節省了每次重複上傳大量檔案的時間。

配置YARN用戶端

  1. FE底層通過執行yarn命令去擷取正在啟動並執行Application的狀態以及終止Application,因此需要為FE配置YARN用戶端,建議使用hadoop-2.5.2或hadoop-2.0以上的官方版本,下載詳情請參見hadoop下載地址

  2. 將下載好的YARN用戶端放在FE同一台機器的目錄下,並在FE設定檔配置yarn_client_path項指向yarn的二進位可執行檔,預設為FE根目錄下的lib/yarn-client/hadoop/bin/yarn路徑。

  3. (可選操作)當FE通過yarn用戶端去擷取Application的狀態或者終止Application時,預設會在FE根目錄下的lib/yarn-config路徑下產生執行yarn命令所需的設定檔,此路徑可通過在FE設定檔配置yarn_config_dir項修改,目前產生的設定檔包括core-site.xmlyarn-site.xml

建立匯入

主要介紹Spark Load的建立匯入文法中參數意義和注意事項。

說明

建立匯入的詳細文法執行HELP SPARK LOAD查看文法協助。

  • Spark Load匯入文法

    LOAD LABEL load_label
        (data_desc, ...)
        WITH RESOURCE resource_name
        [resource_properties]
        [PROPERTIES (key1=value1, ... )]
    
    * load_label:
        db_name.label_name
    
    * data_desc:
        DATA INFILE ('file_path', ...)
        [NEGATIVE]
        INTO TABLE tbl_name
        [PARTITION (p1, p2)]
        [COLUMNS TERMINATED BY separator ]
        [(col1, ...)]
        [COLUMNS FROM PATH AS (col2, ...)]
        [SET (k1=f1(xx), k2=f2(xx))]
        [WHERE predicate]
    
        DATA FROM TABLE hive_external_tbl
        [NEGATIVE]
        INTO TABLE tbl_name
        [PARTITION (p1, p2)]
        [SET (k1=f1(xx), k2=f2(xx))]
        [WHERE predicate]
    
    * resource_properties:
        (key2=value2, ...)
  • 樣本一:上遊資料來源為HDFS檔案的情況

    LOAD LABEL db1.label1
    (
        DATA INFILE("hdfs://abc.com:8888/user/palo/test/ml/file1")
        INTO TABLE tbl1
        COLUMNS TERMINATED BY ","
        (tmp_c1,tmp_c2)
        SET
        (
            id=tmp_c2,
            name=tmp_c1
        ),
        DATA INFILE("hdfs://abc.com:8888/user/palo/test/ml/file2")
        INTO TABLE tbl2
        COLUMNS TERMINATED BY ","
        (col1, col2)
        where col1 > 1
    )
    WITH RESOURCE 'spark0'
    (
        "spark.executor.memory" = "2g",
        "spark.shuffle.compress" = "true"
    )
    PROPERTIES
    (
        "timeout" = "3600"
    );
  • 樣本二:上遊資料來源是Hive表的情況

    1. 建立Hive外部表格。

      CREATE EXTERNAL TABLE hive_t1
      (
          k1 INT,
          K2 SMALLINT,
          k3 varchar(50),
          uuid varchar(100)
      )
      ENGINE=hive
      properties
      (
      "database" = "tmp",
      "table" = "t1",
      "hive.metastore.uris" = "thrift://0.0.0.0:8080"
      );
                                          
    2. 提交Load命令,要求匯入的Doris表中的列必須在Hive外部表格中存在。

      LOAD LABEL db1.label1
      (
          DATA FROM TABLE hive_t1
          INTO TABLE tbl1
          SET
          (
              uuid=bitmap_dict(uuid)
          )
      )
      WITH RESOURCE 'spark0'
      (
          "spark.executor.memory" = "2g",
          "spark.shuffle.compress" = "true"
      )
      PROPERTIES
      (
          "timeout" = "3600"
      );
  • 樣本三:上遊資料來源是Hive Binary類型情況

    1. 建立Hive外部表格。

      CREATE EXTERNAL TABLE hive_t1
      (
          k1 INT,
          K2 SMALLINT,
          k3 varchar(50),
          uuid varchar(100) //hive中的類型為binary
      )
      ENGINE=hive
      properties
      (
      "database" = "tmp",
      "table" = "t1",
      "hive.metastore.uris" = "thrift://0.0.0.0:8080"
      );
    2. 提交Load命令,要求匯入的Doris表中的列必須在Hive外部表格中存在。

      LOAD LABEL db1.label1
      (
          DATA FROM TABLE hive_t1
          INTO TABLE tbl1
          SET
          (
              uuid=binary_bitmap(uuid)
          )
      )
      WITH RESOURCE 'spark0'
      (
          "spark.executor.memory" = "2g",
          "spark.shuffle.compress" = "true"
      )
      PROPERTIES
      (
          "timeout" = "3600"
      );
  • 樣本四:匯入Hive分區表的資料

    • Hive建表語句

      create table test_partition(
          id int,
          name string,
          age int
      )
      partitioned by (dt string)
      row format delimited fields terminated by ','
      stored as textfile;
                                          
    • Doris建表語句

      CREATE TABLE IF NOT EXISTS test_partition_04
      (
          dt date,
          id int,
          name string,
          age int
      )
      UNIQUE KEY(`dt`, `id`)
      DISTRIBUTED BY HASH(`id`) BUCKETS 1
      PROPERTIES (
          "replication_allocation" = "tag.location.default: 1"
      );
    • Spark Load語句

      CREATE EXTERNAL RESOURCE "spark_resource"
      PROPERTIES
      (
      "type" = "spark",
      "spark.master" = "yarn",
      "spark.submit.deployMode" = "cluster",
      "spark.executor.memory" = "1g",
      "spark.yarn.queue" = "default",
      "spark.hadoop.yarn.resourcemanager.address" = "localhost:50056",
      "spark.hadoop.fs.defaultFS" = "hdfs://localhost:9000",
      "working_dir" = "hdfs://localhost:9000/tmp/doris",
      "broker" = "broker_01"
      );
      LOAD LABEL demo.test_hive_partition_table_18
      (
          DATA INFILE("hdfs://localhost:9000/user/hive/warehouse/demo.db/test/dt=2022-08-01/*")
          INTO TABLE test_partition_04
          COLUMNS TERMINATED BY ","
          FORMAT AS "csv"
          (id,name,age)
          COLUMNS FROM PATH AS (`dt`)
          SET
          (
              dt=dt,
              id=id,
              name=name,
              age=age
          )
      )
      WITH RESOURCE 'spark_resource'
      (
          "spark.executor.memory" = "1g",
          "spark.shuffle.compress" = "true"
      )
      PROPERTIES
      (
          "timeout" = "3600"
      );

參數和相關匯入說明

  • 資料描述類參數

    目前支援的資料來源有CSV和Hive table。其他規則與Broker Load一致。

  • 匯入作業參數

    匯入作業參數主要指的是Spark Load建立匯入語句中的屬於opt_properties部分的參數。匯入作業參數是作用於整個匯入作業的。規則與Broker Load一致。

  • Spark資源參數

    Spark資源需要提前配置到Doris系統中並且賦予您USAGE_PRIV許可權後才能使用Spark Load。

    當您有臨時性的需求,比如增加任務使用的資源而修改Spark configs,可參考如下樣本進行設定。

    WITH RESOURCE 'spark0'
    (
      "spark.driver.memory" = "1g",
      "spark.executor.memory" = "3g"
    )
    說明

    設定僅對本次任務生效,並不影響Doris叢集中已有的配置。

  • Hive表作為資料來源匯入

    目前如果期望在匯入流程中將Hive表作為資料來源,那麼需要先建立一張類型為Hive的外部表格,然後提交匯入命令時指定外部表格的表名即可。

  • 匯入流程構建全域字典

    適用於Doris表彙總列的資料類型為Bitmap類型。在load命令中指定需要構建全域字典的欄位即可,格式為:Doris欄位名稱=bitmap_dict(Hive表欄位名稱)。

    重要

    目前只有在上遊資料來源為Hive表時才支援全域字典的構建。

  • Hive Binary(bitmap)類型列的匯入

    適用於Doris表彙總列的資料類型為Bitmap類型,且資料來源Hive表中對應列的資料類型為binary(通過FE中spark-dpp中的org.apache.doris.load.loadv2.dpp.BitmapValue類序列化)類型。

    全域字典,在load命令中指定相應欄位即可,格式為:Doris欄位名稱=binary_bitmap(Hive表欄位名稱)。

    重要

    目前只有在上遊資料來源為Hive表時才支援Binary(Bitmap)類型的資料匯入。

查看匯入

Spark Load匯入方式同Broker Load一樣都是非同步,所以您必須將建立匯入的Label記錄,並且在查看匯入命令中使用Label來查看匯入結果。查看匯入命令在所有匯入方式中是通用的,具體文法可執行HELP SHOW LOAD查看,查看匯入樣本如下。

show load order by createtime desc limit 1\G

返回如下資訊。

*************************** 1. row ***************************
         JobId: 76391
         Label: label1
         State: FINISHED
      Progress: ETL:100%; LOAD:100%
          Type: SPARK
       EtlInfo: unselected.rows=4; dpp.abnorm.ALL=15; dpp.norm.ALL=28133376
      TaskInfo: cluster:cluster0; timeout(s):10800; max_filter_ratio:5.0E-5
      ErrorMsg: N/A
    CreateTime: 2019-07-27 11:46:42
  EtlStartTime: 2019-07-27 11:46:44
 EtlFinishTime: 2019-07-27 11:49:44
 LoadStartTime: 2019-07-27 11:49:44
LoadFinishTime: 2019-07-27 11:50:16
           URL: http://1.1.*.*:80**/proxy/application_15866****3848_0035/
    JobDetails: {"ScannedRows":28133395,"TaskNumber":1,"FileNumber":1,"FileSize":200000}

返回結果集中參數意義可以參見Broker Load。不同點如下:

  • State匯入任務當前所處的階段。任務提交之後狀態為PENDING,提交Spark ETL之後狀態變為ETL,ETL完成之後FE調度BE執行push操作狀態變為LOADING,push完成並且版本生效後狀態變為FINISHED。

    匯入任務的最終階段有兩個:CANCELLED和FINISHED。當Load job處於這兩個階段時匯入完成。其中CANCELLED為匯入失敗,FINISHED為匯入成功。

  • Progress匯入任務的進度描述。分為兩種進度:ETL和LOAD,對應了匯入流程的兩個階段ETL和LOADING。

    LOAD進度=當前已完成所有replica匯入的tablet個數/本次匯入任務的總tablet個數*100%   

    LOAD的進度範圍為0~100%。如果所有匯入表均完成匯入,此時LOAD的進度為99%,表示匯入進入到最後生效階段,整個匯入完成後,LOAD的進度才會改為100%。

    說明

    匯入進度並不是線性,所以如果一段時間內進度沒有變化,並不代表匯入沒有在執行。

  • Type匯入任務的類型。Spark Load為SPARK。

  • 以下參數值含義:

    • CreateTime:匯入建立的時間。

    • EtlStartTime:ETL階段開始的時間。

    • EtlFinishTime:ETL階段完成的時間。

    • LoadStartTime:LOADING階段開始的時間。

    • LoadFinishTime:整個匯入任務完成的時間。

  • JobDetails顯示一些作業的詳細運行狀態,ETL結束的時候更新。包括匯入檔案的個數、總大小(位元組)、子任務個數、已處理的原始行數等,如{"ScannedRows":139264,"TaskNumber":1,"FileNumber":1,"FileSize":940754064}

  • URL可複製輸入到瀏覽器,跳轉至相應Application的Web介面。

查看Spark launcher提交日誌

Spark任務提交過程中產生的詳細日誌,預設儲存在FE根目錄下log/spark_launcher_log路徑下,並以spark_launcher_{load_job_id}_{label}.log命名。

說明

日誌會在此目錄下儲存一段時間,當FE中繼資料中的匯入資訊被清理時,相應的日誌也會被清理,預設儲存時間為3天。

取消匯入

當Spark Load作業狀態不為CANCELLED或FINISHED時,您可以手動取消。取消時需要指定待取消匯入任務的Label。取消匯入命令文法可執行HELP CANCEL LOAD查看。

相關係統配置

下面配置屬於Spark Load的系統層級配置,即作用於所有Spark Load匯入任務的配置。主要通過修改fe.conf來調整配置值。

  • enable_spark_load:開啟Spark Load和建立Resource功能。預設為False,關閉此功能。

  • spark_load_default_timeout_second:任務預設逾時時間為259200秒(3天)。

  • spark_home_default_dirspark:用戶端路徑fe/lib/spark2x

  • spark_resource_path:打包好的Spark依賴檔案路徑(預設為空白)。

  • spark_launcher_log_dirspark:用戶端的提交日誌存放的目錄fe/log/spark_launcher_log

  • yarn_client_pathyarn:二進位可執行檔路徑fe/lib/yarn-client/hadoop/bin/yarn

  • yarn_config_diryarn:設定檔產生路徑fe/lib/yarn-config