Alibaba Cloud E-MapReduce (EMR) runs on Alibaba Cloud Elastic Compute Service (ECS) instances and provides enhanced features on top of open source Apache Hadoop and Apache Spark. This topic describes the enhanced features of Spark in EMR.

Background information

Open source components are used in EMR. New EMR versions are released with the updates of open source components. EMR provides an enhanced deployment environment on Alibaba Cloud for open source components.

Enhanced features

The following table describes the enhanced features of Spark in EMR.

EMR version Spark version Enhanced features
EMR V5.2.1 Spark 3.1.1
  • Delta Lake and Hudi are supported.
  • Remote Shuffle Service is supported.
  • Livy is supported.
  • In the EMR console, the parameter names on the spark-defaults tab of the Configure tab for the Spark service are optimized.
  • The cost-based optimization (CBO), dynamic partition pruning, and Z-order features are optimized. The performance of these features is 50% higher than in Spark 3.
  • Log Service, DataHub, and Message Queue for Apache RocketMQ can be used as data sources.
EMR V4.9.0 Spark 2.4.7
  • The issue that adaptive execution does not take effect in some scenarios is fixed.
  • The issue that statistical aggregate functions are used in different manners in Spark and Hive is fixed.
  • The issue that Spark cannot read valid data of the CHAR type from a Hive ORC table is fixed.
EMR V4.8.0 Spark 2.4.7
  • Some default configurations are optimized.
  • Performance is optimized. Window-based top-k queries can be pushed down.
  • The capability of reading data from and writing data to Hive tables in the CSV or JSON format is enhanced.
  • All the column names of a table can be omitted in the ANALYZE statement.
  • LDAP authentication can be enabled or disabled with a click.

    For more information, see Manage LDAP authentication.

  • Spark Beeline is easier to use.
EMR V4.6.0 Spark 2.4.7
  • Spark is updated to 2.4.7.
  • jQuery is updated to 3.5.1.
  • Spark is compatible with Hive to automatically update table and partition sizes.
  • Spark metadata and job running information can be sent to DataWorks.
EMR V4.5.0 Spark 2.4.5 Metadata stored in Alibaba Cloud Data Lake Formation (DLF) is supported.
EMR V4.4.0 Spark 2.4.5
  • A Java agent is added for the Thrift Server process.
  • The NOT IN subquery performance of Spark SQL is optimized.
  • The following issue is fixed: A stack overflow occurs when a large number of empty files are read.
  • Explicit dependencies on Has are removed by using code or Has dependencies are updated.
EMR V4.3.0 Spark 2.4.5
  • Spark is updated to 2.4.5.
  • The associated Delta Lake is updated to 0.6.0.
  • The issue that PySpark cannot properly run after Ranger Hive is enabled is fixed.
EMR V3.36.1 Spark 2.4.7
  • In the EMR console, the parameter names on the spark-defaults tab of the Configure tab for the Spark service are optimized.
  • The performance of log collection is optimized.
  • The data compression algorithm Zstandard is supported.
EMR V3.35.0 Spark 2.4.7
  • The issue that adaptive execution does not take effect in some scenarios is fixed.
  • The issue that statistical aggregate functions are used in different manners in Spark and Hive is fixed.
  • The issue that Spark cannot read valid data of the CHAR type from a Hive ORC table is fixed.
EMR V3.34.0 Spark 2.4.7
  • Some default configurations are optimized.
  • Performance is optimized. Window-based top-k queries can be pushed down.
  • The capability of reading data from and writing data to Hive tables in the CSV or JSON format is enhanced.
  • All the column names of a table can be omitted in the ANALYZE statement.
  • LDAP authentication can be enabled or disabled with a click.

    For more information, see Manage LDAP authentication.

  • Spark Beeline is easier to use.
EMR V3.33.0 Spark 2.4.7
  • Spark is updated to 2.4.7.
  • jQuery is updated to 3.5.1.
  • Spark is compatible with Hive to automatically update table and partition sizes.
  • Spark metadata and job running information can be sent to DataWorks.
EMR V3.32.0 Spark 2.4.5 The data collection feature of JindoTable can be enabled or disabled.
EMR V3.30.0 Spark 2.4.5
  • Metadata from Alibaba Cloud Data Lake Formation (DLF) is supported.
  • Has dependencies are updated to 2.0.1.
  • Issues caused by backticks (`) in Streaming SQL are fixed.
  • The JAR package of Delta is removed. Delta is separately deployed.
  • Logs are stored in HDFS directories.
EMR V3.29.0 Spark 2.4.5
  • Spark is updated to 2.4.5.2.0.
  • A third-party metastore is supported.
  • The datalake metastore-client parameter is added.
EMR V3.28.0 Spark 2.4.5
  • Spark is updated to 2.4.5.
  • Spark is compatible with the Streaming SQL scripts of DataFactory.
  • Delta 0.6.0 is supported.
EMR V3.27.2 Spark 2.4.3
  • Partition fields of the date type are supported in the cube.
  • The stack depth in the spark-submit scripts is increased.
EMR V3.26.3 Spark 2.4.3 JindoOssCommitter is used as the default committer.
EMR V3.25.0 Spark 2.4.3
  • Parameters related to Delta, such as spark.sql.extensions, can be configured in the EMR console.
  • Data from a Delta table can be read by using Hive to avoid manual configuration of InputFormat.
  • The ALTER TABLE SET TBLPROPERTIES and UNSET TBLPROPERTIES statements are supported.
EMR V3.24.0 Spark 2.4.3
  • Parameters related to Delta are supported.
  • The Spark plug-in can be configured in Ranger.
  • JindoCube is updated to 0.3.0.
EMR V3.23.0 Spark 2.4.3
  • Updated the code for Spark SQL Thrift Server to fix the issue where IsolatedClassLoader cannot load classes in certain cases.
  • Refactored the code related to Spark transactions to improve stability.
  • Fixed the issue where optimized row columnar (ORC) files cannot be read or written after the built-in Hive is upgraded to version 2.3.
  • Supports the MERGE INTO syntax.
  • Supports the SCAN and STREAM syntaxes.
  • Supports exactly-once semantics (EOS) for the Structured Streaming Kafka sink.
  • Upgraded Delta to version 0.4.0.
EMR V3.22.1 Spark 2.4.3
  • Relational cache

    Supports using a relational cache to accelerate data queries through pre-computing. You can create a relational cache to pre-compute data. During a data query, Spark Optimizer automatically detects an appropriate relational cache, optimizes the SQL execution plan, and continues data computing based on the relational cache. This accelerates data queries. For example, you can use relational caches to implement multidimensional online analytical processing (MOLAP), generate data reports, create data dashboards, and synchronize data across clusters.

    • Supports using DDL to perform operations such as CACHE, UNCACHE, ALTER, and SHOW. A relational cache supports all data sources and data formats of Spark.
    • Supports updating caches automatically or by using the REFRESH command. Supports incremental caching based on the specified partitions.
    • Supports optimizing the SQL execution plan based on a relational cache.
  • Streaming SQL
    • Normalized the parameter settings of Stream Query Writer.
    • Optimized the schema compatibility check of Kafka data tables.
    • Supports automatically registering a schema with Schema Registry for a Kafka data table that does not have a schema.
    • Optimized log information recorded when a Kafka schema is incompatible.
    • Fixed the issue where the column name must be explicitly specified when the query result is written to a Kafka data table.
    • Removed the restriction that streaming SQL queries only support the Kafka and LogHub data sources.
  • Delta

    Added the Delta component. You can use Spark to create a Delta data source to perform streaming data writing, transactional reading and writing, data verification, and data backtracking. For more information, see Delta details.

    • You can call the DataFrame API to read data from or write data to Delta.
    • You can call the Structured Streaming API to read or write data by using Delta as the data source or sink.
    • You can call the Delta API to update, delete, merge, vacuum, and optimize data.
    • You can use SQL statements to create Delta tables, import data to Delta, and read data from Delta tables.
  • Others
    • Supports primary keys and foreign keys. This is a constraint feature.
    • Resolved JAR conflicts such as the servlet conflict.