edit-icon download-icon

Spark + MaxCompute

Last Updated: Apr 02, 2018

Spark access to MaxCompute

This section introduces how to use E-MapReduce SDK to complete a MaxCompute data read/write operation in Spark.

  1. Initialize an OdpsOps. In Spark, MaxCompute data operations are completed through the OdpsOps class. Create an OdpsOps object using the following stps:

    1. import com.aliyun.odps.TableSchema
    2. import com.aliyun.odps.data.Record
    3. import org.apache.spark.aliyun.odps.OdpsOps
    4. import org.apache.spark.{SparkContext, SparkConf}
    5. object Sample {
    6. def main(args: Array[String]): Unit = {
    7. // == Step-1 ==
    8. val accessKeyId = "<accessKeyId>"
    9. val accessKeySecret = "<accessKeySecret>"
    10. // Taking the intranet address as an example
    11. val urls = Seq("http://odps-ext.aliyun-inc.com/api", "http://dt-ext.odps.aliyun-inc.com")
    12. val conf = new SparkConf().setAppName("Test Odps")
    13. val sc = new SparkContext(conf)
    14. val odpsOps = OdpsOps(sc, accessKeyId, accessKeySecret, urls(0), urls(1))
    15. // The following is the calling code
    16. // == Step-2 ==
    17. ...
    18. // == Step-3 ==
    19. ...
    20. }
    21. // == Step-2 ==
    22. // Method definition 1
    23. // == Step-3 ==
    24. // Method definition 2
    25. }
  2. Load table data from MaxCompute to Spark. You can load tables in MaxCompute to Spark through the readTable method of OdpsOps objects, that is, generate an RDD, as follows:

    1. // == Step-2 ==
    2. val project = <odps-project>
    3. val table = <odps-table>
    4. val numPartitions = 2
    5. val inputData = odpsOps.readTable(project, table, read, numPartitions)
    6. inputData.top(10).foreach(println)
    7. // == Step-3 ==
    8. ...

    In the preceding code, you also need to define a read function to parse and pre-process MaxCompute table data, as shown:

    1. def read(record: Record, schema: TableSchema): String = {
    2. record.getString(0)
    3. }

    This function loads the first column of the MaxCompute table to the Spark running environment.

  3. Save the result data in Spark to the MaxCompute table. You can store Spark RDD persistently to MaxCompute through the saveToTable method of the OdpsOps object.

    1. val resultData = inputData.map(e => s"$e has been processed.")
    2. odpsOps.saveToTable(project, table, dataRDD, write)

    In the preceding code, you also need to define a write function to pre-process data before writing the data to the MaxCompute table, as shown:

    1. def write(s: String, emptyReord: Record, schema: TableSchema): Unit = {
    2. val r = emptyReord
    3. r.set(0, s)
    4. }

    This function writes every line of RDD data to the first column of the corresponding MaxCompute table.

Appendix

For a complete sample code, see:

Thank you! We've received your feedback.