Spark は、その高いパフォーマンス、使いやすさ、幅広い適用性で知られる汎用的なビッグデータ分析エンジンです。複雑なインメモリコンピューティングをサポートしており、大規模で低レイテンシーのデータ分析アプリケーションの構築に最適です。DataWorks は、Spark ジョブを開発およびスケジュールできる EMR Spark ノードを提供します。このトピックでは、EMR Spark ノードの設定方法と使用方法を説明し、その機能の例を示します。
前提条件
ノードのコンポーネント環境をカスタマイズするには、公式の
dataworks_emr_base_task_podイメージに基づいてカスタムイメージを作成します。詳細については、「カスタムイメージ」および「データ開発でイメージを使用する」をご参照ください。たとえば、カスタムイメージを作成する際に、Spark JAR パッケージを置き換えたり、特定の
ライブラリ、ファイル、またはJAR パッケージへの依存関係を追加したりできます。Alibaba Cloud E-MapReduce (EMR) クラスターを作成し、DataWorks に登録済みであること。詳細については、「Data Studio: EMR コンピューティングリソースの関連付け」をご参照ください。
(オプション。RAM ユーザーの場合は必須) タスク開発を担当するリソースアクセス管理 (RAM) ユーザーをワークスペースに追加し、[開発者] または [ワークスペース管理者] ロールを割り当てます。ワークスペース管理者ロールには広範な権限があるため、注意して付与してください。メンバーの追加方法の詳細については、「ワークスペースにメンバーを追加する」をご参照ください。
Alibaba Cloud アカウントを使用している場合は、このステップをスキップできます。
ジョブに特定の開発環境が必要な場合は、DataWorks のカスタムイメージ機能を使用して、必要なコンポーネントを含むイメージを構築します。詳細については、「カスタムイメージ」をご参照ください。
制限事項
このタスクタイプは、サーバーレスリソースグループ (推奨) または専用スケジューリングリソースグループでのみ実行されます。データ開発でイメージを使用する場合は、サーバーレスリソースグループを使用する必要があります。
DataLake またはカスタムクラスターのメタデータを DataWorks で管理するには、まずクラスターで EMR-HOOK を設定する必要があります。詳細については、「Spark SQL 用に EMR-HOOK を設定する」をご参照ください。
説明クラスターで EMR-HOOK が設定されていない場合、DataWorks でメタデータのリアルタイム表示、監査ログの生成、データリネージの表示、または EMR 関連のデータガバナンス タスクを実行することはできません。
E-MapReduce on Container Service for Kubernetes (EMR on ACK) にデプロイされた Spark クラスターのデータリネージは表示できません。EMR Serverless Spark クラスターのデータリネージは表示できます。
EMR on ACK および EMR Serverless Spark クラスターでは、OSS REF を使用して Object Storage Service (OSS) のリソースのみを参照でき、リソースは OSS にのみアップロードできます。Hadoop 分散ファイルシステム (HDFS) へのリソースのアップロードはサポートされていません。
DataLake およびカスタムクラスターは、OSS REF を使用した OSS リソースの参照、OSS へのリソースのアップロード、および HDFS へのリソースのアップロードをサポートしています。
注意事項
現在のワークスペースにバインドされている EMR クラスターで Spark の Ranger アクセス制御を有効にした場合:
この機能は、デフォルトのイメージを使用する Spark タスクを実行する場合、デフォルトで利用可能です。
カスタムイメージを使用する Spark タスクを実行するには、チケットを送信してイメージをアップグレードし、この機能をサポートする必要があります。
Spark ジョブの開発とパッケージ化
DataWorks で EMR Spark ジョブをスケジュールする前に、まず EMR でジョブコードを開発し、コンパイルして JAR パッケージを生成する必要があります。EMR Spark ジョブの開発方法の詳細については、「概要」をご参照ください。
EMR Spark ジョブをスケジュールするには、JAR パッケージを DataWorks にアップロードする必要があります。
操作手順
EMR Spark ノードの編集ページで、以下の手順に従ってジョブを設定します。
Spark ジョブの開発
ユースケースに応じて、次のいずれかのオプションを選択します。
オプション 1: EMR JAR のアップロードと参照
Data Studio では、ローカルマシンからリソースをアップロードして参照できます。EMR Spark ジョブをコンパイルした後、JAR パッケージを取得します。JAR パッケージのサイズに基づいてストレージ方法を選択します。
JAR パッケージをアップロードして DataWorks で EMR リソースを作成してサブミットするか、EMR の HDFS に直接保存します。EMR on ACK および EMR Serverless Spark クラスターは、HDFS へのリソースのアップロードをサポートしていません。
JAR が 500 MB 未満の場合
EMR JAR リソースを作成します。
JAR パッケージが 500 MB 未満の場合、ローカルマシンからアップロードして DataWorks で EMR JAR リソースを作成できます。これにより、DataWorks コンソールでの視覚的な管理が可能になります。リソースを作成した後、それをサブミットする必要があります。詳細については、「EMR リソースの作成と使用」をご参照ください。
ローカルマシンから [ローカル] の JAR パッケージを、JAR リソースが格納されているディレクトリにアップロードします。詳細については、「リソース管理」をご参照ください。
JAR リソースをアップロードするには、[アップロード] をクリックします。
[ストレージパス]、[データソース]、および[リソースグループ] を選択します。
[保存] をクリックします。

EMR JAR リソースを参照します。
作成した [EMR Spark] ノードを開いて、コードエディタに移動します。
左側のナビゲーションウィンドウで、参照するリソースを見つけ、右クリックして、[参照リソース] を選択します。
リソースを選択すると、そのリファレンスが自動的に[EMR Spark]ノードのコードエディタに追加されます。
##@resource_reference{"spark-examples_2.12-1.0.0-SNAPSHOT-shaded.jar"} spark-examples_2.12-1.0.0-SNAPSHOT-shaded.jarこのコードは参照を確認します。このコードでは、
spark-examples_2.12-1.0.0-SNAPSHOT-shaded.jarはアップロードした EMR JAR リソースの名前です。EMR Spark ノードのコードに `spark-submit` コマンドを追加します。次のコードは例です。
説明ジョブコードにコメントを追加しないでください。ノード実行時にエラーが発生する原因となります。次の例に基づいてコードを修正してください。
##@resource_reference{"spark-examples_2.11-2.4.0.jar"} spark-submit --class org.apache.spark.examples.SparkPi --master yarn spark-examples_2.11-2.4.0.jar 100説明org.apache.spark.examples.SparkPi: コンパイルされた JAR パッケージ内のジョブのメインクラスです。spark-examples_2.11-2.4.0.jar: アップロードした EMR JAR リソースの名前です。他のパラメーターは例のように使用するか、`spark-submit --help` コマンドを実行してヘルプドキュメントを表示し、必要に応じてコマンドを修正できます。
Spark ノードで spark-submit コマンドの簡略化されたパラメーターを使用する必要がある場合は、それらをコードに追加する必要があります。たとえば、`--executor-memory 2G` を追加します。
Spark ノードは、クラスターモードで YARN を使用したジョブのサブミットのみをサポートします。
`spark-submit` を使用してサブミットされたジョブの場合、`deploy-mode` を `client mode` ではなく `cluster mode` に設定します。
JAR が 500 MB 以上の場合
EMR JAR リソースを作成します。
JAR パッケージが 500 MB 以上の場合は、ローカルマシンからアップロードして DataWorks リソースを作成することはできません。代わりに、JAR パッケージを EMR の HDFS に保存し、そのストレージパスを記録します。これにより、DataWorks で Spark ジョブをスケジュールする際にパスを参照できます。
JAR パッケージを[ローカル]マシンから、JAR リソースが保存されているディレクトリにアップロードします。詳細については、「リソース管理」をご参照ください。
「アップロード」をクリックして、JAR リソースをアップロードします。
[ストレージパス]、[データソース]、および[リソースグループ]を選択します。
[保存] をクリックします。

EMR JAR リソースを参照します。
JAR パッケージが HDFS に保存されている場合は、EMR Spark ノードのコードでそのパスを指定して参照します。
作成した[EMR Spark] ノードをダブルクリックして、コードエディタを開きます。
`spark-submit` コマンドを記述します。次のコードは例です。
spark-submit --master yarn --deploy-mode cluster --name SparkPi --driver-memory 4G --driver-cores 1 --num-executors 5 --executor-memory 4G --executor-cores 1 --class org.apache.spark.examples.JavaSparkPi hdfs:///tmp/jars/spark-examples_2.11-2.4.8.jar 100説明hdfs:///tmp/jars/spark-examples_2.11-2.4.8.jar: HDFS 内の JAR パッケージの実際のパスです。org.apache.spark.examples.JavaSparkPi: コンパイルされた JAR パッケージ内のジョブの `メインクラス` です。他のパラメーターは EMR クラスター用であり、実際のクラスター設定に基づいて設定する必要があります。また、`spark-submit --help` コマンドを実行してヘルプドキュメントを表示し、必要に応じてコマンドを修正することもできます。
Spark ノードで `spark-submit` コマンドの簡略化されたパラメーターを使用する必要がある場合は、それらをコードに追加する必要があります。たとえば、`--executor-memory 2G` を追加します。
Spark ノードは、クラスターモードで YARN を使用したジョブのサブミットのみをサポートします。
spark-submitを使用してサブミットされたジョブの場合、deploy-modeをclient modeではなくcluster modeに設定します。
オプション 2: OSS リソースの直接参照
OSS REF を使用して、ノード内の OSS リソースを直接参照できます。EMR ノードが実行されると、DataWorks は参照された OSS リソースを自動的にロードしてジョブが使用できるようにします。この方法は、EMR ジョブで JAR 依存関係を実行する場合や、EMR ジョブがスクリプトに依存する場合などのシナリオでよく使用されます。
JAR リソースを開発します。
コードの依存関係を準備します。
必要なコードの依存関係は、EMR クラスターのマスターノードの `/usr/lib/emr/spark-current/jars/` パスにあります。次の例では Spark 3.4.2 を使用します。IDEA プロジェクトで、指定された pom 依存関係を追加し、関連するプラグインを参照します。
pom 依存関係の追加
<dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>3.4.2</version> </dependency> <!-- Apache Spark SQL --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>3.4.2</version> </dependency> </dependencies>プラグインの参照
<build> <sourceDirectory>src/main/scala</sourceDirectory> <testSourceDirectory>src/test/scala</testSourceDirectory> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.7.0</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> <configuration> <recompileMode>incremental</recompileMode> </configuration> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> <configuration> <args> <arg>-dependencyfile</arg> <arg>${project.build.directory}/.scala_dependencies</arg> </args> </configuration> </execution> </executions> </plugin> </plugins> </build>次のコードは例です。
package com.aliyun.emr.example.spark import org.apache.spark.sql.SparkSession object SparkMaxComputeDemo { def main(args: Array[String]): Unit = { // SparkSession を作成します。 val spark = SparkSession.builder() .appName("HelloDataWorks") .getOrCreate() // Spark のバージョンを出力します。 println(s"Spark version: ${spark.version}") } }Scala コードを編集した後、JAR パッケージを生成します。
この例で生成される JAR パッケージは `SparkWorkOSS-1.0-SNAPSHOT-jar-with-dependencies.jar` です。
JAR リソースをアップロードします。
コードの開発が完了したら、OSS コンソール にログインします。左側のナビゲーションウィンドウで、[バケット一覧] をクリックします。
宛先バケットの名前をクリックし、[ファイル管理] ページに移動します。
この例では `onaliyun-bucket-2` バケットを使用します。
[ディレクトリの作成] をクリックして、JARリソース用のディレクトリを作成します。
[ディレクトリ名] を
emr/jarsに設定して、ディレクトリを作成します。JAR リソースをディレクトリにアップロードします。
ディレクトリに移動し、[ファイルのアップロード] をクリックします。[アップロードするファイル] セクションで [ファイルのスキャン] をクリックし、
SparkWorkOSS-1.0-SNAPSHOT-jar-with-dependencies.jarファイルを選択してから、[ファイルのアップロード] をクリックします。
JAR リソースを参照します。
コードを編集して JAR リソースを参照します。
作成した EMR Spark ノードの編集ページで、コードを編集して JAR リソースを参照します。
spark-submit --class com.aliyun.emr.example.spark.SparkMaxComputeDemo --master yarn ossref://onaliyun-bucket-2/emr/jars/SparkWorkOSS-1.0-SNAPSHOT-jar-with-dependencies.jar次の表にパラメーターを説明します。
パラメーター
説明
class実行するメインクラスの完全名です。
masterSpark アプリケーションが実行されるモードです。
ossrefファイルパスフォーマットは `ossref://{endpoint}/{bucket}/{object}` です。
エンドポイント: Object Storage Service (OSS) のパブリックエンドポイントです。空欄のままにした場合、OSSバケットはEMRクラスターと同じリージョン内にある必要があります。
[バケット]: オブジェクトを格納するために使用される OSS コンテナーです。各 バケットには一意の名前があります。現在のアカウントで利用可能なすべてのバケットを表示するには、OSS コンソールにログインしてください。
オブジェクト: [バケット] に格納されている特定のオブジェクト (ファイル名またはパス) 。
EMR Spark ノードジョブを実行します。
編集後、
アイコンをクリックし、作成したサーバーレスリソースグループを選択して EMR Spark ノードを実行します。ジョブが完了したら、コンソールに出力された `applicationId` (例: `application_1730367929285_xxxx`) を記録します。結果を表示します。
EMR Shell ノードを作成し、そのノードで `yarn logs -applicationId application_1730367929285_xxxx` コマンドを実行して結果を表示します。

(任意) 詳細パラメーターの設定
以下の表に記載されているパラメーターは、ノードの右側にあるペインの[EMRノードパラメーター]および[DataWorksパラメーター]セクションで設定できます。
説明設定できる詳細パラメーターは、次の表に示すように、EMR クラスターのタイプによって異なります。
ペインの [EMR ノードパラメーター] および [Spark パラメーター] セクションで、オープンソースの Spark プロパティをさらに設定できます。
DataLake およびカスタム (ECS)
詳細パラメーター
説明
queue
ジョブをサブミットするためのスケジューリングキューです。デフォルトは `default` キューです。
EMR クラスターを DataWorks ワークスペースに登録する際に、ワークスペースレベルの YARN リソースキュー を設定した場合、以下のロジックが適用されます:
「[グローバル設定を優先]」が「[はい]」に設定されている場合、DataWorks は EMR クラスターの登録時に設定されたキューを使用して Spark ジョブを実行します。
このオプションが選択されていない場合、DataWorks は EMR Spark ノードで設定されたキューを使用して Spark ジョブを実行します。
EMR YARN の詳細については、「基本的なキュー設定」をご参照ください。EMR クラスター登録時のキュー設定の詳細については、「グローバル YARN キューの設定」をご参照ください。
priority
ジョブの優先度です。デフォルト値は 1 です。
FLOW_SKIP_SQL_ANALYZE
SQL 文の実行モードです。有効値:
true: 複数の SQL 文を一度に実行します。false(デフォルト): 一度に 1 つの SQL 文を実行します。
説明このパラメーターは、データ開発環境でのテスト実行でのみサポートされます。
その他
詳細設定でカスタム Spark パラメーターを追加できます。たとえば、`spark.eventLog.enabled : false` を追加できます。DataWorks は、EMR クラスターに送信する前に、パラメーターを自動的に `--conf key=value` 形式にフォーマットします。
グローバル Spark パラメーターも設定できます。詳細については、「グローバル Spark パラメーターの設定」をご参照ください。
説明Ranger 権限制御を有効にするには、「グローバル Spark パラメーターの設定」で `spark.hadoop.fs.oss.authorization.method=ranger` の設定を追加して、それが有効になるようにしてください。
Spark (ACK)
詳細パラメーター
説明
FLOW_SKIP_SQL_ANALYZE
SQL 文の実行モードです。有効値:
true: 複数の SQL 文を一度に実行します。false: 一度に 1 つの SQL 文を実行します。
説明このパラメーターは、データ開発環境でのテスト実行でのみサポートされます。
その他
詳細設定でカスタム Spark パラメーターを追加できます。たとえば、`spark.eventLog.enabled : false` を追加できます。DataWorks は、EMR クラスターに送信する前に、パラメーターを自動的に `--conf key=value` 形式にフォーマットします。
グローバル Spark パラメーターも設定できます。詳細については、「グローバル Spark パラメーターの設定」をご参照ください。
Hadoop (ECS)
詳細パラメーター
説明
queue
ジョブをサブミットするためのスケジューリングキューです。デフォルトのキューは `default` です。
DataWorks ワークスペースに EMR クラスターを登録する際に、ワークスペースレベルの YARN リソースキュー を設定した場合:
「グローバル設定を優先」を有効にすると、EMR クラスター登録時に設定されたキューが Spark タスクで使用されます。
このオプションを有効にしない場合、キューは Spark タスク実行時の EMR Spark ノードの設定によって決まります。
EMR YARN の詳細については、「基本的なキュー設定」をご参照ください。EMR クラスター登録時のキュー設定の詳細については、「グローバル YARN リソースキューの設定」をご参照ください。
priority
ジョブの優先度です。デフォルト値は 1 です。
FLOW_SKIP_SQL_ANALYZE
SQL 文の実行モードです。有効値:
true: 複数の SQL 文を一度に実行します。false: 一度に 1 つの SQL 文を実行します。
説明このパラメーターは、データ開発環境でのテスト実行でのみサポートされます。
USE_GATEWAY
ノードのジョブをゲートウェイクラスター経由でサブミットするかどうかを指定します。有効値:
true: ジョブをゲートウェイクラスター経由でサブミットします。false: ジョブをゲートウェイクラスター経由でサブミットしません。ジョブはデフォルトでヘッダーノードにサブミットされます。
説明ノードのクラスターがゲートウェイクラスターに関連付けられていない場合、このパラメーターを `true` に設定するとジョブのサブミットが失敗します。
その他
詳細設定でカスタム Spark パラメーターを追加できます。たとえば、`spark.eventLog.enabled : false` を追加できます。DataWorks は、EMR クラスターに送信する前に、パラメーターを自動的に `--conf key=value` 形式にフォーマットします。
グローバル Spark パラメーターも設定できます。詳細については、「グローバル Spark パラメーターの設定」をご参照ください。
説明Ranger 権限制御を有効にするには、「グローバル Spark パラメーターの設定」で `spark.hadoop.fs.oss.authorization.method=ranger` の設定を追加して、それが有効になるようにしてください。
EMR Serverless Spark
関連パラメーターの設定方法については、「Spark ジョブサブミット時のパラメーター設定」をご参照ください。
詳細パラメーター
説明
queue
ジョブをサブミットするためのスケジューリングキューです。デフォルトは `dev_queue` キューです。
priority
ジョブの優先度です。デフォルト値は 1 です。
FLOW_SKIP_SQL_ANALYZE
SQL 文の実行モードです。有効値:
`true`: 複数の SQL 文を一度に実行します。
`false`: 一度に 1 つの SQL 文を実行します。
説明このパラメーターは、データ開発環境でのテスト実行でのみサポートされます。
SERVERLESS_RELEASE_VERSION
Spark エンジンのバージョン。デフォルトでは、システムは、[デフォルト エンジン バージョン] を使用します。このバージョンは、[クラスター管理] で設定されたクラスターのものであり、[管理センター] 内にあります。特定のジョブで異なるエンジン バージョンを使用する場合は、ここでデフォルトをオーバーライドできます。
SERVERLESS_QUEUE_NAME
リソースキューを指定します。デフォルトでは、システムは [デフォルト リソースキュー] を使用します。これは、[クラスター管理] でクラスターに対して設定されたキューであり、[管理センター] 内にあります。リソースの分離または管理が必要な場合は、ここで別のキューを指定できます。詳細については、「リソースキューの管理」をご参照ください。
その他
詳細設定でカスタム Spark パラメーターを追加できます。たとえば、`spark.eventLog.enabled : false` を追加できます。DataWorks は、EMR クラスターに送信する前に、パラメーターを自動的に `--conf key=value` 形式にフォーマットします。
グローバル Spark パラメーターも設定できます。詳細については、「グローバル Spark パラメーターの設定」をご参照ください。
Spark ジョブの実行
[Run Configuration] [コンピューティングリソース] セクションで、[コンピューティングリソース] および [DataWorks リソースグループ] を設定します。
説明また、ジョブの実行に必要なリソースに基づいて、[Scheduling CU] を設定することもできます。デフォルトの CU は
0.25です。パブリックネットワークまたは VPC 内のデータソースにアクセスするには、データソースへの接続が確立されているスケジューリングリソースグループを使用する必要があります。詳細については、「ネットワーク接続ソリューション」をご参照ください。
ツールバーのパラメーター ダイアログボックスで、対応するデータソースを選択し、[実行] をクリックして Spark ジョブを実行します。
ノードジョブを定期的に実行するには、ビジネス要件に基づいてスケジューリング情報を設定します。詳細については、「ノードのスケジューリング設定」をご参照ください。
説明コンポーネント環境をカスタマイズするには、公式イメージに基づいてカスタムの
dataworks_emr_base_task_podを作成し、「カスタムイメージ」および「データ開発でイメージを使用する」をご参照ください。たとえば、カスタムイメージを作成する際に、Spark JAR パッケージを置き換えたり、特定の `ライブラリ`、`ファイル`、または `jar パッケージ` への依存関係を追加したりできます。
ノードを設定した後、それを公開する必要があります。詳細については、「ノードとワークフローのデプロイメント」をご参照ください。
ノードを公開した後、オペレーションセンターでその定期タスクのステータスを表示できます。詳細については、「オペレーションセンター入門」をご参照ください。
よくある質問
Q: ノード実行時に接続タイムアウトエラーが発生するのはなぜですか?
A: [リソースグループ] と [クラスタ] のネットワーク接続を確認します。コンピューティングリソースページに移動し、リソースを見つけ、[リソースの初期化] をクリックします。表示されるダイアログボックスで、[再初期化] をクリックし、初期化が正常に完了したことを確認します。

