Sistem Pemrosesan Terdistribusi Lindorm (LDPS) menyediakan API RESTful yang memungkinkan Anda mengirimkan pekerjaan Spark berbasis Python. Anda dapat menjalankan tugas pemrosesan aliran, pemrosesan batch, pembelajaran mesin, dan komputasi graf menggunakan pekerjaan Spark berbasis Python. Topik ini menjelaskan cara membuat pekerjaan dalam Python dan mengirimkannya ke LDPS untuk dieksekusi.
Prasyarat
LDPS harus diaktifkan untuk instance Lindorm Anda. Untuk informasi lebih lanjut, lihat Aktifkan LDPS dan Modifikasi Konfigurasi.
Prosedur
Langkah 1: Tentukan pekerjaan Spark berbasis Python
Unduh demo yang disediakan oleh LDPS dengan mengklik Contoh Pekerjaan Spark.
Ekstrak file dari paket yang diunduh. Contoh pekerjaan terdapat dalam direktori
lindorm-spark-examples. Masuk ke direktorilindorm-spark-examples/pythondan tinjau file di folder tersebut.Direktori root proyek dalam contoh ini adalah
your_project. Ikuti langkah-langkah berikut untuk membuat pekerjaan Spark berbasis Python:Buat file kosong bernama
__init__.pydi direktoriyour_project.Modifikasi file main.py.
Buka file
lindorm-spark-examples/python/your_project/main.pydan tambahkan jalur fileyour_projectkesys.path. Untuk detail lebih lanjut, lihat bagian Notice1 dalam file main.py.# Notice1: Anda perlu melakukan langkah berikut untuk menyelesaikan modifikasi kode: # Langkah1: Harap tambahkan "__init__.py" ke direktori proyek Anda, proyek Anda akan bertindak sebagai modul dari launcher.py # Langkah2: Harap tambahkan direktori saat ini ke sys.path, Anda harus menambahkan kode berikut ke file utama Anda current_dir = os.path.abspath(os.path.dirname(__file__)) sys.path.append(current_dir) print("direktori saat ini di your_project: %s" % current_dir) print("sys.path: %s \n" % str(sys.path))Kapsulkan logika entri ke metode
main(argv)dalam file main.py. Untuk detail lebih lanjut, lihat bagian Notice2 dalam filelindorm-spark-examples/python/your_project/main.py.# Notice2: Pindahkan kode di cabang `if __name__ == "__main__":` ke fungsi main(argv) yang baru didefinisikan, # sehingga launcher.py di direktori induk hanya memanggil main(sys.argv) def main(argv): print("Menerima argumen: %s \n" % str(argv)) print("direktori saat ini di main: %s \n" % os.path.abspath(os.path.dirname(__file__))) # Tulis kode Anda di sini if __name__ == "__main__": main(sys.argv)
Buat file entri yang digunakan untuk memanggil metode
main(argv)untuk memulai pekerjaan Spark berbasis Python. Dalam demo, filelauncher.pydi direktorilindorm-spark-examples/pythonadalah file entri. Gunakan kode dalam filelauncher.pyuntuk memulai pekerjaan Spark berbasis Python.
Langkah 2: Kemas pekerjaan Spark berbasis Python
Kemas runtime Python dan pustaka kelas pihak ketiga tempat proyek bergantung. Kami merekomendasikan penggunaan Conda atau Virtualenv untuk mengemas pustaka dependen ke dalam paket tar. Untuk informasi lebih lanjut, lihat Manajemen Paket Python.
PentingAnda dapat mengonfigurasi parameter spark.archives untuk menentukan paket yang dihasilkan oleh Conda atau Virtualenv sebagai file yang akan diekstraksi dan didistribusikan. Oleh karena itu, pekerjaan dapat dikemas menjadi file dari semua jenis yang didukung oleh parameter ini. Untuk informasi lebih lanjut, lihat spark.archives.
Operasi ini harus diselesaikan di Linux untuk memastikan bahwa LDPS dapat mengenali file biner dalam Python.
Kemas file proyek
your_projectke fileZIPatauEGG.Jalankan perintah berikut untuk mengemas file proyek
your_projectke file ZIP:zip -r project.zip your_projectUntuk informasi tentang cara mengemas file proyek
your_projectke file EGG, lihat Membangun Eggs.
Langkah 3: Unggah file pekerjaan Spark berbasis Python
Unggah file berikut ke Bucket Object Storage Service (OSS). Untuk informasi lebih lanjut, lihat Unggah Sederhana.
Langkah 4: Kirimkan pekerjaan Spark berbasis Python
LDPS memungkinkan Anda mengirimkan dan mengelola pekerjaan menggunakan dua metode berikut:
Kirimkan pekerjaan di konsol Lindorm. Untuk informasi lebih lanjut, lihat Kelola Pekerjaan di Konsol Lindorm.
Kirimkan pekerjaan menggunakan Data Management (DMS). Untuk informasi lebih lanjut, lihat Gunakan DMS untuk Mengelola Pekerjaan.
Konfigurasikan dua jenis parameter permintaan berikut saat mengirimkan pekerjaan:
Parameter untuk menentukan lingkungan runtime pekerjaan berbasis Python. Contoh:
{"spark.archives":"oss://testBucketName/pyspark_conda_env.tar.gz#environment", "spark.kubernetes.driverEnv.PYSPARK_PYTHON":"./environment/bin/python","spark.submit.pyFiles":"oss://testBucketName/your_project.zip"}Untuk mengirimkan file proyek, konfigurasikan parameter spark.submit.pyFiles dalam nilai parameter configs. Tentukan jalur OSS file
ZIP,EGG, atau filelauncher.pyproyek.Untuk mengirimkan file .tar yang berisi runtime Python dan pustaka kelas pihak ketiga, konfigurasikan parameter spark.archives dan parameter spark.kubernetes.driverEnv.PYSPARK_PYTHON dalam nilai parameter configs.
Gunakan tanda pagar (#) untuk menentukan nilai parameter targetDir saat mengonfigurasi parameter spark.archives.
Parameter spark.kubernetes.driverEnv.PYSPARK_PYTHON menentukan direktori tempat file lingkungan runtime Python disimpan.
Parameter yang diperlukan untuk mengunggah file ke OSS. Sertakan parameter dalam nilai parameter configs.
Tabel 1 Parameter dalam configs
Parameter
Contoh
Deskripsi
spark.hadoop.fs.oss.endpoint
oss-cn-beijing-internal.aliyuncs.com
Titik akhir Bucket OSS tempat Anda ingin menyimpan file.
spark.hadoop.fs.oss.accessKeyId
testAccessKey ID
ID Access Key dan Secret Access Key yang digunakan untuk otorisasi identitas. Anda dapat membuat atau mendapatkan ID Access Key dan Secret Access Key di Konsol Manajemen Alibaba Cloud. Untuk informasi lebih lanjut, lihat Dapatkan pasangan AccessKey.
spark.hadoop.fs.oss.accessKeySecret
testAccessKey Secret
spark.hadoop.fs.oss.impl
Atur nilainya menjadi org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem.
Kelas yang digunakan untuk mengakses OSS.
CatatanUntuk informasi lebih lanjut tentang parameter, lihat Parameter.
Contoh
Unduh file yang berisi demo dengan mengklik Contoh Pekerjaan Spark, lalu ekstrak file yang diunduh.
Buka file your_project/main.py dan modifikasi titik entri proyek Python.
Tambahkan jalur file your_project ke sys.path.
current_dir = os.path.abspath(os.path.dirname(__file__)) sys.path.append(current_dir) print("direktori saat ini di your_project: %s" % current_dir) print("sys.path: %s \n" % str(sys.path))Tentukan logika entri dalam file main.py. Contoh kode berikut menunjukkan cara menginisialisasi sesi SparkSession.
from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName("PythonImportTest") \ .getOrCreate() print(spark.conf) spark.stop()
Kemas file your_project di direktori Python ke file .zip.
zip -r your_project.zip your_projectDi Linux, gunakan Conda untuk mengemas file lingkungan runtime Python.
conda create -y -n pyspark_conda_env -c conda-forge numpy conda-pack conda activate pyspark_conda_env conda pack -f -o pyspark_conda_env.tar.gzUnggah file your_project.zip, file pyspark_conda_env.tar.gz, dan file launcher.py ke OSS.
Kirimkan pekerjaan menggunakan salah satu metode berikut:
Kirimkan pekerjaan di konsol Lindorm. Untuk informasi lebih lanjut, lihat Kelola Pekerjaan di Konsol Lindorm.
Kirimkan pekerjaan menggunakan DMS. Untuk informasi lebih lanjut, lihat Gunakan DMS untuk Mengelola Pekerjaan.
Diagnosis pekerjaan
Setelah pekerjaan dikirimkan, Anda dapat melihat status pekerjaan dan alamat antarmuka pengguna web Spark di halaman Jobs. Untuk informasi lebih lanjut, lihat Lihat Detail Pekerjaan Spark. Jika Anda mengalami masalah saat mengirimkan pekerjaan, kirim tiket dan berikan ID pekerjaan serta alamat antarmuka web Spark.