本文匯總了Spark使用時的常見問題。
Spark Core
Spark SQL
PySpark
Spark Streaming
spark-submit
在哪裡可以查看Spark歷史作業?
您可以在EMR控制台目的地組群的訪問連結與連接埠頁簽,單擊Spark UI連結,即可查看到Spark歷史作業運行資訊。訪問UI詳情,請參見通過控制台訪問開源組件Web介面。
是否支援Standalone模式提交Spark作業?
不支援。E-MapReduce支援使用Spark on YARN以及Spark on Kubernetes模式提交作業,不支援Standalone和Mesos模式。
如何減少Spark2命令列工具的日誌輸出?
EMR DataLake叢集選擇Spark2服務後,使用spark-sql和spark-shell等命令列工具時預設輸出INFO層級日誌,如果想減少日誌輸出,可以修改log4j記錄層級。具體操作如下:
在運行命令列工具的節點(例如,master節點)建立一個log4j.properties設定檔,也可以從預設設定檔複製,複製命令如下所示。
cp /etc/emr/spark-conf/log4j.properties /new/path/to/log4j.properties修改新設定檔的記錄層級。
log4j.rootCategory=WARN, console修改Spark服務spark-defaults.conf設定檔中的配置項spark.driver.extraJavaOptions,將參數值中的-Dlog4j.configuration=file:/etc/emr/spark-conf/log4j.properties替換為-Dlog4j.configuration=file:/new/path/to/log4j.properties。
重要路徑需要添加file:首碼。
如何使用Spark3的小檔案合并功能?
您可以通過設定參數spark.sql.adaptive.merge.output.small.files.enabled為true,來自動合并小檔案。由於合并後的檔案會壓縮,如果您覺得合并後的檔案太小,可以適當調大參數spark.sql.adaptive.advisoryOutputFileSizeInBytes的值,預設值為256 MB。
如何處理SparkSQL資料扭曲?
針對Spark2,處理方式如下:
讀取表時過濾無關資料,例如null。
廣播小表(Broadcast)。
select /*+ BROADCAST (table1) */ * from table1 join table2 on table1.id = table2.id根據傾斜key,分離傾斜資料。
select * from table1_1 join table2 on table1_1.id = table2.id union all select /*+ BROADCAST (table1_2) */ * from table1_2 join table2 on table1_2.id = table2.id傾斜key已知時,打散資料。
select id, value, concat(id, (rand() * 10000) % 3) as new_id from A select id, value, concat(id, suffix) as new_id from ( select id, value, suffix from B Lateral View explode(array(0, 1, 2)) tmp as suffix)傾斜key未知時,打散資料。
select t1.id, t1.id_rand, t2.name from ( select id , case when id = null then concat(‘SkewData_’, cast(rand() as string)) else id end as id_rand from test1 where statis_date = ‘20221130’) t1 left join test2 t2 on t1.id_rand = t2.id
針對Spark3,可以在EMR控制台Spark3服務的配置頁簽,修改spark.sql.adaptive.enabled和spark.sql.adaptive.skewJoin.enabled的參數值為true。
如何指定PySpark使用Python 3版本?
下面內容以可選服務為Spark2,EMR-5.7.0版本的DataLake叢集為例,介紹如何指定PySpark使用Python 3版本。
您可以通過以下兩種方式修改Python的版本:
臨時生效方式
通過SSH方式登入叢集,詳情請參見登入叢集。
執行以下命令,修改Python的版本。
export PYSPARK_PYTHON=/usr/bin/python3執行以下命令,查看Python的版本。
pyspark當返回資訊中包含如下資訊時,表示已修改Python版本為Python 3。
Using Python version 3.6.8
永久生效方式
通過SSH方式登入叢集,詳情請參見登入叢集。
修改設定檔。
執行以下命令,開啟檔案profile。
vi /etc/profile按下
i鍵進入編輯模式。在profile檔案末尾添加以下資訊,以修改Python的版本。
export PYSPARK_PYTHON=/usr/bin/python3
按下
Esc鍵退出編輯模式,輸入:wq儲存並關閉檔案。
執行以下命令,重新執行剛修改的設定檔,使之立即生效。
source /etc/profile執行以下命令,查看Python的版本。
pyspark當返回資訊中包含如下資訊時,表示已修改Python版本為Python 3。
Using Python version 3.6.8
為什麼Spark Streaming作業運行一段時間後無故結束?
首先檢查Spark版本是否是1.6之前版本,如果是的話,請更新Spark版本。
Spark 1.6之前版本存在記憶體流失的問題,會導致Container被中止掉。
檢查自己的代碼在記憶體使用量上有沒有做好最佳化。
為什麼Spark Streaming作業已經結束,但是E-MapReduce控制台顯示作業狀態還處於“運行中”?
檢查作業提交方式是否為Yarn-Client模式,因為E-MapReduce對Yarn-Client模式的Spark Streaming作業的狀態監控存在問題,所以請修改為Yarn-Cluster模式。
為什麼在啟用了Kerberos的EMR叢集中,使用YARN-Cluster模式執行spark-submit時會報錯java.lang.ClassNotFoundException?
具體報錯資訊如下圖。

原因分析:EMR叢集開啟Kerberos之後,在YARN-Cluster模式下,Driver的classpath不會自動擴充以包含指定目錄中的JAR檔案,從而導致執行Spark任務時報錯。
解決方案:在EMR叢集開啟Kerberos之後,使用spark-submit在YARN-Cluster模式下提交任務時,需要增加--jars參數。除了使用者程式本身依賴的JAR包外,還需要添加/opt/apps/METASTORE/metastore-current/hive2目錄中的所有JAR包。
在YARN-Cluster模式下,--jars參數中的所有依賴必須以“,”分隔,不支援目錄形式。
例如,如果您的應用程式JAR包為 /opt/apps/SPARK3/spark3-current/examples/jars/spark-examples_2.12-3.5.3-emr.jar,則 spark-submit 命令如下:
spark-submit --deploy-mode cluster --class org.apache.spark.examples.SparkPi --master yarn \
--jars $(ls /opt/apps/METASTORE/metastore-current/hive2/*.jar | tr '\n' ',') \
/opt/apps/SPARK3/spark3-current/examples/jars/spark-examples_2.12-3.5.3-emr.jar