MaxComputeタスクでSparkを ローカルモードまたはクラスタモード で実行できます。また、DataWorksでオフラインのSpark on MaxComputeタスクをクラスタモードで実行して、タスクを他のタイプのノードと統合してスケジューリングすることもできます。このトピックでは、DataWorksでSpark on MaxComputeタスクを設定およびスケジューリングする方法について説明します。
前提条件
ODPS Sparkノードが作成されていること。詳細については、「ODPSノードの作成と管理」をご参照ください。
制限事項
Spark 3.Xバージョンを使用するODPS Sparkノードをコミットしたときにエラーが報告された場合は、サーバーレスリソースグループを購入してください。詳細については、「サーバーレスリソースグループの作成と使用」をご参照ください。
背景情報
Spark on MaxCompute は、MaxComputeによって提供される、オープンソースのSparkと互換性のあるコンピューティングサービスです。Spark on MaxComputeは、統合コンピューティングリソースとデータセット権限システムに基づくSparkコンピューティングフレームワークを提供します。Spark on MaxComputeを使用すると、好みの開発方法を使用してSparkタスクを送信および実行できます。Spark on MaxComputeは、多様なデータ処理と分析の要件を満たすことができます。DataWorksでは、ODPS Sparkノードを使用して、Spark on MaxComputeタスクをスケジューリングおよび実行し、Spark on MaxComputeタスクを他のタイプのタスクと統合できます。
Spark on MaxComputeでは、Java、Scala、またはPythonを使用してタスクを開発し、ローカルモードまたはクラスタモードでタスクを実行できます。また、Spark on MaxComputeでは、DataWorksでオフラインのSpark on MaxComputeタスクをクラスタモードで実行することもできます。Spark on MaxComputeタスクの実行モードの詳細については、「実行モード」をご参照ください。
準備
ODPS Sparkノードでは、Java、Scala、または Python を使用して、オフラインのSpark on MaxComputeタスクを開発および実行できます。オフラインのSpark on MaxComputeタスクの開発に必要な操作とパラメータは、使用するプログラミング言語によって異なります。ビジネス要件に基づいてプログラミング言語を選択できます。
Java/Scala
ODPS SparkノードでJavaまたはScalaコードを実行する前に、オンプレミスマシンでSpark on MaxComputeタスクのコードの開発を完了し、コードをMaxComputeリソースとしてDataWorksにアップロードする必要があります。次の手順を実行する必要があります。
開発環境を準備します。
使用するオペレーティングシステムに基づいて、Spark on MaxComputeタスクを実行する開発環境を準備する必要があります。詳細については、「Linux開発環境のセットアップ」または「Windows開発環境のセットアップ」をご参照ください。
JavaまたはScalaコードを開発します。
ODPS SparkノードでJavaまたはScalaコードを実行する前に、オンプレミスマシンまたは準備された開発環境でSpark on MaxComputeタスクのコードの開発を完了する必要があります。Spark on MaxComputeによって提供される サンプルプロジェクトテンプレート を使用することをお勧めします。
開発したコードをパッケージ化し、DataWorksにアップロードします。
コードが開発された後、コードをパッケージ化し、パッケージをMaxComputeリソースとしてDataWorksにアップロードする必要があります。詳細については、「MaxComputeリソースの作成と使用」をご参照ください。
プログラミング言語: Python(デフォルトのPython環境を使用)
DataWorksでは、DataWorksでオンラインでPythonリソースにコードを書き込み、ODPS Sparkノードを使用してコードをコミットおよび実行することにより、PySparkタスクを開発できます。DataWorksでPythonリソースを作成する方法、およびPySparkを使用してSpark on MaxComputeアプリケーションを開発する例については、「MaxComputeリソースの作成と使用」および「PySparkを使用したSpark on MaxComputeアプリケーションの開発」をご参照ください。
DataWorksが提供するデフォルトのPython環境を使用してコードを開発できます。デフォルトのPython環境でサポートされているサードパーティ製パッケージがPySparkタスクの要件を満たせない場合は、プログラミング言語: Python(カスタムPython環境を使用) を参照して、カスタムPython環境を準備できます。また、開発用にさらに多くのPythonリソースをサポートする PyODPS 2ノード または PyODPS 3ノード を使用することもできます。
プログラミング言語: Python(カスタムPython環境を使用)
デフォルトのPython環境がビジネス要件を満たせない場合は、次の手順を実行して、Spark on MaxComputeタスクを実行するためのカスタムPython環境を準備できます。
オンプレミスマシンでPython環境を準備します。
PySpark Pythonバージョンとサポートされている依存関係 を参照して、ビジネス要件に基づいてPython環境を設定できます。
Python環境のコードをパッケージ化し、パッケージをDataWorksにアップロードします。
Python環境のコードをZIP形式でパッケージ化し、パッケージをMaxComputeリソースとしてDataWorksにアップロードする必要があります。これにより、環境内でSpark on MaxComputeタスクを実行できます。詳細については、「MaxComputeリソースの作成と使用」をご参照ください。
パラメータの説明
DataWorksでオフラインのSpark on MaxComputeタスクをクラスタモードで実行できます。このモードでは、Main メソッドをカスタムアプリケーションのエントリポイントとして指定する必要があります。Main が Success または Fail 状態になると、Sparkタスクは終了します。spark-defaults.conf ファイルの設定項目をODPS Sparkノードの設定に追加する必要があります。たとえば、executors の数、メモリサイズ、spark.hadoop.odps.runtime.end.point などの設定項目を追加する必要があります。
spark-defaults.conf ファイルをアップロードする必要はありません。代わりに、spark-defaults.conf ファイルの設定項目をODPS Sparkノードの設定に1つずつ追加する必要があります。

パラメータ | 説明 | spark-submitコマンド |
Sparkバージョン | Sparkのバージョン。有効な値: Spark 1.x、Spark 2.x、Spark 3.x。 説明 Spark 3.Xバージョンを使用するODPS Sparkノードをコミットしたときにエラーが報告された場合は、サーバーレスリソースグループを購入してください。詳細については、「サーバーレスリソースグループの作成と使用」をご参照ください。 | なし |
言語 | プログラミング言語。有効な値: Java/Scala および Python。ビジネス要件に基づいてプログラミング言語を選択できます。 | なし |
メイン JAR リソース | メインのJARまたはPythonリソースファイル。 必要なリソースファイルをDataWorksにアップロードし、事前にコミットする必要があります。詳細については、「MaxComputeリソースの作成と使用」をご参照ください。 |
|
設定項目 | Spark on MaxComputeタスクを送信するために必要な設定項目。
|
|
メインクラス | メインクラスの名前。このパラメータは、言語 パラメータを |
|
パラメータ | ビジネス要件に基づいてパラメータを追加できます。複数のパラメータはスペースで区切ります。DataWorksでは、${変数名} 形式で スケジューリングパラメータ を追加できます。パラメータを追加した後、右側のナビゲーションペインの [プロパティ] タブをクリックし、[スケジューリングパラメータ] セクションで関連する変数に値を割り当てる必要があります。 説明 サポートされているスケジューリングパラメータの形式については、「サポートされているスケジューリングパラメータの形式」をご参照ください。 |
|
その他のリソース | 次のタイプのリソースもサポートされています。ビジネス要件に基づいて、次のタイプのリソースを選択できます。
必要なリソースファイルをDataWorksにアップロードし、事前にコミットする必要があります。詳細については、「MaxComputeリソースの作成と使用」をご参照ください。 | さまざまなタイプのリソースのコマンド:
|
簡単なコード編集例
このセクションでは、ODPS Sparkノードを使用してSpark on MaxComputeタスクを開発する方法の簡単な例を示します。この例では、文字列を数字に変換できるかどうかを判断するSpark on MaxComputeタスクが開発されています。
リソースを作成します。
DataWorksコンソールの [datastudio] ページで、spark_is_number.pyという名前のPythonリソースを作成します。詳細については、「MaxComputeリソースの作成と使用」をご参照ください。サンプルコード:
# -*- coding: utf-8 -*- import sys from pyspark.sql import SparkSession try: # python 2 の場合 reload(sys) sys.setdefaultencoding('utf8') except: # python 3 では不要 pass if __name__ == '__main__': spark = SparkSession.builder\ .appName("spark sql")\ .config("spark.sql.broadcastTimeout", 20 * 60)\ .config("spark.sql.crossJoin.enabled", True)\ .config("odps.exec.dynamic.partition.mode", "nonstrict")\ .config("spark.sql.catalogImplementation", "odps")\ .getOrCreate() def is_number(s): try: float(s) return True except ValueError: pass try: import unicodedata unicodedata.numeric(s) return True except (TypeError, ValueError): pass return False print(is_number('foo')) print(is_number('1')) print(is_number('1.3')) print(is_number('-1.37')) print(is_number('1e3'))リソースを保存してコミットします。
作成したODPS Sparkノードで、このトピックの パラメータの説明 セクションを参照して、MaxCompute Sparkタスクのパラメータとスケジューリングプロパティを設定し、ノードを保存してコミットします。
パラメータ
説明
Sparkバージョン
Spark 2.xを選択します。
言語
Pythonを選択します。
メイン Python リソース
作成したPythonリソース spark_is_number.py。
開発環境のオペレーションセンターに移動し、ODPS Sparkノードのデータをバックフィルします。詳細については、「データのバックフィルとデータバックフィルインスタンスの表示(新バージョン)」をご参照ください。
説明DataWorksは、DataStudioでODPS Sparkノードを実行するためのエントリポイントを提供していません。開発環境のオペレーションセンターでODPS Sparkノードを実行する必要があります。
結果を表示します。
データバックフィルインスタンスが正常に実行された後、生成された実行ログの [トラッキング URL] をクリックして結果を表示します。次の情報が返されます。
False True True True True
高度なコード編集例
他のシナリオでのSpark on MaxComputeタスクの開発の詳細については、次のトピックを参照してください。
次のステップ
Spark on MaxComputeタスクの開発が完了したら、次の操作を実行できます。
スケジューリングプロパティの設定: ノードの定期的なスケジューリングのプロパティを設定できます。システムにタスクを定期的にスケジューリングおよび実行させる場合は、再実行設定やスケジューリングの依存関係など、ノードの項目を設定する必要があります。詳細については、「スケジュール」をご参照ください。
ノードのデバッグ: ノードのコードをデバッグおよびテストして、コードロジックが期待どおりに動作するかどうかを確認できます。詳細については、「デバッグ手順」をご参照ください。
ノードのデプロイ: すべての開発操作が完了したら、ノードをデプロイできます。ノードがデプロイされると、システムはノードのスケジューリングプロパティに基づいてノードを定期的にスケジューリングします。詳細については、「ノードのデプロイ」をご参照ください。
Sparkタスクを診断するためのシステムの有効化: MaxComputeは、LogviewツールとSpark Web UIを提供します。Sparkタスクのログを表示して、タスクが期待どおりに送信および実行されているかどうかを確認できます。