This page answers common questions about running Spark on E-MapReduce (EMR).
Spark Core
Spark SQL
PySpark
Spark Streaming
spark-submit
How do I view historical Spark job information?
Go to the EMR console and open the Access Links and Ports tab for your cluster. Find the Spark service and click its URL to open the Spark web UI. For details on accessing component web UIs, see Access the web UIs of open source components in the EMR console.
Can I submit Spark jobs in standalone mode?
No. EMR supports Spark on YARN mode and Spark on Kubernetes mode. Standalone mode and Mesos mode are not supported.
How do I reduce the log output of Spark 2 CLI tools?
Cause: By default, Spark 2 on an EMR DataLake cluster generates INFO-level logs when you use CLI tools like spark-sql and spark-shell.
Resolution: Change the log4j log level.
-
Create a
log4j.propertiesfile on the node where you run the CLI tools (typically the master node). To start from the existing configuration, copy the default file:cp /etc/emr/spark-conf/log4j.properties /new/path/to/log4j.properties -
Open the new file and set the root log level to WARN:
log4j.rootCategory=WARN, console -
In
spark-defaults.conf, update thespark.driver.extraJavaOptionsproperty. Replace-Dlog4j.configuration=file:/etc/emr/spark-conf/log4j.propertieswith the path to your new file:-Dlog4j.configuration=file:/new/path/to/log4j.properties
The path must include the file: prefix.
How do I enable small file merging in Spark 3?
Set spark.sql.adaptive.merge.output.small.files.enabled to true. Spark compresses and merges small output files, with a default maximum merged file size of 256 MB. If 256 MB is too small for your workload, increase spark.sql.adaptive.advisoryOutputFileSizeInBytes.
How do I handle data skew in Spark SQL?
The approach depends on your Spark version.
Spark 3
Resolution: Enable Adaptive Query Execution (AQE) skew join handling via the EMR console. Go to the Configure tab of the Spark service and set both parameters to true:
-
spark.sql.adaptive.enabled -
spark.sql.adaptive.skewJoin.enabled
Spark 2
Cause: AQE is not available in Spark 2. You must handle data skew manually using one of the following approaches.
Resolution: Choose the approach that matches your data and skew pattern.
Filter out null or irrelevant data when reading from the table. Removing skewed null keys before the join reduces the data imbalance.
Broadcast the smaller table using a query hint:
SELECT /*+ BROADCAST(table1) */ *
FROM table1
JOIN table2 ON table1.id = table2.id
Split skewed data by a known skewed key and handle the skewed partition separately with a broadcast join:
SELECT * FROM table1_1 JOIN table2 ON table1_1.id = table2.id
UNION ALL
SELECT /*+ BROADCAST(table1_2) */ * FROM table1_2 JOIN table2 ON table1_2.id = table2.id
Scatter skewed data with a known skewed key by adding a random suffix to the key, then expand the other table to match:
SELECT id, value, CONCAT(id, (RAND() * 10000) % 3) AS new_id FROM A;
SELECT id, value, CONCAT(id, suffix) AS new_id
FROM (
SELECT id, value, suffix
FROM B LATERAL VIEW EXPLODE(ARRAY(0, 1, 2)) tmp AS suffix
);
Scatter skewed data without a known skewed key by assigning a random prefix to null keys:
SELECT t1.id, t1.id_rand, t2.name
FROM (
SELECT id,
CASE WHEN id = null THEN CONCAT('SkewData_', CAST(RAND() AS STRING))
ELSE id END AS id_rand
FROM test1
WHERE statis_date = '20221130'
) t1
LEFT JOIN test2 t2 ON t1.id_rand = t2.id
How do I use Python 3 in PySpark jobs?
The following steps use Spark 2.x on an EMR V5.7.0 DataLake cluster as an example.
Temporarily switch to Python 3
This change applies only to the current session.
-
Log on to the cluster via SSH. For details, see Log on to a cluster.
-
Set the Python version for PySpark:
export PYSPARK_PYTHON=/usr/bin/python3 -
Verify the change by starting PySpark:
pysparkThe output should include:
Using Python version 3.6.8
Permanently switch to Python 3
-
Log on to the cluster via SSH. For details, see Log on to a cluster.
-
Open
/etc/profilefor editing:vi /etc/profile -
Press
Ito enter insert mode, then add the following line at the end of the file:export PYSPARK_PYTHON=/usr/bin/python3
-
Press
Esc, then enter:wqto save and close the file. -
Reload the profile to apply the change:
source /etc/profile -
Verify the change by starting PySpark:
pysparkThe output should include:
Using Python version 3.6.8
Why does a Spark Streaming job unexpectedly stop?
Check the following causes:
-
Spark version earlier than 1.6: Spark versions before 1.6 have a memory leak bug that can cause YARN containers to stop unexpectedly. Upgrade to a later Spark version.
-
Memory inefficiency in application code: Review your application code for memory issues that could exhaust container memory under sustained load.
Why does a stopped Spark Streaming job still show as Running in the EMR console?
Cause: This is a known monitoring issue with Spark Streaming jobs submitted in yarn-client mode. The EMR console does not reliably detect the stopped state for yarn-client jobs.
Resolution: Submit your Spark Streaming jobs in yarn-cluster mode instead of yarn-client mode.
How do I fix java.lang.ClassNotFoundException when running spark-submit in YARN-cluster mode with Kerberos authentication?
Cause: In YARN-cluster mode with Kerberos authentication enabled, the Spark driver's classpath is not automatically extended to include JAR files from specific directories. As a result, classes from the Hive metastore directory are not found at runtime.
Resolution: Add the --jars parameter to include the JAR files from /opt/apps/METASTORE/metastore-current/hive2/ in your spark-submit command.
In YARN-cluster mode, JAR files passed to --jars must be separated by commas (,).
For example, if your Spark JAR is /opt/apps/SPARK3/spark3-current/examples/jars/spark-examples_2.12-3.5.3-emr.jar, run:
spark-submit --deploy-mode cluster --class org.apache.spark.examples.SparkPi --master yarn \
--jars $(ls /opt/apps/METASTORE/metastore-current/hive2/*.jar | tr '\n' ',') \
/opt/apps/SPARK3/spark3-current/examples/jars/spark-examples_2.12-3.5.3-emr.jar
What's next
If your issue is not covered here:
-
Search the Alibaba Cloud Community for similar questions.
-
Submit a ticket from the EMR console for technical support.