すべてのプロダクト
Search
ドキュメントセンター

AnalyticDB:PAI DSW を使用した Spark アプリケーションの開発

最終更新日:Nov 09, 2025

Data Science Workshop (DSW) は、PAI が提供する機械学習向けのクラウドベースの統合開発環境 (IDE) です。複数の言語と開発環境をサポートしています。DSW インスタンスから AnalyticDB for MySQL クラスターに接続し、Notebook や Terminal などの IDE を使用して PySpark スクリプトを記述し、Spark ジョブを送信できます。このトピックでは、DSW インスタンスから Spark ジョブを送信する方法について説明します。

前提条件

  • AnalyticDB for MySQL Enterprise Edition、Basic Edition、または Data Lakehouse Edition クラスターが作成されていること。

  • AnalyticDB for MySQL クラスターにジョブリソースグループが作成されており、リソースグループに Spark パラメーター spark.adb.version=3.2 が設定されていること。

  • AnalyticDB for MySQL クラスターにデータベースアカウントが作成されていること。

  • AnalyticDB for MySQL に、AliyunADBSparkProcessingDataRole ロールを偽装して他のクラウドリソースにアクセスする権限が付与されていること。

  • AnalyticDB for MySQL クラスターに Spark アプリケーションのログストレージパスが設定されていること。

    説明

    AnalyticDB for MySQL コンソールにログインします。管理するクラスターを見つけ、クラスター ID をクリックします。左側のナビゲーションウィンドウで、ジョブを開発する > Spark Jar 開発 を選択します。ログ設定 をクリックします。表示されるダイアログボックスで、デフォルトパスを選択するか、カスタムストレージパスを指定します。カスタムストレージパスを OSS のルートディレクトリに設定することはできません。カスタムストレージパスに少なくとも 1 つのレイヤーのフォルダーが含まれていることを確認してください。

ステップ 1: PAI DSW インスタンスの作成と設定

  1. PAI をアクティブ化し、ワークスペースを作成します。詳細については、「PAI のアクティブ化」および「ワークスペースの作成と管理」をご参照ください。

    PAI は AnalyticDB for MySQL と同じリージョンにある必要があります。

  2. DSW インスタンスを作成します。

    次のいずれかの方法で DSW インスタンスを作成できます。

    • コンソールで DSW インスタンスを作成できます。詳細については、「DSW インスタンスの作成」をご参照ください。

      [イメージ][イメージ URL] に設定し、AnalyticDB for MySQL Spark の Livy イメージ URL (registry.cn-hangzhou.aliyuncs.com/adb-public-image/adb-spark-public-image:livy.0.5.pre) を入力する必要があります。必要に応じて他のパラメーターを設定できます。

    • チュートリアルで、[DSW で開く] をクリックし、要件を満たす DSW インスタンスを選択するか、新しいインスタンスを作成します。詳細については、「DSW インスタンスの作成」をご参照ください。

      DSW インスタンス作成ページでは、イメージ URL と DSW インスタンスタイプが事前に入力されています。[インスタンス名] を入力し、[OK] をクリックするだけで DSW インスタンスを作成できます。

  3. DSW インスタンスにアクセスします。詳細については、「コンソールからのアクセス」をご参照ください。

  4. 上部のメニューバーで、[Terminal] をクリックし、次の文を実行して Apache Livy プロキシを開始します。

    cd /root/proxy
    python app.py --db <ClusterID> --rg <Resource Group Name> --e <URL> -i <AK> -k <SK> -t <STS> & 

    パラメーター:

    パラメーター

    必須

    説明

    ClusterID

    はい

    AnalyticDB for MySQL クラスターの ID。

    Resource Group Name

    はい

    AnalyticDB for MySQL クラスター内のジョブリソースグループの名前。

    URL

    はい

    AnalyticDB for MySQL クラスターのサービスエンドポイント。

    AnalyticDB for MySQL クラスターのサービスエンドポイントの表示方法については、「サービスエンドポイント」をご参照ください。

    AK, SK

    条件付きで必須

    AnalyticDB for MySQL にアクセスする権限を持つ Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey ID と AccessKey Secret。

    AccessKey ID と AccessKey Secret の取得方法については、「アカウントと権限」をご参照ください。

    説明

    Alibaba Cloud アカウントまたは RAM ユーザーを使用する場合にのみ、AKSK を指定する必要があります。

    STS

    特定の条件下で必須

    RAM ロールの一時的な ID 認証情報であるセキュリティトークンサービス (STS) トークン。

    承認された RAM ユーザーは、AccessKey ペアを使用して AssumeRole 操作を呼び出すことができます。これにより、RAM ユーザーは RAM ロールの STS トークンを取得し、その STS トークンを使用して Alibaba Cloud リソースにアクセスできます。

    説明

    RAM ロールを使用する場合にのみ STS を指定する必要があります。

    次の情報が返された場合、プロキシは正常に開始されています。

    2024-11-15 11:04:52,125-ADB-INFO: ADB Client Init
    2024-11-15 11:04:52,125-ADB-INFO: Aliyun ADB Proxy is ready
  5. プロセスがポート 5000 をリッスンしているかどうかを確認します。

    ステップ 4 が完了したら、netstat -anlp | grep 5000 文を実行して、プロセスがポート 5000 をリッスンしているかどうかを確認できます。

ステップ 2: PySpark アプリケーションの開発

  1. DSW インスタンスにアクセスします。詳細については、「コンソールからのアクセス」をご参照ください。

  2. 上部のナビゲーションバーで、[Notebook] をクリックして [Notebook] ページを開きます。

  3. 上部のメニューバーで、[File] > [New] > [Notebook] を選択します。[Select Kernel] ダイアログボックスで、[Python 3 (ipykernel)] を選択し、[Select] をクリックします。

  4. 次の文を順番に実行して、sparkmagic をインストールしてロードします。

    !pip install sparkmagic
    %load_ext sparkmagic.magics
  5. %manage_spark 文を実行します。

    文を実行すると、[Create Session] タブが表示されます。

  6. [Create Session] タブで、[Language][Python] に設定し、[Create Session] をクリックします。

    重要

    [Create Session] は一度だけクリックしてください。繰り返しクリックしないでください。

    [Create Session] をクリックすると、Notebook ページの下部にあるステータスが [Busy] に変わります。ステータスが [Idle] に変わり、セッション ID が [Manage Session] タブに表示されると、セッションが作成されます。

    image

  7. PySpark スクリプトを実行します。

    PySpark スクリプトを実行するときは、サービスコードの前に %%spark コマンドを必ず追加して、リモート Spark インスタンスが使用されるように指定する必要があります。

    %%spark
    db_sql = """
    CREATE DATABASE IF NOT exists test_db comment 'デモ DB' 
    location 'oss://testBucketName/test'  
    WITH dbproperties(k1='v1', k2='v2')
    """
    
    tb_sql = """
    CREATE TABLE IF NOT exists test_db.test_tbl(id int, name string, age int) 
    using parquet 
    location 'oss://testBucketName/test/test_tbl/' 
    tblproperties ('parquet.compress'='SNAPPY');
    """
    
    insert_sql = """
    INSERT INTO test_db.test_tbl VALUES(1, 'adb', 10);
    """
    
    select_sql = """
    SELECT * FROM test_db.test_tbl;
    """
    
    spark.sql(db_sql).show()
    spark.sql(tb_sql).show()
    spark.sql(insert_sql).show()
    spark.sql(select_sql).show()