すべてのプロダクト
Search
ドキュメントセンター

E-MapReduce:PySpark ジョブでサードパーティの Python ライブラリを使用する

最終更新日:Nov 09, 2025

PySpark タスクでは、データ処理と分析機能を強化するために、サードパーティの Python ライブラリが必要になることがよくあります。このトピックでは、ランタイム環境、Conda 環境の分離、PEX 軽量パッケージングメソッドを通じて、これらのライブラリを Serverless Spark 環境に効果的に統合する方法の詳細な例を示し、分散コンピューティングシナリオにおけるタスクの安定性と柔軟性を確保します。

背景情報

インタラクティブな PySpark 開発中に、サードパーティの Python ライブラリを使用して、データ処理と分析の柔軟性と使いやすさを向上させることができます。次の 3 つの方法でこの目標を達成できます。実際のニーズに基づいて最適な方法を選択することをお勧めします。

方法

シナリオ

方法 1: ランタイム環境を通じてサードパーティの Python ライブラリを使用する

Alibaba Cloud 管理コンソールで、必要なライブラリ (たとえば、numpypandas) を使用して標準化された環境を構成します。システムは自動的に環境を構築し、新しいタスクを追加するときに作成されたランタイム環境を使用できます。

方法 2: Conda を通じて Python 環境を管理する

Conda は、クロスプラットフォームのパッケージおよび環境管理システムです。Conda を使用すると、異なる Python バージョンとライブラリ依存関係を持つ環境を簡単に作成、保存、ロード、および切り替えることができます。

方法 3: PEX を通じて Python 依存関係をパッケージ化する

PEX (Python EXecutable) は、Python アプリケーションと対応する依存関係を実行可能ファイルにパッケージ化するために使用できるツールです。

前提条件

ワークスペースが作成されていること。詳細については、「ワークスペースの作成」をご参照ください。

制限事項

Python 3.8 以降をインストールする必要があります。この例では、Python 3.8 を使用します。

手順

方法 1: ランタイム環境を通じてサードパーティの Python ライブラリを使用する

ステップ 1: ランタイム環境の作成

  1. ランタイム環境管理ページに移動します。

    1. E-MapReduce コンソールにログインします。

    2. 左側のナビゲーションウィンドウで、[EMR Serverless] > [Spark] を選択します。

    3. [Spark] ページで、対象のワークスペースの名前をクリックします。

    4. [EMR Serverless Spark] ページで、左側のナビゲーションウィンドウにある [ランタイム環境管理] をクリックします。

  2. [ランタイム環境の作成] をクリックします。

  3. [ランタイム環境の作成] ページで、[ライブラリの追加] をクリックします。

    パラメーターの詳細については、「ランタイム環境の管理」をご参照ください。

  4. [新しいライブラリ] ダイアログボックスで、[PyPI] ソースタイプを使用し、[PyPI パッケージ] パラメーターを設定してから、[OK] をクリックします。

    [PyPI パッケージ] フィールドに、ライブラリの名前とバージョンを入力します。バージョンを指定しない場合、デフォルトで最新バージョンがインストールされます。

    この例では、ライブラリ fakergeopy が追加されます。

  5. [作成] をクリックします。

    ランタイム環境が作成されると、システムは環境の初期化を開始します。

ステップ 2: リソースファイルを OSS にアップロードする

  1. pyspark_third_party_libs_demo.py をクリックして、必要なリソースファイルをダウンロードします。

    この例では、PySpark とサードパーティのライブラリを使用してアナログデータを生成し、地理分析を実行する方法を示します。faker ライブラリは、ユーザー情報とランダムな地理的位置を含むアナログデータを生成するために使用されます。geopy ライブラリは、各ユーザーの場所とエッフェル塔の間の地理的距離を計算するために使用されます。最後に、10 キロメートル以内のユーザーがフィルターで除外されます。

    次の内容で pyspark_third_party_libs_demo.py サンプルスクリプトを作成することもできます。

    pyspark_third_party_libs_demo.py

    from pyspark.sql import SparkSession   
    from pyspark.sql.functions import udf
    from pyspark.sql.types import FloatType
    from faker import Faker
    import random
    from geopy.distance import geodesic
    
    spark = SparkSession.builder \
            .appName("PySparkThirdPartyLibsDemo") \
            .getOrCreate()
    
    # サードパーティの faker ライブラリを使用してアナログデータを生成する
    fake = Faker()
    landmark = (48.8584, 2.2945)  # エッフェル塔の座標
    
    # アナログデータを生成する関数を作成する
    def generate_fake_data(num_records):
        data = []
        for _ in range(num_records):
            # パリ近郊のランダムな座標を生成する
            lat = 48.85 + random.uniform(-0.2, 0.2)
            lon = 2.30 + random.uniform(-0.2, 0.2)
            data.append((
                fake.uuid4(),        # ユーザー ID
                fake.name(),         # 名前
                lat,                 # 緯度
                lon                  # 経度
            ))
        return data
    
    # 100 件のアナログレコードを生成する
    fake_data = generate_fake_data(100)
    
    # Spark DataFrame を作成する
    columns = ["user_id", "name", "latitude", "longitude"]
    df = spark.createDataFrame(fake_data, schema=columns)
    
    # 生成されたサンプルデータを表示する
    print("Generated sample data:")
    df.show(5)
    
    # サードパーティの geopy ライブラリを使用して距離を計算する
    def calculate_distance(lat, lon, landmark=landmark):
        """2 点間の地理的距離 (キロメートル) を計算する"""
        user_location = (lat, lon)
        return geodesic(user_location, landmark).kilometers
    
    # UDF (ユーザー定義関数) を登録する
    distance_udf = udf(calculate_distance, FloatType())
    
    # 距離列を追加する
    df_with_distance = df.withColumn(
        "distance_km", 
        distance_udf("latitude", "longitude")
    )
    
    # 10 キロメートル以内のユーザーを検索する
    nearby_users = df_with_distance.filter("distance_km <= 10")
    
    # 結果を表示する
    print(f"\nFound {nearby_users.count()} users within a 10-kilometer range:")
    nearby_users.select("name", "latitude", "longitude", "distance_km").show(10)
    
  2. pyspark_third_party_libs_demo.py を OSS にアップロードします。詳細については、「単純アップロード」をご参照ください。

ステップ 3: ジョブを開発して実行する

  1. EMR Serverless Spark ページの左側のナビゲーションウィンドウで、[データ開発] をクリックします。

  2. [開発] タブで、image アイコンをクリックします。

  3. [作成] ダイアログボックスで、名前を入力し、[タイプ] ドロップダウンリストから [バッチジョブ] > [PySpark] を選択して、[OK] をクリックします。

  4. 右上隅で、キューを選択します。

  5. ジョブの設定タブで、次のパラメーターを設定し、[実行] をクリックします。他のパラメーターを設定する必要はありません。

    パラメーター

    説明

    メイン Python リソース

    [OSS] を選択し、pyspark_third_party_libs_demo.py ファイルの OSS パスを入力します。例: oss://<yourBucketName>/pyspark_third_party_libs_demo.py。

    ランタイム環境

    ドロップダウンリストから作成したランタイム環境を選択します。

  6. ジョブの実行後、[実行記録] セクションのジョブの [アクション] 列で [ログエクスプローラー] をクリックします。

  7. [ログエクスプローラー] タブで、関連するログを表示できます。

    たとえば、[ドライバーログ] セクションの [Stdout] タブで、次の情報を表示できます。

    Generated sample data:
    +--------------------+-------------------+------------------+------------------+
    |             user_id|               name|          latitude|         longitude|
    +--------------------+-------------------+------------------+------------------+
    |73d4565c-8cdf-4bc...|  Garrett Robertson| 48.81845614776422|2.4087517234236064|
    |0fc364b1-6759-416...|      Dawn Gonzalez| 48.68654896170054|2.4708555780468013|
    |2ab1f0aa-5552-4e1...|Alexander Gallagher| 48.87603770688707|2.1209399987431246|
    |1cabbdde-e703-4a8...|       David Morris|48.656356532418116|2.2503952330408175|
    |8b7938a0-b283-401...|    Shannon Perkins| 48.82915001905855| 2.410743969589327|
    +--------------------+-------------------+------------------+------------------+
    only showing top 5 rows
    
    
    Found 24 users within a 10-kilometer range:
    +-----------------+------------------+------------------+-----------+
    |             name|          latitude|         longitude|distance_km|
    +-----------------+------------------+------------------+-----------+
    |Garrett Robertson| 48.81845614776422|2.4087517234236064|   9.490705|
    |  Shannon Perkins| 48.82915001905855| 2.410743969589327|   9.131355|
    |      Alex Harris| 48.82547383207313|2.3579336032430027|   5.923493|
    |      Tammy Ramos| 48.84668267431606|2.3606455536493574|   5.026109|
    |   Ivan Christian| 48.89224239228342|2.2811025348668195|  3.8897192|
    |  Vernon Humphrey| 48.93142188723839| 2.306957802222233|   8.171813|
    |  Shawn Rodriguez|48.919907710882654|2.2270993307836044|   8.439087|
    |    Robert Fisher|48.794216103154646|2.3699024070507906|   9.033209|
    |  Heather Collier|48.822957591865205|2.2993033803043454|   3.957171|
    |       Dawn White|48.877816307255586|2.3743880390928878|   6.246059|
    +-----------------+------------------+------------------+-----------+
    only showing top 10 rows

方法 2: Conda を通じて Python 環境を管理する

ステップ 1: Conda 環境を作成してデプロイする

  1. Alibaba Cloud Linux 3 OS を使用し、インターネットに接続され、x86 アーキテクチャを持つ Elastic Compute Service (ECS) インスタンスを作成します。詳細については、「カスタム起動タブでインスタンスを作成する」をご参照ください。

    説明

    EMR on ECS ページで作成された既存の EMR クラスターのアイドルノードを使用することもできます (ノードが x86 アーキテクチャであることを確認してください)。

  2. 次のコマンドを実行して Miniconda をインストールします。

    wget https://repo.continuum.io/miniconda/Miniconda3-latest-Linux-x86_64.sh
    chmod +x Miniconda3-latest-Linux-x86_64.sh
    
    ./Miniconda3-latest-Linux-x86_64.sh -b
    source miniconda3/bin/activate
  3. Python 3.8 と NumPy を使用する Conda 環境を構築します。

    conda create -y -n pyspark_conda_env -c conda-forge conda-pack numpy python=3.8
    conda activate pyspark_conda_env
    conda pack -f -o pyspark_conda_env.tar.gz

ステップ 2: リソースファイルを OSS にアップロードする

  1. kmeans.pykmeans_data.txt をクリックして、必要なリソースファイルをダウンロードします。

    次の内容で kmeans.py サンプルスクリプトと kmeans_data.txt データファイルを作成することもできます。

    kmeans.py

    """
    MLlib を使用した K-means クラスタリングプログラム。
    
    この例には NumPy (http://www.numpy.org/) が必要です。
    """
    import sys
    
    import numpy as np
    from pyspark import SparkContext
    from pyspark.mllib.clustering import KMeans
    
    
    def parseVector(line):
        return np.array([float(x) for x in line.split(' ')])
    
    
    if __name__ == "__main__":
        if len(sys.argv) != 3:
            print("Usage: kmeans <file> <k>", file=sys.stderr)
            sys.exit(-1)
        sc = SparkContext(appName="KMeans")
        lines = sc.textFile(sys.argv[1])
        data = lines.map(parseVector)
        k = int(sys.argv[2])
        model = KMeans.train(data, k)
        print("Final centers: " + str(model.clusterCenters))
        print("Total Cost: " + str(model.computeCost(data)))
        sc.stop()

    kmeans_data.txt

    0.0 0.0 0.0
    0.1 0.1 0.1
    0.2 0.2 0.2
    9.0 9.0 9.0
    9.1 9.1 9.1
    9.2 9.2 9.2
  2. pyspark_conda_env.tar.gzkmeans.py、および kmeans_data.txt ファイルを OSS にアップロードします。詳細については、「単純アップロード」をご参照ください。

ステップ 3: ジョブを開発して実行する

  1. EMR Serverless Spark ページの左側のナビゲーションウィンドウで、[データ開発] をクリックします。

  2. [開発] タブで、image アイコンをクリックします。

  3. [作成] ダイアログボックスで、名前を入力し、[タイプ] ドロップダウンリストから [バッチジョブ] > [PySpark] を選択して、[OK] をクリックします。

  4. 右上隅で、キューを選択します。

  5. ジョブの設定タブで、次のパラメーターを設定し、[実行] をクリックします。他のパラメーターを設定する必要はありません。

    パラメーター

    説明

    メイン Python リソース

    [OSS] を選択し、kmeans.py ファイルの OSS パスを入力します。例: oss://<yourBucketName>/kmeans.py。

    実行パラメーター

    kmeans_data.txt データファイルの OSS パスを入力します。

    フォーマット: oss://<yourBucketName>/kmeans_data.txt 2

    アーカイブのリソース

    [OSS] を選択し、pyspark_conda_env.tar.gz ファイルの OSS パスを入力します。

    フォーマット: oss://<yourBucketName>/pyspark_conda_env.tar.gz#condaenv

    Spark 構成

    spark.pyspark.driver.python  ./condaenv/bin/python
    spark.pyspark.python         ./condaenv/bin/python
  6. ジョブの実行後、[実行記録] セクションのジョブの [アクション] 列で [ログエクスプローラー] をクリックします。

  7. [ログエクスプローラー] タブで、関連するログを表示できます。

    たとえば、[ドライバーログ] セクションの [Stdout] タブで、次の情報を表示できます。

    Final centers: [array([0.1, 0.1, 0.1]), array([9.1, 9.1, 9.1])]
    Total Cost: 0.11999999999999958

方法 3: PEX を通じて Python 依存関係をパッケージ化する

ステップ 1: PEX ファイルをパッケージ化して実行する

  1. Alibaba Cloud Linux 3 OS を使用し、インターネットに接続され、x86 アーキテクチャを持つ Elastic Compute Service (ECS) インスタンスを作成します。詳細については、「カスタム起動タブでインスタンスを作成する」をご参照ください。

    説明

    EMR on ECS ページで作成された既存の EMR クラスターのアイドルノードを使用することもできます (ノードが x86 アーキテクチャであることを確認してください)。

  2. PEX と wheel ツールをインストールします。

    pip3.8 install --user pex wheel \
      --trusted-host mirrors.cloud.aliyuncs.com \
      -i http://mirrors.cloud.aliyuncs.com/pypi/simple/
  3. 使用したいサードパーティの Python ライブラリの wheel ファイルを一時ディレクトリにダウンロードします。

    pip3.8 wheel -w /tmp/wheel \
      pyspark==3.3.1 pandas==1.5.3 pyarrow==15.0.1 numpy==1.24.4 \
      --trusted-host mirrors.cloud.aliyuncs.com \
      -i http://mirrors.cloud.aliyuncs.com/pypi/simple/
  4. PEX ファイルを生成します。

    pex -f /tmp/wheel --no-index \
      pyspark==3.3.1 pandas==1.5.3 pyarrow==15.0.1 numpy==1.24.4 \
      -o spark331_pandas153.pex

ステップ 2: PEX ファイルを OSS にアップロードする

  1. kmeans.pykmeans_data.txt をクリックして、必要なリソースファイルをダウンロードします。

  2. spark331_pandas153.pexkmeans.py、および kmeans_data.txt ファイルを OSS にアップロードします。詳細については、「単純アップロード」をご参照ください。

    説明

    この例では、Spark 3.3.1 が使用され、PEX ファイルには Pandas、PyArrow、および NumPy のサードパーティ Python ライブラリが含まれています。選択した Spark バージョンに基づいて、他のバージョンの PySpark 環境をパッケージ化できます。エンジンバージョンの詳細については、「エンジンバージョン」をご参照ください。

ステップ 3: ジョブを開発して実行する

  1. EMR Serverless Spark ページの左側のナビゲーションウィンドウで、[データ開発] をクリックします。

  2. [開発] タブで、image アイコンをクリックします。

  3. [作成] ダイアログボックスで、名前を入力し、[タイプ] ドロップダウンリストから [バッチジョブ] > [PySpark] を選択して、[OK] をクリックします。

  4. 右上隅で、キューを選択します。

  5. ジョブの設定タブで、次のパラメーターを設定し、[実行] をクリックします。他のパラメーターを設定する必要はありません。

    パラメーター

    説明

    メイン Python リソース

    [OSS] を選択し、kmeans.py ファイルの OSS パスを入力します。例: oss://<yourBucketName>/kmeans.py。

    実行パラメーター

    kmeans_data.txt データファイルの OSS パスを入力します。

    フォーマット: oss://<yourBucketName>/kmeans_data.txt 2

    ファイルリソース

    [OSS] を選択し、spark331_pandas153.pex ファイルの OSS パスを入力します。例: oss://<yourBucketName>/spark331_pandas153.pex

    Spark 構成

    spark.pyspark.driver.python            ./spark331_pandas153.pex
    spark.pyspark.python                   ./spark331_pandas153.pex
  6. ジョブの実行後、[実行記録] セクションのジョブの [アクション] 列で [ログエクスプローラー] をクリックします。

  7. [ログエクスプローラー] タブで、関連するログを表示できます。

    たとえば、[ドライバーログ] セクションの [Stdout] タブで、次の情報を表示できます。

    Final centers: [array([0.1, 0.1, 0.1]), array([9.1, 9.1, 9.1])]
    Total Cost: 0.11999999999999958

参考

このトピックでは、PySpark 開発を例として使用します。他の方法でジョブを開発する場合は、「バッチジョブまたはストリーミングジョブを開発する」をご参照ください。