All Products
Search
Document Center

E-MapReduce:Use Spark to access MaxCompute

Last Updated:Mar 26, 2026

Apache Spark on E-MapReduce (EMR) can read data from and write data to MaxCompute using the OdpsOps class.

Prerequisites

Before you begin, make sure you have:

  • An EMR cluster with Spark enabled

  • A MaxCompute project and table

  • An Alibaba Cloud AccessKey ID and AccessKey Secret

Important

Store your credentials as environment variables rather than hardcoding them in your code. Hardcoded credentials can be accidentally exposed in version control or logs.

How it works

OdpsOps is the EMR SDK class for managing MaxCompute data in Spark. To move data between Spark and MaxCompute:

  1. Initialize an OdpsOps object with your credentials and MaxCompute endpoints.

  2. Call readTable to load a MaxCompute table into a Spark RDD (Resilient Distributed Dataset).

  3. Process the data in Spark.

  4. Call saveToTable to write the result RDD back to a MaxCompute table.

Read and write MaxCompute data

Step 1: Initialize OdpsOps

Set your credentials as environment variables, then import the required classes and create an OdpsOps object:

export ALIBABA_CLOUD_ACCESS_KEY_ID=<your-access-key-id>
export ALIBABA_CLOUD_ACCESS_KEY_SECRET=<your-access-key-secret>
import com.aliyun.odps.TableSchema
import com.aliyun.odps.data.Record
import org.apache.spark.aliyun.odps.OdpsOps
import org.apache.spark.{SparkContext, SparkConf}

object Sample {
  def main(args: Array[String]): Unit = {

    // Read credentials from environment variables
    val accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")
    val accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")

    // Internal network endpoints for MaxCompute
    val odpsEndpoint = "http://odps-ext.aliyun-inc.com/api"
    val tunnelEndpoint = "http://dt-ext.odps.aliyun-inc.com"

    val conf = new SparkConf().setAppName("Test Odps")
    val sc = new SparkContext(conf)
    val odpsOps = OdpsOps(sc, accessKeyId, accessKeySecret, odpsEndpoint, tunnelEndpoint)

    // Step 2: read data
    // Step 3: write data
  }
}

`OdpsOps` constructor parameters

ParameterDescription
scThe SparkContext instance
accessKeyIdYour Alibaba Cloud AccessKey ID, read from an environment variable
accessKeySecretYour Alibaba Cloud AccessKey Secret, read from an environment variable
odpsEndpointThe MaxCompute service endpoint
tunnelEndpointThe MaxCompute Tunnel endpoint for data transfer

Step 2: Load data from MaxCompute

Use readTable to load a MaxCompute table into a Spark RDD. Define a read function that maps each Record to the Scala type you need:

// Define how to parse each MaxCompute record into a Scala value
def read(record: Record, schema: TableSchema): String = {
  record.getString(0)
}

val project = "<odps-project>"
val table = "<odps-table>"
val numPartitions = 2

val inputData = odpsOps.readTable(project, table, read, numPartitions)
inputData.top(10).foreach(println)

`readTable` parameters

ParameterDescription
projectThe MaxCompute project name
tableThe MaxCompute table name
readA function with signature (Record, TableSchema) => T that parses each record
numPartitionsThe number of RDD partitions for parallel reads

Step 3: Write data to MaxCompute

Use saveToTable to write a Spark RDD to a MaxCompute table. Define a write function that populates a Record from each element in the RDD:

// Define how to convert each Scala value into a MaxCompute record
def write(s: String, emptyRecord: Record, schema: TableSchema): Unit = {
  val r = emptyRecord
  r.set(0, s)
}

val resultData = inputData.map(e => s"$e has been processed.")
odpsOps.saveToTable(project, table, resultData, write)

`saveToTable` parameters

ParameterDescription
projectThe MaxCompute project name
tableThe MaxCompute table name
dataRDDThe Spark RDD to write
writeA function with signature (T, Record, TableSchema) => Unit that populates each record

Work with partitioned tables

When reading from or writing to a partitioned table, pass the partition specification as <partition-key>=<partition-value>. Separate multiple partitions with commas.

Partition specificationExample
Single partitionpt='1'
Multiple partitionspt='1', ps='2'

Next steps

For the complete sample code, see SparkMaxComputeDemo on GitHub.