全部产品
Search
文档中心

AnalyticDB:Kembangkan aplikasi Spark menggunakan PySpark

更新时间:Nov 10, 2025

Topik ini menjelaskan cara mengembangkan pekerjaan Python Spark pada AnalyticDB for MySQL serta cara mengemas lingkungan runtime untuk pekerjaan Python menggunakan lingkungan virtual.

Prasyarat

Penggunaan dasar PySpark

  1. Tulis program sampel berikut dan simpan sebagai example.py.

    from pyspark.sql import SparkSession
    
    if __name__ == "__main__":
        spark = SparkSession.builder.getOrCreate()
        df = spark.sql("SELECT 1+1")
        df.printSchema()
        df.show()
    
  2. Unggah program example.py ke OSS. Untuk informasi selengkapnya, lihat Unggah file menggunakan konsol.

  3. Buka editor pengembangan Spark.

    1. Masuk ke Konsol AnalyticDB for MySQL. Di pojok kiri atas konsol, pilih wilayah. Di panel navigasi sebelah kiri, klik Clusters. Temukan kluster yang ingin Anda kelola dan klik ID kluster tersebut.

    2. Di panel navigasi sebelah kiri, klik Job Development > Spark Jar Development.

  4. Di bagian atas jendela editor, pilih kelompok sumber daya pekerjaan dan jenis pekerjaan Spark. Contoh ini menggunakan tipe Batch.

  5. Di editor, jalankan pekerjaan dengan konfigurasi berikut.

    {
     "name": "Spark Python Test",
     "file": "oss://testBucketName/example.py",
     "conf": {
     "spark.driver.resourceSpec": "small",
     "spark.executor.instances": 1,
     "spark.executor.resourceSpec": "small"
     }
    }

    Untuk informasi selengkapnya tentang parameter, lihat Parameter.

Gunakan dependensi Python

Metode

Jika program Python Anda memerlukan dependensi kustom atau pihak ketiga, unggah ke OSS dan konfigurasikan parameter pyFiles saat mengirimkan pekerjaan Spark.

Contoh

Contoh ini menunjukkan cara mengimpor user-defined function untuk menghitung pendapatan setelah pajak karyawan. File data bernama staff.csv diunggah ke OSS untuk contoh ini. File staff.csv berisi data sampel berikut:

name,age,gender,salary
Lucky,25,male,100
Lucy,23,female,150
Martin,30,male,180
Rose,31,female,200
  1. Kembangkan dependensi dan unggah ke OSS.

    1. Buat folder bernama tools, lalu buat program bernama func.py di dalam folder tersebut.

      def tax(salary):
          """
          mengonversi string menjadi int dan memotong pajak 15% dari gaji
      
          :param salary: Gaji staf
          :return:
          """
          return 0.15 * int(salary)
      
    2. Kompres folder tools dan unggah ke OSS. Dalam contoh ini, file terkompresi adalah tools.zip.

      Catatan

      Jika pekerjaan Anda bergantung pada beberapa file Python, kami menyarankan Anda mengompresnya menjadi satu file .zip. Anda kemudian dapat mereferensikan file Python tersebut sebagai modul dalam kode Python Anda.

  2. Tulis program sampel bernama example.py.

    from __future__ import print_function
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import udf
    from pyspark.sql.types import FloatType
    
    import sys
    
    # import file pihak ketiga
    from tools import func
    
    if __name__ == "__main__":
        # inisialisasi konteks pyspark
        spark = SparkSession.builder.appName("Python Example").getOrCreate()
        # baca csv dari oss ke dataframe, tampilkan tabel
        cvs_file = sys.argv[1]
        df = spark.read.csv(cvs_file, mode="DROPMALFORMED", inferSchema=True, header=True)
        # cetak skema dan data ke konsol
        df.printSchema()
        df.show()
        # buat udf
        taxCut = udf(lambda salary: func.tax(salary), FloatType())
        # potong pajak dari gaji dan tampilkan hasilnya
        df.select("name", taxCut("salary").alias("final salary")).show()
        spark.stop()
    
  3. Unggah program example.py ke OSS. Untuk informasi selengkapnya, lihat Unggah file menggunakan konsol.

  4. Buka editor pengembangan Spark.

    1. Masuk ke Konsol AnalyticDB for MySQL. Di pojok kiri atas konsol, pilih wilayah. Di panel navigasi sebelah kiri, klik Clusters. Temukan kluster yang ingin Anda kelola dan klik ID kluster tersebut.

    2. Di panel navigasi sebelah kiri, klik Job Development > Spark Jar Development.

  5. Di bagian atas jendela editor, pilih kelompok sumber daya pekerjaan dan jenis pekerjaan Spark. Contoh ini menggunakan tipe Batch.

  6. Di editor, jalankan pekerjaan dengan konfigurasi berikut.

    {
     "name": "Spark Python",
     "file": "oss://testBucketName/example.py",
     "pyFiles": ["oss://testBucketName/tools.zip"],
     "args": [
     "oss://testBucketName/staff.csv"
     ],
     "conf": {
     "spark.driver.resourceSpec": "small",
     "spark.executor.instances": 2,
     "spark.executor.resourceSpec": "small"
     }
    }

    Parameter:

    • file: Path OSS tempat program Python disimpan.

    • pyFiles: Path OSS file Python yang menjadi dependensi pekerjaan PySpark. File harus memiliki ekstensi .zip. Pisahkan beberapa paket terkompresi dengan koma (,).

      Catatan

      Semua file Python yang menjadi dependensi aplikasi PySpark harus disimpan di OSS.

    • args: Argumen untuk program utama. Dalam contoh ini, ini adalah path OSS file data sampel staff.csv.

    Untuk informasi selengkapnya tentang parameter, lihat Parameter.

Kemas lingkungan dependensi menggunakan Lingkungan Virtual

Jika pekerjaan Python Anda memiliki dependensi kompleks, Anda dapat menggunakan fitur lingkungan virtual Python untuk manajemen dan isolasi lingkungan. Spark AnalyticDB for MySQL memungkinkan Anda menggunakan lingkungan virtual untuk mengemas lingkungan dependensi lokal dan mengunggahnya ke OSS. Untuk informasi selengkapnya tentang lingkungan virtual, lihat dokumentasi resmi Python.

Penting

AnalyticDB for MySQL Spark menggunakan glibc-devel 2.28. Jika lingkungan virtual Anda tidak kompatibel dengan versi ini, tugas PySpark mungkin gagal.

Metode

Untuk mengemas lingkungan Python menggunakan lingkungan virtual, unggah paket terkompresi ke OSS. Kemudian, saat mengirimkan pekerjaan Spark, konfigurasikan parameter terkait untuk menentukan path OSS paket terkompresi dan path lokal interpreter Python yang akan digunakan.

  • Tentukan path OSS paket lingkungan Python terkompresi:

    • Jika paket terkompresi berukuran kecil, konfigurasikan parameter archives.

    • Jika paket terkompresi berukuran besar, konfigurasikan parameter spark.executorEnv.ADB_SPARK_DOWNLOAD_FILES dan spark.kubernetes.driverEnv.ADB_SPARK_DOWNLOAD_FILES.

  • Tentukan path lokal interpreter Python yang akan digunakan melalui parameter spark.pyspark.python.

Contoh

  1. Persiapkan lingkungan Linux.

    Anda harus mengemas lingkungan Python pada sistem operasi Linux. Anda dapat menyiapkan lingkungan Linux dengan salah satu metode berikut. Contoh ini menggunakan instance ECS Alibaba Cloud.

    • Beli instance ECS Alibaba Cloud yang menjalankan CentOS 7 atau AnolisOS 8. Untuk informasi selengkapnya, lihat Buat instance menggunakan wizard.

    • Instal CentOS 7, AnolisOS 8, atau versi yang lebih baru di mesin lokal Anda.

    • Gunakan image Docker resmi untuk CentOS atau AnolisOS guna mengemas lingkungan Python.

  2. Kemas lingkungan runtime Python menggunakan lingkungan virtual dan unggah paket terkompresi ke OSS.

    Gunakan Virtualenv atau Conda untuk mengemas lingkungan Python yang menjadi dependensi proyek Anda. Anda dapat menyesuaikan versi Python selama proses pengemasan. Contoh ini menggunakan Virtualenv.

    # Buat direktori venv di path saat ini dengan python3
    # HARUS MENAMBAHKAN --copies !
    virtualenv --copies --download --python python3.7 venv
    
    # aktifkan lingkungan
    source venv/bin/activate
    
    # instal modul pihak ketiga
    pip install scikit-spark==0.4.0
    
    # periksa hasilnya
    pip list
    
    # kompres lingkungan
    tar -czvf venv.tar.gz venv
    Catatan

    Jika Anda ingin mengemas dependensi proyek dengan Conda, lihat Managing environments.

  3. Buka editor pengembangan Spark.

    1. Masuk ke Konsol AnalyticDB for MySQL. Di pojok kiri atas konsol, pilih wilayah. Di panel navigasi sebelah kiri, klik Clusters. Temukan kluster yang ingin Anda kelola dan klik ID kluster tersebut.

    2. Di panel navigasi sebelah kiri, klik Job Development > Spark Jar Development.

  4. Di bagian atas jendela editor, pilih kelompok sumber daya pekerjaan dan jenis pekerjaan Spark. Contoh ini menggunakan tipe Batch.

  5. Di editor, jalankan pekerjaan dengan konfigurasi berikut.

    {
     "name": "venv example",
     "archives": [
     "oss://testBucketname/venv.tar.gz#PY3"
     ],
     "conf": {
     "spark.driver.resourceSpec": "small",
     "spark.executor.instances": 1,
     "spark.pyspark.python": "./PY3/venv/bin/python3",
     "spark.executor.resourceSpec": "small"
     },
     "file": "oss://testBucketname/example.py"
    }

    atau

    Catatan

    Jika paket terkompresi lingkungan Python berukuran besar, gunakan konfigurasi berikut.

    {
     "name": "venv example",
     "conf": {
     "spark.executorEnv.ADB_SPARK_DOWNLOAD_FILES": "oss://testBucketname/venv_py36.tar.gz#PY3",
     "spark.kubernetes.driverEnv.ADB_SPARK_DOWNLOAD_FILES": "oss://atestBucketname/venv_py36.tar.gz#PY3,",
     "spark.driver.resourceSpec": "small",
     "spark.executor.instances": 1,
     "spark.pyspark.python": "./PY3/venv/bin/python3",
     "spark.executor.resourceSpec": "small"
     },
     "file": "oss://testBucketname/example.py"
    }

    Parameter:

    • archives: Path OSS paket lingkungan Python terkompresi. Dalam contoh ini, ini adalah path OSS paket venv.tar.gz.

    • spark.executorEnv.ADB_SPARK_DOWNLOAD_FILES: Menentukan path OSS paket lingkungan Python terkompresi untuk node Spark Executor.

    • spark.kubernetes.driverEnv.ADB_SPARK_DOWNLOAD_FILES: Menentukan path OSS paket lingkungan Python terkompresi untuk node Spark Driver.

    • spark.pyspark.python: Menentukan path lokal interpreter Python yang akan digunakan.

    Untuk informasi selengkapnya tentang parameter lainnya, lihat Parameter.