このトピックでは、AnalyticDB for MySQL Spark Python ジョブを開発する方法と、仮想環境を使用して Python ジョブのランタイム環境をパッケージングする方法について説明します。
前提条件
AnalyticDB for MySQL Enterprise Edition、Basic Edition、または Data Lakehouse Edition クラスターが作成されていること。
Object Storage Service (OSS) バケットが AnalyticDB for MySQL クラスターと同じリージョンに作成されていること。
AnalyticDB for MySQL Enterprise Edition、Basic Edition、または Data Lakehouse Edition クラスター用にジョブのリソースグループが作成されていること。
AnalyticDB for MySQL クラスター用にデータベースアカウントが作成されていること。
Alibaba Cloud アカウントを使用する場合、特権アカウントを作成するだけで済みます。
RAM ユーザーを使用する場合、特権アカウントと標準アカウントを作成し、標準アカウントを RAM ユーザーに関連付ける必要があります。
PySpark の基本的な使用方法
次のサンプルプログラムを記述し、
example.pyとして保存します。from pyspark.sql import SparkSession if __name__ == "__main__": spark = SparkSession.builder.getOrCreate() df = spark.sql("SELECT 1+1") df.printSchema() df.show()example.pyプログラムを OSS にアップロードします。詳細については、「コンソールを使用してファイルをアップロードする」をご参照ください。Spark 開発エディターに移動します。
AnalyticDB for MySQL コンソールにログインします。コンソールの左上隅でリージョンを選択します。左側のナビゲーションウィンドウで、クラスターリスト をクリックします。管理するクラスターを見つけ、クラスター ID をクリックします。
左側のナビゲーションウィンドウで、[ジョブ開発] > [Spark Jar 開発] をクリックします。
エディターウィンドウの上部で、ジョブのリソースグループと Spark ジョブのタイプを選択します。この例では、Batch タイプを使用します。
エディターで、次の構成でジョブを実行します。
{ "name": "Spark Python Test", "file": "oss://testBucketName/example.py", "conf": { "spark.driver.resourceSpec": "small", "spark.executor.instances": 1, "spark.executor.resourceSpec": "small" } }パラメーターの詳細については、「パラメーター」をご参照ください。
Python 依存関係の使用
メソッド
Python プログラムにカスタムまたはサードパーティの依存関係が必要な場合は、それらを OSS にアップロードし、Spark ジョブを送信するときに pyFiles パラメーターを構成します。
例
この例では、ユーザー定義関数をインポートして従業員の税引き後所得を計算する方法を示します。この例では、staff.csv という名前のデータファイルが OSS にアップロードされています。staff.csv ファイルには、次のサンプルデータが含まれています。
name,age,gender,salary
Lucky,25,male,100
Lucy,23,female,150
Martin,30,male,180
Rose,31,female,200依存関係を開発し、OSS にアップロードします。
toolsという名前のフォルダを作成し、そのフォルダにfunc.pyという名前のプログラムを作成します。def tax(salary): """ 文字列を整数に変換し、給与から 15% の税金を差し引きます :param salary: 従業員の給与 :return: """ return 0.15 * int(salary)toolsフォルダを圧縮して OSS にアップロードします。この例では、圧縮ファイルはtools.zipです。説明ジョブが複数の Python ファイルに依存している場合は、それらを .zip ファイルに圧縮することをお勧めします。これにより、Python コード内で Python ファイルをモジュールとして参照できます。
example.pyという名前のサンプルプログラムを記述します。from __future__ import print_function from pyspark.sql import SparkSession from pyspark.sql.functions import udf from pyspark.sql.types import FloatType import sys # サードパーティファイルをインポート from tools import func if __name__ == "__main__": # pyspark コンテキストを初期化 spark = SparkSession.builder.appName("Python Example").getOrCreate() # oss から csv をデータフレームに読み込み、テーブルを表示 cvs_file = sys.argv[1] df = spark.read.csv(cvs_file, mode="DROPMALFORMED", inferSchema=True, header=True) # スキーマとデータをコンソールに出力 df.printSchema() df.show() # udf を作成 taxCut = udf(lambda salary: func.tax(salary), FloatType()) # 給与から税金を差し引き、結果を表示 df.select("name", taxCut("salary").alias("final salary")).show() spark.stop()example.pyプログラムを OSS にアップロードします。詳細については、「コンソールを使用してファイルをアップロードする」をご参照ください。Spark 開発エディターに移動します。
AnalyticDB for MySQL コンソールにログインします。コンソールの左上隅でリージョンを選択します。左側のナビゲーションウィンドウで、クラスターリスト をクリックします。管理するクラスターを見つけ、クラスター ID をクリックします。
左側のナビゲーションウィンドウで、[ジョブ開発] > [Spark Jar 開発] をクリックします。
エディターウィンドウの上部で、ジョブのリソースグループと Spark ジョブのタイプを選択します。この例では、Batch タイプを使用します。
エディターで、次の構成でジョブを実行します。
{ "name": "Spark Python", "file": "oss://testBucketName/example.py", "pyFiles": ["oss://testBucketName/tools.zip"], "args": [ "oss://testBucketName/staff.csv" ], "conf": { "spark.driver.resourceSpec": "small", "spark.executor.instances": 2, "spark.executor.resourceSpec": "small" } }パラメーター:
file: Python プログラムが格納されている OSS パス。
pyFiles: PySpark ジョブが依存する Python ファイルの OSS パス。ファイルには .zip 拡張子が必要です。複数の圧縮パッケージはコンマ (,) で区切ります。
説明PySpark アプリケーションが依存するすべての Python ファイルは OSS に保存する必要があります。
args: メインプログラムの引数。この例では、
staff.csvサンプルデータファイルの OSS パスです。
パラメーターの詳細については、「パラメーター」をご参照ください。
仮想環境を使用した依存環境のパッケージング
Python ジョブに複雑な依存関係がある場合、Python の仮想環境機能を使用して環境の管理と隔離を行うことができます。AnalyticDB for MySQL Spark では、仮想環境を使用してローカルの依存関係環境をパッケージングし、OSS にアップロードできます。仮想環境の詳細については、「Python 公式ドキュメント」をご参照ください。
AnalyticDB for MySQL Spark は glibc-devel 2.28 を使用します。仮想環境がこのバージョンと互換性がない場合、PySpark タスクが失敗する可能性があります。
メソッド
仮想環境を使用して Python 環境をパッケージングするには、圧縮パッケージを OSS にアップロードします。次に、Spark ジョブを送信するときに、関連するパラメーターを構成して、圧縮パッケージの OSS パスと使用する Python インタープリターのローカルパスを指定します。
圧縮された Python 環境パッケージの OSS パスを指定します:
圧縮パッケージが小さい場合は、
archivesパラメーターを構成します。圧縮パッケージが大きい場合は、
spark.executorEnv.ADB_SPARK_DOWNLOAD_FILESおよびspark.kubernetes.driverEnv.ADB_SPARK_DOWNLOAD_FILESパラメーターを構成します。
spark.pyspark.pythonパラメーターを使用して、使用する Python インタープリターのローカルパスを指定します。
例
Linux 環境を準備します。
Linux オペレーティングシステムで Python 環境をパッケージングする必要があります。次のいずれかの方法で Linux 環境を準備できます。この例では、Alibaba Cloud ECS インスタンスを使用します。
CentOS 7 または AnolisOS 8 を実行する Alibaba Cloud ECS インスタンスを購入します。詳細については、「ウィザードを使用してインスタンスを作成する」をご参照ください。
ローカルマシンに CentOS 7、AnolisOS 8、またはそれ以降のバージョンをインストールします。
CentOS または AnolisOS の公式 Docker イメージを使用して Python 環境をパッケージングします。
仮想環境を使用して Python ランタイム環境をパッケージングし、圧縮パッケージを OSS にアップロードします。
Virtualenv または Conda を使用して、プロジェクトが依存する Python 環境をパッケージングします。パッケージング中に Python のバージョンをカスタマイズできます。この例では Virtualenv を使用します。
# 現在のパスに python3 で venv ディレクトリを作成 # --copies を必ず追加してください! virtualenv --copies --download --python python3.7 venv # 環境をアクティブ化 source venv/bin/activate # サードパーティモジュールをインストール pip install scikit-spark==0.4.0 # 結果を確認 pip list # 環境を圧縮 tar -czvf venv.tar.gz venv説明Conda を使用してプロジェクトの依存関係をパッケージングする場合は、「環境の管理」をご参照ください。
Spark 開発エディターに移動します。
AnalyticDB for MySQL コンソールにログインします。コンソールの左上隅でリージョンを選択します。左側のナビゲーションウィンドウで、クラスターリスト をクリックします。管理するクラスターを見つけ、クラスター ID をクリックします。
左側のナビゲーションウィンドウで、[ジョブ開発] > [Spark Jar 開発] をクリックします。
エディターウィンドウの上部で、ジョブのリソースグループと Spark ジョブのタイプを選択します。この例では、Batch タイプを使用します。
エディターで、次の構成でジョブを実行します。
{ "name": "venv example", "archives": [ "oss://testBucketname/venv.tar.gz#PY3" ], "conf": { "spark.driver.resourceSpec": "small", "spark.executor.instances": 1, "spark.pyspark.python": "./PY3/venv/bin/python3", "spark.executor.resourceSpec": "small" }, "file": "oss://testBucketname/example.py" }または
説明Python 環境の圧縮パッケージが大きい場合は、次の構成を使用します。
{ "name": "venv example", "conf": { "spark.executorEnv.ADB_SPARK_DOWNLOAD_FILES": "oss://testBucketname/venv_py36.tar.gz#PY3", "spark.kubernetes.driverEnv.ADB_SPARK_DOWNLOAD_FILES": "oss://atestBucketname/venv_py36.tar.gz#PY3,", "spark.driver.resourceSpec": "small", "spark.executor.instances": 1, "spark.pyspark.python": "./PY3/venv/bin/python3", "spark.executor.resourceSpec": "small" }, "file": "oss://testBucketname/example.py" }パラメーター:
archives: 圧縮された Python 環境パッケージの OSS パス。この例では、
venv.tar.gzパッケージの OSS パスです。spark.executorEnv.ADB_SPARK_DOWNLOAD_FILES: Spark Executor ノード用の圧縮された Python 環境パッケージの OSS パスを指定します。
spark.kubernetes.driverEnv.ADB_SPARK_DOWNLOAD_FILES: Spark Driver ノード用の圧縮された Python 環境パッケージの OSS パスを指定します。
spark.pyspark.python: 使用する Python インタープリターのローカルパスを指定します。
その他のパラメーターの詳細については、「パラメーター」をご参照ください。