全部产品
Search
文档中心

E-MapReduce:Menggunakan Spark untuk mengakses MaxCompute

更新时间:Jun 24, 2025

Topik ini menjelaskan cara menggunakan Spark untuk membaca data dari dan menulis data ke MaxCompute.

Prosedur

  1. Inisialisasi objek OdpsOps.

    Di dalam Spark, kelas OdpsOps digunakan untuk mengelola data di MaxCompute.

    import com.aliyun.odps.TableSchema
         import com.aliyun.odps.data.Record
         import org.apache.spark.aliyun.odps.OdpsOps
         import org.apache.spark.{SparkContext, SparkConf}
         object Sample {
           def main(args: Array[String]): Unit = {    
             // == Langkah-1 ==
             val accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")
             val accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
             // Gunakan URL internal sebagai contoh. 
             val urls = Seq("http://odps-ext.aliyun-inc.com/api", "http://dt-ext.odps.aliyun-inc.com") 
             val conf = new SparkConf().setAppName("Test Odps")
             val sc = new SparkContext(conf)
             val odpsOps = OdpsOps(sc, accessKeyId, accessKeySecret, urls(0), urls(1))
             // Kode untuk pemanggilan: 
             // == Langkah-2 ==
             ...
             // == Langkah-3 ==
             ...
           }
         }
    null

    Anda harus mengonfigurasi variabel lingkungan sebelum menjalankan kode contoh. Untuk informasi lebih lanjut tentang cara mengonfigurasi variabel lingkungan, lihat bagian Mengonfigurasi Variabel Lingkungan dalam topik ini.

  2. Muat data tabel dari MaxCompute ke Spark.

    Gunakan metode readTable dari objek OdpsOps untuk memuat data tabel dari MaxCompute ke Spark.

    // == Langkah-2 ==
             val project = <odps-project>
             val table = <odps-table>
             val numPartitions = 2
             val inputData = odpsOps.readTable(project, table, read, numPartitions)
             inputData.top(10).foreach(println)
             // == Langkah-3 ==
             ...

    Dalam kode sebelumnya, Anda perlu mendefinisikan fungsi read untuk mengurai dan memproses data tabel di MaxCompute. Kode untuk mendefinisikan fungsi read:

    def read(record: Record, schema: TableSchema): String = {
               record.getString(0)
             }
  3. Simpan data hasil di Spark ke tabel MaxCompute.

    Gunakan metode saveToTable dari objek OdpsOps untuk menyimpan data hasil di Spark ke tabel MaxCompute.

    val resultData = inputData.map(e => s"$e telah diproses.")
             odpsOps.saveToTable(project, table, dataRDD, write)

    Dalam kode sebelumnya, Anda perlu mendefinisikan fungsi write untuk memproses data. Kode untuk mendefinisikan fungsi write:

    def write(s: String, emptyRecord: Record, schema: TableSchema): Unit = {
               val r = emptyRecord
               r.set(0, s)
             }
  4. Perhatikan format parameter untuk tabel terpartisi.

    Saat menggunakan Spark untuk membaca atau menulis data ke tabel terpartisi di MaxCompute, Anda harus menentukan partisi dalam format Nama kolom kunci Partisi=Nama Partisi. Jika beberapa partisi terlibat, pisahkan mereka dengan koma (,).

    • Contoh 1: Untuk membaca data dari partisi di mana pt adalah 1, gunakan pt='1'.

    • Contoh 2: Untuk membaca data dari partisi di mana pt adalah 1 dan ps adalah 2, gunakan pt='1', ps='2'.

Lampiran

Untuk kode contoh lengkap, kunjungi GitHub.