All Products
Search
Document Center

E-MapReduce:Enhanced features of Spark in EMR

Last Updated:Jun 03, 2024

Alibaba Cloud E-MapReduce (EMR) runs on Alibaba Cloud Elastic Compute Service (ECS) instances and provides enhanced features on top of open source Apache Hadoop and Apache Spark. This topic describes the enhanced features of Spark in EMR.

Background information

Open source components are used in EMR. New EMR versions are released with the updates of open source components. EMR provides an enhanced deployment environment on Alibaba Cloud for open source components.

Enhanced features

The following table describes the enhanced features of Spark in EMR.

EMR V5.X series

EMR version

Spark version

Enhanced features

EMR V5.17.0

Spark 3.4.2

Spark 3 is updated to 3.4.2.

EMR V5.16.0

Spark 3.3.1

Vulnerabilities in the Commons Text library are fixed.

EMR V5.15.1

Spark 3.3.1

Configurations related to jdo are removed from the hive-site.xml configuration file.

EMR V5.12.1

Spark 3.3.1

  • By default, OSS-HDFS is used to store data of Spark History Server.

  • OSS or OSS-HDFS is used to store data of Spark3 Native Engine.

EMR V5.10.0

Spark 3.3.1

Spark is updated to 3.3.1.

EMR V5.9.0

Spark 3.3.0

  • Spark is updated to 3.3.

  • Kerberos authentication is supported.

EMR V5.8.0

Spark 3.2.1

LDAP authentication can be enabled with one click.

EMR V5.6.0

Spark 3.2.1

Spark is updated to 3.2.1.

EMR V5.5.0

Spark 3.2.0

  • An IF expression can be used in the COUNT DISTINCT function, and the CASE WHEN syntax for the COUNT DISTINCT function is optimized.

    To use this feature, set spark.sql.optimizer.rewriteConditionalDistinctAggregates to true.

  • Fallback from Shuffle Hash Join to Sort Merge Join is supported.

    To use this feature, set spark.sql.join.preferSortMergeJoin to false and set spark.sql.join.enableShuffledHashJoinFallback to true.

  • Automatic merging of small files in non-dynamic partitions is supported.

    To use this feature, set spark.sql.adaptive.merge.output.small.files.enabled to true.

  • The concurrency in scenarios in which the GROUPING SETS clause or the DISTINCT function is used can be automatically adjusted.

    To use this feature, set spark.sql.execution.optimizeExpand to true.

  • Hive on Spark is optimized.

  • The syntax of the time travel feature is supported.

  • Spark is adapted to JindoSDK.

EMR V5.4.0

Spark 3.1.2

  • Spark is updated to 3.1.2.

  • In Spark 3.x, the Distinct computing performance is optimized for Spark SQL. The optimization feature is triggered if an aggregation operator contains multiple count(distinct case ... when ...) methods.

  • The array-index out of bounds error that is returned when some required statistics for Adaptive Query Execution (AQE) are missing is fixed.

  • Errors related to AQE and data caching in specific scenarios are fixed.

EMR V5.3.0

Spark 3.1.1

The incompatibility issue between Spark and Delta Lake is fixed.

EMR V5.2.1

Spark 3.1.1

Important

In EMR V5.2.1, Spark (3.1.1) and Kudu (1.11.1) are incompatible with each other.

  • Delta Lake and Hudi are supported.

  • Remote Shuffle Service is supported.

  • Livy is supported.

  • In the EMR console, the parameter names on the spark-defaults tab of the Configure tab for the Spark service are optimized.

  • The cost-based optimization (CBO), dynamic partition pruning, and Z-order features are optimized. The performance of these features is 50% higher than in Spark 3.

  • Log Service, DataHub, and Message Queue for Apache RocketMQ can be used as data sources.

EMR V4.X series

EMR version

Spark version

Enhanced features

EMR V4.10.0

Spark 2.4.8

  • Spark is updated to 2.4.8.

  • The issue that adaptive execution does not take effect in some scenarios is fixed.

  • The issue that statistical aggregate functions are used in different manners in Spark and Hive is fixed.

  • The issue that Spark cannot read valid data of the CHAR type from a Hive ORC table is fixed.

  • The default configurations of Thrift Server are optimized.

  • In the EMR console, the parameter names on the spark-defaults tab of the Configure tab for the Spark service are optimized.

  • Hive on Spark is optimized.

  • The array-index out of bounds error that is returned when some required statistics for Adaptive Query Execution (AQE) are missing is fixed.

  • Errors related to AQE and data caching in specific scenarios are fixed.

  • Log4j Metrics Appender is removed because the configuration is invalid.

  • The null pointer exception that occurs when SparkContext is started is fixed.

  • The data compression algorithm Zstandard is supported.

EMR V4.9.0

Spark 2.4.7

  • The issue that adaptive execution does not take effect in some scenarios is fixed.

  • The issue that statistical aggregate functions are used in different manners in Spark and Hive is fixed.

  • The issue that Spark cannot read valid data of the CHAR type from a Hive ORC table is fixed.

EMR V4.8.0

Spark 2.4.7

  • Some default configurations are optimized.

  • Performance is optimized. Window-based top-k queries can be pushed down.

  • The capability of reading data from and writing data to Hive tables in the CSV or JSON format is enhanced.

  • All the column names of a table can be omitted in the ANALYZE statement.

  • LDAP authentication can be enabled or disabled with a click.

  • Spark Beeline is easier to use.

EMR V4.6.0

Spark 2.4.7

  • Spark is updated to 2.4.7.

  • jQuery is updated to 3.5.1.

  • Spark is compatible with Hive to automatically update table and partition sizes.

  • Spark metadata and job running information can be sent to DataWorks.

EMR V4.5.0

Spark 2.4.5

Metadata stored in Alibaba Cloud DLF is supported.

EMR V4.3.0

Spark 2.4.5

  • Spark is updated to 2.4.5.

  • The associated Delta Lake is updated to 0.6.0.

  • The issue that PySpark cannot properly run after Ranger Hive is enabled is fixed.

EMR V3.X series

EMR version

Spark version

Enhanced features

EMR V3.51.0

Spark 3.4.2

Spark 3 is updated to 3.4.2.

EMR V3.50.0

Spark 3.3.1

Vulnerabilities in the Commons Text library are fixed.

EMR V3.49.0

Spark 3.3.1

Configurations related to jdo are removed from the hive-site.xml configuration file.

EMR V3.46.1

Spark 3.3.1

  • By default, OSS-HDFS is used to store data of Spark History Server.

  • OSS or OSS-HDFS is used to store data of Spark3 Native Engine.

EMR V3.44.0

Spark 3.3.1

Spark is updated to 3.3.1.

EMR V3.43.0

Spark 3.3.0

  • Spark is updated to 3.3.

  • Kerberos authentication is supported.

EMR V3.40.0

Spark 3.2.1

Spark is updated to 3.2.1.

EMR V3.39.1

Spark 2.4.8

  • Hive on Spark is optimized.

  • Spark is adapted to JindoSDK.

EMR V3.38.1

Spark 2.4.8

  • Log4j Metrics Appender is removed because the configuration is invalid.

  • The null pointer exception that occurs when SparkContext is started is fixed.

EMR V3.38.0

Spark 2.4.8

  • Spark is updated to 2.4.8.

  • Both of Spark 2.4.8 and Spark 3.1.2 are supported.

    Note

    Delta and Remote Shuffle Service are not supported in Spark 3.

  • In Spark 3.x, the Distinct computing performance is optimized for Spark SQL. The optimization feature is triggered if an aggregation operator contains multiple count(distinct case ... when ...) methods.

  • The array-index out of bounds error that is returned when some required statistics for Adaptive Query Execution (AQE) are missing is fixed.

  • Errors related to AQE and data caching in specific scenarios are fixed.

EMR V3.37.0

Spark 2.4.7

The incompatibility issue between Spark and Delta Lake is fixed.

EMR V3.36.1

Spark 2.4.7

  • In the EMR console, the parameter names on the spark-defaults tab of the Configure tab for the Spark service are optimized.

  • The performance of log collection is optimized.

  • The data compression algorithm Zstandard is supported.

EMR V3.35.0

Spark 2.4.7

  • The issue that adaptive execution does not take effect in some scenarios is fixed.
  • The issue that statistical aggregate functions are used in different manners in Spark and Hive is fixed.
  • The issue that Spark cannot read valid data of the CHAR type from a Hive ORC table is fixed.

EMR V3.34.0

Spark 2.4.7

  • Some default configurations are optimized.

  • Performance is optimized. Window-based top-k queries can be pushed down.

  • The capability of reading data from and writing data to Hive tables in the CSV or JSON format is enhanced.

  • All the column names of a table can be omitted in the ANALYZE statement.

  • LDAP authentication can be enabled or disabled with a click.

  • Spark Beeline is easier to use.

EMR V3.33.0

Spark 2.4.7

  • Spark is updated to 2.4.7.
  • jQuery is updated to 3.5.1.
  • Spark is compatible with Hive to automatically update table and partition sizes.
  • Spark metadata and job running information can be sent to DataWorks.

EMR V3.32.0

Spark 2.4.5

The data collection feature of JindoTable can be enabled or disabled.

EMR V3.30.0

Spark 2.4.5

  • Metadata from Alibaba Cloud Data Lake Formation (DLF) is supported.
  • Has dependencies are updated to 2.0.1.
  • Issues caused by backticks (`) in Streaming SQL are fixed.
  • The JAR package of Delta is removed. Delta is separately deployed.
  • Logs are stored in HDFS directories.

EMR V3.29.0

Spark 2.4.5

  • Spark is updated to 2.4.5.2.0.

  • A third-party metastore is supported.

  • The datalake metastore-client parameter is added.

EMR V3.28.0

Spark 2.4.5

  • Spark is updated to 2.4.5.
  • Spark is compatible with the Streaming SQL scripts of DataFactory.
  • Delta 0.6.0 is supported.

EMR V3.27.0

Spark 2.4.3

  • Partition fields of the date type are supported in the cube.
  • The stack depth in the spark-submit scripts is increased.

EMR V3.25.0

Spark 2.4.3

  • Parameters related to Delta, such as spark.sql.extensions, can be configured in the EMR console.
  • Data from a Delta table can be read by using Hive to avoid manual configuration of InputFormat.
  • The ALTER TABLE SET TBLPROPERTIES and UNSET TBLPROPERTIES statements are supported.

EMR V3.24.0

Spark 2.4.3

  • Parameters related to Delta are supported.
  • The Spark plug-in can be configured in Ranger.
  • JindoCube is updated to 0.3.0.

EMR V3.23.0

Spark 2.4.3

  • Updated the code for Spark SQL Thrift Server to fix the issue where IsolatedClassLoader cannot load classes in certain cases.
  • Refactored the code related to Spark transactions to improve stability.
  • Fixed the issue where optimized row columnar (ORC) files cannot be read or written after the built-in Hive is upgraded to version 2.3.
  • Supports the MERGE INTO syntax.
  • Supports the SCAN and STREAM syntaxes.
  • Supports exactly-once semantics (EOS) for the Structured Streaming Kafka sink.
  • Upgraded Delta to version 0.4.0.

EMR V3.22.0

Spark 2.4.3

  • Relational cache

    Supports using a relational cache to accelerate data queries through pre-computing. You can create a relational cache to pre-compute data. During a data query, Spark Optimizer automatically detects an appropriate relational cache, optimizes the SQL execution plan, and continues data computing based on the relational cache. This accelerates data queries. For example, you can use relational caches to implement multidimensional online analytical processing (MOLAP), generate data reports, create data dashboards, and synchronize data across clusters.

    • Supports using DDL to perform operations such as CACHE, UNCACHE, ALTER, and SHOW. A relational cache supports all data sources and data formats of Spark.
    • Supports updating caches automatically or by using the REFRESH command. Supports incremental caching based on the specified partitions.
    • Supports optimizing the SQL execution plan based on a relational cache.
  • Streaming SQL
    • Normalized the parameter settings of Stream Query Writer.
    • Optimized the schema compatibility check of Kafka data tables.
    • Supports automatically registering a schema with Schema Registry for a Kafka data table that does not have a schema.
    • Optimized log information recorded when a Kafka schema is incompatible.
    • Fixed the issue where the column name must be explicitly specified when the query result is written to a Kafka data table.
    • Removed the restriction that streaming SQL queries only support the Kafka and LogHub data sources.
  • Delta

    Added the Delta component. You can use Spark to create a Delta data source to perform streaming data writing, transactional reading and writing, data verification, and data backtracking. For more information, see Delta details.

    • You can call the DataFrame API to read data from or write data to Delta.
    • You can call the Structured Streaming API to read or write data by using Delta as the data source or sink.
    • You can call the Delta API to update, delete, merge, vacuum, and optimize data.
    • You can use SQL statements to create Delta tables, import data to Delta, and read data from Delta tables.
  • Others
    • Supports primary keys and foreign keys. This is a constraint feature.
    • Resolved JAR conflicts such as the servlet conflict.