全部產品
Search
文件中心

E-MapReduce:常見問題

更新時間:Feb 22, 2025

本文匯總了Spark使用時的常見問題。

在哪裡可以查看Spark歷史作業?

您可以在EMR控制台目的地組群的訪問連結與連接埠頁簽,單擊Spark UI連結,即可查看到Spark歷史作業運行資訊。訪問UI詳情,請參見通過控制台訪問開源組件Web介面

是否支援Standalone模式提交Spark作業?

不支援。E-MapReduce支援使用Spark on YARN以及Spark on Kubernetes模式提交作業,不支援Standalone和Mesos模式。

如何減少Spark2命令列工具的日誌輸出?

EMR DataLake叢集選擇Spark2服務後,使用spark-sql和spark-shell等命令列工具時預設輸出INFO層級日誌,如果想減少日誌輸出,可以修改log4j記錄層級。具體操作如下:

  1. 在運行命令列工具的節點(例如,master節點)建立一個log4j.properties設定檔,也可以從預設設定檔複製,複製命令如下所示。

    cp /etc/emr/spark-conf/log4j.properties /new/path/to/log4j.properties
  2. 修改新設定檔的記錄層級。

    log4j.rootCategory=WARN, console
  3. 修改Spark服務spark-defaults.conf設定檔中的配置項spark.driver.extraJavaOptions,將參數值中的-Dlog4j.configuration=file:/etc/emr/spark-conf/log4j.properties替換為-Dlog4j.configuration=file:/new/path/to/log4j.properties

    重要

    路徑需要添加file:首碼。

如何使用Spark3的小檔案合并功能?

您可以通過設定參數spark.sql.adaptive.merge.output.small.files.enabled為true,來自動合并小檔案。由於合并後的檔案會壓縮,如果您覺得合并後的檔案太小,可以適當調大參數spark.sql.adaptive.advisoryOutputFileSizeInBytes的值,預設值為256 MB。

如何處理SparkSQL資料扭曲?

  • 針對Spark2,處理方式如下:

    • 讀取表時過濾無關資料,例如null。

    • 廣播小表(Broadcast)。

      select /*+ BROADCAST (table1) */ * from table1 join table2 on table1.id = table2.id
    • 根據傾斜key,分離傾斜資料。

      select * from table1_1 join table2 on table1_1.id = table2.id
      union all
      select /*+ BROADCAST (table1_2) */ * from table1_2 join table2 on table1_2.id = table2.id
    • 傾斜key已知時,打散資料。

      select id, value, concat(id, (rand() * 10000) % 3) as new_id from A
      select id, value, concat(id, suffix) as new_id
      from ( 
      select id, value, suffix
       from B Lateral View explode(array(0, 1, 2)) tmp as suffix)
    • 傾斜key未知時,打散資料。

      select t1.id, t1.id_rand, t2.name
      from (
      select id ,
      case when id = null then concat(‘SkewData_’, cast(rand() as string))
      else id end as id_rand
      from test1
      where statis_date = ‘20221130’) t1
      left join test2 t2
      on t1.id_rand = t2.id
  • 針對Spark3,可以在EMR控制台Spark3服務的配置頁簽,修改spark.sql.adaptive.enabledspark.sql.adaptive.skewJoin.enabled的參數值為true。

如何指定PySpark使用Python 3版本?

下面內容以可選服務為Spark2,EMR-5.7.0版本的DataLake叢集為例,介紹如何指定PySpark使用Python 3版本。

您可以通過以下兩種方式修改Python的版本:

臨時生效方式

  1. 通過SSH方式登入叢集,詳情請參見登入叢集

  2. 執行以下命令,修改Python的版本。

    export PYSPARK_PYTHON=/usr/bin/python3
  3. 執行以下命令,查看Python的版本。

    pyspark

    當返回資訊中包含如下資訊時,表示已修改Python版本為Python 3。

    Using Python version 3.6.8

永久生效方式

  1. 通過SSH方式登入叢集,詳情請參見登入叢集

  2. 修改設定檔。

    1. 執行以下命令,開啟檔案profile

      vi /etc/profile
    2. 按下i鍵進入編輯模式。

    3. profile檔案末尾添加以下資訊,以修改Python的版本。

      export PYSPARK_PYTHON=/usr/bin/python3

      export

    4. 按下Esc鍵退出編輯模式,輸入:wq儲存並關閉檔案。

  3. 執行以下命令,重新執行剛修改的設定檔,使之立即生效。

    source /etc/profile
  4. 執行以下命令,查看Python的版本。

    pyspark

    當返回資訊中包含如下資訊時,表示已修改Python版本為Python 3。

    Using Python version 3.6.8

為什麼Spark Streaming作業運行一段時間後無故結束?

  • 首先檢查Spark版本是否是1.6之前版本,如果是的話,請更新Spark版本。

    Spark 1.6之前版本存在記憶體流失的問題,會導致Container被中止掉。

  • 檢查自己的代碼在記憶體使用量上有沒有做好最佳化。

為什麼Spark Streaming作業已經結束,但是E-MapReduce控制台顯示作業狀態還處於“運行中”?

檢查作業提交方式是否為Yarn-Client模式,因為E-MapReduce對Yarn-Client模式的Spark Streaming作業的狀態監控存在問題,所以請修改為Yarn-Cluster模式。

為什麼在啟用了Kerberos的EMR叢集中,使用YARN-Cluster模式執行spark-submit時會報錯java.lang.ClassNotFoundException

具體報錯資訊如下圖。

image

原因分析:EMR叢集開啟Kerberos之後,在YARN-Cluster模式下,Driver的classpath不會自動擴充以包含指定目錄中的JAR檔案,從而導致執行Spark任務時報錯。

解決方案:在EMR叢集開啟Kerberos之後,使用spark-submit在YARN-Cluster模式下提交任務時,需要增加--jars參數。除了使用者程式本身依賴的JAR包外,還需要添加/opt/apps/METASTORE/metastore-current/hive2目錄中的所有JAR包。

重要

在YARN-Cluster模式下,--jars參數中的所有依賴必須以“,”分隔,不支援目錄形式。

例如,如果您的應用程式JAR包為 /opt/apps/SPARK3/spark3-current/examples/jars/spark-examples_2.12-3.5.3-emr.jar,則 spark-submit 命令如下:

spark-submit --deploy-mode cluster --class org.apache.spark.examples.SparkPi --master yarn \
--jars $(ls /opt/apps/METASTORE/metastore-current/hive2/*.jar | tr '\n' ',') \
/opt/apps/SPARK3/spark3-current/examples/jars/spark-examples_2.12-3.5.3-emr.jar