Data Science Workshop (DSW) は、多言語開発のためにPlatform for AI (PAI) によって提供されるクラウド統合開発環境 (IDE) です。 DSWインスタンスからAnalyticDB for MySQLクラスターに接続し、統合されたノートブックおよびターミナル開発環境でPySparkスクリプトを記述してSparkアプリケーションを送信できます。 このトピックでは、DSWインスタンスを使用してSparkアプリケーションを送信する方法について説明します。
前提条件
AnalyticDB for MySQLData Lakehouse Editionクラスターが作成されます。
AnalyticDB for MySQLクラスターのジョブリソースグループが作成されます。 詳細については、「リソースグループの作成」をご参照ください。
AnalyticDB for MySQLクラスター用のデータベースアカウントが作成されます。
Alibaba Cloudアカウントを使用する場合は、特権アカウントを作成するだけで済みます。 詳細については、「データベースアカウントの作成」トピックの「特権アカウントの作成」セクションをご参照ください。
RAM (Resource Access Management) ユーザーを使用する場合は、特権アカウントと標準アカウントを作成し、標準アカウントをRAMユーザーに関連付ける必要があります。 詳細については、「データベースアカウントの作成」および「データベースアカウントの関連付けまたは関連付けの解除」をご参照ください。
AnalyticDB for MySQLは、AliyunADBSparkProcessingDataRoleロールを引き受けて他のクラウドリソースにアクセスする権限があります。 詳細については、「権限付与の実行」をご参照ください。
Sparkアプリケーションのログストレージパスは、AnalyticDB for MySQLクラスターに設定されています。
説明AnalyticDB for MySQL コンソールにログインします。 管理するクラスターを確認し、クラスター ID をクリックします。 左側のナビゲーションウィンドウで、 を選択します。 ログ設定 をクリックします。 表示されるダイアログボックスで、デフォルトパスを選択するか、カスタムストレージパスを指定します。 カスタムストレージパスをOSSのルートディレクトリに設定することはできません。 カスタムストレージパスに少なくとも1つのレイヤーのフォルダーが含まれていることを確認します。
手順1: PAI DSWインスタンスの作成と設定
PAIを有効にしてワークスペースを作成します。 詳細については、「PAIの有効化」および「ワークスペースの作成」をご参照ください。
AnalyticDB for MySQLクラスターと同じリージョンでPAIを有効化する必要があります。
DSWインスタンスを作成します。 詳細については、「DSWインスタンスの作成」をご参照ください。
ImageパラメーターをImage Addressに設定し、AnalyticDB for MySQL Spark Livyイメージアドレス
registry.cn-hangzhou.aliyuncs.com/adb-public-image/adb-spark-public-image:livy.0.5.pre
を入力する必要があります。 ビジネス要件に基づいて他のパラメーターを設定できます。DSWインスタンスにアクセスします。 詳細については、「DSWインスタンスへのアクセス」トピックの「コンソールでのDSWインスタンスへのアクセス」セクションをご参照ください。
上部のナビゲーションバーで、[ターミナル] をクリックします。 次のコマンドを実行して、Apache Livyプロキシを起動します。
cd /root/proxy python app.py --db <ClusterID> --rg <Resource Group Name> --e <URL> -i <AK> -k <SK> -t <STS> &
下表に、各パラメーターを説明します。
パラメーター
必須 / 任意
説明
ClusterID
対象
AnalyticDB for MySQLクラスターのID。
リソースグループ名
対象
AnalyticDB for MySQLクラスター内のジョブリソースグループの名前。
URL
対象
AnalyticDB for MySQLクラスターのエンドポイント。
AnalyticDB for MySQLクラスターのエンドポイントの詳細については、「エンドポイント」をご参照ください。
AKとSK
特定の条件が満たされればはい
アクセスに使用されるAlibaba CloudアカウントまたはRAMユーザーのAccessKey IDとAccessKeyシークレットAnalyticDB for MySQL。
AccessKey IDとAccessKeyシークレットの取得方法については、「アカウントと権限」をご参照ください。
説明Alibaba CloudアカウントまたはRAMユーザーを使用する場合にのみ、AKおよびSKパラメーターを設定する必要があります。
STS
特定の条件が満たされればはい
RAMロールの一時的なID資格情報であるSecurity Token Service (STS) トークン。
許可されたRAMユーザーは、AccessKeyペアを使用してAssumeRole操作を呼び出すことができます。 これにより、RAMユーザーはRAMロールのSTSトークンを取得し、STSトークンを使用してAlibaba Cloudリソースにアクセスできます。
説明RAMユーザーを使用する場合にのみ、STSパラメーターを設定する必要があります。
次のコマンド出力は、Apache Livyプロキシが起動されたことを示します。
2024-11-15 11:04:52,125-ADB-INFO: ADB Client Init 2024-11-15 11:04:52,125-ADB-INFO: Aliyun ADB Proxy is ready
プロセスがポート5000でリッスンするかどうかを確認します。
手順4の操作が完了したら、
netstat -anlp | grep 5000
コマンドを実行して、プロセスがポート5000でリッスンしているかどうかを確認できます。
ステップ2: PySpark開発の実行
DSWインスタンスにアクセスします。 詳細については、「DSWインスタンスへのアクセス」トピックの「コンソールでのDSWインスタンスへのアクセス」セクションをご参照ください。
上部のナビゲーションバーで、[ノートブック] をクリックします。
ノートブックページの上部のナビゲーションバーで、
を選択します。 [カーネルの選択] ダイアログボックスで、[Python 3 (ipykernel)] を選択し、[選択] をクリックします。次のコマンドを実行してsparkmagicをインストールし、ロードします。
!pip install sparkmagic %load_ext sparkmagic.magics
% manage_spark
コマンドを実行します。コマンドを実行すると、[セッションの作成] タブが表示されます。
[セッションの作成] タブで、[言語] パラメーターをpythonに設定し、[セッションの作成] をクリックします。
重要[セッションの作成] を1回だけクリックします。
[セッションの作成] をクリックすると、[ノートブック] ページの下部に [ビジー] ステータスが表示されます。 セッションが作成されると、ステータスが [アイドル] に変わり、セッションIDが [セッションの管理] タブに表示されます。
PySparkスクリプトを実行します。
PySparkスクリプトを実行するときは、ビジネスコードの前に
% % spark
コマンドを追加して、リモートSparkを指定する必要があります。%%spark from pyspark.ml.regression import LinearRegression from pyspark.ml.linalg import Vectors from pyspark.ml.feature import VectorAssembler data = [(1, 1), (2, 3), (3, 5), (4, 7), (5, 9)] columns = ["feature", "label"] df = spark.createDataFrame(data, columns) assembler = VectorAssembler(inputCols=["feature"], outputCol="feature_vector") df_transformed = assembler.transform(df) lr = LinearRegression(featuresCol="feature_vector", labelCol="label") train_data = df_transformed model = lr.fit(train_data) print("Model coefficients", model.coefficients) print("Model intercept", model.intercept) input_data = [[1]] input_df = spark.createDataFrame(input_data, ["feature"]) input_df = assembler.transform(input_df) prediction = model.transform(input_df) print("Prediction result", prediction.select("prediction").collect()[0][0])