Apache Spark on E-MapReduce (EMR) can read data from and write data to MaxCompute using the OdpsOps class.
Prerequisites
Before you begin, make sure you have:
An EMR cluster with Spark enabled
A MaxCompute project and table
An Alibaba Cloud AccessKey ID and AccessKey Secret
Store your credentials as environment variables rather than hardcoding them in your code. Hardcoded credentials can be accidentally exposed in version control or logs.
How it works
OdpsOps is the EMR SDK class for managing MaxCompute data in Spark. To move data between Spark and MaxCompute:
Initialize an
OdpsOpsobject with your credentials and MaxCompute endpoints.Call
readTableto load a MaxCompute table into a Spark RDD (Resilient Distributed Dataset).Process the data in Spark.
Call
saveToTableto write the result RDD back to a MaxCompute table.
Read and write MaxCompute data
Step 1: Initialize OdpsOps
Set your credentials as environment variables, then import the required classes and create an OdpsOps object:
export ALIBABA_CLOUD_ACCESS_KEY_ID=<your-access-key-id>
export ALIBABA_CLOUD_ACCESS_KEY_SECRET=<your-access-key-secret>import com.aliyun.odps.TableSchema
import com.aliyun.odps.data.Record
import org.apache.spark.aliyun.odps.OdpsOps
import org.apache.spark.{SparkContext, SparkConf}
object Sample {
def main(args: Array[String]): Unit = {
// Read credentials from environment variables
val accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")
val accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
// Internal network endpoints for MaxCompute
val odpsEndpoint = "http://odps-ext.aliyun-inc.com/api"
val tunnelEndpoint = "http://dt-ext.odps.aliyun-inc.com"
val conf = new SparkConf().setAppName("Test Odps")
val sc = new SparkContext(conf)
val odpsOps = OdpsOps(sc, accessKeyId, accessKeySecret, odpsEndpoint, tunnelEndpoint)
// Step 2: read data
// Step 3: write data
}
}`OdpsOps` constructor parameters
| Parameter | Description |
|---|---|
sc | The SparkContext instance |
accessKeyId | Your Alibaba Cloud AccessKey ID, read from an environment variable |
accessKeySecret | Your Alibaba Cloud AccessKey Secret, read from an environment variable |
odpsEndpoint | The MaxCompute service endpoint |
tunnelEndpoint | The MaxCompute Tunnel endpoint for data transfer |
Step 2: Load data from MaxCompute
Use readTable to load a MaxCompute table into a Spark RDD. Define a read function that maps each Record to the Scala type you need:
// Define how to parse each MaxCompute record into a Scala value
def read(record: Record, schema: TableSchema): String = {
record.getString(0)
}
val project = "<odps-project>"
val table = "<odps-table>"
val numPartitions = 2
val inputData = odpsOps.readTable(project, table, read, numPartitions)
inputData.top(10).foreach(println)`readTable` parameters
| Parameter | Description |
|---|---|
project | The MaxCompute project name |
table | The MaxCompute table name |
read | A function with signature (Record, TableSchema) => T that parses each record |
numPartitions | The number of RDD partitions for parallel reads |
Step 3: Write data to MaxCompute
Use saveToTable to write a Spark RDD to a MaxCompute table. Define a write function that populates a Record from each element in the RDD:
// Define how to convert each Scala value into a MaxCompute record
def write(s: String, emptyRecord: Record, schema: TableSchema): Unit = {
val r = emptyRecord
r.set(0, s)
}
val resultData = inputData.map(e => s"$e has been processed.")
odpsOps.saveToTable(project, table, resultData, write)`saveToTable` parameters
| Parameter | Description |
|---|---|
project | The MaxCompute project name |
table | The MaxCompute table name |
dataRDD | The Spark RDD to write |
write | A function with signature (T, Record, TableSchema) => Unit that populates each record |
Work with partitioned tables
When reading from or writing to a partitioned table, pass the partition specification as <partition-key>=<partition-value>. Separate multiple partitions with commas.
| Partition specification | Example |
|---|---|
| Single partition | pt='1' |
| Multiple partitions | pt='1', ps='2' |
Next steps
For the complete sample code, see SparkMaxComputeDemo on GitHub.