すべてのプロダクト
Search
ドキュメントセンター

E-MapReduce:バッチタスクまたはストリーミングタスクを開発する

最終更新日:Jan 30, 2026

このトピックでは、バッチタスクまたはストリーミングタスクを開発するための設定項目と手順について説明します。

前提条件

ワークスペースが作成されていること。 詳細については、「ワークスペースの管理」をご参照ください。

手順

  1. データ開発ページに移動します。

    1. E-MapReduce コンソールにログインします

    2. 左側のナビゲーションウィンドウで、EMR Serverless > Spark を選択します。

    3. [Spark] ページで、対象のワークスペースの名前をクリックします。

    4. [EMR Serverless Spark] ページで、左側のナビゲーションウィンドウにある [データ開発] をクリックします。

  2. タスクを作成します。

    1. [開発フォルダー] タブで、image (作成) アイコンをクリックします。

    2. 表示されるダイアログボックスで、[名前] を入力し、タイプとして [バッチタスク] または [ストリーミングタスク] を選択して、[OK] をクリックします。

    3. 右上隅で、キューを選択します。

      キューの追加方法の詳細については、「リソースキューの管理」をご参照ください。

    4. 新しいタスクのエディターで、パラメーターを編集します。

      JAR

      パラメーター

      説明

      メイン JAR リソース

      タスクの実行に必要なプライマリ JAR パッケージ。

      • ワークスペースリソース: [リソースアップロード] ページでアップロードしたファイル。

      • OSS リソース: Alibaba Cloud Object Storage Service (OSS) に保存されているファイル。

      エンジンバージョン

      Spark バージョン。 詳細については、「エンジンバージョン」をご参照ください。

      メインクラス

      Spark タスクの送信時に指定されるメインクラス。

      実行パラメーター

      タスクのランタイム中に必要な設定項目、またはメインクラスに渡されるカスタムパラメーター。 複数のパラメーターはスペースで区切ります。

      タイムアウト

      このタスクの完了が許可される最大時間。 タスクの実行時間がこのしきい値を超えると、システムはタスクを自動的に停止します。 デフォルト値は空で、タイムアウト制限が設定されていないことを意味します。

      ネットワーク接続

      ファイルリソース

      タスクを送信すると、--files パラメーターで指定されたファイルがエグゼキュータの作業ディレクトリにコピーされます。 これにより、Spark タスクはランタイムにこれらのファイルにアクセスできます。

      必要に応じて [ワークスペースリソース] または [OSS リソース] を選択できます。

      アーカイブリソース

      タスクを送信すると、--archives パラメーターで指定されたファイルが解凍され、エグゼキュータのアーカイブ済みオブジェクトに配布されます。

      必要に応じて [ワークスペースリソース] または [OSS リソース] を選択できます。

      JAR リソース

      タスクを送信するときに、--jars パラメーターを使用して必要な JAR 依存関係ファイルを指定します。

      必要に応じて [ワークスペースリソース] または [OSS リソース] を選択できます。

      spark.driver.cores

      Spark アプリケーションでドライバーが使用する CPU コアの数。

      spark.driver.memory

      Spark アプリケーションでドライバーが使用できるメモリ量。

      spark.executor.cores

      Spark アプリケーションの各エグゼキュータが使用する仮想 CPU コアの数。

      spark.executor.memory

      Spark アプリケーションの各エグゼキュータが使用できるメモリ量。

      spark.executor.instances

      Spark によって割り当てられるエグゼキュータの数。

      動的リソース割り当て

      デフォルトでは、この機能は無効になっています。 この機能を有効にした後、次のパラメーターを設定する必要があります:

      • エグゼキュータの最小数: デフォルト値: 2。

      • エグゼキュータの最大数: spark.executor.instances パラメーターを設定しない場合、デフォルト値 10 が使用されます。

      その他のメモリ設定

      • spark.driver.memoryOverhead: 各ドライバーが使用できる非ヒープメモリのサイズ。 このパラメーターを空のままにすると、Spark は次の数式に基づいてこのパラメーターに値を自動的に割り当てます: max(384 MB, 10% × spark.driver.memory)

      • spark.executor.memoryOverhead: 各エグゼキュータが使用できる非ヒープメモリのサイズ。 このパラメーターを空のままにすると、Spark は次の数式に基づいてこのパラメーターに値を自動的に割り当てます: max(384 MB, 10% × spark.executor.memory)

      • spark.memory.offHeap.size: Spark アプリケーションが使用できるオフヒープメモリのサイズ。 デフォルト値: 1 GB。

        このパラメーターは、spark.memory.offHeap.enabled パラメーターを true に設定した場合にのみ有効です。 デフォルトでは、Fusion エンジンを使用する場合、spark.memory.offHeap.enabled パラメーターは true に設定され、spark.memory.offHeap.size パラメーターは 1 GB に設定されます。

      Spark 設定

      Spark 設定情報を入力します。 キーと値のペアはスペースで区切ります。 例: key value

      タグ

      タグのキーと値のペアを入力します。 タグを使用すると、タスクをより便利かつ正確に管理できます。

      PySpark

      パラメーター

      説明

      メイン Python リソース

      タスクの実行に必要なプライマリ Python ファイル。

      • ワークスペースリソース: [リソースアップロード] ページでアップロードしたファイル。

      • OSS リソース: OSS に保存されているファイル。

      エンジンバージョン

      Spark バージョン。 詳細については、「エンジンバージョン」をご参照ください。

      実行パラメーター

      タスクのランタイム中に必要な設定項目、またはメインクラスに渡されるカスタムパラメーター。

      タイムアウト

      このタスクの完了が許可される最大時間。 タスクの実行時間がこのしきい値を超えると、システムはタスクを自動的に停止します。 デフォルト値は空で、タイムアウト制限が設定されていないことを意味します。

      ランタイム環境

      タスクの実行に必要なリソースは、選択した環境に基づいて事前設定されます。

      ネットワーク接続

      ファイルリソース

      クラスター内のすべてのエグゼキュータノードに配布されるファイルのリスト。

      必要に応じて [ワークスペースリソース] または [OSS リソース] を選択できます。

      Pyfiles リソース

      タスクを送信するときに、--py-files パラメーターで指定されたファイルが Python 依存関係ファイルとして配布されます。

      必要に応じて [ワークスペースリソース] または [OSS リソース] を選択できます。

      アーカイブリソース

      タスクを送信すると、--archives パラメーターで指定されたファイルが解凍され、エグゼキュータのアーカイブ済みオブジェクトに配布されます。

      必要に応じて [ワークスペースリソース] または [OSS リソース] を選択できます。

      JAR リソース

      タスクを送信するときに、--jars パラメーターを使用して必要な JAR 依存関係ファイルを指定します。

      必要に応じて [ワークスペースリソース] または [OSS リソース] を選択できます。

      spark.driver.cores

      Spark アプリケーションでドライバーが使用する CPU コアの数。

      spark.driver.memory

      Spark アプリケーションでドライバーが使用できるメモリ量。

      spark.executor.cores

      Spark アプリケーションの各エグゼキュータが使用する仮想 CPU コアの数。

      spark.executor.memory

      Spark アプリケーションの各エグゼキュータが使用できるメモリ量。

      spark.executor.instances

      Spark によって割り当てられるエグゼキュータの数。

      動的リソース割り当て

      デフォルトでは、この機能は無効になっています。 この機能を有効にした後、次のパラメーターを設定する必要があります:

      • エグゼキュータの最小数: デフォルト値: 2。

      • エグゼキュータの最大数: spark.executor.instances パラメーターを設定しない場合、デフォルト値 10 が使用されます。

      その他のメモリ設定

      • spark.driver.memoryOverhead: 各ドライバーが使用できる非ヒープメモリのサイズ。 このパラメーターを空のままにすると、Spark は次の数式に基づいてこのパラメーターに値を自動的に割り当てます: max(384 MB, 10% × spark.driver.memory)

      • spark.executor.memoryOverhead: 各エグゼキュータが使用できる非ヒープメモリのサイズ。 このパラメーターを空のままにすると、Spark は次の数式に基づいてこのパラメーターに値を自動的に割り当てます: max(384 MB, 10% × spark.executor.memory)

      • spark.memory.offHeap.size: Spark アプリケーションが使用できるオフヒープメモリのサイズ。 デフォルト値: 1 GB。

        このパラメーターは、spark.memory.offHeap.enabled パラメーターを true に設定した場合にのみ有効です。 デフォルトでは、Fusion エンジンを使用する場合、spark.memory.offHeap.enabled パラメーターは true に設定され、spark.memory.offHeap.size パラメーターは 1 GB に設定されます。

      Spark 設定

      Spark 設定情報を入力します。 キーと値のペアはスペースで区切ります。 例: key value

      タグ

      タグのキーと値のペアを入力します。 タグを使用すると、タスクをより便利かつ正確に管理できます。

      SQL

      パラメーター

      説明

      SQL ファイル

      タスクの送信時に必要なファイル。

      • ワークスペースリソース: [リソースアップロード] ページでアップロードしたファイル。

      • OSS リソース: OSS に保存されているファイル。

      エンジンバージョン

      Spark バージョン。 詳細については、「エンジンバージョン」をご参照ください。

      タイムアウト

      このタスクの完了が許可される最大時間。 タスクの実行時間がこのしきい値を超えると、システムはタスクを自動的に停止します。 デフォルト値は空で、タイムアウト制限が設定されていないことを意味します。

      ネットワーク接続

      spark.driver.cores

      Spark アプリケーションでドライバーが使用する CPU コアの数。

      spark.driver.memory

      Spark アプリケーションでドライバーが使用できるメモリ量。

      spark.executor.cores

      Spark アプリケーションの各エグゼキュータが使用する仮想 CPU コアの数。

      spark.executor.memory

      Spark アプリケーションの各エグゼキュータが使用できるメモリ量。

      spark.executor.instances

      Spark によって割り当てられるエグゼキュータの数。

      動的リソース割り当て

      デフォルトでは、この機能は無効になっています。 この機能を有効にした後、次のパラメーターを設定する必要があります:

      • エグゼキュータの最小数: デフォルト値: 2。

      • エグゼキュータの最大数: spark.executor.instances パラメーターを設定しない場合、デフォルト値 10 が使用されます。

      その他のメモリ設定

      • spark.driver.memoryOverhead: 各ドライバーが使用できる非ヒープメモリのサイズ。 このパラメーターを空のままにすると、Spark は次の数式に基づいてこのパラメーターに値を自動的に割り当てます: max(384 MB, 10% × spark.driver.memory)

      • spark.executor.memoryOverhead: 各エグゼキュータが使用できる非ヒープメモリのサイズ。 このパラメーターを空のままにすると、Spark は次の数式に基づいてこのパラメーターに値を自動的に割り当てます: max(384 MB, 10% × spark.executor.memory)

      • spark.memory.offHeap.size: Spark アプリケーションが使用できるオフヒープメモリのサイズ。 デフォルト値: 1 GB。

        このパラメーターは、spark.memory.offHeap.enabled パラメーターを true に設定した場合にのみ有効です。 デフォルトでは、Fusion エンジンを使用する場合、spark.memory.offHeap.enabled パラメーターは true に設定され、spark.memory.offHeap.size パラメーターは 1 GB に設定されます。

      Spark 設定

      Spark 設定情報を入力します。 キーと値のペアはスペースで区切ります。 例: key value

      タグ

      タグのキーと値のペアを入力します。 タグを使用すると、タスクをより便利かつ正確に管理できます。

      Spark Submit

      パラメーター

      説明

      エンジンバージョン

      Spark バージョン。 詳細については、「エンジンバージョン」をご参照ください。

      スクリプト

      Spark Submit スクリプトを入力します。

      次のコードは例です:

      --class org.apache.spark.examples.SparkPi \
      --conf spark.executor.memory=2g \
      oss://<YourBucket>/spark-examples_2.12-3.3.1.jar

      タイムアウト

      このタスクの完了が許可される最大時間。 タスクの実行時間がこのしきい値を超えると、システムはタスクを自動的に停止します。 デフォルト値は空で、タイムアウト制限が設定されていないことを意味します。

      ネットワーク接続

      spark.driver.cores

      Spark アプリケーションでドライバーが使用する CPU コアの数。

      spark.driver.memory

      Spark アプリケーションでドライバーが使用できるメモリ量。

      spark.executor.cores

      Spark アプリケーションの各エグゼキュータが使用する仮想 CPU コアの数。

      spark.executor.memory

      Spark アプリケーションの各エグゼキュータが使用できるメモリ量。

      spark.executor.instances

      Spark によって割り当てられるエグゼキュータの数。

      動的リソース割り当て

      デフォルトでは、この機能は無効になっています。 この機能を有効にした後、次のパラメーターを設定する必要があります:

      • エグゼキュータの最小数: デフォルト値: 2。

      • エグゼキュータの最大数: spark.executor.instances パラメーターを設定しない場合、デフォルト値 10 が使用されます。

      その他のメモリ設定

      • spark.driver.memoryOverhead: 各ドライバーが使用できる非ヒープメモリのサイズ。 このパラメーターを空のままにすると、Spark は次の数式に基づいてこのパラメーターに値を自動的に割り当てます: max(384 MB, 10% × spark.driver.memory)

      • spark.executor.memoryOverhead: 各エグゼキュータが使用できる非ヒープメモリのサイズ。 このパラメーターを空のままにすると、Spark は次の数式に基づいてこのパラメーターに値を自動的に割り当てます: max(384 MB, 10% × spark.executor.memory)

      • spark.memory.offHeap.size: Spark アプリケーションが使用できるオフヒープメモリのサイズ。 デフォルト値: 1 GB。

        このパラメーターは、spark.memory.offHeap.enabled パラメーターを true に設定した場合にのみ有効です。 デフォルトでは、Fusion エンジンを使用する場合、spark.memory.offHeap.enabled パラメーターは true に設定され、spark.memory.offHeap.size パラメーターは 1 GB に設定されます。

      Spark 設定

      Spark 設定情報を入力します。 キーと値のペアはスペースで区切ります。 例: key value

      タグ

      タグのキーと値のペアを入力します。 タグを使用すると、タスクをより便利かつ正確に管理できます。

    5. (オプション) タスク開発ページの右側で、[バージョン情報] タブをクリックして、バージョン情報を表示したり、バージョンを比較したりします。

  3. タスクを実行して発行します。

    1. [実行] をクリックします。

      タスクが実行された後、下部の [実行記録] エリアで、[アクション] 列の [詳細] をクリックします。 タスクの詳細を表示できる [概要] ページにリダイレクトされます。

    2. 右上隅で、[発行] をクリックします。

    3. [発行] ダイアログボックスで、[リリースノート] を入力し、[OK] をクリックします。

関連ドキュメント

よくある質問

Q1: 失敗したタスクの自動リトライポリシーを設定するにはどうすればよいですか?

ストリーミングタスクのフォールトトレランスを向上させるために、次の 2 つの Spark 設定項目を使用して自動リトライポリシーを設定できます:

spark.emr.serverless.streaming.fail.retry.interval 60    # リトライ間隔: 60 秒
spark.emr.serverless.streaming.fail.retry.time 3        # 最大リトライ回数: 3