DataWorks で ODPS Spark ノードを使用して、Spark on MaxCompute タスクのスケジューリングと実行を行います。Spark on MaxCompute タスクは、ローカルモードまたはクラスターモードで実行できます。このトピックでは、DataWorks でコードを準備し、ノードのパラメーターを設定し、クラスターモードでタスクを実行する方法について説明します。
前提条件
開始する前に、以下が完了していることを確認してください:
-
DataWorks で ODPS Spark ノードが作成されていること。詳細については、「ODPS ノードの作成と管理」をご参照ください。
制限事項
Spark 3.x を使用する ODPS Spark ノードをコミットした際にエラーが報告された場合は、チケットを送信してテクニカルサポートにご連絡ください。サポートチームが、ノードの実行に使用される専用スケジューリングリソースグループのバージョンを更新します。
コードの準備
Spark on MaxCompute タスクで使用する言語を選択します。準備の手順は、Java/Scala と Python のどちらを使用するかによって異なります。
Java または Scala
ノードを設定する前に、ローカルマシンで以下の手順を完了してください:
-
開発環境のセットアップ。 ご利用のオペレーティングシステムに基づいて開発環境を準備します:
-
Linux: Linux 開発環境のセットアップ
-
Windows: Windows 開発環境のセットアップ
-
-
コードの開発。 Spark on MaxCompute アプリケーションのコードを記述します。サンプルプロジェクトテンプレートから始めると、正しい Spark と MaxCompute の依存関係が設定済みのプロジェクト構造を取得できます。
-
JAR のパッケージ化とアップロード。 コードを JAR としてパッケージ化し、MaxCompute リソースとして DataWorks にアップロードします。詳細については、「MaxCompute リソースの作成と使用」をご参照ください。
Python (デフォルト環境)
PySpark コードを Python リソースとして DataWorks に直接記述し、コミットします。ローカルでのセットアップは不要です。例と手順については、「MaxCompute リソースの作成と使用」および「PySpark を使用した Spark on MaxCompute アプリケーションの開発」をご参照ください。
デフォルトの Python 環境にタスクで必要なサードパーティパッケージが含まれていない場合は、カスタム Python 環境を準備する (下記参照) か、より広範な Python ライブラリをサポートする PyODPS 2 ノードまたは PyODPS 3 ノードを使用してください。
Python (カスタム環境)
デフォルトの Python 環境が要件を満たさない場合:
-
ローカルマシンでカスタム Python 環境を準備します。 依存関係の要件に基づいて環境を設定するには、「PySpark の Python バージョンとサポートされる依存関係」をご参照ください。
-
環境のパッケージ化とアップロード。 Python 環境を ZIP ファイルとしてパッケージ化し、MaxCompute リソースとして DataWorks にアップロードします。詳細については、「MaxCompute リソースの作成と使用」をご参照ください。
ノードの設定
クラスターモードでは、ノードはエントリーポイントとして Main メソッドを呼び出してアプリケーションを実行します。タスクは、Main が Success または Fail の状態に達した時点で完了と見なされます。
spark-defaults.confファイルはアップロードしないでください。代わりに、spark-defaults.confの各設定項目を、ノードの [設定項目] フィールドに個別に追加してください。
次の表に各パラメーターを示します。自動設定とマークされたパラメーターは、ご利用の MaxCompute プロジェクトの設定から事前に入力されます。タスクで異なる値が必要な場合にのみ、[設定項目] でオーバーライドしてください。
| パラメーター | 必須 | 説明 | 同等の spark-submit オプション |
|---|---|---|---|
| Spark バージョン | はい | 使用する Spark のバージョン。オプション: [Spark1.x]、[Spark2.x]、[Spark3.x]。 | — |
| 言語 | はい | プログラミング言語。オプション: [Java/Scala]、[Python]。 | — |
| メイン JAR リソース | はい | MaxCompute リソースとしてアップロードされたメイン JAR ファイル (Java/Scala) または Python スクリプト。このフィールドを設定する前に、リソースをアップロードしてコミットしてください。「MaxCompute リソースの作成と使用」をご参照ください。 | app jar または Python ファイル |
| 設定項目 | 条件付き | spark-submit の --conf と同等の Spark 設定プロパティを 1 行に 1 つずつ追加します。エグゼキュータの数、メモリサイズ、spark.hadoop.odps.runtime.end.point などの項目を必要に応じて追加します。 |
--conf PROP=VALUE |
| メインクラス | Java/Scala のみ | メインクラスの完全修飾名。Python タスクでは不要です。 | --class CLASS_NAME |
| パラメーター | いいえ | アプリケーションに渡される引数をスペースで区切って指定します。スケジューリングパラメーターには ${変数名} のフォーマットを使用し、[プロパティ] タブの [スケジューリングパラメーター] セクションで値を割り当てます。サポートされているフォーマットについては、「スケジューリングパラメーターでサポートされているフォーマット」をご参照ください。 |
[app arguments] |
| その他のリソース | いいえ | タスクで必要な追加のリソースファイル。サポートされているタイプと適用可能な言語: Jar リソース (Java/Scala のみ)、Python リソース (Python のみ)、ファイルリソース (すべて)、アーカイブリソース (すべて、圧縮ファイルのみ)。最初にリソースをアップロードしてコミットしてください。 | --jars、--py-files、--files、--archives |
自動設定項目 — 次の設定プロパティは、ご利用の MaxCompute プロジェクトの値に合わせて自動的に設定されます。タスクで異なる値が必要な場合にのみ、[設定項目] でオーバーライドしてください:
-
spark.hadoop.odps.access.id -
spark.hadoop.odps.access.key -
spark.hadoop.odps.end.point
例:文字列から数値への変換チェックの実行
この例では、文字列が数値に変換できるかどうかをチェックする PySpark タスクを作成します。
ステップ 1:Python リソースの作成とコミット
-
DataWorks コンソールで DataStudio に移動し、spark_is_number.py という名前の Python リソースを作成します。リソースの作成に関する詳細については、「MaxCompute リソースの作成と使用」をご参照ください。次のコードをリソースに貼り付けます:
# -*- coding: utf-8 -*- import sys from pyspark.sql import SparkSession try: # Python 2 のみ reload(sys) sys.setdefaultencoding('utf8') except: # Python 3 では不要 pass if __name__ == '__main__': spark = SparkSession.builder\ .appName("spark sql")\ .config("spark.sql.broadcastTimeout", 20 * 60)\ .config("spark.sql.crossJoin.enabled", True)\ .config("odps.exec.dynamic.partition.mode", "nonstrict")\ .config("spark.sql.catalogImplementation", "odps")\ .getOrCreate() def is_number(s): try: float(s) return True except ValueError: pass try: import unicodedata unicodedata.numeric(s) return True except (TypeError, ValueError): pass return False print(is_number('foo')) print(is_number('1')) print(is_number('1.3')) print(is_number('-1.37')) print(is_number('1e3')) -
リソースを保存してコミットします。
ステップ 2:ODPS Spark ノードの設定
ODPS Spark ノードで、次のパラメーターを設定します:
| パラメーター | 値 |
|---|---|
| Spark バージョン | Spark2.x |
| 言語 | Python |
| メイン Python リソース | spark_is_number.py (作成したリソース) |
ノードを保存してコミットします。
ステップ 3:オペレーションセンターでのノードの実行
ODPS Spark ノードは DataStudio から実行できません。ノードを実行するには、開発環境のオペレーションセンターに移動します。
オペレーションセンターで、ODPS Spark ノードのバックフィルをトリガーします。詳細については、「データバックフィルとデータバックフィルインスタンスの表示 (新バージョン)」をご参照ください。
ステップ 4:結果の表示
バックフィルインスタンスが正常に完了したら、実行ログの [tracking URL] をクリックして出力を表示します:
False
True
True
True
True
その他の例
その他の Spark on MaxCompute 開発シナリオ:
次のステップ
Spark on MaxCompute タスクを開発して実行した後、次の操作を実行できます:
-
スケジューリングプロパティの設定:ノードの定期的なスケジューリング (再実行設定やスケジューリングの依存関係を含む) を設定し、システムがタスクを自動的に実行するようにします。詳細については、「概要」をご参照ください。
-
ノードのデバッグ:本番環境に移行する前に、ノードのコードをテストしてロジックが期待どおりに機能することを確認します。詳細については、「デバッグ手順」をご参照ください。
-
ノードのデプロイ:ノードをデプロイしてスケジューリングを有効にします。デプロイ後、システムは設定したスケジューリングプロパティに基づいてノードを自動的にスケジュールし、実行します。詳細については、「ノードのデプロイ」をご参照ください。
-
タスクの問題の診断:Logview ツールと Spark Web UI を使用してログを検査し、タスクが期待どおりに送信され、実行されていることを確認します。詳細については、「Spark タスクを診断するシステムの有効化」をご参照ください。