すべてのプロダクト
Search
ドキュメントセンター

DataWorks:MaxCompute Spark タスクの開発

最終更新日:Jun 21, 2026

Spark on MaxCompute タスクは ローカルモードまたはクラスターモード で実行できます。また、DataWorks でオフラインの Spark on MaxCompute タスクをクラスターモードで実行し、他のタイプのノードと統合してスケジューリングすることもできます。このトピックでは、DataWorks で Spark on MaxCompute タスクを設定およびスケジューリングする方法について説明します。

概要

Spark on MaxCompute は、オープンソースの Spark と互換性のある MaxCompute のコンピューティングサービスです。統一されたコンピューティングリソースとデータセット権限システム上に Spark コンピューティングフレームワークを提供します。これにより、使い慣れた開発手法を用いて Spark タスクをサブミットして実行し、より広範なデータ処理および分析ニーズに対応できます。DataWorks では、MaxCompute Spark ノードを使用して Spark on MaxCompute タスクをスケジューリングおよび実行し、他のタスクと統合できます。

Spark on MaxCompute は、Java、Scala、Python での開発をサポートし、タスクをローカルモードまたはクラスターモードで実行します。DataWorks では、Spark on MaxCompute のオフラインタスクはクラスターモードで実行されます。実行モードの詳細については、「実行モード」をご参照ください。

制限

Spark 3.X バージョンを使用する ODPS Spark ノードをコミットする際にエラーが発生した場合は、サーバーレスリソースグループを購入してください。詳細については、「サーバーレスリソースグループの作成と使用」をご参照ください。

事前準備

MaxCompute Spark ノードを使用して、Java/Scala または Python で Spark on MaxCompute のオフラインタスクを実行できます。開発手順と設定プロセスは言語ごとに異なります。ビジネス要件に基づいて言語を選択してください。

Java/Scala

ODPS Spark ノードで Java または Scala コードを実行する前に、オンプレミスマシンで Spark on MaxCompute タスクのコード開発を完了し、そのコードを MaxCompute リソースとして DataWorks にアップロードする必要があります。次の手順を実行します。

  1. 開発環境の準備

    使用するオペレーティングシステムに基づいて、Spark on MaxCompute タスクを実行する開発環境を準備する必要があります。詳細については、「Linux 開発環境のセットアップ」または「Windows 開発環境のセットアップ」をご参照ください。

  2. Java または Scala コードの開発

    ODPS Spark ノードで Java または Scala コードを実行する前に、オンプレミスマシンまたは準備した開発環境で Spark on MaxCompute タスクのコード開発を完了する必要があります。Spark on MaxCompute が提供するサンプルプロジェクトテンプレートの使用を推奨します。

  3. 開発したコードのパッケージ化と DataWorks へのアップロード

    コードの開発後、コードをパッケージ化し、そのパッケージを MaxCompute リソースとして DataWorks にアップロードする必要があります。詳細については、「MaxCompute リソースの作成と使用」をご参照ください。

プログラミング言語: Python (デフォルトの Python 環境を使用)

DataWorks では、Python リソースにオンラインでコードを記述し、ODPS Spark ノードを使用してコードをコミットして実行することで、PySpark タスクを開発できます。DataWorks で Python リソースを作成する方法、および PySpark を使用して Spark on MaxCompute アプリケーションを開発する例については、「MaxCompute リソースの作成と使用」および「PySpark を使用した Spark on MaxCompute アプリケーションの開発」をご参照ください。

説明

DataWorks が提供するデフォルトの Python 環境を使用してコードを開発できます。デフォルトの Python 環境でサポートされているサードパーティパッケージが PySpark タスクの要件を満たせない場合は、「プログラミング言語: Python (カスタム Python 環境を使用)」を参照して、カスタム Python 環境を準備できます。また、開発用により多くの Python リソースをサポートする PyODPS 2 ノードまたは PyODPS 3 ノードを使用することもできます。

プログラミング言語: Python (カスタム Python 環境を使用)

デフォルトの Python 環境がビジネス要件を満たせない場合は、次の手順を実行して、Spark on MaxCompute タスクを実行するためのカスタム Python 環境を準備できます。

  1. オンプレミスマシンでの Python 環境の準備

    PySpark の Python バージョンとサポートされる依存関係」を参照して、ビジネス要件に基づいて Python 環境を設定できます。

  2. Python 環境のパッケージ化と DataWorks へのアップロード

    Python 環境を ZIP 形式でパッケージ化し、そのパッケージを MaxCompute リソースとして DataWorks にアップロードする必要があります。これにより、その環境で Spark on MaxCompute タスクを実行できます。詳細については、「MaxCompute リソースの作成と使用」をご参照ください。

パラメーターの説明

DataWorks は、Spark on MaxCompute のオフラインタスクをクラスターモードで実行します。このモードでは、カスタムアプリケーションのエントリポイントを main メソッドで指定する必要があります。Spark タスクは main メソッドの実行が完了すると、Success または Fail のステータスで終了します。さらに、spark-defaults.conf ファイルの設定を、MaxCompute Spark ノードの設定項目に個別に追加する必要があります。例としては、エグゼキューターインスタンス数、メモリサイズ、spark.hadoop.odps.runtime.end.point 設定などがあります。

説明

spark-defaults.conf ファイルをアップロードする必要はありません。代わりに、spark-defaults.conf ファイルの各設定を MaxCompute Spark ノードの設定項目として個別に追加します。

パラメーター

説明

spark-submit コマンド

[Spark Version]

Spark のバージョン。有効な値: [Spark1.x]、[Spark2.x]、[Spark3.x]。

説明

Spark 3.X バージョンを使用する ODPS Spark ノードをコミットする際にエラーが発生した場合は、サーバーレスリソースグループを購入してください。詳細については、「サーバーレスリソースグループの作成と使用」をご参照ください。

なし

[Language]

Spark on MaxCompute タスクの開発言語に基づいて [Java/Scala] または [Python] を選択します。

なし

メイン JAR リソース / メイン Python リソース

メインのリソースファイル。言語として Java/Scala を選択した場合は JAR リソース、Python を選択した場合は Python リソースを指定します。

必要なリソースファイルを DataWorks にアップロードし、あらかじめリソースファイルをコミットする必要があります。詳細については、「MaxCompute リソースの作成と使用」をご参照ください。

app jar or Python file

[Configuration Item]

Spark on MaxCompute タスクをサブミットするために必要な設定項目。

  • spark.hadoop.odps.access.id (AccessKey ID)、spark.hadoop.odps.access.key (AccessKey Secret)、または spark.hadoop.odps.end.point (エンドポイント) を設定する必要はありません。プラットフォームは、デフォルトで現在の MaxCompute プロジェクトから値が自動的に設定されます。明示的に設定してデフォルト値を上書きすることもできます。

  • spark-defaults.conf ファイルをアップロードする必要はありません。代わりに、spark-defaults.conf ファイルの各設定を MaxCompute Spark ノードの設定項目として個別に追加します (エグゼキューター数、メモリサイズ、spark.hadoop.odps.runtime.end.point 設定など)。

--conf PROP=VALUE

[Main Class]

メインクラスの名前。このパラメーターは、[言語]Java/Scala に設定されている場合に必要です。

--class CLASS_NAME

[Parameter]

必要に応じて、アプリケーションの引数をスペースで区切って追加します。DataWorks は、${variable_name} 形式のスケジューリングパラメーターをサポートしています。Parameter フィールドで変数を設定した後、右側のナビゲーションペインの Scheduling Settings > Parameter で値を割り当てる必要があります。

説明

サポートされているスケジューリングパラメーターの形式については、「サポートされているスケジューリングパラメーターの形式」をご参照ください。

[app arguments]

その他のリソース

次のタイプのリソースもサポートされています。ビジネス要件に基づいて、次のタイプのリソースを選択できます。

  • JAR リソース: [言語]Java/Scala の場合にのみ使用できます。

  • Python リソース: [言語]Python の場合にのみ使用できます。

  • ファイルリソース

  • アーカイブ リソース: ZIP 形式の圧縮リソースのみ表示されます。

必要なリソースファイルを DataWorks にアップロードし、あらかじめリソースファイルをコミットする必要があります。詳細については、「MaxCompute リソースの作成と使用」をご参照ください。

リソースタイプごとのコマンド:

  • --jars JARS

  • --py-files PY_FILES

  • --files FILES

  • --archives ARCHIVES

簡単なコード編集の例

このセクションでは、ODPS Spark ノードを使用して Spark on MaxCompute タスクを開発する方法を簡単な例で説明します。この例では、文字列が数字に変換できるかどうかを判断する Spark on MaxCompute タスクを開発します。

  1. リソースの作成

    1. [DataStudio] ページで、新しい Python リソースを作成し、spark_is_number.py という名前を付けます。詳細については、「MaxCompute リソースの作成と使用」をご参照ください。次のコードを使用します:

      # -*- coding: utf-8 -*-
      import sys
      from pyspark.sql import SparkSession
      try:
          # Python 2の場合
          reload(sys)
          sys.setdefaultencoding('utf8')
      except:
          # Python 3では不要
          pass
      if __name__ == '__main__':
          spark = SparkSession.builder\
              .appName("spark sql")\
              .config("spark.sql.broadcastTimeout", 20 * 60)\
              .config("spark.sql.crossJoin.enabled", True)\
              .config("odps.exec.dynamic.partition.mode", "nonstrict")\
              .config("spark.sql.catalogImplementation", "odps")\
              .getOrCreate()
      def is_number(s):
          try:
              float(s)
              return True
          except ValueError:
              pass
          try:
              import unicodedata
              unicodedata.numeric(s)
              return True
          except (TypeError, ValueError):
              pass
          return False
      print(is_number('foo'))
      print(is_number('1'))
      print(is_number('1.3'))
      print(is_number('-1.37'))
      print(is_number('1e3'))
    2. リソースを保存してコミットします。

  2. 作成した ODPS Spark ノードで、このトピックの「パラメーターの説明」セクションを参照して、MaxCompute Spark タスクのパラメーターとスケジューリングプロパティを設定し、ノードを保存してコミットします。

    パラメーター

    説明

    Spark バージョン

    Spark2.x を選択します。

    言語

    Python を選択します。

    メイン Python リソース

    作成した Python リソース spark_is_number.py

  3. 開発環境のオペレーションセンターに移動して、ODPS Spark ノードのデータをバックフィルします。詳細については、「データバックフィルとデータバックフィルインスタンスの表示 (新バージョン)」をご参照ください。

    説明

    DataWorks は、DataStudio で ODPS Spark ノードを実行するためのエントリポイントを提供していません。ODPS Spark ノードは、開発環境のオペレーションセンターで実行する必要があります。

  4. 結果の表示

    データバックフィルインスタンスが正常に実行された後、生成された実行ログの [tracking URL] をクリックして結果を表示します。次の情報が返されます:

    False
    True
    True
    True
    True

高度なコード編集の例

他のシナリオでの Spark on MaxCompute タスクの開発に関する詳細については、次のトピックをご参照ください:

次のステップ

Spark on MaxCompute タスクの開発が完了したら、次の操作を実行できます:

  • スケジューリング設定:定期的に実行されるタスクに対して、再実行設定や依存関係などのスケジューリングプロパティを設定します。 詳細については、「タスクのスケジューリング設定の概要」をご参照ください。

  • タスクデバッグ:ノードコードをテストおよび実行して、そのロジックを検証します。 詳細については、「タスクデバッグプロセス」をご参照ください。

  • タスクデプロイメント:ノードをデプロイして、スケジューリング設定に基づいて定期的に実行します。 詳細については、「タスクのデプロイ」をご参照ください。