このトピックでは、バッチタスクまたはストリーミングタスクを開発するための設定項目と手順について説明します。
前提条件
ワークスペースが作成されていること。詳細については、「ワークスペースの管理」をご参照ください。
手順
データ開発ページに移動します。
E-MapReduce コンソールにログインします。
左側のナビゲーションウィンドウで、 を選択します。
Spark ページで、対象のワークスペースの名前をクリックします。
EMR Serverless Spark ページで、左側のナビゲーションウィンドウの Data Development をクリックします。
タスクを作成します。
開発 タブで、
(作成) アイコンをクリックします。表示されたダイアログボックスで、Name を入力し、要件に応じてタイプとして Batch Job または Streaming Job を選択し、OK をクリックします。
右上隅でキューを選択します。
キューの追加方法の詳細については、「リソースキューの管理」をご参照ください。
新しいタスクのエディターで、パラメーターを編集します。
JAR
パラメーター
説明
Main JAR Resource
タスクの実行に必要なプライマリ JAR パッケージです。
Workspace:Files ページで事前にアップロードしたファイルです。
OSS:Alibaba Cloud OSS に保存されているファイルです。
Engine Version
Spark のバージョンです。詳細については、「エンジンバージョンの概要」をご参照ください。
メインクラス
Spark タスクの送信時に指定するメインクラスです。
Execution Parameters
タスクのランタイム中に必要な設定項目、またはメインクラスに渡されるカスタムパラメーターです。複数のパラメーターはスペースで区切ります。
Timeout
このタスクの完了を許可する最大時間です。タスクの実行時間がこのしきい値を超えると、システムは自動的にタスクを停止します。デフォルト値は空で、タイムアウト制限が設定されていないことを意味します。
Network Connection
Mount Integrated File Directory
この機能はデフォルトで無効になっています。この機能を使用するには、まず Files ページの Integrated File Directory タブでファイルディレクトリを追加する必要があります。詳細については、「統合ファイルディレクトリ」をご参照ください。
有効にすると、システムは管理対象のファイルディレクトリをタスクにマウントし、タスク内でそのディレクトリ内のファイルへの直接の読み取りおよび書き込みアクセスを許可します。
マウント操作は、ドライバーの計算リソースを一定量消費します。消費されるリソースは、次の 2 つの値のうち大きい方になります:
固定リソース:0.3 CPU コア + 1 GB メモリ。
動的リソース:
spark.driverリソースの 10% (つまり、0.1 × spark.driverコアとメモリ)。
たとえば、
spark.driverが 4 CPU コアと 8 GB のメモリで設定されている場合、動的リソースは 0.4 CPU コア + 0.8 GB メモリです。この場合、実際に消費されるリソースはmax(0.3 Core + 1GB, 0.4 Core + 0.8GB)であり、0.4 CPU コア + 1 GB メモリとなります。説明マウントを有効にすると、デフォルトではディレクトリはドライバーにのみマウントされます。エグゼキュータにもディレクトリをマウントするには、エグゼキュータへのマウント を有効にしてください。
重要統合 NAS ファイルディレクトリをマウントした後、ネットワーク接続を設定する必要があります。ネットワーク接続の VPC は、NAS マウントポイントが存在する VPC と同じである必要があります。
エグゼキュータへのマウント
有効にすると、システムは管理対象のファイルディレクトリをタスクのエグゼキュータにマウントし、タスクのエグゼキュータ内でそのディレクトリ内のファイルへの直接の読み取りおよび書き込みアクセスを許可します。
マウントはエグゼキュータのリソースを消費します。具体的なリソース消費量は、ファイルの使用状況に応じて変動します。
File Resources
タスクを送信すると、
--filesパラメーターで指定されたファイルがエグゼキュータの作業ディレクトリにコピーされます。これにより、Spark タスクはランタイム時にこれらのファイルにアクセスできます。リソースタイプを指定する際、必要に応じて Workspace または OSS を選択できます。
Archive Resources
ジョブを送信すると、
--archivesパラメーターで指定されたアーカイブファイルが展開され、エグゼキュータに配布されます。リソースタイプを指定する際、Workspace または OSS を選択できます。
JAR Resources
タスクを送信する際に、
--jarsパラメーターを使用して必要な JAR 依存ファイルを指定します。リソースタイプを指定する際、必要に応じて Workspace または OSS を選択できます。
spark.driver.cores
Spark アプリケーションのドライバーが使用する CPU コアの数です。
spark.driver.memory
Spark アプリケーションのドライバーが利用できるメモリ量です。
spark.executor.cores
Spark アプリケーションの各エグゼキュータが使用する仮想 CPU コアの数です。
spark.executor.memory
Spark アプリケーションの各エグゼキュータが利用できるメモリ量です。
spark.executor.instances
Spark によって割り当てられるエグゼキュータの数です。
Dynamic Resource Allocation
この機能はデフォルトで無効になっています。この機能を有効にした後、次のパラメーターを設定します:
Minimum Number of Executors:デフォルト値は 2 です。
Maximum Number of Executors:spark.executor.instances が設定されていない場合、デフォルト値は 10 です。
その他のメモリ設定
spark.driver.memoryOverhead:各ドライバーが利用できる非ヒープメモリです。このパラメーターが設定されていない場合、Spark はデフォルトの数式
max(384 MB, 10% × spark.driver.memory)に基づいて自動的に値を割り当てます。spark.executor.memoryOverhead:各エグゼキュータが利用できる非ヒープメモリです。このパラメーターが設定されていない場合、Spark はデフォルトの数式
max(384 MB, 10% × spark.executor.memory)に基づいて自動的に値を割り当てます。spark.memory.offHeap.size:Spark が利用できるオフヒープメモリのサイズです。デフォルト値は 1 GB です。
このパラメーターは、
spark.memory.offHeap.enabledがtrueに設定されている場合にのみ有効です。デフォルトでは、Fusion Engine を使用する場合、この機能は有効になり、非ヒープメモリは 1 GB に設定されます。
Spark 設定
Spark の設定情報を入力します。キーと値のペアをスペースで区切ります。例:
key value。Tags
タグのキーと値のペアを入力します。タグを使用すると、タスクをより便利かつ正確に管理できます。
PySpark
パラメーター
説明
Main Python Resources
タスクの実行に必要なプライマリ Python ファイルです。
Workspace:Files ページで事前にアップロードしたファイルです。
OSS:Alibaba Cloud OSS に保存されているファイルです。
Engine Version
Spark のバージョンです。詳細については、「エンジンバージョンの概要」をご参照ください。
Execution Parameters
タスクのランタイム中に必要な設定項目、またはメインクラスに渡されるカスタムパラメーターです。
Timeout
このタスクの完了を許可する最大時間です。タスクの実行時間がこのしきい値を超えると、システムは自動的にタスクを停止します。デフォルト値は空で、タイムアウト制限が設定されていないことを意味します。
Runtime Environment
タスクの実行に必要なリソースは、選択した環境に基づいて事前に設定されます。
Network Connection
Mount Integrated File Directory
この機能はデフォルトで無効になっています。この機能を使用するには、まず Files ページの Integrated File Directory タブでファイルディレクトリを追加する必要があります。詳細については、「統合ファイルディレクトリ」をご参照ください。
有効にすると、システムは管理対象のファイルディレクトリをタスクにマウントし、タスク内でそのディレクトリ内のファイルへの直接の読み取りおよび書き込みアクセスを許可します。
マウント操作は、ドライバーの計算リソースを一定量消費します。消費されるリソースは、次の 2 つの値のうち大きい方になります:
固定リソース:0.3 CPU コア + 1 GB メモリ。
動的リソース:
spark.driverリソースの 10% (つまり、0.1 × spark.driverコアとメモリ)。
たとえば、
spark.driverが 4 CPU コアと 8 GB のメモリで設定されている場合、動的リソースは 0.4 CPU コア + 0.8 GB メモリです。この場合、実際に消費されるリソースはmax(0.3 Core + 1GB, 0.4 Core + 0.8GB)であり、0.4 CPU コア + 1 GB メモリとなります。説明マウントを有効にすると、デフォルトではディレクトリはドライバーにのみマウントされます。エグゼキュータにもディレクトリをマウントするには、エグゼキュータへのマウント を有効にしてください。
重要統合 NAS ファイルディレクトリをマウントした後、ネットワーク接続を設定する必要があります。ネットワーク接続の VPC は、NAS マウントポイントが存在する VPC と同じである必要があります。
エグゼキュータへのマウント
有効にすると、システムは管理対象のファイルディレクトリをタスクのエグゼキュータにマウントし、タスクのエグゼキュータ内でそのディレクトリ内のファイルへの直接の読み取りおよび書き込みアクセスを許可します。
マウントはエグゼキュータのリソースを消費します。具体的なリソース消費量は、ファイルの使用状況に応じて変動します。
File Resources
クラスター内のすべてのエグゼキュータノードに配布されるファイルのリストです。
リソースタイプを指定する際、必要に応じて Workspace または OSS を選択できます。
Pyfiles Resources
タスクを送信すると、
--py-filesパラメーターで指定されたファイルが Python 依存ファイルとして配布されます。リソースタイプを指定する際、Workspace または OSS を選択できます。
Archive Resources
タスクが送信されると、
--archivesパラメーターで指定されたファイルは展開され、アーカイブ済みオブジェクトとしてエグゼキュータに配布されます。必要に応じて Workspace または OSS を選択できます。
JAR Resources
タスクを送信する際に、
--jarsパラメーターを使用して必要な JAR 依存ファイルを指定します。リソースタイプを指定する際、Workspace または OSS を選択できます。
spark.driver.cores
Spark アプリケーションのドライバーが使用する CPU コアの数です。
spark.driver.memory
Spark アプリケーションのドライバーが利用できるメモリ量です。
spark.executor.cores
Spark アプリケーションの各エグゼキュータが使用する仮想 CPU コアの数です。
spark.executor.memory
Spark アプリケーションの各エグゼキュータが利用できるメモリ量です。
spark.executor.instances
Spark によって割り当てられるエグゼキュータの数です。
Dynamic Resource Allocation
この機能はデフォルトで無効になっています。この機能を有効にした後、次のパラメーターを設定します:
Minimum Number of Executors:デフォルト値は 2 です。
Maximum Number of Executors:spark.executor.instances が設定されていない場合、デフォルト値は 10 です。
その他のメモリ設定
spark.driver.memoryOverhead:各ドライバーが利用できる非ヒープメモリです。このパラメーターが設定されていない場合、Spark はデフォルトの数式
max(384 MB, 10% × spark.driver.memory)に基づいて自動的に値を割り当てます。spark.executor.memoryOverhead:各エグゼキュータが利用できる非ヒープメモリです。このパラメーターが設定されていない場合、Spark はデフォルトの数式
max(384 MB, 10% × spark.executor.memory)に基づいて自動的に値を割り当てます。spark.memory.offHeap.size:Spark が利用できるオフヒープメモリのサイズです。デフォルト値は 1 GB です。
このパラメーターは、
spark.memory.offHeap.enabledがtrueに設定されている場合にのみ有効です。デフォルトでは、Fusion Engine を使用する場合、この機能は有効になり、非ヒープメモリは 1 GB に設定されます。
Spark 設定
Spark の設定情報を入力します。キーと値のペアをスペースで区切ります。例:
key value。Tags
タグのキーと値のペアを入力します。タグを使用すると、タスクをより便利かつ正確に管理できます。
SQL
パラメーター
説明
SQL File
タスクを送信する際に必要なファイルです。
Workspace:ジョブを送信する前に Files ページでアップロードしたファイルです。
OSS:Alibaba Cloud OSS に保存されているファイルです。
Engine Version
Spark のバージョンです。詳細については、「エンジンバージョンの概要」をご参照ください。
Timeout
このタスクの完了を許可する最大時間です。タスクの実行時間がこのしきい値を超えると、システムは自動的にタスクを停止します。デフォルト値は空で、タイムアウト制限が設定されていないことを意味します。
Network Connection
Mount Integrated File Directory
この機能はデフォルトで無効になっています。この機能を使用するには、まず Files ページの Integrated File Directory タブでファイルディレクトリを追加する必要があります。詳細については、「統合ファイルディレクトリ」をご参照ください。
有効にすると、システムは管理対象のファイルディレクトリをタスクにマウントし、タスク内でそのディレクトリ内のファイルへの直接の読み取りおよび書き込みアクセスを許可します。
マウント操作は、ドライバーの計算リソースを一定量消費します。消費されるリソースは、次の 2 つの値のうち大きい方になります:
固定リソース:0.3 CPU コア + 1 GB メモリ。
動的リソース:
spark.driverリソースの 10% (つまり、0.1 × spark.driverコアとメモリ)。
たとえば、
spark.driverが 4 CPU コアと 8 GB のメモリで設定されている場合、動的リソースは 0.4 CPU コア + 0.8 GB メモリです。この場合、実際に消費されるリソースはmax(0.3 Core + 1GB, 0.4 Core + 0.8GB)であり、0.4 CPU コア + 1 GB メモリとなります。説明マウントを有効にすると、デフォルトではディレクトリはドライバーにのみマウントされます。エグゼキュータにもディレクトリをマウントするには、エグゼキュータへのマウント を有効にしてください。
重要統合 NAS ファイルディレクトリをマウントした後、ネットワーク接続を設定する必要があります。ネットワーク接続の VPC は、NAS マウントポイントが存在する VPC と同じである必要があります。
エグゼキュータへのマウント
有効にすると、システムは管理対象のファイルディレクトリをタスクのエグゼキュータにマウントし、タスクのエグゼキュータ内でそのディレクトリ内のファイルへの直接の読み取りおよび書き込みアクセスを許可します。
マウントはエグゼキュータのリソースを消費します。具体的なリソース消費量は、ファイルの使用状況に応じて変動します。
spark.driver.cores
Spark アプリケーションのドライバーが使用する CPU コアの数です。
spark.driver.memory
Spark アプリケーションのドライバーが利用できるメモリ量です。
spark.executor.cores
Spark アプリケーションの各エグゼキュータが使用する仮想 CPU コアの数です。
spark.executor.memory
Spark アプリケーションの各エグゼキュータが利用できるメモリ量です。
spark.executor.instances
Spark によって割り当てられるエグゼキュータの数です。
Dynamic Resource Allocation
この機能はデフォルトで無効になっています。この機能を有効にした後、次のパラメーターを設定します:
Minimum Number of Executors:デフォルト値は 2 です。
Maximum Number of Executors:spark.executor.instances が設定されていない場合、デフォルト値は 10 です。
その他のメモリ設定
spark.driver.memoryOverhead:各ドライバーが利用できる非ヒープメモリです。このパラメーターが設定されていない場合、Spark はデフォルトの数式
max(384 MB, 10% × spark.driver.memory)に基づいて自動的に値を割り当てます。spark.executor.memoryOverhead:各エグゼキュータが利用できる非ヒープメモリです。このパラメーターが設定されていない場合、Spark はデフォルトの数式
max(384 MB, 10% × spark.executor.memory)に基づいて自動的に値を割り当てます。spark.memory.offHeap.size:Spark が利用できるオフヒープメモリのサイズです。デフォルト値は 1 GB です。
このパラメーターは、
spark.memory.offHeap.enabledがtrueに設定されている場合にのみ有効です。デフォルトでは、Fusion Engine を使用する場合、この機能は有効になり、非ヒープメモリは 1 GB に設定されます。
Spark 設定
Spark の設定情報を入力します。キーと値のペアをスペースで区切ります。例:
key value。Tags
タグのキーと値のペアを入力します。タグを使用すると、タスクをより便利かつ正確に管理できます。
Spark Submit
パラメーター
説明
Engine Version
Spark のバージョンです。詳細については、「エンジンバージョンの概要」をご参照ください。
Script
Spark Submit スクリプトを入力します。
次のコードは例です:
--class org.apache.spark.examples.SparkPi \ --conf spark.executor.memory=2g \ oss://<YourBucket>/spark-examples_2.12-3.5.2.jarTimeout
このタスクの完了を許可する最大時間です。タスクの実行時間がこのしきい値を超えると、システムは自動的にタスクを停止します。デフォルト値は空で、タイムアウト制限が設定されていないことを意味します。
Network Connection
Mount Integrated File Directory
この機能はデフォルトで無効になっています。この機能を使用するには、まず Files ページの Integrated File Directory タブでファイルディレクトリを追加する必要があります。詳細については、「統合ファイルディレクトリ」をご参照ください。
有効にすると、システムは管理対象のファイルディレクトリをタスクにマウントし、タスク内でそのディレクトリ内のファイルへの直接の読み取りおよび書き込みアクセスを許可します。
マウント操作は、ドライバーの計算リソースを一定量消費します。消費されるリソースは、次の 2 つの値のうち大きい方になります:
固定リソース:0.3 CPU コア + 1 GB メモリ。
動的リソース:
spark.driverリソースの 10% (つまり、0.1 × spark.driverコアとメモリ)。
たとえば、
spark.driverが 4 CPU コアと 8 GB のメモリで設定されている場合、動的リソースは 0.4 CPU コア + 0.8 GB メモリです。この場合、実際に消費されるリソースはmax(0.3 Core + 1GB, 0.4 Core + 0.8GB)であり、0.4 CPU コア + 1 GB メモリとなります。説明マウントを有効にすると、デフォルトではディレクトリはドライバーにのみマウントされます。エグゼキュータにもディレクトリをマウントするには、エグゼキュータへのマウント を有効にしてください。
重要統合 NAS ファイルディレクトリをマウントした後、ネットワーク接続を設定する必要があります。ネットワーク接続の VPC は、NAS マウントポイントが存在する VPC と同じである必要があります。
エグゼキュータへのマウント
有効にすると、システムは管理対象のファイルディレクトリをタスクのエグゼキュータにマウントし、タスクのエグゼキュータ内でそのディレクトリ内のファイルへの直接の読み取りおよび書き込みアクセスを許可します。
マウントはエグゼキュータのリソースを消費します。具体的なリソース消費量は、ファイルの使用状況に応じて変動します。
spark.driver.cores
Spark アプリケーションのドライバーが使用する CPU コアの数です。
spark.driver.memory
Spark アプリケーションのドライバーが利用できるメモリ量です。
spark.executor.cores
Spark アプリケーションの各エグゼキュータが使用する仮想 CPU コアの数です。
spark.executor.memory
Spark アプリケーションの各エグゼキュータが利用できるメモリ量です。
spark.executor.instances
Spark によって割り当てられるエグゼキュータの数です。
Dynamic Resource Allocation
この機能はデフォルトで無効になっています。この機能を有効にした後、次のパラメーターを設定します:
Minimum Number of Executors:デフォルト値は 2 です。
Maximum Number of Executors:spark.executor.instances が設定されていない場合、デフォルト値は 10 です。
その他のメモリ設定
spark.driver.memoryOverhead:各ドライバーが利用できる非ヒープメモリです。このパラメーターが設定されていない場合、Spark はデフォルトの数式
max(384 MB, 10% × spark.driver.memory)に基づいて自動的に値を割り当てます。spark.executor.memoryOverhead:各エグゼキュータが利用できる非ヒープメモリです。このパラメーターが設定されていない場合、Spark はデフォルトの数式
max(384 MB, 10% × spark.executor.memory)に基づいて自動的に値を割り当てます。spark.memory.offHeap.size:Spark が利用できるオフヒープメモリのサイズです。デフォルト値は 1 GB です。
このパラメーターは、
spark.memory.offHeap.enabledがtrueに設定されている場合にのみ有効です。デフォルトでは、Fusion Engine を使用する場合、この機能は有効になり、非ヒープメモリは 1 GB に設定されます。
Spark 設定
Spark の設定情報を入力します。キーと値のペアをスペースで区切ります。例:
key value。Tags
タグのキーと値のペアを入力します。タグを使用すると、タスクをより便利かつ正確に管理できます。
(任意) タスク開発ページの右側で、Version Information タブをクリックしてバージョン情報を表示したり、バージョンを比較したりします。
タスクの実行と公開
Run をクリックします。
タスクが実行された後、ページ下部の Execution Records エリアに移動し、[操作] 列の Details をクリックします。Overview ページにリダイレクトされ、タスクの詳細を表示できます。
右上隅で Publish をクリックします。
Publish ダイアログボックスで、Remarks を入力し、OK をクリックします。
関連ドキュメント
JAR タスク開発プロセスの完全な例については、「JAR 開発のクイックスタート」をご参照ください。
Spark Submit タスク開発プロセスの完全な例については、「Spark Submit 開発のクイックスタート」をご参照ください。
SQL タスクの開発とオーケストレーションプロセスの完全な例については、「SparkSQL 開発のクイックスタート」をご参照ください。
PySpark バッチタスク開発プロセスの完全な例については、「PySpark 開発のクイックスタート」をご参照ください。
PySpark ストリーミングタスク開発プロセスの完全な例については、「Serverless Spark を使用した PySpark ストリーミングタスクの送信」をご参照ください。
よくある質問
Q1:失敗したタスクの自動リトライポリシーを設定するにはどうすればよいですか?
ストリーミングタスクのフォールトトレランスを向上させるために、次の 2 つの Spark 設定項目を使用して自動リトライポリシーを設定できます:
spark.emr.serverless.streaming.fail.retry.interval 60 # リトライ間隔:60 秒
spark.emr.serverless.streaming.fail.retry.time 3 # 最大リトライ回数:3