Lindorm コンピュートエンジンは、Spark Python ジョブを送信するための RESTful API を提供しています。この API を使用して、ストリーミングタスク、バッチタスク、機械学習タスク、およびグラフコンピューティングタスクを実行できます。このトピックでは、Lindorm コンピュートエンジン向けの Python ジョブを開発および送信する方法について説明します。
前提条件
Lindorm コンピュートエンジンを有効化しました。詳細については、「サービスの有効化」をご参照ください。
Spark Python ジョブ開発プロセス
ステップ 1: Python ベースの Spark ジョブの定義
「[サンプル Spark ジョブ]」をクリックして、サンプルパッケージをダウンロードします。
ダウンロードしたパッケージを展開します。展開されたフォルダの名前は
lindorm-spark-examplesです。「lindorm-spark-examples/python」ディレクトリに移動し、Python のディレクトリ構造を確認します。このセクションでは、
your_projectをプロジェクトのルートディレクトリと仮定した場合のプロジェクトのディレクトリ構造について説明します。your_projectディレクトリに、__init__.pyという名前の空のファイルを作成します。エントリファイルを変更します。
your_projectディレクトリのパスをsys.pathに追加します。詳細については、「lindorm-spark-examples/python/your_project/main.py」の Notice1 セクションをご参照ください。# Notice1: You need to do the following step to complete the code modification: # Step1: Please add a "__init__.py" to your project directory, your project will act as a module of launcher.py # Step2: Please add current dir to sys.path, you should add the following code to your main file current_dir = os.path.abspath(os.path.dirname(__file__)) sys.path.append(current_dir) print("current dir in your_project: %s" % current_dir) print("sys.path: %s \n" % str(sys.path))エントリーロジックを
main(argv)メソッドにカプセル化します。詳細については、「lindorm-spark-examples/python/your_project/main.py」の Notice2 セクションをご参照ください。# Notice2: Move the code in `if __name__ == "__main__":` branch to a new defined main(argv) function, # so that launcher.py in parent directory just call main(sys.argv) def main(argv): print("Receive arguments: %s \n" % str(argv)) print("current dir in main: %s \n" % os.path.abspath(os.path.dirname(__file__))) # Write your code here if __name__ == "__main__": main(sys.argv)
エントリーファイルを作成して、
main(argv)メソッドを呼び出します。ルートディレクトリyour_projectで、launcher.pyという名前のファイルを作成します。lindorm-spark-examples/python/launcher.pyからコードをコピーできます。
ステップ 2: Python ベースの Spark ジョブのパッケージ化
プロジェクトが依存する Python ランタイム環境とサードパーティライブラリをパッケージ化します。これらの依存関係を tar ファイルにパッケージ化するには、Conda または Virtualenv を使用することを推奨します。詳細については、「Python パッケージ管理」をご参照ください。
重要`spark.archives` パラメーターを使用して、Conda または Virtualenv によって作成された tar ファイルを渡します。`spark.archives` でサポートされているすべてのフォーマットが有効です。詳細については、「spark.archives」をご参照ください。
Lindorm コンピュートエンジンが Python バイナリファイルを認識できるように、このステップは Linux で実行してください。
プロジェクトファイルをパッケージ化します。
your_projectディレクトリを.zipまたは.eggファイルに圧縮します。次のコマンドを実行して、
.zipファイルを作成します。zip -r project.zip your_project「
.egg」ファイルを作成するには、「Eggs のビルド」をご参照ください。
ステップ 3: Python ベースの Spark ジョブファイルのアップロード
次のファイルを OSS にアップロードします。詳細については、「シンプルアップロード」をご参照ください。
ステップ 4: Python ベースの Spark ジョブの送信
Lindorm コンピュートエンジンは、ジョブを送信および管理するための 2 つの方法をサポートしています。
Lindorm コンソールでジョブを送信します。詳細については、「コンソールでのジョブ管理」をご参照ください。
DMS を使用してジョブを送信します。詳細については、「DMS を使用したジョブ管理」をご参照ください。
リクエストパラメーターは、次の 2 つの部分で構成されます。
Python ジョブランタイム環境のパラメーター。例:
{"spark.archives":"oss://testBucketName/pyspark_conda_env.tar.gz#environment", "spark.kubernetes.driverEnv.PYSPARK_PYTHON":"./environment/bin/python","spark.submit.pyFiles":"oss://testBucketName/your_project.zip"}プロジェクトファイル(
.zip、.egg、または.py)を送信する際は、spark.submit.pyFiles を configs パラメーターに設定します。Python ランタイム環境とサードパーティライブラリを含む tar ファイルを送信する場合、`configs` パラメーターで `spark.archives` と `spark.kubernetes.driverEnv.PYSPARK_PYTHON` を設定します。
spark.archives パラメーターでは、番号記号 (#) を使用して targetDir を指定します。
`spark.kubernetes.driverEnv.PYSPARK_PYTHON` を Python 実行可能ファイルのパスに設定します。
OSS にファイルをアップロードする場合は、configs パラメーターに以下のパラメーターを設定してください。
表 1. 設定パラメーター
パラメーター
例
説明
spark.hadoop.fs.oss.endpoint
oss-cn-beijing-internal.aliyuncs.com
Python ファイルを保存する OSS バケットのエンドポイント。
spark.hadoop.fs.oss.accessKeyId
testAccessKey ID
Alibaba Cloud マネジメントコンソールで作成する AccessKey ID と AccessKey Secret。詳細については、「AccessKey ペアの作成」をご参照ください。
spark.hadoop.fs.oss.accessKeySecret
testAccessKey Secret
spark.hadoop.fs.oss.impl
org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem
OSS にアクセスするために使用されるクラス。
説明その他のパラメーターについては、「パラメーター」をご参照ください。
Python ジョブ開発例
「サンプル Spark ジョブ」をクリックして、ファイルをダウンロードして展開します。
「your_project/main.py」ファイルを開き、エントリーポイントを変更します。
your_project ディレクトリを sys.path に追加します。
current_dir = os.path.abspath(os.path.dirname(__file__)) sys.path.append(current_dir) print("current dir in your_project: %s" % current_dir) print("sys.path: %s \n" % str(sys.path))エントリーロジックを main.py ファイルに追加します。以下の例では、SparkSession を初期化します。
from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName("PythonImportTest") \ .getOrCreate() print(spark.conf) spark.stop()
Python ディレクトリで、your_project ディレクトリを ZIP ファイルに圧縮します。
zip -r your_project.zip your_projectLinux で、Conda を使用して Python ランタイム環境をパッケージ化します。
conda create -y -n pyspark_conda_env -c conda-forge numpy conda-pack conda activate pyspark_conda_env conda pack -f -o pyspark_conda_env.tar.gz「your_project.zip」、「pyspark_conda_env.tar.gz」、および「launcher.py」を OSS にアップロードします。
次のいずれかの方法でジョブを送信します。
Lindorm コンソールでジョブを送信します。詳細については、「コンソールでのジョブ管理」をご参照ください。
DMS を使用してジョブを送信します。詳細については、「DMS を使用したジョブ管理」をご参照ください。
ジョブ診断
Python ジョブを送信した後、Jobs ページでそのステータスと Spark UI アドレスを確認できます。 詳細については、「ジョブの表示」をご参照ください。 送信中に問題が発生した場合は、チケットを送信してください。 サポートスタッフにジョブ ID と Spark UI アドレスを提供してください。