This topic describes how to use the E-MapReduce SDK to read and write MaxCompute data in Spark.
Allow Spark to access MaxCompute
- Initialize an OdpsOps object. In Spark, data operations of MaxCompute are performed
using the OdpsOps class. Follow these steps to create an OdpsOps object:
import com.aliyun.odps.TableSchema import com.aliyun.odps.data.Record import org.apache.spark.aliyun.odps.OdpsOps import org.apache.spark.{ SparkContext, SparkConf} object Sample { def main(args: Array[String]): Unit = { // == Step-1 == val accessKeyId = "<accessKeyId>" val accessKeySecret = "<accessKeySecret>" // Take the internal network address as an example val urls = Seq("http://odps-ext.aliyun-inc.com/api", "http://dt-ext.odps.aliyun-inc.com") val conf = new SparkConf().setAppName("Test Odps") val sc = new SparkContext(conf) val odpsOps = OdpsOps(sc, accessKeyId, accessKeySecret, urls(0), urls(1)) // A part of the calling code is shown as follows // == Step-2 == ... // == Step-3 == ... } // == Step-2 == // Method definition 1 // == Step-3 == // Method definition 2 }
- Load the table data from MaxCompute to Spark. By using the readTable method of the
OdpsOps object, you can load a MaxCompute table to Spark and create an RDD as follows:
// == Step-2 == val project = <odps-project> val table = <odps-table> val numPartitions = 2 val inputData = odpsOps.readTable(project, table, read, numPartitions) inputData.top(10).foreach(println) // == Step-3 == ...
In the preceding code, you must define a read function to parse and pre-process the MaxCompute table data, as follows:def read(record: Record, schema: TableSchema): String = { record.getString(0) }
This function loads the first column of the MaxCompute table into the Spark runtime environment.
- Save the result data in Spark to the MaxCompute table. Using the saveToTable method
of the OdpsOps object, you can save Spark RDD to MaxCompute.
val resultData = inputData.map(e => s"$e has been processed.") odpsOps.saveToTable(project, table, dataRDD, write)
In the preceding code, you must define a write function for data pre-processing before writing to the MaxCompute table as follows:def write(s: String, emptyReord: Record, schema: TableSchema): Unit = { val r = emptyReord r.set(0, s) }
This function writes RDD data in each row to the first column of the corresponding MaxCompute table.
- Notation of the partition table parameters.
The SDK supports reading and writing MaxCompute partition tables. The standard naming of the tables is partition_column_name=partition_name (with multiple partitions separated by commas). Assume that we have partition columns pt and ps:
- Read the table data of the partition where pt is 1.
- Read the table data of the partition where pt is 1 and ps is 2.
Appendix
For the complete sample code, see Allow Spark to access MaxCompute.