すべてのプロダクト
Search
ドキュメントセンター

AnalyticDB:PAI DSWを使用したSparkアプリケーションの開発

最終更新日:Dec 24, 2024

Data Science Workshop (DSW) は、多言語開発のためにPlatform for AI (PAI) によって提供されるクラウド統合開発環境 (IDE) です。 DSWインスタンスからAnalyticDB for MySQLクラスターに接続し、統合されたノートブックおよびターミナル開発環境でPySparkスクリプトを記述してSparkアプリケーションを送信できます。 このトピックでは、DSWインスタンスを使用してSparkアプリケーションを送信する方法について説明します。

前提条件

  • AnalyticDB for MySQLData Lakehouse Editionクラスターが作成されます。

  • AnalyticDB for MySQLクラスターのジョブリソースグループが作成されます。 詳細については、「リソースグループの作成」をご参照ください。

  • AnalyticDB for MySQLクラスター用のデータベースアカウントが作成されます。

  • AnalyticDB for MySQLは、AliyunADBSparkProcessingDataRoleロールを引き受けて他のクラウドリソースにアクセスする権限があります。 詳細については、「権限付与の実行」をご参照ください。

  • Sparkアプリケーションのログストレージパスは、AnalyticDB for MySQLクラスターに設定されています。

    説明

    AnalyticDB for MySQL コンソールにログインします。 管理するクラスターを確認し、クラスター ID をクリックします。 左側のナビゲーションウィンドウで、ジョブを開発する > Spark Jar 開発 を選択します。 ログ設定 をクリックします。 表示されるダイアログボックスで、デフォルトパスを選択するか、カスタムストレージパスを指定します。 カスタムストレージパスをOSSのルートディレクトリに設定することはできません。 カスタムストレージパスに少なくとも1つのレイヤーのフォルダーが含まれていることを確認します。

手順1: PAI DSWインスタンスの作成と設定

  1. PAIを有効にしてワークスペースを作成します。 詳細については、「PAIの有効化」および「ワークスペースの作成」をご参照ください。

    AnalyticDB for MySQLクラスターと同じリージョンでPAIを有効化する必要があります。

  2. DSWインスタンスを作成します。 詳細については、「DSWインスタンスの作成」をご参照ください。

    ImageパラメーターをImage Addressに設定し、AnalyticDB for MySQL Spark Livyイメージアドレスregistry.cn-hangzhou.aliyuncs.com/adb-public-image/adb-spark-public-image:livy.0.5.preを入力する必要があります。 ビジネス要件に基づいて他のパラメーターを設定できます。

  3. DSWインスタンスにアクセスします。 詳細については、「DSWインスタンスへのアクセス」トピックの「コンソールでのDSWインスタンスへのアクセス」セクションをご参照ください。

  4. 上部のナビゲーションバーで、[ターミナル] をクリックします。 次のコマンドを実行して、Apache Livyプロキシを起動します。

    cd /root/proxy
    python app.py --db <ClusterID> --rg <Resource Group Name> --e <URL> -i <AK> -k <SK> -t <STS> & 

    下表に、各パラメーターを説明します。

    パラメーター

    必須 / 任意

    説明

    ClusterID

    対象

    AnalyticDB for MySQLクラスターのID。

    リソースグループ名

    対象

    AnalyticDB for MySQLクラスター内のジョブリソースグループの名前。

    URL

    対象

    AnalyticDB for MySQLクラスターのエンドポイント。

    AnalyticDB for MySQLクラスターのエンドポイントの詳細については、「エンドポイント」をご参照ください。

    AKとSK

    特定の条件が満たされればはい

    アクセスに使用されるAlibaba CloudアカウントまたはRAMユーザーのAccessKey IDとAccessKeyシークレットAnalyticDB for MySQL

    AccessKey IDとAccessKeyシークレットの取得方法については、「アカウントと権限」をご参照ください。

    説明

    Alibaba CloudアカウントまたはRAMユーザーを使用する場合にのみ、AKおよびSKパラメーターを設定する必要があります。

    STS

    特定の条件が満たされればはい

    RAMロールの一時的なID資格情報であるSecurity Token Service (STS) トークン。

    許可されたRAMユーザーは、AccessKeyペアを使用してAssumeRole操作を呼び出すことができます。 これにより、RAMユーザーはRAMロールのSTSトークンを取得し、STSトークンを使用してAlibaba Cloudリソースにアクセスできます。

    説明

    RAMユーザーを使用する場合にのみ、STSパラメーターを設定する必要があります。

    次のコマンド出力は、Apache Livyプロキシが起動されたことを示します。

    2024-11-15 11:04:52,125-ADB-INFO: ADB Client Init
    2024-11-15 11:04:52,125-ADB-INFO: Aliyun ADB Proxy is ready
  5. プロセスがポート5000でリッスンするかどうかを確認します。

    手順4の操作が完了したら、netstat -anlp | grep 5000コマンドを実行して、プロセスがポート5000でリッスンしているかどうかを確認できます。

ステップ2: PySpark開発の実行

  1. DSWインスタンスにアクセスします。 詳細については、「DSWインスタンスへのアクセス」トピックの「コンソールでのDSWインスタンスへのアクセス」セクションをご参照ください。

  2. 上部のナビゲーションバーで、[ノートブック] をクリックします。

  3. ノートブックページの上部のナビゲーションバーで、[ファイル] > [新規作成] > [ノートブック] を選択します。 [カーネルの選択] ダイアログボックスで、[Python 3 (ipykernel)] を選択し、[選択] をクリックします。

  4. 次のコマンドを実行してsparkmagicをインストールし、ロードします。

    !pip install sparkmagic
    %load_ext sparkmagic.magics
  5. % manage_sparkコマンドを実行します。

    コマンドを実行すると、[セッションの作成] タブが表示されます。

  6. [セッションの作成] タブで、[言語] パラメーターをpythonに設定し、[セッションの作成] をクリックします。

    重要

    [セッションの作成] を1回だけクリックします。

    [セッションの作成] をクリックすると、[ノートブック] ページの下部に [ビジー] ステータスが表示されます。 セッションが作成されると、ステータスが [アイドル] に変わり、セッションIDが [セッションの管理] タブに表示されます。

    image

  7. PySparkスクリプトを実行します。

    PySparkスクリプトを実行するときは、ビジネスコードの前に % % sparkコマンドを追加して、リモートSparkを指定する必要があります。

    %%spark
    from pyspark.ml.regression import LinearRegression
    from pyspark.ml.linalg import Vectors
    from pyspark.ml.feature import VectorAssembler
    data = [(1, 1), (2, 3), (3, 5), (4, 7), (5, 9)]
    columns = ["feature", "label"]
    df = spark.createDataFrame(data, columns)
    
    assembler = VectorAssembler(inputCols=["feature"], outputCol="feature_vector")
    df_transformed = assembler.transform(df)
    lr = LinearRegression(featuresCol="feature_vector", labelCol="label")
    
    train_data = df_transformed
    model = lr.fit(train_data)
    print("Model coefficients", model.coefficients)
    print("Model intercept", model.intercept)
    
    input_data = [[1]]
    input_df = spark.createDataFrame(input_data, ["feature"])
    input_df = assembler.transform(input_df)
    prediction = model.transform(input_df)
    print("Prediction result", prediction.select("prediction").collect()[0][0])