This topic describes how to use Spark to read data from and write data to MaxCompute.
Procedure
- Initialize an OdpsOps object.
In Spark, the OdpsOps class is used to perform operations related to data in MaxCompute.
import com.aliyun.odps.TableSchema import com.aliyun.odps.data.Record import org.apache.spark.aliyun.odps.OdpsOps import org.apache.spark.{SparkContext, SparkConf} object Sample { def main(args: Array[String]): Unit = { // == Step-1 == val accessKeyId = "<accessKeyId>" val accessKeySecret = "<accessKeySecret>" // Use internal URLs as examples. val urls = Seq("http://odps-ext.aliyun-inc.com/api", "http://dt-ext.odps.aliyun-inc.com") val conf = new SparkConf().setAppName("Test Odps") val sc = new SparkContext(conf) val odpsOps = OdpsOps(sc, accessKeyId, accessKeySecret, urls(0), urls(1)) // Code for invocation: // == Step-2 == ... // == Step-3 == ... } // == Step-2 == // Method definition 1 // == Step-3 == // Method definition 2 }
- Load table data from MaxCompute to Spark.
Use the readTable method of the OdpsOps object to load table data from MaxCompute to Spark.
// == Step-2 == val project = <odps-project> val table = <odps-table> val numPartitions = 2 val inputData = odpsOps.readTable(project, table, read, numPartitions) inputData.top(10).foreach(println) // == Step-3 == ...
In the preceding code, you must define a read function to parse and preprocess table data in MaxCompute. Code to define the read function:def read(record: Record, schema: TableSchema): String = { record.getString(0) }
- Save the result data in Spark to a MaxCompute table.
Use the saveToTable method of the OdpsOps object to save the result data in Spark to a MaxCompute table.
val resultData = inputData.map(e => s"$e has been processed.") odpsOps.saveToTable(project, table, dataRDD, write)
In the preceding code, you must define a write function to preprocess the data. Code to define the write function:def write(s: String, emptyReord: Record, schema: TableSchema): Unit = { val r = emptyReord r.set(0, s) }
- Pay attention to the format of parameters for a partitioned table.
When you use Spark to read data from or write data to a partitioned table in MaxCompute, you must specify a partition in the Partition key column name=Partition name format. If multiple partitions are involved, separate them with commas (,).
- Example 1: To read data from the partition in which pt is 1, use pt='1'.
- Example 2: To read data from the partition in which pt is 1 and the partition in which ps is 2, use pt='1', ps='2'.
Appendix
For the complete sample code, visit GitHub.