This topic describes how to use Spark to read data from and write data to MaxCompute.

Procedure

  1. Initialize an OdpsOps object.

    In Spark, the OdpsOps class is used to perform operations related to data in MaxCompute.

    import com.aliyun.odps.TableSchema
         import com.aliyun.odps.data.Record
         import org.apache.spark.aliyun.odps.OdpsOps
         import org.apache.spark.{SparkContext, SparkConf}
         object Sample {
           def main(args: Array[String]): Unit = {    
             // == Step-1 ==
             val accessKeyId = "<accessKeyId>"
             val accessKeySecret = "<accessKeySecret>"
             // Use internal URLs as examples. 
             val urls = Seq("http://odps-ext.aliyun-inc.com/api", "http://dt-ext.odps.aliyun-inc.com") 
             val conf = new SparkConf().setAppName("Test Odps")
             val sc = new SparkContext(conf)
             val odpsOps = OdpsOps(sc, accessKeyId, accessKeySecret, urls(0), urls(1))
             // Code for invocation: 
             // == Step-2 ==
             ...
             // == Step-3 ==
             ...
           }
           // == Step-2 ==
           // Method definition 1
           // == Step-3 ==
           // Method definition 2
         }
  2. Load table data from MaxCompute to Spark.

    Use the readTable method of the OdpsOps object to load table data from MaxCompute to Spark.

    // == Step-2 ==
             val project = <odps-project>
             val table = <odps-table>
             val numPartitions = 2
             val inputData = odpsOps.readTable(project, table, read, numPartitions)
             inputData.top(10).foreach(println)
             // == Step-3 ==
             ...
    In the preceding code, you must define a read function to parse and preprocess table data in MaxCompute. Code to define the read function:
    def read(record: Record, schema: TableSchema): String = {
               record.getString(0)
             }
  3. Save the result data in Spark to a MaxCompute table.

    Use the saveToTable method of the OdpsOps object to save the result data in Spark to a MaxCompute table.

    val resultData = inputData.map(e => s"$e has been processed.")
             odpsOps.saveToTable(project, table, dataRDD, write)
    In the preceding code, you must define a write function to preprocess the data. Code to define the write function:
    def write(s: String, emptyReord: Record, schema: TableSchema): Unit = {
               val r = emptyReord
               r.set(0, s)
             }
  4. Pay attention to the format of parameters for a partitioned table.
    When you use Spark to read data from or write data to a partitioned table in MaxCompute, you must specify a partition in the Partition key column name=Partition name format. If multiple partitions are involved, separate them with commas (,).
    • Example 1: To read data from the partition in which pt is 1, use pt='1'.
    • Example 2: To read data from the partition in which pt is 1 and the partition in which ps is 2, use pt='1', ps='2'.

Appendix

For the complete sample code, visit GitHub.