Topik ini menjelaskan cara mengembangkan pekerjaan Python Spark pada AnalyticDB for MySQL serta cara mengemas lingkungan runtime untuk pekerjaan Python menggunakan lingkungan virtual.
Prasyarat
Kluster Edisi Perusahaan, Edisi Dasar, atau Edisi Data Lakehouse AnalyticDB for MySQL telah dibuat.
Bucket Object Storage Service (OSS) telah dibuat di wilayah yang sama dengan kluster AnalyticDB for MySQL.
Kelompok sumber daya pekerjaan telah dibuat untuk kluster AnalyticDB for MySQL Edisi Perusahaan, Edisi Dasar, atau Edisi Data Lakehouse.
Akun database telah dibuat untuk kluster AnalyticDB for MySQL.
Jika Anda menggunakan akun Alibaba Cloud, cukup membuat akun istimewa.
Jika Anda menggunakan pengguna Resource Access Management (RAM), Anda harus membuat akun istimewa dan akun standar, lalu mengaitkan akun standar tersebut dengan pengguna RAM.
Penggunaan dasar PySpark
Tulis program sampel berikut dan simpan sebagai
example.py.from pyspark.sql import SparkSession if __name__ == "__main__": spark = SparkSession.builder.getOrCreate() df = spark.sql("SELECT 1+1") df.printSchema() df.show()Unggah program
example.pyke OSS. Untuk informasi selengkapnya, lihat Unggah file menggunakan konsol.Buka editor pengembangan Spark.
Masuk ke Konsol AnalyticDB for MySQL. Di pojok kiri atas konsol, pilih wilayah. Di panel navigasi sebelah kiri, klik Clusters. Temukan kluster yang ingin Anda kelola dan klik ID kluster tersebut.
Di panel navigasi sebelah kiri, klik Job Development > Spark Jar Development.
Di bagian atas jendela editor, pilih kelompok sumber daya pekerjaan dan jenis pekerjaan Spark. Contoh ini menggunakan tipe Batch.
Di editor, jalankan pekerjaan dengan konfigurasi berikut.
{ "name": "Spark Python Test", "file": "oss://testBucketName/example.py", "conf": { "spark.driver.resourceSpec": "small", "spark.executor.instances": 1, "spark.executor.resourceSpec": "small" } }Untuk informasi selengkapnya tentang parameter, lihat Parameter.
Gunakan dependensi Python
Metode
Jika program Python Anda memerlukan dependensi kustom atau pihak ketiga, unggah ke OSS dan konfigurasikan parameter pyFiles saat mengirimkan pekerjaan Spark.
Contoh
Contoh ini menunjukkan cara mengimpor user-defined function untuk menghitung pendapatan setelah pajak karyawan. File data bernama staff.csv diunggah ke OSS untuk contoh ini. File staff.csv berisi data sampel berikut:
name,age,gender,salary
Lucky,25,male,100
Lucy,23,female,150
Martin,30,male,180
Rose,31,female,200Kembangkan dependensi dan unggah ke OSS.
Buat folder bernama
tools, lalu buat program bernamafunc.pydi dalam folder tersebut.def tax(salary): """ mengonversi string menjadi int dan memotong pajak 15% dari gaji :param salary: Gaji staf :return: """ return 0.15 * int(salary)Kompres folder
toolsdan unggah ke OSS. Dalam contoh ini, file terkompresi adalahtools.zip.CatatanJika pekerjaan Anda bergantung pada beberapa file Python, kami menyarankan Anda mengompresnya menjadi satu file .zip. Anda kemudian dapat mereferensikan file Python tersebut sebagai modul dalam kode Python Anda.
Tulis program sampel bernama
example.py.from __future__ import print_function from pyspark.sql import SparkSession from pyspark.sql.functions import udf from pyspark.sql.types import FloatType import sys # import file pihak ketiga from tools import func if __name__ == "__main__": # inisialisasi konteks pyspark spark = SparkSession.builder.appName("Python Example").getOrCreate() # baca csv dari oss ke dataframe, tampilkan tabel cvs_file = sys.argv[1] df = spark.read.csv(cvs_file, mode="DROPMALFORMED", inferSchema=True, header=True) # cetak skema dan data ke konsol df.printSchema() df.show() # buat udf taxCut = udf(lambda salary: func.tax(salary), FloatType()) # potong pajak dari gaji dan tampilkan hasilnya df.select("name", taxCut("salary").alias("final salary")).show() spark.stop()Unggah program
example.pyke OSS. Untuk informasi selengkapnya, lihat Unggah file menggunakan konsol.Buka editor pengembangan Spark.
Masuk ke Konsol AnalyticDB for MySQL. Di pojok kiri atas konsol, pilih wilayah. Di panel navigasi sebelah kiri, klik Clusters. Temukan kluster yang ingin Anda kelola dan klik ID kluster tersebut.
Di panel navigasi sebelah kiri, klik Job Development > Spark Jar Development.
Di bagian atas jendela editor, pilih kelompok sumber daya pekerjaan dan jenis pekerjaan Spark. Contoh ini menggunakan tipe Batch.
Di editor, jalankan pekerjaan dengan konfigurasi berikut.
{ "name": "Spark Python", "file": "oss://testBucketName/example.py", "pyFiles": ["oss://testBucketName/tools.zip"], "args": [ "oss://testBucketName/staff.csv" ], "conf": { "spark.driver.resourceSpec": "small", "spark.executor.instances": 2, "spark.executor.resourceSpec": "small" } }Parameter:
file: Path OSS tempat program Python disimpan.
pyFiles: Path OSS file Python yang menjadi dependensi pekerjaan PySpark. File harus memiliki ekstensi .zip. Pisahkan beberapa paket terkompresi dengan koma (,).
CatatanSemua file Python yang menjadi dependensi aplikasi PySpark harus disimpan di OSS.
args: Argumen untuk program utama. Dalam contoh ini, ini adalah path OSS file data sampel
staff.csv.
Untuk informasi selengkapnya tentang parameter, lihat Parameter.
Kemas lingkungan dependensi menggunakan Lingkungan Virtual
Jika pekerjaan Python Anda memiliki dependensi kompleks, Anda dapat menggunakan fitur lingkungan virtual Python untuk manajemen dan isolasi lingkungan. Spark AnalyticDB for MySQL memungkinkan Anda menggunakan lingkungan virtual untuk mengemas lingkungan dependensi lokal dan mengunggahnya ke OSS. Untuk informasi selengkapnya tentang lingkungan virtual, lihat dokumentasi resmi Python.
AnalyticDB for MySQL Spark menggunakan glibc-devel 2.28. Jika lingkungan virtual Anda tidak kompatibel dengan versi ini, tugas PySpark mungkin gagal.
Metode
Untuk mengemas lingkungan Python menggunakan lingkungan virtual, unggah paket terkompresi ke OSS. Kemudian, saat mengirimkan pekerjaan Spark, konfigurasikan parameter terkait untuk menentukan path OSS paket terkompresi dan path lokal interpreter Python yang akan digunakan.
Tentukan path OSS paket lingkungan Python terkompresi:
Jika paket terkompresi berukuran kecil, konfigurasikan parameter
archives.Jika paket terkompresi berukuran besar, konfigurasikan parameter
spark.executorEnv.ADB_SPARK_DOWNLOAD_FILESdanspark.kubernetes.driverEnv.ADB_SPARK_DOWNLOAD_FILES.
Tentukan path lokal interpreter Python yang akan digunakan melalui parameter
spark.pyspark.python.
Contoh
Persiapkan lingkungan Linux.
Anda harus mengemas lingkungan Python pada sistem operasi Linux. Anda dapat menyiapkan lingkungan Linux dengan salah satu metode berikut. Contoh ini menggunakan instance ECS Alibaba Cloud.
Beli instance ECS Alibaba Cloud yang menjalankan CentOS 7 atau AnolisOS 8. Untuk informasi selengkapnya, lihat Buat instance menggunakan wizard.
Instal CentOS 7, AnolisOS 8, atau versi yang lebih baru di mesin lokal Anda.
Gunakan image Docker resmi untuk CentOS atau AnolisOS guna mengemas lingkungan Python.
Kemas lingkungan runtime Python menggunakan lingkungan virtual dan unggah paket terkompresi ke OSS.
Gunakan Virtualenv atau Conda untuk mengemas lingkungan Python yang menjadi dependensi proyek Anda. Anda dapat menyesuaikan versi Python selama proses pengemasan. Contoh ini menggunakan Virtualenv.
# Buat direktori venv di path saat ini dengan python3 # HARUS MENAMBAHKAN --copies ! virtualenv --copies --download --python python3.7 venv # aktifkan lingkungan source venv/bin/activate # instal modul pihak ketiga pip install scikit-spark==0.4.0 # periksa hasilnya pip list # kompres lingkungan tar -czvf venv.tar.gz venvCatatanJika Anda ingin mengemas dependensi proyek dengan Conda, lihat Managing environments.
Buka editor pengembangan Spark.
Masuk ke Konsol AnalyticDB for MySQL. Di pojok kiri atas konsol, pilih wilayah. Di panel navigasi sebelah kiri, klik Clusters. Temukan kluster yang ingin Anda kelola dan klik ID kluster tersebut.
Di panel navigasi sebelah kiri, klik Job Development > Spark Jar Development.
Di bagian atas jendela editor, pilih kelompok sumber daya pekerjaan dan jenis pekerjaan Spark. Contoh ini menggunakan tipe Batch.
Di editor, jalankan pekerjaan dengan konfigurasi berikut.
{ "name": "venv example", "archives": [ "oss://testBucketname/venv.tar.gz#PY3" ], "conf": { "spark.driver.resourceSpec": "small", "spark.executor.instances": 1, "spark.pyspark.python": "./PY3/venv/bin/python3", "spark.executor.resourceSpec": "small" }, "file": "oss://testBucketname/example.py" }atau
CatatanJika paket terkompresi lingkungan Python berukuran besar, gunakan konfigurasi berikut.
{ "name": "venv example", "conf": { "spark.executorEnv.ADB_SPARK_DOWNLOAD_FILES": "oss://testBucketname/venv_py36.tar.gz#PY3", "spark.kubernetes.driverEnv.ADB_SPARK_DOWNLOAD_FILES": "oss://atestBucketname/venv_py36.tar.gz#PY3,", "spark.driver.resourceSpec": "small", "spark.executor.instances": 1, "spark.pyspark.python": "./PY3/venv/bin/python3", "spark.executor.resourceSpec": "small" }, "file": "oss://testBucketname/example.py" }Parameter:
archives: Path OSS paket lingkungan Python terkompresi. Dalam contoh ini, ini adalah path OSS paket
venv.tar.gz.spark.executorEnv.ADB_SPARK_DOWNLOAD_FILES: Menentukan path OSS paket lingkungan Python terkompresi untuk node Spark Executor.
spark.kubernetes.driverEnv.ADB_SPARK_DOWNLOAD_FILES: Menentukan path OSS paket lingkungan Python terkompresi untuk node Spark Driver.
spark.pyspark.python: Menentukan path lokal interpreter Python yang akan digunakan.
Untuk informasi selengkapnya tentang parameter lainnya, lihat Parameter.