すべてのプロダクト
Search
ドキュメントセンター

AnalyticDB:PySpark を使用して Spark アプリケーションを開発する

最終更新日:Nov 09, 2025

このトピックでは、AnalyticDB for MySQL Spark Python ジョブを開発する方法と、仮想環境を使用して Python ジョブのランタイム環境をパッケージングする方法について説明します。

前提条件

PySpark の基本的な使用方法

  1. 次のサンプルプログラムを記述し、example.py として保存します。

    from pyspark.sql import SparkSession
    
    if __name__ == "__main__":
        spark = SparkSession.builder.getOrCreate()
        df = spark.sql("SELECT 1+1")
        df.printSchema()
        df.show()
    
  2. example.py プログラムを OSS にアップロードします。詳細については、「コンソールを使用してファイルをアップロードする」をご参照ください。

  3. Spark 開発エディターに移動します。

    1. AnalyticDB for MySQL コンソールにログインします。コンソールの左上隅でリージョンを選択します。左側のナビゲーションウィンドウで、クラスターリスト をクリックします。管理するクラスターを見つけ、クラスター ID をクリックします。

    2. 左側のナビゲーションウィンドウで、[ジョブ開発] > [Spark Jar 開発] をクリックします。

  4. エディターウィンドウの上部で、ジョブのリソースグループと Spark ジョブのタイプを選択します。この例では、Batch タイプを使用します。

  5. エディターで、次の構成でジョブを実行します。

    {
     "name": "Spark Python Test",
     "file": "oss://testBucketName/example.py",
     "conf": {
     "spark.driver.resourceSpec": "small",
     "spark.executor.instances": 1,
     "spark.executor.resourceSpec": "small"
     }
    }

    パラメーターの詳細については、「パラメーター」をご参照ください。

Python 依存関係の使用

メソッド

Python プログラムにカスタムまたはサードパーティの依存関係が必要な場合は、それらを OSS にアップロードし、Spark ジョブを送信するときに pyFiles パラメーターを構成します。

この例では、ユーザー定義関数をインポートして従業員の税引き後所得を計算する方法を示します。この例では、staff.csv という名前のデータファイルが OSS にアップロードされています。staff.csv ファイルには、次のサンプルデータが含まれています。

name,age,gender,salary
Lucky,25,male,100
Lucy,23,female,150
Martin,30,male,180
Rose,31,female,200
  1. 依存関係を開発し、OSS にアップロードします。

    1. tools という名前のフォルダを作成し、そのフォルダに func.py という名前のプログラムを作成します。

      def tax(salary):
          """
          文字列を整数に変換し、給与から 15% の税金を差し引きます
      
          :param salary: 従業員の給与
          :return:
          """
          return 0.15 * int(salary)
      
    2. tools フォルダを圧縮して OSS にアップロードします。この例では、圧縮ファイルは tools.zip です。

      説明

      ジョブが複数の Python ファイルに依存している場合は、それらを .zip ファイルに圧縮することをお勧めします。これにより、Python コード内で Python ファイルをモジュールとして参照できます。

  2. example.py という名前のサンプルプログラムを記述します。

    from __future__ import print_function
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import udf
    from pyspark.sql.types import FloatType
    
    import sys
    
    # サードパーティファイルをインポート
    from tools import func
    
    if __name__ == "__main__":
        # pyspark コンテキストを初期化
        spark = SparkSession.builder.appName("Python Example").getOrCreate()
        # oss から csv をデータフレームに読み込み、テーブルを表示
        cvs_file = sys.argv[1]
        df = spark.read.csv(cvs_file, mode="DROPMALFORMED", inferSchema=True, header=True)
        # スキーマとデータをコンソールに出力
        df.printSchema()
        df.show()
        # udf を作成
        taxCut = udf(lambda salary: func.tax(salary), FloatType())
        # 給与から税金を差し引き、結果を表示
        df.select("name", taxCut("salary").alias("final salary")).show()
        spark.stop()
    
  3. example.py プログラムを OSS にアップロードします。詳細については、「コンソールを使用してファイルをアップロードする」をご参照ください。

  4. Spark 開発エディターに移動します。

    1. AnalyticDB for MySQL コンソールにログインします。コンソールの左上隅でリージョンを選択します。左側のナビゲーションウィンドウで、クラスターリスト をクリックします。管理するクラスターを見つけ、クラスター ID をクリックします。

    2. 左側のナビゲーションウィンドウで、[ジョブ開発] > [Spark Jar 開発] をクリックします。

  5. エディターウィンドウの上部で、ジョブのリソースグループと Spark ジョブのタイプを選択します。この例では、Batch タイプを使用します。

  6. エディターで、次の構成でジョブを実行します。

    {
     "name": "Spark Python",
     "file": "oss://testBucketName/example.py",
     "pyFiles": ["oss://testBucketName/tools.zip"],
     "args": [
     "oss://testBucketName/staff.csv"
     ],
     "conf": {
     "spark.driver.resourceSpec": "small",
     "spark.executor.instances": 2,
     "spark.executor.resourceSpec": "small"
     }
    }

    パラメーター:

    • file: Python プログラムが格納されている OSS パス。

    • pyFiles: PySpark ジョブが依存する Python ファイルの OSS パス。ファイルには .zip 拡張子が必要です。複数の圧縮パッケージはコンマ (,) で区切ります。

      説明

      PySpark アプリケーションが依存するすべての Python ファイルは OSS に保存する必要があります。

    • args: メインプログラムの引数。この例では、staff.csv サンプルデータファイルの OSS パスです。

    パラメーターの詳細については、「パラメーター」をご参照ください。

仮想環境を使用した依存環境のパッケージング

Python ジョブに複雑な依存関係がある場合、Python の仮想環境機能を使用して環境の管理と隔離を行うことができます。AnalyticDB for MySQL Spark では、仮想環境を使用してローカルの依存関係環境をパッケージングし、OSS にアップロードできます。仮想環境の詳細については、「Python 公式ドキュメント」をご参照ください。

重要

AnalyticDB for MySQL Spark は glibc-devel 2.28 を使用します。仮想環境がこのバージョンと互換性がない場合、PySpark タスクが失敗する可能性があります。

メソッド

仮想環境を使用して Python 環境をパッケージングするには、圧縮パッケージを OSS にアップロードします。次に、Spark ジョブを送信するときに、関連するパラメーターを構成して、圧縮パッケージの OSS パスと使用する Python インタープリターのローカルパスを指定します。

  • 圧縮された Python 環境パッケージの OSS パスを指定します:

    • 圧縮パッケージが小さい場合は、archives パラメーターを構成します。

    • 圧縮パッケージが大きい場合は、spark.executorEnv.ADB_SPARK_DOWNLOAD_FILES および spark.kubernetes.driverEnv.ADB_SPARK_DOWNLOAD_FILES パラメーターを構成します。

  • spark.pyspark.python パラメーターを使用して、使用する Python インタープリターのローカルパスを指定します。

  1. Linux 環境を準備します。

    Linux オペレーティングシステムで Python 環境をパッケージングする必要があります。次のいずれかの方法で Linux 環境を準備できます。この例では、Alibaba Cloud ECS インスタンスを使用します。

    • CentOS 7 または AnolisOS 8 を実行する Alibaba Cloud ECS インスタンスを購入します。詳細については、「ウィザードを使用してインスタンスを作成する」をご参照ください。

    • ローカルマシンに CentOS 7、AnolisOS 8、またはそれ以降のバージョンをインストールします。

    • CentOS または AnolisOS の公式 Docker イメージを使用して Python 環境をパッケージングします。

  2. 仮想環境を使用して Python ランタイム環境をパッケージングし、圧縮パッケージを OSS にアップロードします。

    Virtualenv または Conda を使用して、プロジェクトが依存する Python 環境をパッケージングします。パッケージング中に Python のバージョンをカスタマイズできます。この例では Virtualenv を使用します。

    # 現在のパスに python3 で venv ディレクトリを作成
    # --copies を必ず追加してください!
    virtualenv --copies --download --python python3.7 venv
    
    # 環境をアクティブ化
    source venv/bin/activate
    
    # サードパーティモジュールをインストール
    pip install scikit-spark==0.4.0
    
    # 結果を確認
    pip list
    
    # 環境を圧縮
    tar -czvf venv.tar.gz venv
    説明

    Conda を使用してプロジェクトの依存関係をパッケージングする場合は、「環境の管理」をご参照ください。

  3. Spark 開発エディターに移動します。

    1. AnalyticDB for MySQL コンソールにログインします。コンソールの左上隅でリージョンを選択します。左側のナビゲーションウィンドウで、クラスターリスト をクリックします。管理するクラスターを見つけ、クラスター ID をクリックします。

    2. 左側のナビゲーションウィンドウで、[ジョブ開発] > [Spark Jar 開発] をクリックします。

  4. エディターウィンドウの上部で、ジョブのリソースグループと Spark ジョブのタイプを選択します。この例では、Batch タイプを使用します。

  5. エディターで、次の構成でジョブを実行します。

    {
     "name": "venv example",
     "archives": [
     "oss://testBucketname/venv.tar.gz#PY3"
     ],
     "conf": {
     "spark.driver.resourceSpec": "small",
     "spark.executor.instances": 1,
     "spark.pyspark.python": "./PY3/venv/bin/python3",
     "spark.executor.resourceSpec": "small"
     },
     "file": "oss://testBucketname/example.py"
    }

    または

    説明

    Python 環境の圧縮パッケージが大きい場合は、次の構成を使用します。

    {
     "name": "venv example",
     "conf": {
     "spark.executorEnv.ADB_SPARK_DOWNLOAD_FILES": "oss://testBucketname/venv_py36.tar.gz#PY3",
     "spark.kubernetes.driverEnv.ADB_SPARK_DOWNLOAD_FILES": "oss://atestBucketname/venv_py36.tar.gz#PY3,",
     "spark.driver.resourceSpec": "small",
     "spark.executor.instances": 1,
     "spark.pyspark.python": "./PY3/venv/bin/python3",
     "spark.executor.resourceSpec": "small"
     },
     "file": "oss://testBucketname/example.py"
    }

    パラメーター:

    • archives: 圧縮された Python 環境パッケージの OSS パス。この例では、venv.tar.gz パッケージの OSS パスです。

    • spark.executorEnv.ADB_SPARK_DOWNLOAD_FILES: Spark Executor ノード用の圧縮された Python 環境パッケージの OSS パスを指定します。

    • spark.kubernetes.driverEnv.ADB_SPARK_DOWNLOAD_FILES: Spark Driver ノード用の圧縮された Python 環境パッケージの OSS パスを指定します。

    • spark.pyspark.python: 使用する Python インタープリターのローカルパスを指定します。

    その他のパラメーターの詳細については、「パラメーター」をご参照ください。