All Products
Search
Document Center

E-MapReduce:Use Spark to access MaxCompute

Last Updated:Aug 15, 2023

This topic describes how to use Spark to read data from and write data to MaxCompute.

Procedure

  1. Initialize an OdpsOps object.

    In Spark, the OdpsOps class is used to manage data in MaxCompute.

    import com.aliyun.odps.TableSchema
         import com.aliyun.odps.data.Record
         import org.apache.spark.aliyun.odps.OdpsOps
         import org.apache.spark.{SparkContext, SparkConf}
         object Sample {
           def main(args: Array[String]): Unit = {    
             // == Step-1 ==
             val accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")
             val accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
             // Use internal URLs as examples. 
             val urls = Seq("http://odps-ext.aliyun-inc.com/api", "http://dt-ext.odps.aliyun-inc.com") 
             val conf = new SparkConf().setAppName("Test Odps")
             val sc = new SparkContext(conf)
             val odpsOps = OdpsOps(sc, accessKeyId, accessKeySecret, urls(0), urls(1))
             // Code for invocation: 
             // == Step-2 ==
             ...
             // == Step-3 ==
             ...
           }
         }
    Note

    You must configure environment variables before you can run the sample code. For more information about how to configure environment variables, see the Configure environment variables section in this topic.

  2. Load table data from MaxCompute to Spark.

    Use the readTable method of the OdpsOps object to load table data from MaxCompute to Spark.

    // == Step-2 ==
             val project = <odps-project>
             val table = <odps-table>
             val numPartitions = 2
             val inputData = odpsOps.readTable(project, table, read, numPartitions)
             inputData.top(10).foreach(println)
             // == Step-3 ==
             ...

    In the preceding code, you must define a read function to parse and preprocess table data in MaxCompute. Code to define the read function:

    def read(record: Record, schema: TableSchema): String = {
               record.getString(0)
             }
  3. Save the result data in Spark to a MaxCompute table.

    Use the saveToTable method of the OdpsOps object to save the result data in Spark to a MaxCompute table.

    val resultData = inputData.map(e => s"$e has been processed.")
             odpsOps.saveToTable(project, table, dataRDD, write)

    In the preceding code, you must define a write function to preprocess the data. Code to define the write function:

    def write(s: String, emptyRecord: Record, schema: TableSchema): Unit = {
               val r = emptyRecord
               r.set(0, s)
             }
  4. Pay attention to the format of parameters for a partitioned table.

    When you use Spark to read data from or write data to a partitioned table in MaxCompute, you must specify a partition in the Partition key column name=Partition name format. If multiple partitions are involved, separate them with commas (,).

    • Example 1: To read data from the partition in which pt is 1, use pt='1'.

    • Example 2: To read data from the partition in which pt is 1 and the partition in which ps is 2, use pt='1', ps='2'.

Appendix

For the complete sample code, visit GitHub.