すべてのプロダクト
Search
ドキュメントセンター

E-MapReduce:バッチタスクまたはストリーミングタスクの開発

最終更新日:Feb 12, 2026

このトピックでは、バッチタスクまたはストリーミングタスクを開発するための設定項目と手順について説明します。

前提条件

ワークスペースが作成されていること。詳細については、「ワークスペースの管理」をご参照ください。

手順

  1. データ開発ページに移動します。

    1. E-MapReduce コンソールにログインします

    2. 左側のナビゲーションウィンドウで、EMR Serverless > Spark を選択します。

    3. Spark ページで、対象のワークスペースの名前をクリックします。

    4. EMR Serverless Spark ページで、左側のナビゲーションウィンドウの Data Development をクリックします。

  2. タスクを作成します。

    1. 開発 タブで、image (作成) アイコンをクリックします。

    2. 表示されたダイアログボックスで、Name を入力し、要件に応じてタイプとして Batch Job または Streaming Job を選択し、OK をクリックします。

    3. 右上隅でキューを選択します。

      キューの追加方法の詳細については、「リソースキューの管理」をご参照ください。

    4. 新しいタスクのエディターで、パラメーターを編集します。

      JAR

      パラメーター

      説明

      Main JAR Resource

      タスクの実行に必要なプライマリ JAR パッケージです。

      • WorkspaceFiles ページで事前にアップロードしたファイルです。

      • OSS:Alibaba Cloud OSS に保存されているファイルです。

      Engine Version

      Spark のバージョンです。詳細については、「エンジンバージョンの概要」をご参照ください。

      メインクラス

      Spark タスクの送信時に指定するメインクラスです。

      Execution Parameters

      タスクのランタイム中に必要な設定項目、またはメインクラスに渡されるカスタムパラメーターです。複数のパラメーターはスペースで区切ります。

      Timeout

      このタスクの完了を許可する最大時間です。タスクの実行時間がこのしきい値を超えると、システムは自動的にタスクを停止します。デフォルト値は空で、タイムアウト制限が設定されていないことを意味します。

      Network Connection

      Mount Integrated File Directory

      この機能はデフォルトで無効になっています。この機能を使用するには、まず Files ページの Integrated File Directory タブでファイルディレクトリを追加する必要があります。詳細については、「統合ファイルディレクトリ」をご参照ください。

      有効にすると、システムは管理対象のファイルディレクトリをタスクにマウントし、タスク内でそのディレクトリ内のファイルへの直接の読み取りおよび書き込みアクセスを許可します。

      マウント操作は、ドライバーの計算リソースを一定量消費します。消費されるリソースは、次の 2 つの値のうち大きい方になります:

      • 固定リソース:0.3 CPU コア + 1 GB メモリ。

      • 動的リソース:spark.driver リソースの 10% (つまり、0.1 × spark.driver コアとメモリ)。

      たとえば、spark.driver が 4 CPU コアと 8 GB のメモリで設定されている場合、動的リソースは 0.4 CPU コア + 0.8 GB メモリです。この場合、実際に消費されるリソースは max(0.3 Core + 1GB, 0.4 Core + 0.8GB) であり、0.4 CPU コア + 1 GB メモリとなります。

      説明

      マウントを有効にすると、デフォルトではディレクトリはドライバーにのみマウントされます。エグゼキュータにもディレクトリをマウントするには、エグゼキュータへのマウント を有効にしてください。

      重要

      統合 NAS ファイルディレクトリをマウントした後、ネットワーク接続を設定する必要があります。ネットワーク接続の VPC は、NAS マウントポイントが存在する VPC と同じである必要があります。

      エグゼキュータへのマウント

      有効にすると、システムは管理対象のファイルディレクトリをタスクのエグゼキュータにマウントし、タスクのエグゼキュータ内でそのディレクトリ内のファイルへの直接の読み取りおよび書き込みアクセスを許可します。

      マウントはエグゼキュータのリソースを消費します。具体的なリソース消費量は、ファイルの使用状況に応じて変動します。

      File Resources

      タスクを送信すると、--files パラメーターで指定されたファイルがエグゼキュータの作業ディレクトリにコピーされます。これにより、Spark タスクはランタイム時にこれらのファイルにアクセスできます。

      リソースタイプを指定する際、必要に応じて Workspace または OSS を選択できます。

      Archive Resources

      ジョブを送信すると、--archives パラメーターで指定されたアーカイブファイルが展開され、エグゼキュータに配布されます。

      リソースタイプを指定する際、Workspace または OSS を選択できます。

      JAR Resources

      タスクを送信する際に、--jars パラメーターを使用して必要な JAR 依存ファイルを指定します。

      リソースタイプを指定する際、必要に応じて Workspace または OSS を選択できます。

      spark.driver.cores

      Spark アプリケーションのドライバーが使用する CPU コアの数です。

      spark.driver.memory

      Spark アプリケーションのドライバーが利用できるメモリ量です。

      spark.executor.cores

      Spark アプリケーションの各エグゼキュータが使用する仮想 CPU コアの数です。

      spark.executor.memory

      Spark アプリケーションの各エグゼキュータが利用できるメモリ量です。

      spark.executor.instances

      Spark によって割り当てられるエグゼキュータの数です。

      Dynamic Resource Allocation

      この機能はデフォルトで無効になっています。この機能を有効にした後、次のパラメーターを設定します:

      • Minimum Number of Executors:デフォルト値は 2 です。

      • Maximum Number of Executorsspark.executor.instances が設定されていない場合、デフォルト値は 10 です。

      その他のメモリ設定

      • spark.driver.memoryOverhead:各ドライバーが利用できる非ヒープメモリです。このパラメーターが設定されていない場合、Spark はデフォルトの数式 max(384 MB, 10% × spark.driver.memory) に基づいて自動的に値を割り当てます。

      • spark.executor.memoryOverhead:各エグゼキュータが利用できる非ヒープメモリです。このパラメーターが設定されていない場合、Spark はデフォルトの数式 max(384 MB, 10% × spark.executor.memory) に基づいて自動的に値を割り当てます。

      • spark.memory.offHeap.size:Spark が利用できるオフヒープメモリのサイズです。デフォルト値は 1 GB です。

        このパラメーターは、spark.memory.offHeap.enabledtrue に設定されている場合にのみ有効です。デフォルトでは、Fusion Engine を使用する場合、この機能は有効になり、非ヒープメモリは 1 GB に設定されます。

      Spark 設定

      Spark の設定情報を入力します。キーと値のペアをスペースで区切ります。例:key value

      Tags

      タグのキーと値のペアを入力します。タグを使用すると、タスクをより便利かつ正確に管理できます。

      PySpark

      パラメーター

      説明

      Main Python Resources

      タスクの実行に必要なプライマリ Python ファイルです。

      • WorkspaceFiles ページで事前にアップロードしたファイルです。

      • OSS:Alibaba Cloud OSS に保存されているファイルです。

      Engine Version

      Spark のバージョンです。詳細については、「エンジンバージョンの概要」をご参照ください。

      Execution Parameters

      タスクのランタイム中に必要な設定項目、またはメインクラスに渡されるカスタムパラメーターです。

      Timeout

      このタスクの完了を許可する最大時間です。タスクの実行時間がこのしきい値を超えると、システムは自動的にタスクを停止します。デフォルト値は空で、タイムアウト制限が設定されていないことを意味します。

      Runtime Environment

      タスクの実行に必要なリソースは、選択した環境に基づいて事前に設定されます。

      Network Connection

      Mount Integrated File Directory

      この機能はデフォルトで無効になっています。この機能を使用するには、まず Files ページの Integrated File Directory タブでファイルディレクトリを追加する必要があります。詳細については、「統合ファイルディレクトリ」をご参照ください。

      有効にすると、システムは管理対象のファイルディレクトリをタスクにマウントし、タスク内でそのディレクトリ内のファイルへの直接の読み取りおよび書き込みアクセスを許可します。

      マウント操作は、ドライバーの計算リソースを一定量消費します。消費されるリソースは、次の 2 つの値のうち大きい方になります:

      • 固定リソース:0.3 CPU コア + 1 GB メモリ。

      • 動的リソース:spark.driver リソースの 10% (つまり、0.1 × spark.driver コアとメモリ)。

      たとえば、spark.driver が 4 CPU コアと 8 GB のメモリで設定されている場合、動的リソースは 0.4 CPU コア + 0.8 GB メモリです。この場合、実際に消費されるリソースは max(0.3 Core + 1GB, 0.4 Core + 0.8GB) であり、0.4 CPU コア + 1 GB メモリとなります。

      説明

      マウントを有効にすると、デフォルトではディレクトリはドライバーにのみマウントされます。エグゼキュータにもディレクトリをマウントするには、エグゼキュータへのマウント を有効にしてください。

      重要

      統合 NAS ファイルディレクトリをマウントした後、ネットワーク接続を設定する必要があります。ネットワーク接続の VPC は、NAS マウントポイントが存在する VPC と同じである必要があります。

      エグゼキュータへのマウント

      有効にすると、システムは管理対象のファイルディレクトリをタスクのエグゼキュータにマウントし、タスクのエグゼキュータ内でそのディレクトリ内のファイルへの直接の読み取りおよび書き込みアクセスを許可します。

      マウントはエグゼキュータのリソースを消費します。具体的なリソース消費量は、ファイルの使用状況に応じて変動します。

      File Resources

      クラスター内のすべてのエグゼキュータノードに配布されるファイルのリストです。

      リソースタイプを指定する際、必要に応じて Workspace または OSS を選択できます。

      Pyfiles Resources

      タスクを送信すると、--py-files パラメーターで指定されたファイルが Python 依存ファイルとして配布されます。

      リソースタイプを指定する際、Workspace または OSS を選択できます。

      Archive Resources

      タスクが送信されると、--archives パラメーターで指定されたファイルは展開され、アーカイブ済みオブジェクトとしてエグゼキュータに配布されます。

      必要に応じて Workspace または OSS を選択できます。

      JAR Resources

      タスクを送信する際に、--jars パラメーターを使用して必要な JAR 依存ファイルを指定します。

      リソースタイプを指定する際、Workspace または OSS を選択できます。

      spark.driver.cores

      Spark アプリケーションのドライバーが使用する CPU コアの数です。

      spark.driver.memory

      Spark アプリケーションのドライバーが利用できるメモリ量です。

      spark.executor.cores

      Spark アプリケーションの各エグゼキュータが使用する仮想 CPU コアの数です。

      spark.executor.memory

      Spark アプリケーションの各エグゼキュータが利用できるメモリ量です。

      spark.executor.instances

      Spark によって割り当てられるエグゼキュータの数です。

      Dynamic Resource Allocation

      この機能はデフォルトで無効になっています。この機能を有効にした後、次のパラメーターを設定します:

      • Minimum Number of Executors:デフォルト値は 2 です。

      • Maximum Number of Executorsspark.executor.instances が設定されていない場合、デフォルト値は 10 です。

      その他のメモリ設定

      • spark.driver.memoryOverhead:各ドライバーが利用できる非ヒープメモリです。このパラメーターが設定されていない場合、Spark はデフォルトの数式 max(384 MB, 10% × spark.driver.memory) に基づいて自動的に値を割り当てます。

      • spark.executor.memoryOverhead:各エグゼキュータが利用できる非ヒープメモリです。このパラメーターが設定されていない場合、Spark はデフォルトの数式 max(384 MB, 10% × spark.executor.memory) に基づいて自動的に値を割り当てます。

      • spark.memory.offHeap.size:Spark が利用できるオフヒープメモリのサイズです。デフォルト値は 1 GB です。

        このパラメーターは、spark.memory.offHeap.enabledtrue に設定されている場合にのみ有効です。デフォルトでは、Fusion Engine を使用する場合、この機能は有効になり、非ヒープメモリは 1 GB に設定されます。

      Spark 設定

      Spark の設定情報を入力します。キーと値のペアをスペースで区切ります。例:key value

      Tags

      タグのキーと値のペアを入力します。タグを使用すると、タスクをより便利かつ正確に管理できます。

      SQL

      パラメーター

      説明

      SQL File

      タスクを送信する際に必要なファイルです。

      • Workspace:ジョブを送信する前に Files ページでアップロードしたファイルです。

      • OSS:Alibaba Cloud OSS に保存されているファイルです。

      Engine Version

      Spark のバージョンです。詳細については、「エンジンバージョンの概要」をご参照ください。

      Timeout

      このタスクの完了を許可する最大時間です。タスクの実行時間がこのしきい値を超えると、システムは自動的にタスクを停止します。デフォルト値は空で、タイムアウト制限が設定されていないことを意味します。

      Network Connection

      Mount Integrated File Directory

      この機能はデフォルトで無効になっています。この機能を使用するには、まず Files ページの Integrated File Directory タブでファイルディレクトリを追加する必要があります。詳細については、「統合ファイルディレクトリ」をご参照ください。

      有効にすると、システムは管理対象のファイルディレクトリをタスクにマウントし、タスク内でそのディレクトリ内のファイルへの直接の読み取りおよび書き込みアクセスを許可します。

      マウント操作は、ドライバーの計算リソースを一定量消費します。消費されるリソースは、次の 2 つの値のうち大きい方になります:

      • 固定リソース:0.3 CPU コア + 1 GB メモリ。

      • 動的リソース:spark.driver リソースの 10% (つまり、0.1 × spark.driver コアとメモリ)。

      たとえば、spark.driver が 4 CPU コアと 8 GB のメモリで設定されている場合、動的リソースは 0.4 CPU コア + 0.8 GB メモリです。この場合、実際に消費されるリソースは max(0.3 Core + 1GB, 0.4 Core + 0.8GB) であり、0.4 CPU コア + 1 GB メモリとなります。

      説明

      マウントを有効にすると、デフォルトではディレクトリはドライバーにのみマウントされます。エグゼキュータにもディレクトリをマウントするには、エグゼキュータへのマウント を有効にしてください。

      重要

      統合 NAS ファイルディレクトリをマウントした後、ネットワーク接続を設定する必要があります。ネットワーク接続の VPC は、NAS マウントポイントが存在する VPC と同じである必要があります。

      エグゼキュータへのマウント

      有効にすると、システムは管理対象のファイルディレクトリをタスクのエグゼキュータにマウントし、タスクのエグゼキュータ内でそのディレクトリ内のファイルへの直接の読み取りおよび書き込みアクセスを許可します。

      マウントはエグゼキュータのリソースを消費します。具体的なリソース消費量は、ファイルの使用状況に応じて変動します。

      spark.driver.cores

      Spark アプリケーションのドライバーが使用する CPU コアの数です。

      spark.driver.memory

      Spark アプリケーションのドライバーが利用できるメモリ量です。

      spark.executor.cores

      Spark アプリケーションの各エグゼキュータが使用する仮想 CPU コアの数です。

      spark.executor.memory

      Spark アプリケーションの各エグゼキュータが利用できるメモリ量です。

      spark.executor.instances

      Spark によって割り当てられるエグゼキュータの数です。

      Dynamic Resource Allocation

      この機能はデフォルトで無効になっています。この機能を有効にした後、次のパラメーターを設定します:

      • Minimum Number of Executors:デフォルト値は 2 です。

      • Maximum Number of Executorsspark.executor.instances が設定されていない場合、デフォルト値は 10 です。

      その他のメモリ設定

      • spark.driver.memoryOverhead:各ドライバーが利用できる非ヒープメモリです。このパラメーターが設定されていない場合、Spark はデフォルトの数式 max(384 MB, 10% × spark.driver.memory) に基づいて自動的に値を割り当てます。

      • spark.executor.memoryOverhead:各エグゼキュータが利用できる非ヒープメモリです。このパラメーターが設定されていない場合、Spark はデフォルトの数式 max(384 MB, 10% × spark.executor.memory) に基づいて自動的に値を割り当てます。

      • spark.memory.offHeap.size:Spark が利用できるオフヒープメモリのサイズです。デフォルト値は 1 GB です。

        このパラメーターは、spark.memory.offHeap.enabledtrue に設定されている場合にのみ有効です。デフォルトでは、Fusion Engine を使用する場合、この機能は有効になり、非ヒープメモリは 1 GB に設定されます。

      Spark 設定

      Spark の設定情報を入力します。キーと値のペアをスペースで区切ります。例:key value

      Tags

      タグのキーと値のペアを入力します。タグを使用すると、タスクをより便利かつ正確に管理できます。

      Spark Submit

      パラメーター

      説明

      Engine Version

      Spark のバージョンです。詳細については、「エンジンバージョンの概要」をご参照ください。

      Script

      Spark Submit スクリプトを入力します。

      次のコードは例です:

      --class org.apache.spark.examples.SparkPi \
      --conf spark.executor.memory=2g \
      oss://<YourBucket>/spark-examples_2.12-3.5.2.jar

      Timeout

      このタスクの完了を許可する最大時間です。タスクの実行時間がこのしきい値を超えると、システムは自動的にタスクを停止します。デフォルト値は空で、タイムアウト制限が設定されていないことを意味します。

      Network Connection

      Mount Integrated File Directory

      この機能はデフォルトで無効になっています。この機能を使用するには、まず Files ページの Integrated File Directory タブでファイルディレクトリを追加する必要があります。詳細については、「統合ファイルディレクトリ」をご参照ください。

      有効にすると、システムは管理対象のファイルディレクトリをタスクにマウントし、タスク内でそのディレクトリ内のファイルへの直接の読み取りおよび書き込みアクセスを許可します。

      マウント操作は、ドライバーの計算リソースを一定量消費します。消費されるリソースは、次の 2 つの値のうち大きい方になります:

      • 固定リソース:0.3 CPU コア + 1 GB メモリ。

      • 動的リソース:spark.driver リソースの 10% (つまり、0.1 × spark.driver コアとメモリ)。

      たとえば、spark.driver が 4 CPU コアと 8 GB のメモリで設定されている場合、動的リソースは 0.4 CPU コア + 0.8 GB メモリです。この場合、実際に消費されるリソースは max(0.3 Core + 1GB, 0.4 Core + 0.8GB) であり、0.4 CPU コア + 1 GB メモリとなります。

      説明

      マウントを有効にすると、デフォルトではディレクトリはドライバーにのみマウントされます。エグゼキュータにもディレクトリをマウントするには、エグゼキュータへのマウント を有効にしてください。

      重要

      統合 NAS ファイルディレクトリをマウントした後、ネットワーク接続を設定する必要があります。ネットワーク接続の VPC は、NAS マウントポイントが存在する VPC と同じである必要があります。

      エグゼキュータへのマウント

      有効にすると、システムは管理対象のファイルディレクトリをタスクのエグゼキュータにマウントし、タスクのエグゼキュータ内でそのディレクトリ内のファイルへの直接の読み取りおよび書き込みアクセスを許可します。

      マウントはエグゼキュータのリソースを消費します。具体的なリソース消費量は、ファイルの使用状況に応じて変動します。

      spark.driver.cores

      Spark アプリケーションのドライバーが使用する CPU コアの数です。

      spark.driver.memory

      Spark アプリケーションのドライバーが利用できるメモリ量です。

      spark.executor.cores

      Spark アプリケーションの各エグゼキュータが使用する仮想 CPU コアの数です。

      spark.executor.memory

      Spark アプリケーションの各エグゼキュータが利用できるメモリ量です。

      spark.executor.instances

      Spark によって割り当てられるエグゼキュータの数です。

      Dynamic Resource Allocation

      この機能はデフォルトで無効になっています。この機能を有効にした後、次のパラメーターを設定します:

      • Minimum Number of Executors:デフォルト値は 2 です。

      • Maximum Number of Executorsspark.executor.instances が設定されていない場合、デフォルト値は 10 です。

      その他のメモリ設定

      • spark.driver.memoryOverhead:各ドライバーが利用できる非ヒープメモリです。このパラメーターが設定されていない場合、Spark はデフォルトの数式 max(384 MB, 10% × spark.driver.memory) に基づいて自動的に値を割り当てます。

      • spark.executor.memoryOverhead:各エグゼキュータが利用できる非ヒープメモリです。このパラメーターが設定されていない場合、Spark はデフォルトの数式 max(384 MB, 10% × spark.executor.memory) に基づいて自動的に値を割り当てます。

      • spark.memory.offHeap.size:Spark が利用できるオフヒープメモリのサイズです。デフォルト値は 1 GB です。

        このパラメーターは、spark.memory.offHeap.enabledtrue に設定されている場合にのみ有効です。デフォルトでは、Fusion Engine を使用する場合、この機能は有効になり、非ヒープメモリは 1 GB に設定されます。

      Spark 設定

      Spark の設定情報を入力します。キーと値のペアをスペースで区切ります。例:key value

      Tags

      タグのキーと値のペアを入力します。タグを使用すると、タスクをより便利かつ正確に管理できます。

    5. (任意) タスク開発ページの右側で、Version Information タブをクリックしてバージョン情報を表示したり、バージョンを比較したりします。

  3. タスクの実行と公開

    1. Run をクリックします。

      タスクが実行された後、ページ下部の Execution Records エリアに移動し、[操作] 列の Details をクリックします。Overview ページにリダイレクトされ、タスクの詳細を表示できます。

    2. 右上隅で Publish をクリックします。

    3. Publish ダイアログボックスで、Remarks を入力し、OK をクリックします。

関連ドキュメント

よくある質問

Q1:失敗したタスクの自動リトライポリシーを設定するにはどうすればよいですか?

ストリーミングタスクのフォールトトレランスを向上させるために、次の 2 つの Spark 設定項目を使用して自動リトライポリシーを設定できます:

spark.emr.serverless.streaming.fail.retry.interval 60    # リトライ間隔:60 秒
spark.emr.serverless.streaming.fail.retry.time 3        # 最大リトライ回数:3