Spark on MaxCompute lets you query data in existing Hadoop (Hive/E-MapReduce) and Data Lake Formation (DLF) with Object Storage Service (OSS) data sources without migrating your data or rewriting your Spark jobs.
When to use this approach
Use Spark on MaxCompute to access external data sources when:
Your Spark jobs already run against a Hive metastore on E-MapReduce (EMR) or an OSS-backed DLF catalog, and you want to switch to MaxCompute as the compute engine without moving the data.
You need to query existing partitioned or non-partitioned tables during a migration transition period.
If you plan to fully migrate your data into MaxCompute, use MaxCompute SQL or the standard MaxCompute data import tools instead.
Prerequisites
Before you begin, make sure you have:
An external project in MaxCompute mapped to your Hive database (for Hadoop sources) or your DLF database (for DLF + OSS sources)
The Spark on MaxCompute runtime configured and accessible
The
spark-2.4.5-odps0.34.0Spark version available in your environment
Key concepts
External project: A MaxCompute project that maps to an external metadata catalog (Hive or DLF) rather than storing data in MaxCompute itself. Spark on MaxCompute queries data through this mapping at runtime.
External table access: Access to tables defined in an external project. Both external table access and external project access are disabled by default and must be explicitly enabled.
Configuration parameters
Set these parameters in your Spark configuration before running your job.
Required parameters (all scenarios)
| Parameter | Default | Description |
|---|---|---|
spark.sql.odps.enableExternalTable | false | Enables access to external tables. Set to true. |
spark.sql.odps.enableExternalProject | false | Enables access to external projects. Set to true. |
spark.hadoop.odps.spark.version | — | Specifies the Spark version. Set to spark-2.4.5-odps0.34.0. |
Additional parameters for DLF + OSS scenarios
| Parameter | Default | Description |
|---|---|---|
spark.hadoop.odps.oss.location.uri.style | — | Set to emr if the OSS directory containing data files was created by EMR. |
spark.hadoop.odps.oss.endpoint | — | OSS endpoint for the region where your data files are stored (for example, oss-cn-shanghai-internal.aliyuncs.com). |
spark.hadoop.odps.region.id | — | Region ID (for example, cn-shanghai). |
spark.hadoop.odps.oss.region.default | — | Default OSS region (for example, cn-shanghai). |
SparkSession parameters
| Parameter | Example value | Description |
|---|---|---|
spark.sql.broadcastTimeout | 20 * 60 | Broadcast join timeout in seconds. Default is 300 seconds (5 minutes). The example sets it to 1,200 seconds. |
odps.exec.dynamic.partition.mode | nonstrict | Partition mode. In nonstrict mode, all partitions can be dynamic. In strict mode, at least one static partition is required. |
oss.endpoint | oss-cn-shanghai-internal.aliyuncs.com | OSS endpoint set directly in SparkSession config. |
Access an external project based on a Hadoop data source
The external project hadoop_external_project maps to a Hive database on E-MapReduce. The following examples read from a non-partitioned table (testtbl) and a partitioned table (testtbl_par).
Query with MaxCompute SQL
-- Read from a non-partitioned table
SELECT * FROM hadoop_external_project.testtbl;
-- Read from a partitioned table
SELECT * FROM hadoop_external_project.testtbl_par WHERE b='20220914';Query with Spark on MaxCompute
Add the following parameters to your Spark configuration:
spark.sql.odps.enableExternalTable=true
spark.sql.odps.enableExternalProject=true
spark.hadoop.odps.spark.version=spark-2.4.5-odps0.34.0Then run the following Scala code:
import org.apache.spark.sql.SparkSession
object external_Project_ReadTableHadoop {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("external_TableL-on-MaxCompute")
// Broadcast join timeout. Default is 300 seconds.
.config("spark.sql.broadcastTimeout", 20 * 60)
// nonstrict: all partitions can be dynamic. strict: at least one static partition required.
.config("odps.exec.dynamic.partition.mode", "nonstrict")
.config("oss.endpoint", "oss-cn-shanghai-internal.aliyuncs.com")
.getOrCreate()
// List tables in the external project
print("=====show tables in hadoop_external_project6=====")
spark.sql("show tables in hadoop_external_project6").show()
// Read from a non-partitioned table
print("===============hadoop_external_project6.testtbl;================")
spark.sql("desc extended hadoop_external_project6.testtbl").show()
print("===============hadoop_external_project6.testtbl;================")
spark.sql("SELECT * from hadoop_external_project6.testtbl").show()
// Read from a partitioned table
print("===============hadoop_external_project6.testtbl_par;================")
spark.sql("desc extended hadoop_external_project6.testtbl_par").show()
print("===============hadoop_external_project6.testtbl;================")
spark.sql("SELECT * from hadoop_external_project6.testtbl_par where b='20220914'").show()
}
}Access an external project based on DLF and OSS
The external project ext_dlf_0713 maps to a Data Lake Formation (DLF) database backed by OSS. The following example reads from a non-partitioned table (tbl_oss1).
Query with MaxCompute SQL
-- Read from a non-partitioned table in the DLF-backed external project
SELECT * FROM ext_dlf_0713.tbl_oss1;Query with Spark on MaxCompute
Add the following parameters to your Spark configuration. The OSS endpoint and region parameters are required for DLF + OSS scenarios.
spark.sql.odps.enableExternalTable=true
spark.sql.odps.enableExternalProject=true
spark.hadoop.odps.spark.version=spark-2.4.5-odps0.34.0
# Add this parameter only if the OSS directory was created by EMR
spark.hadoop.odps.oss.location.uri.style=emr
spark.hadoop.odps.oss.endpoint=oss-cn-shanghai-internal.aliyuncs.com
spark.hadoop.odps.region.id=cn-shanghai
spark.hadoop.odps.oss.region.default=cn-shanghaiThen run the following Scala code:
import org.apache.spark.sql.{SaveMode, SparkSession}
object external_Project_ReadTable {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("external_TableL-on-MaxCompute")
// Broadcast join timeout. Default is 300 seconds.
.config("spark.sql.broadcastTimeout", 20 * 60)
// nonstrict: all partitions can be dynamic. strict: at least one static partition required.
.config("odps.exec.dynamic.partition.mode", "nonstrict")
.config("oss.endpoint", "oss-cn-shanghai-internal.aliyuncs.com")
.getOrCreate()
// List tables in the DLF-backed external project
print("=====show tables in ext_dlf_0713=====")
spark.sql("show tables in ext_dlf_0713").show()
// Read from a non-partitioned table
print("===============ext_dlf_0713.tbl_oss1;================")
spark.sql("desc extended ext_dlf_0713.tbl_oss1").show()
print("===============ext_dlf_0713.tbl_oss1;================")
spark.sql("SELECT * from ext_dlf_0713.tbl_oss1").show()
}
}