すべてのプロダクト
Search
ドキュメントセンター

DataWorks:MaxCompute Spark ノード

最終更新日:Feb 28, 2025

ローカルモードまたはクラスタモードで MaxCompute タスク上で Spark を実行できます。また、DataWorks でクラスタモードでオフライン Spark on MaxCompute タスクを実行して、タスクを他のタイプのノードと統合してスケジューリングすることもできます。このトピックでは、DataWorks で Spark on MaxCompute タスクを構成およびスケジューリングする方法について説明します。

前提条件

  • (RAM ユーザーを使用してタスクを開発する場合に必要) 目的の RAM ユーザーがメンバーとして DataWorks ワークスペースに追加され、[開発] ロールまたは [ワークスペース管理者] ロールが割り当てられています。ワークスペース管理者ロールには広範な権限があります。必要な場合にのみ、ワークスペース管理者ロールをユーザーに割り当てることをお勧めします。メンバーを追加する方法の詳細については、「ワークスペースメンバーを追加し、ロールを割り当てる」をご参照ください。

    説明

    Alibaba Cloud アカウントを使用する場合は、この前提条件は無視してください。

  • MaxCompute Spark ノードが作成されます。詳細については、「ノードを作成する」をご参照ください。

制限事項

Spark 3.X バージョンを使用する MaxCompute Spark ノードをコミットしたときにエラーが報告された場合は、サーバーレス リソースグループを購入してください。詳細については、「サーバーレス リソースグループの作成と使用」をご参照ください。

背景情報

Spark on MaxCompute は、MaxCompute によって提供されるコンピューティング サービスであり、オープンソースの Spark と互換性があります。Spark on MaxCompute は、統合コンピューティング リソースとデータセット権限システムに基づく Spark コンピューティング フレームワークを提供します。Spark on MaxCompute を使用すると、好みの開発方法を使用して Spark タスクを送信および実行できます。Spark on MaxCompute は、多様なデータ処理と分析の要件を満たすことができます。DataWorks では、MaxCompute Spark ノードを使用して、Spark on MaxCompute タスクをスケジューリングおよび実行し、Spark on MaxCompute タスクを他のタイプのタスクと統合できます。

Spark on MaxCompute では、Java、Scala、または Python を使用してタスクを開発し、ローカルモードまたはクラスタモードでタスクを実行できます。また、Spark on MaxCompute では、DataWorks でクラスタモードでオフライン Spark on MaxCompute タスクを実行することもできます。Spark on MaxCompute タスクの実行モードの詳細については、「実行モード」をご参照ください。

準備

MaxCompute Spark ノードでは、Java、Scala、または Python を使用してオフライン Spark on MaxCompute タスクを開発および実行できます。オフライン Spark on MaxCompute タスクの開発に必要な操作とパラメーターは、使用するプログラミング言語によって異なります。ビジネス要件に基づいてプログラミング言語を選択できます。

Java/Scala

MaxCompute Spark ノードで Java コードまたは Scala コードを実行する前に、オンプレミス マシンで Spark on MaxCompute タスクのコードの開発を完了し、コードを MaxCompute リソースとして DataWorks にアップロードする必要があります。次の手順を実行する必要があります。

  1. 開発環境を準備します。

    使用するオペレーティング システムに基づいて、Spark on MaxCompute タスクを実行する開発環境を準備する必要があります。詳細については、「Linux 開発環境のセットアップ」または「Windows 開発環境のセットアップ」をご参照ください。

  2. Java コードまたは Scala コードを開発します。

    MaxCompute Spark ノードで Java コードまたは Scala コードを実行する前に、オンプレミス マシンまたは準備された開発環境で Spark on MaxCompute タスクのコードの開発を完了する必要があります。Spark on MaxCompute によって提供される サンプル プロジェクト テンプレート を使用することをお勧めします。

  3. 開発したコードをパッケージ化し、DataWorks にアップロードします。

    コードが開発されたら、コードをパッケージ化し、パッケージを MaxCompute リソースとして DataWorks にアップロードする必要があります。

プログラミング言語: Python (デフォルトの Python 環境を使用)

DataWorks では、DataWorks でオンラインで Python リソースにコードを書き込み、MaxCompute Spark ノードを使用してコードをコミットおよび実行することで、PySpark タスクを開発できます。DataWorks で Python リソースを作成する必要があります。PySpark を使用して Spark on MaxCompute アプリケーションを開発する例を表示する方法については、「PySpark を使用して Spark on MaxCompute アプリケーションを開発する」をご参照ください。

説明

DataWorks によって提供されるデフォルトの Python 環境を使用してコードを開発できます。デフォルトの Python 環境でサポートされているサードパーティ製パッケージが PySpark タスクの要件を満たせない場合は、プログラミング言語: Python (カスタム Python 環境を使用) を参照してカスタム Python 環境を準備できます。また、開発用にさらに多くの Python リソースをサポートする PyODPS 2 ノードまたは PyODPS 3 ノードを使用することもできます。

プログラミング言語: Python (カスタム Python 環境を使用)

デフォルトの Python 環境がビジネス要件を満たせない場合は、次の手順を実行して、Spark on MaxCompute タスクを実行するためのカスタム Python 環境を準備できます。

  1. オンプレミス マシンで Python 環境を準備します。

    PySpark の Python バージョンとサポートされている依存関係 を参照して、ビジネス要件に基づいて Python 環境を構成できます。

  2. Python 環境のコードをパッケージ化し、パッケージを DataWorks にアップロードします。

    Python 環境のコードを ZIP 形式でパッケージ化し、パッケージを MaxCompute リソースとして DataWorks にアップロードする必要があります。これにより、環境内で Spark on MaxCompute タスクを実行できます。

パラメーターの説明

DataWorks でクラスタモードでオフライン Spark on MaxCompute タスクを実行できます。このモードでは、Main メソッドをカスタム アプリケーションのエントリ ポイントとして指定する必要があります。Main成功 または 失敗 すると、Spark ジョブは終了します。spark-defaults.conf ファイルの設定項目を MaxCompute Spark ノードの構成に追加する必要があります。たとえば、executor の数、メモリ サイズ、spark.hadoop.odps.runtime.end.point などの設定項目を追加する必要があります。

説明

spark-defaults.conf ファイルをアップロードする必要はありません。代わりに、spark-defaults.conf ファイルの設定項目を MaxCompute Spark ノードの構成に 1 つずつ追加する必要があります。

Java/Scala のパラメーター

image

パラメーター

説明

spark-submit コマンド

[spark バージョン]

Spark のバージョン。有効な値: [spark1.x][spark2.x][spark3.x]

説明

Spark 3.X バージョンを使用する MaxCompute Spark ノードをコミットしたときにエラーが報告された場合は、サーバーレス リソースグループを購入してください。詳細については、「サーバーレス リソースグループの作成と使用」をご参照ください。

なし

[言語]

プログラミング言語。有効な値: [java/scala] および [python]。ビジネス要件に基づいてプログラミング言語を選択できます。

なし

[メイン JAR リソース]

メインの JAR ファイルまたは Python リソースファイル。

必要なリソースファイルを DataWorks にアップロードし、事前にコミットする必要があります。

app jar または Python ファイル

[設定項目]

Spark on MaxCompute タスクを送信するために必要な設定項目。

  • spark.hadoop.odps.access.idspark.hadoop.odps.access.keyspark.hadoop.odps.end.point を構成する必要はありません。デフォルトでは、これらの設定項目の値は MaxCompute プロジェクトの値と同じです。必要に応じて、これらの項目を明示的に構成してデフォルト値を上書きすることもできます。

  • spark-defaults.conf ファイルをアップロードする必要はありません。代わりに、spark-defaults.conf ファイルの設定項目を MaxCompute Spark ノードの構成に 1 つずつ追加する必要があります。たとえば、executor の数、メモリ サイズ、spark.hadoop.odps.runtime.end.point などの設定項目を追加する必要があります。

--conf PROP=VALUE

[メイン クラス]

メイン クラスの名前。言語 パラメーターを Java/Scala に設定した場合にのみ、このパラメーターは必須です。

--class CLASS_NAME

[パラメーター]

ビジネス要件に基づいてパラメーターを追加できます。複数のパラメーターはスペースで区切ります。DataWorks では、${変数名} 形式で [スケジューリング パラメーター] を追加できます。パラメーターを追加したら、右側のナビゲーション ウィンドウの [プロパティ] タブをクリックし、[スケジューリング パラメーター] セクションで関連する変数に値を割り当てる必要があります。

[app 引数]

[JAR リソース]

言語 パラメーターを Java/Scala に設定した場合にのみ、このタイプのリソースを選択できます。

必要なリソースファイルを DataWorks にアップロードし、事前にコミットする必要があります。

リソース コマンド:

--jars JARS

[ファイル リソース]

ファイル リソース。

--files FILES

[アーカイブ リソース]

圧縮リソースのみが表示されます。

--archives ARCHIVES

Python のパラメーター

image

パラメーター

説明

spark-submit コマンド

[spark バージョン]

Spark のバージョン。有効な値: [spark1.x][spark2.x][spark3.x]

説明

Spark 3.X バージョンを使用する MaxCompute Spark ノードをコミットしたときにエラーが報告された場合は、サーバーレス リソースグループを購入してください。詳細については、「サーバーレス リソースグループの作成と使用」をご参照ください。

なし

[言語]

ノードで使用されるプログラミング言語。このパラメーターを [python] に設定します。ビジネス要件に基づいてプログラミング言語を選択できます。

なし

[メイン Python リソース]

メインの JAR ファイルまたは Python リソースファイル。

必要なリソースファイルを DataWorks にアップロードし、事前にコミットする必要があります。

app jar または Python ファイル

[設定項目]

Spark on MaxCompute タスクを送信するために必要な設定項目。

  • spark.hadoop.odps.access.idspark.hadoop.odps.access.keyspark.hadoop.odps.end.point を構成する必要はありません。デフォルトでは、これらの設定項目の値は MaxCompute プロジェクトの値と同じです。必要に応じて、これらの項目を明示的に構成してデフォルト値を上書きすることもできます。

  • spark-defaults.conf ファイルをアップロードする必要はありません。代わりに、spark-defaults.conf ファイルの設定項目を MaxCompute Spark ノードの構成に 1 つずつ追加する必要があります。たとえば、executor の数、メモリ サイズ、spark.hadoop.odps.runtime.end.point などの設定項目を追加する必要があります。

--conf PROP=VALUE

[パラメーター]

ビジネス要件に基づいてパラメーターを追加できます。複数のパラメーターはスペースで区切ります。DataWorks では、${変数名} 形式で [スケジューリング パラメーター] を追加できます。パラメーターを追加したら、右側のナビゲーション ウィンドウの [プロパティ] タブをクリックし、[スケジューリング パラメーター] セクションで関連する変数に値を割り当てる必要があります。

[app 引数]

[python リソース]

言語 パラメーターを Python に設定した場合にのみ、このタイプのリソースを選択できます。

必要なリソースファイルを DataWorks にアップロードし、事前にコミットする必要があります。

--py-files PY_FILES

[ファイル リソース]

ファイル リソース。

--files FILES

[アーカイブ リソース]

圧縮リソースのみが表示されます。

--archives ARCHIVES

手順

  1. リソースを作成します。

    1. Data Studio ページの左側のナビゲーションウィンドウで、[リソース管理] をクリックします。[RESOURCE MANAGEMENT: ALL] ペインで、プラスアイコンをクリックし、[リソースの作成] > [MaxCompute Python] を選択します。表示されるポップオーバーで、Python リソースの名前として spark_is_number.py と入力します。サンプルコード:

      # -*- coding: utf-8 -*-
      import sys
      from pyspark.sql import SparkSession
      
      try:
          # for python 2
          reload(sys)
          sys.setdefaultencoding('utf8')
      except:
          # python 3 not needed
          pass
      
      if __name__ == '__main__':
          spark = SparkSession.builder\
              .appName("spark sql")\
              .config("spark.sql.broadcastTimeout", 20 * 60)\
              .config("spark.sql.crossJoin.enabled", True)\
              .config("odps.exec.dynamic.partition.mode", "nonstrict")\
              .config("spark.sql.catalogImplementation", "odps")\
              .getOrCreate()
      
      def is_number(s):
          try:
              float(s)
              return True
          except ValueError:
              pass
      
          try:
              import unicodedata
              unicodedata.numeric(s)
              return True
          except (TypeError, ValueError):
              pass
      
          return False
      
      print(is_number('foo'))
      print(is_number('1'))
      print(is_number('1.3'))
      print(is_number('-1.37'))
      print(is_number('1e3'))
    2. リソースを保存します。

  2. 作成した MaxCompute Spark ノードのパラメーターとスケジューリング プロパティを構成します。詳細については、このトピックの パラメーターの説明 セクションをご参照ください。

  3. ノードを定期的に実行する場合は、ビジネス要件に基づいてスケジューリング情報を構成します。

  4. ノードを構成したら、ノードをデプロイします。詳細については、「ノードのデプロイ」をご参照ください。

  5. ノードをデプロイしたら、オペレーションセンターでノードの実行ステータスを表示します。詳細については、「オペレーションセンターの概要」をご参照ください。

    説明
    • DataWorks は、データ スタジオで MaxCompute Spark ノードを実行するためのエントリ ポイントを提供していません。開発環境のオペレーションセンターで MaxCompute Spark ノードを実行する必要があります。

    • MaxCompute Spark ノードのデータ バックフィル インスタンスが正常に実行されたら、生成された実行ログのトラッキング URL をクリックして結果を表示します。

参考資料