全部产品
Search
文档中心

E-MapReduce:Gunakan library Python pihak ketiga dalam pekerjaan PySpark

更新时间:Nov 10, 2025

Tugas PySpark sering memerlukan pustaka Python pihak ketiga untuk meningkatkan kemampuan pemrosesan dan analisis data. Topik ini memberikan contoh terperinci tentang cara mengintegrasikan pustaka tersebut secara efektif ke dalam lingkungan Serverless Spark melalui lingkungan runtime, isolasi lingkungan Conda, dan metode pengemasan ringan PEX, sehingga menjamin stabilitas tugas dan fleksibilitas dalam skenario komputasi terdistribusi.

Informasi latar belakang

Selama pengembangan PySpark interaktif, Anda dapat menggunakan pustaka Python pihak ketiga untuk meningkatkan fleksibilitas dan kemudahan penggunaan dalam pemrosesan serta analisis data. Tiga metode berikut dapat membantu Anda mencapai tujuan ini. Pilih metode yang paling sesuai berdasarkan kebutuhan aktual Anda.

Metode

Skenario

Metode 1: Gunakan pustaka Python pihak ketiga melalui lingkungan runtime

Konfigurasikan lingkungan standar dengan pustaka yang diperlukan (seperti numpy, pandas) di Konsol Manajemen Alibaba Cloud. Sistem akan secara otomatis membangun lingkungan tersebut, dan Anda dapat menggunakan lingkungan runtime yang telah dibuat saat menambahkan tugas baru.

Metode 2: Kelola lingkungan Python melalui Conda

Conda adalah sistem manajemen paket dan lingkungan lintas platform. Anda dapat menggunakan Conda untuk dengan mudah membuat, menyimpan, memuat, dan beralih antar lingkungan yang memiliki versi Python dan dependensi pustaka berbeda.

Metode 3: Kemas dependensi Python melalui PEX

PEX (Python EXecutable) adalah alat yang dapat Anda gunakan untuk mengemas aplikasi Python beserta dependensinya ke dalam file yang dapat dieksekusi.

Prasyarat

Sebuah ruang kerja telah dibuat. Untuk informasi lebih lanjut, lihat Buat Ruang Kerja.

Batasan

Anda harus menginstal Python 3.8 atau versi yang lebih baru. Dalam contoh ini, Python 3.8 digunakan.

Prosedur

Metode 1: Gunakan library Python pihak ketiga melalui lingkungan runtime

Langkah 1: Buat lingkungan runtime

  1. Buka halaman Manajemen Lingkungan Runtime.

    1. Masuk ke Konsol E-MapReduce.

    2. Pada panel navigasi sebelah kiri, pilih EMR Serverless > Spark.

    3. Pada halaman Spark, klik nama ruang kerja yang dituju.

    4. Pada halaman EMR Serverless Spark, klik Runtime Environment Management pada panel navigasi sebelah kiri.

  2. Klik Create Runtime Environment.

  3. Pada halaman Create Runtime Environment, klik Add Library.

    Untuk informasi selengkapnya tentang parameter, lihat Kelola lingkungan runtime.

  4. Pada kotak dialog New Library, gunakan tipe sumber PyPI, konfigurasikan parameter PyPI Package, lalu klik OK.

    Pada kolom PyPI Package, masukkan nama dan versi pustaka. Jika Anda tidak menentukan versi, versi terbaru akan diinstal secara default.

    Dalam contoh ini, pustaka faker dan geopy ditambahkan.

  5. Klik Create.

    Setelah lingkungan runtime dibuat, sistem akan mulai menginisialisasi lingkungan tersebut.

Langkah 2: Unggah file sumber daya ke OSS

  1. Klik pyspark_third_party_libs_demo.py untuk mengunduh file sumber daya yang diperlukan.

    Contoh ini menunjukkan cara menggunakan PySpark dan pustaka pihak ketiga untuk menghasilkan data analog dan melakukan analisis geografis. Pustaka faker digunakan untuk menghasilkan data analog yang berisi informasi pengguna dan lokasi geografis acak. Pustaka geopy digunakan untuk menghitung jarak geografis antara lokasi setiap pengguna dan Menara Eiffel. Terakhir, pengguna dalam jangkauan 10 kilometer disaring.

    Anda juga dapat membuat skrip sampel pyspark_third_party_libs_demo.py dengan konten berikut:

    pyspark_third_party_libs_demo.py

    from pyspark.sql import SparkSession   
    from pyspark.sql.functions import udf
    from pyspark.sql.types import FloatType
    from faker import Faker
    import random
    from geopy.distance import geodesic
    
    spark = SparkSession.builder \
            .appName("PySparkThirdPartyLibsDemo") \
            .getOrCreate()
    
    # Gunakan library pihak ketiga faker untuk menghasilkan data analog
    fake = Faker()
    landmark = (48.8584, 2.2945)  # Koordinat Menara Eiffel
    
    # Buat fungsi untuk menghasilkan data analog
    def generate_fake_data(num_records):
        data = []
        for _ in range(num_records):
            # Hasilkan koordinat acak di sekitar Paris
            lat = 48.85 + random.uniform(-0.2, 0.2)
            lon = 2.30 + random.uniform(-0.2, 0.2)
            data.append((
                fake.uuid4(),        # ID Pengguna
                fake.name(),         # Nama
                lat,                 # Latitude
                lon                  # Longitude
            ))
        return data
    
    # Hasilkan 100 catatan analog
    fake_data = generate_fake_data(100)
    
    # Buat DataFrame Spark
    columns = ["user_id", "name", "latitude", "longitude"]
    df = spark.createDataFrame(fake_data, schema=columns)
    
    # Cetak data sampel yang dihasilkan
    print("Generated sample data:")
    df.show(5)
    
    # Gunakan library pihak ketiga geopy untuk menghitung jarak
    def calculate_distance(lat, lon, landmark=landmark):
        """Hitung jarak geografis antara dua titik (kilometer)"""
        user_location = (lat, lon)
        return geodesic(user_location, landmark).kilometers
    
    # Daftarkan UDF (User Defined Function)
    distance_udf = udf(calculate_distance, FloatType())
    
    # Tambahkan kolom jarak
    df_with_distance = df.withColumn(
        "distance_km", 
        distance_udf("latitude", "longitude")
    )
    
    # Temukan pengguna dalam jangkauan 10 kilometer
    nearby_users = df_with_distance.filter("distance_km <= 10")
    
    # Cetak hasilnya
    print(f"\nFound {nearby_users.count()} users within a 10-kilometer range:")
    nearby_users.select("name", "latitude", "longitude", "distance_km").show(10)
    
  2. Unggah pyspark_third_party_libs_demo.py ke OSS. Untuk informasi selengkapnya, lihat Unggah sederhana.

Langkah 3: Kembangkan dan jalankan pekerjaan

  1. Pada panel navigasi sebelah kiri halaman EMR Serverless Spark, klik Data Development.

  2. Pada tab Development, klik ikon image.

  3. Pada kotak dialog Create, masukkan nama, pilih Batch Job > PySpark dari daftar drop-down Jenis, lalu klik OK.

  4. Pada pojok kanan atas, pilih antrian.

  5. Pada tab konfigurasi pekerjaan, konfigurasikan parameter berikut dan klik Run. Anda tidak perlu mengonfigurasi parameter lainnya.

    Parameter

    Deskripsi

    Main Python Resources

    Pilih OSS dan masukkan path OSS file pyspark_third_party_libs_demo.py. Contoh: oss://<yourBucketName>/pyspark_third_party_libs_demo.py.

    Runtime Environment

    Pilih lingkungan runtime yang telah Anda buat dari daftar drop-down.

  6. Setelah pekerjaan dijalankan, klik Log Explorer pada kolom Tindakan pekerjaan di bagian Execution Records.

  7. Pada tab Log Explorer, Anda dapat melihat log terkait.

    Sebagai contoh, pada tab Stdout bagian Driver Log, Anda dapat melihat informasi berikut:

    Generated sample data:
    +--------------------+-------------------+------------------+------------------+
    |             user_id|               name|          latitude|         longitude|
    +--------------------+-------------------+------------------+------------------+
    |73d4565c-8cdf-4bc...|  Garrett Robertson| 48.81845614776422|2.4087517234236064|
    |0fc364b1-6759-416...|      Dawn Gonzalez| 48.68654896170054|2.4708555780468013|
    |2ab1f0aa-5552-4e1...|Alexander Gallagher| 48.87603770688707|2.1209399987431246|
    |1cabbdde-e703-4a8...|       David Morris|48.656356532418116|2.2503952330408175|
    |8b7938a0-b283-401...|    Shannon Perkins| 48.82915001905855| 2.410743969589327|
    +--------------------+-------------------+------------------+------------------+
    only showing top 5 rows
    
    
    Found 24 users within a 10-kilometer range:
    +-----------------+------------------+------------------+-----------+
    |             name|          latitude|         longitude|distance_km|
    +-----------------+------------------+------------------+-----------+
    |Garrett Robertson| 48.81845614776422|2.4087517234236064|   9.490705|
    |  Shannon Perkins| 48.82915001905855| 2.410743969589327|   9.131355|
    |      Alex Harris| 48.82547383207313|2.3579336032430027|   5.923493|
    |      Tammy Ramos| 48.84668267431606|2.3606455536493574|   5.026109|
    |   Ivan Christian| 48.89224239228342|2.2811025348668195|  3.8897192|
    |  Vernon Humphrey| 48.93142188723839| 2.306957802222233|   8.171813|
    |  Shawn Rodriguez|48.919907710882654|2.2270993307836044|   8.439087|
    |    Robert Fisher|48.794216103154646|2.3699024070507906|   9.033209|
    |  Heather Collier|48.822957591865205|2.2993033803043454|   3.957171|
    |       Dawn White|48.877816307255586|2.3743880390928878|   6.246059|
    +-----------------+------------------+------------------+-----------+
    only showing top 10 rows

Metode 2: Kelola lingkungan Python melalui Conda

Langkah 1: Buat dan sebarkan lingkungan Conda

  1. Buat instans Elastic Compute Service (ECS) yang menggunakan OS Alibaba Cloud Linux 3, terhubung ke Internet, dan memiliki arsitektur x86. Untuk informasi selengkapnya, lihat Buat instans pada tab Peluncuran Kustom.

    Catatan

    Anda juga dapat menggunakan node idle di kluster EMR yang sudah ada yang dibuat pada halaman EMR on ECS (pastikan node tersebut memiliki arsitektur x86).

  2. Jalankan perintah berikut untuk menginstal Miniconda:

    wget https://repo.continuum.io/miniconda/Miniconda3-latest-Linux-x86_64.sh
    chmod +x Miniconda3-latest-Linux-x86_64.sh
    
    ./Miniconda3-latest-Linux-x86_64.sh -b
    source miniconda3/bin/activate
  3. Buat lingkungan Conda yang menggunakan Python 3.8 dan NumPy.

    conda create -y -n pyspark_conda_env -c conda-forge conda-pack numpy python=3.8
    conda activate pyspark_conda_env
    conda pack -f -o pyspark_conda_env.tar.gz

Langkah 2: Unggah file sumber daya ke OSS

  1. Klik kmeans.py dan kmeans_data.txt untuk mengunduh file sumber daya yang diperlukan.

    Anda juga dapat membuat skrip sampel kmeans.py dan file data kmeans_data.txt dengan konten berikut:

    kmeans.py

    """
    Program pengelompokan K-means menggunakan MLlib.
    
    Contoh ini memerlukan NumPy (http://www.numpy.org/).
    """
    import sys
    
    import numpy as np
    from pyspark import SparkContext
    from pyspark.mllib.clustering import KMeans
    
    
    def parseVector(line):
        return np.array([float(x) for x in line.split(' ')])
    
    
    if __name__ == "__main__":
        if len(sys.argv) != 3:
            print("Penggunaan: kmeans <file> <k>", file=sys.stderr)
            sys.exit(-1)
        sc = SparkContext(appName="KMeans")
        lines = sc.textFile(sys.argv[1])
        data = lines.map(parseVector)
        k = int(sys.argv[2])
        model = KMeans.train(data, k)
        print("Pusat akhir: " + str(model.clusterCenters))
        print("Total Biaya: " + str(model.computeCost(data)))
        sc.stop()

    kmeans_data.txt

    0.0 0.0 0.0
    0.1 0.1 0.1
    0.2 0.2 0.2
    9.0 9.0 9.0
    9.1 9.1 9.1
    9.2 9.2 9.2
  2. Unggah file pyspark_conda_env.tar.gz, kmeans.py, dan kmeans_data.txt ke OSS. Untuk informasi selengkapnya, lihat Unggah sederhana.

Langkah 3: Kembangkan dan jalankan pekerjaan

  1. Pada panel navigasi sebelah kiri halaman EMR Serverless Spark, klik Data Development.

  2. Pada tab Development, klik ikon image.

  3. Pada kotak dialog Create, masukkan nama, pilih Batch Job > PySpark dari daftar drop-down Jenis, lalu klik OK.

  4. Pada pojok kanan atas, pilih antrian.

  5. Pada tab konfigurasi pekerjaan, konfigurasikan parameter berikut dan klik Run. Anda tidak perlu mengonfigurasi parameter lainnya.

    Parameter

    Deskripsi

    Main Python Resources

    Pilih OSS dan masukkan path OSS file kmeans.py. Contoh: oss://<yourBucketName>/kmeans.py.

    Execution Parameters

    Masukkan path OSS file data kmeans_data.txt.

    Format: oss://<yourBucketName>/kmeans_data.txt 2.

    Archive Resources

    Pilih OSS dan masukkan path OSS file pyspark_conda_env.tar.gz.

    Format: oss://<yourBucketName>/pyspark_conda_env.tar.gz#condaenv.

    Spark Configuration

    spark.pyspark.driver.python  ./condaenv/bin/python
    spark.pyspark.python         ./condaenv/bin/python
  6. Setelah pekerjaan dijalankan, klik Log Explorer pada kolom Tindakan pekerjaan di bagian Execution Records.

  7. Pada tab Log Explorer, Anda dapat melihat log terkait.

    Sebagai contoh, pada tab Stdout bagian Driver Log, Anda dapat melihat informasi berikut:

    Final centers: [array([0.1, 0.1, 0.1]), array([9.1, 9.1, 9.1])]
    Total Cost: 0.11999999999999958

Metode 3: Kemas dependensi Python melalui PEX

Langkah 1: Kemas dan eksekusi file PEX

  1. Buat instans Elastic Compute Service (ECS) yang menggunakan OS Alibaba Cloud Linux 3, terhubung ke Internet, dan memiliki arsitektur x86. Untuk informasi selengkapnya, lihat Buat instans pada tab Peluncuran Kustom.

    Catatan

    Anda juga dapat menggunakan node idle di kluster EMR yang sudah ada yang dibuat pada halaman EMR on ECS (pastikan node tersebut memiliki arsitektur x86).

  2. Instal alat PEX dan wheel.

    pip3.8 install --user pex wheel \
      --trusted-host mirrors.cloud.aliyuncs.com \
      -i http://mirrors.cloud.aliyuncs.com/pypi/simple/
  3. Unduh file wheel dari pustaka Python pihak ketiga yang ingin Anda gunakan ke direktori sementara.

    pip3.8 wheel -w /tmp/wheel \
      pyspark==3.3.1 pandas==1.5.3 pyarrow==15.0.1 numpy==1.24.4 \
      --trusted-host mirrors.cloud.aliyuncs.com \
      -i http://mirrors.cloud.aliyuncs.com/pypi/simple/
  4. Hasilkan file PEX.

    pex -f /tmp/wheel --no-index \
      pyspark==3.3.1 pandas==1.5.3 pyarrow==15.0.1 numpy==1.24.4 \
      -o spark331_pandas153.pex

Langkah 2: Unggah file PEX ke OSS

  1. Klik kmeans.py dan kmeans_data.txt untuk mengunduh file sumber daya yang diperlukan.

  2. Unggah file spark331_pandas153.pex, kmeans.py, dan kmeans_data.txt ke OSS. Untuk informasi selengkapnya, lihat Unggah sederhana.

    Catatan

    Dalam contoh ini, Spark 3.3.1 digunakan dan file PEX berisi pustaka Python pihak ketiga Pandas, PyArrow, dan NumPy. Anda dapat mengemas lingkungan PySpark versi lain berdasarkan versi Spark yang Anda pilih. Untuk informasi lebih lanjut tentang versi mesin, lihat Versi Mesin.

Langkah 3: Kembangkan dan jalankan pekerjaan

  1. Pada panel navigasi sebelah kiri halaman EMR Serverless Spark, klik Data Development.

  2. Pada tab Development, klik ikon image.

  3. Pada kotak dialog Create, masukkan nama, pilih Batch Job > PySpark dari daftar drop-down Jenis, lalu klik OK.

  4. Pada pojok kanan atas, pilih antrian.

  5. Pada tab konfigurasi pekerjaan, konfigurasikan parameter berikut dan klik Run. Anda tidak perlu mengonfigurasi parameter lainnya.

    Parameter

    Deskripsi

    Main Python Resources

    Pilih OSS dan masukkan path OSS file kmeans.py. Contoh: oss://<yourBucketName>/kmeans.py.

    Execution Parameters

    Masukkan path OSS file data kmeans_data.txt.

    Format: oss://<yourBucketName>/kmeans_data.txt 2.

    File Resources

    Pilih OSS dan masukkan path OSS file spark331_pandas153.pex. Contoh: oss://<yourBucketName>/spark331_pandas153.pex.

    Spark Configuration

    spark.pyspark.driver.python            ./spark331_pandas153.pex
    spark.pyspark.python                   ./spark331_pandas153.pex
  6. Setelah pekerjaan dijalankan, klik Log Explorer pada kolom Tindakan pekerjaan di bagian Execution Records.

  7. Pada tab Log Explorer, Anda dapat melihat log terkait.

    Sebagai contoh, pada tab Stdout bagian Driver Log, Anda dapat melihat informasi berikut:

    Final centers: [array([0.1, 0.1, 0.1]), array([9.1, 9.1, 9.1])]
    Total Cost: 0.11999999999999958

Referensi

Topik ini menggunakan pengembangan PySpark sebagai contoh. Jika Anda ingin mengembangkan pekerjaan dengan cara lain, lihat Kembangkan pekerjaan batch atau streaming.