本ページでは、MaxCompute 上で Spark ジョブを開発・実行する際に生じる代表的な質問をトピック別にまとめています。
Spark を使用した開発
プロジェクトの自己診断方法
ジョブを送信する前に、以下の項目を確認してください。
pom.xml の依存関係 — すべての spark-xxxx_${scala.binary.version} 依存関係の範囲を provided に設定する必要があります:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
メインクラス内の spark.master — yarn-cluster モードで送信する際は、local[N] をハードコードしないでください。
val spark = SparkSession
.builder()
.appName("SparkPi")
// yarn-cluster モードで送信する場合は、.config("spark.master", "local[4]") を削除してください。
.getOrCreate()
Scala のメインクラス定義 — エントリポイントは object でなければならず、class ではありません。
object SparkPi { // object でなければならず、class では main 関数をロードできません。
def main(args: Array[String]) {
val spark = SparkSession
.builder()
.appName("SparkPi")
.getOrCreate()
ハードコードされた構成 — コード内で直接設定した構成は、yarn-cluster モードでは有効にならない場合があります。すべての構成項目は、代わりに spark-defaults.conf に追加してください。
// クラスターモードでは、以下のように MaxCompute の構成をハードコードしないでください。
val spark = SparkSession
.builder()
.appName("SparkPi")
.config("key1", "value1")
.config("key2", "value2")
.getOrCreate()
yarn-cluster モードでジョブを送信する場合は、すべての構成項目を spark-defaults.conf に追加してください。
DataWorks で ODPS Spark ノードを実行する手順
-
Spark コードをローカルで作成・パッケージ化します。Python 環境は Python 2.7 である必要があります。
-
リソースパッケージを DataWorks にアップロードします。詳細については、「MaxCompute リソースの作成と使用」をご参照ください。
-
DataWorks で ODPS Spark ノードを作成します。詳細については、「ODPS Spark ノードの作成」をご参照ください。
-
ノードコードを作成して実行します。結果は DataWorks コンソールで確認できます。
MaxCompute 上の Spark をローカルでデバッグする方法
IntelliJ IDEA を使用してローカルデバッグを行います。詳細については、「Linux 開発環境のセットアップ」をご参照ください。
Spark から VPC 内のサービスにアクセスする方法
「Spark からの VPC タイプのインスタンスへのアクセス」をご参照ください。
JAR パッケージをリソースとして参照する方法
spark.hadoop.odps.cupid.resources パラメーターを使用します。リソースは複数のプロジェクト間で共有可能です。データセキュリティを確保するために、適切な権限を設定してください。
## spark-defaults.conf または DataWorks の設定項目に追加します。
spark.hadoop.odps.cupid.resources = projectname.xx0.jar,projectname.xx1.jar
Spark を使用してパラメーターを渡す方法
GitHub 上の「Spark on DataWorks」ガイドをご参照ください。
Spark を使用して DataHub のデータを MaxCompute にストリーミングする方法
サンプルコードについては、GitHub 上の「DataHub ストリーミングの例」をご参照ください。
オープンソースの Spark コードを MaxCompute 上の Spark に移行する方法
ジョブのデータアクセス要件に応じて、以下のいずれかを選択してください。
-
MaxCompute テーブルや OSS へのアクセスが不要な場合 — 既存の JAR パッケージをそのまま実行できます。Spark および Hadoop の依存関係を
providedに設定してください。詳細については、「Linux 開発環境のセットアップ」をご参照ください。 -
MaxCompute テーブルへのアクセスが必要な場合 — 必要な依存関係を追加し、再パッケージ化してください。詳細については、「Linux 開発環境のセットアップ」をご参照ください。
-
OSS へのアクセスが必要な場合 — 必要な OSS パッケージを取得し、「ネットワーク接続の確立」を行い、その後再コンパイルおよび再パッケージ化してください。詳細については、「Linux 開発環境のセットアップ」をご参照ください。
Spark を使用して MaxCompute テーブルのデータを処理する方法
MaxCompute 上の Spark は、ローカルモード、クラスターモード、DataWorks モードの 3 つの実行モードをサポートしています。モードによって構成が異なります。「実行モード」をご参照ください。
Spark のリソース並列度を設定する方法
並列度は、エグゼキュータ数 × 各エグゼキュータあたりの CPU コア数で決定されます。
最大並列タスク数 =spark.executor.instances×spark.executor.cores
| パラメーター | 説明 |
|---|---|
spark.executor.instances |
ジョブが要求するエグゼキュータ数 |
spark.executor.cores |
各エグゼキュータプロセスにおける CPU コア数。各コアは同時に 1 つのタスクを実行します。2~4 の範囲で設定してください。 |
メモリ不足 (OOM) 問題の解決方法
OOM エラーは以下の形式で表示されます。
-
java.lang.OutOfMemoryError: Java heap space -
java.lang.OutOfMemoryError: GC overhead limit exceeded -
Cannot allocate memory -
The job has been killed by "OOM Killer", please check your job's memory usage
OOM を解決するには、以下のパラメーターを調整してください。
| パラメーター | 説明 | 推奨事項 |
|---|---|---|
spark.executor.memory |
各エグゼキュータのヒープメモリ量 | 1:4 の比率で spark.executor.cores と連動させます。たとえば、spark.executor.cores=1 の場合、spark.executor.memory=4g と設定します。エグゼキュータで OutOfMemoryError が発生する場合は、値を増加させてください。 |
spark.executor.memoryOverhead |
各エグゼキュータのオフヒープメモリ量(JVM オーバーヘッド、文字列、NIO バッファーなど) | デフォルト値: spark.executor.memory × 0.1(最小値:384 MB)。エグゼキュータのログに Cannot allocate memory または OOM Killer エラーが表示される場合は、値を増加させてください。 |
spark.driver.memory |
ドライバーのヒープメモリ量 | 1:4 の比率で spark.driver.cores と連動させます。ドライバーが大量のデータを収集する場合や、OutOfMemoryError が発生する場合は、値を増加させてください。 |
spark.driver.memoryOverhead |
ドライバーのオフヒープメモリ量 | デフォルト値: spark.driver.memory × 0.1(最小値:384 MB)。ドライバーのログに Cannot allocate memory が表示される場合は、値を増加させてください。 |
ディスク領域不足問題の解決方法
No space left on device エラーは、ローカルディスク領域が枯渇したことを意味します。通常はエグゼキュータで発生し、そのプロセスが終了します。
この問題を解決するには、以下の 2 つの方法があります。
-
ディスクサイズの拡大 —
spark.hadoop.odps.cupid.disk.driver.device_sizeを設定します(デフォルト:20 GB、最大:100 GB)。このパラメーターはドライバーおよび各エグゼキュータに適用され、spark-defaults.confまたは DataWorks の設定項目でのみ有効です。 -
エグゼキュータ数の増加 — ディスクサイズを 100 GB まで増加させてもエラーが解消しない場合、単一のエグゼキュータのシャッフルデータが制限を超えている可能性があります。これはデータスキューが原因であることが多く、データの再パーティション分割が必要です。また、データ量が過大である場合も同様のエラーが発生します。この場合は、
spark.executor.instancesを増加させて負荷を分散してください。
MaxCompute プロジェクト内のリソースを参照する方法
Spark on MaxCompute は、2 つのメソッドをサポートしています。
方法 1:パラメーターによる参照(大規模リソースに推奨)
spark.hadoop.odps.cupid.resources を spark-defaults.conf または DataWorks の設定項目に設定します。書式は <projectname>.<resourcename>[:<newresourcename>] です。複数のリソースを指定する場合はカンマで区切ります。
リソースはドライバーおよび各エグゼキュータの作業ディレクトリにダウンロードされます。デフォルトのファイル名は <projectname>.<resourcename> です。リソースの名前を変更する場合は、<projectname>.<resourcename>:<newresourcename> を使用します。
## DataWorks の設定項目または spark-defaults.conf に追加します。
## 複数のリソースを参照します。
spark.hadoop.odps.cupid.resources=public.python-python-2.7-ucs4.zip,public.myjar.jar
## ダウンロード時にリソースの名前を変更します。
spark.hadoop.odps.cupid.resources=public.myjar.jar:myjar.jar
詳細については、「リソース操作」をご参照ください。
方法 2:DataWorks からの参照
DataWorks の [データ開発] ペインで、MaxCompute のリソースをビジネスフローに追加します。詳細については、「MaxCompute リソースの管理」をご参照ください。その後、ODPS Spark ノードで JAR、ファイル、およびアーカイブリソースを選択します。
方法 2 では、リソースはタスク実行時にアップロードされます。大規模なリソースには、方法 1 をご使用ください。
VPC へのアクセス方法
MaxCompute 上の Spark は、弾性ネットワークインターフェース(ENI)を使用して VPC に接続します。ENI 接続は、同一リージョン内の VPC に限定されます。複数の VPC にアクセスするには、対象の VPC を ENI 接続済みの VPC に接続してください。
ENI を使用した VPC アクセスを設定するには、以下の手順を実行します。
-
ENI を使用した専用回線接続を有効化します。「Spark からの VPC タイプのインスタンスへのアクセス」をご参照ください。
-
対象サービスで、ステップ 1 で作成した MaxCompute のセキュリティグループからのアクセスを許可するホワイトリストルールを追加します。たとえば、ApsaraDB RDS にアクセスする場合は、RDS のホワイトリストにステップ 1 のセキュリティグループを追加します。サービスが IP アドレスのみを受け付ける場合(セキュリティグループは不可)、ステップ 1 で使用した vSwitch CIDR ブロックを追加してください。
-
ジョブの ENI パラメーターを
spark-defaults.confまたは DataWorks の設定項目に設定します。[regionid]および[vpcid]は、実際の リージョン ID および VPC ID に置き換えてください。spark.hadoop.odps.cupid.eni.enable = true spark.hadoop.odps.cupid.eni.info = [regionid]:[vpcid]
インターネットへのアクセス方法
以下の 2 つの方法が利用可能です。
方法 1:ENI 接続
-
ENI を使用した専用回線接続を有効化します。「Spark からの VPC タイプのインスタンスへのアクセス」をご参照ください。
-
接続済みの VPC がインターネットに接続されていることを確認してください。「インターネット NAT ゲートウェイの SNAT 機能を使用したインターネットへのアクセス」をご参照ください。
-
ジョブを構成します。
[region]および[vpcid]は、実際の リージョン ID および VPC ID に置き換えてください。## DataWorks の設定項目または spark-defaults.conf に追加します。 spark.hadoop.odps.cupid.internet.access.list=aliyundoc.com:443 spark.hadoop.odps.cupid.eni.enable=true spark.hadoop.odps.cupid.eni.info=[region]:[vpcid]
方法 2:SmartNAT
SmartNAT は Spark 3.4 以降ではサポートされていません。
たとえば、https://aliyundoc.com:443 にアクセスする場合:
-
MaxCompute 開発者コミュニティ(DingTalk グループ ID:11782920)に参加し、サポートチームに
https://aliyundoc.com:443をodps.security.outbound.internetlistに追加するよう依頼してください。 -
spark-defaults.confまたは DataWorks の設定項目で SmartNAT を有効化します。spark.hadoop.odps.cupid.internet.access.list=aliyundoc.com:443 spark.hadoop.odps.cupid.smartnat.enable=true
OSS へのアクセス方法
MaxCompute 上の Spark は、Jindo SDK を介して Alibaba Cloud Object Storage Service (OSS) にアクセスします。
ステップ 1:Jindo SDK および OSS エンドポイントの構成
以下の内容を spark-defaults.conf または DataWorks の設定項目に追加します。
## Jindo SDK の JAR パッケージを参照します。
spark.hadoop.odps.cupid.resources=public.jindofs-sdk-3.7.2.jar
## OSS 実装クラスを設定します。
spark.hadoop.fs.AbstractFileSystem.oss.impl=com.aliyun.emr.fs.oss.OSS
spark.hadoop.fs.oss.impl=com.aliyun.emr.fs.oss.JindoOssFileSystem
## OSS エンドポイントを設定します(内部エンドポイントのみ)。
spark.hadoop.fs.oss.endpoint=oss-[YourRegionId]-internal.aliyuncs.com
## (任意)ランタイムで接続に失敗した場合、信頼できるサービスのホワイトリストに追加します。
spark.hadoop.odps.cupid.trusted.services.access.list=[YourBucketName].oss-[YourRegionId]-internal.aliyuncs.com
クラスターモードでは、内部 OSS エンドポイントのみがサポートされています。パブリックエンドポイントはサポートされていません。「OSS のリージョンとエンドポイント」をご参照ください。
ステップ 2:認証の構成
以下の 2 つの認証方式がサポートされています。
-
AccessKey ペア — 認証情報を直接
SparkConfに設定します。val conf = new SparkConf() .setAppName("jindo-sdk-demo") .set("spark.hadoop.fs.oss.accessKeyId", "<YourAccessKeyId>") .set("spark.hadoop.fs.oss.accessKeySecret", "<YourAccessKeySecret>") -
Security Token Service (STS) トークン — MaxCompute プロジェクトと OSS バケットが同一の Alibaba Cloud アカウントに属している場合に使用します。
-
ワンクリック承認 をクリックして、STS トークンを使用した MaxCompute の OSS リソースへのアクセスを承認します。
-
spark-defaults.confまたは DataWorks の設定項目で、ローカル HTTP サービスを有効化します。spark.hadoop.odps.cupid.http.server.enable = true -
SparkConfで STS 認証情報を構成します。${aliyun-uid}は Alibaba Cloud アカウントの UID に、${role-name}はロール名にそれぞれ置き換えてください。val conf = new SparkConf() .setAppName("jindo-sdk-demo") .set("spark.hadoop.fs.jfs.cache.oss.credentials.provider", "com.aliyun.emr.fs.auth.CustomCredentialsProvider") .set("spark.hadoop.aliyun.oss.provider.url", "http://localhost:10011/sts-token-info?user_id=${aliyun-uid}&role=${role-name}")
-
サードパーティ製 Python ライブラリを参照する方法
PySpark ジョブで No module named 'xxx' エラーが発生した場合、必要なライブラリがデフォルトの Python 環境に存在しません。以下の 3 つの方法が利用可能です。
方法 1:MaxCompute のパブリック Python 環境の使用
以下の内容を spark-defaults.conf または DataWorks の設定項目に追加します。
-
Python 2.7:
spark.hadoop.odps.cupid.resources = public.python-2.7.13-ucs4.tar.gz spark.pyspark.python = ./public.python-2.7.13-ucs4.tar.gz/python-2.7.13-ucs4/bin/python含まれるサードパーティ製ライブラリ:
https://odps-repo.oss-cn-hangzhou.aliyuncs.com/pyspark/py27/py27-default_req.txt -
Python 3.7:
spark.hadoop.odps.cupid.resources = public.python-3.7.9-ucs4.tar.gz spark.pyspark.python = ./public.python-3.7.9-ucs4.tar.gz/python-3.7.9-ucs4/bin/python3含まれるサードパーティ製ライブラリ:
https://odps-repo.oss-cn-hangzhou.aliyuncs.com/pyspark/py37/py37-default_req.txt
方法 2:単一の WHL パッケージのアップロード
Python 依存関係が少数かつシンプルな場合に使用します。
## WHL パッケージを .zip ファイルにリネームします(例:pymysql.zip)。
## .zip ファイルをアーカイブリソースとしてアップロードします。
## ODPS Spark ノード(アーカイブタイプ)で参照します。
## 以下の内容を spark-defaults.conf または DataWorks の設定項目に追加します:
spark.executorEnv.PYTHONPATH=pymysql
spark.yarn.appMasterEnv.PYTHONPATH=pymysql
その後、コード内でパッケージをインポートします。
import pymysql
方法 3:カスタム Python 環境全体のアップロード
複雑な依存関係がある場合や、カスタムの Python バージョンが必要な場合に使用します。Docker コンテナで Python 環境をパッケージ化し、アップロードします。「依存関係のパッケージ化」をご参照ください。
JAR 依存関係の競合を解決する方法
実行時に NoClassDefFoundError または NoSuchMethodError が発生した場合、JAR 内のサードパーティ製依存関係のバージョンが Spark のバンドル済み依存関係と競合している可能性があります。
-
pom.xmlで、Spark コミュニティ版、Hadoop コミュニティ版、ODPS/Cupid のすべての依存関係をprovidedに設定します。 -
競合する依存関係を特定して除外します。
-
除外だけでは不十分な場合、
maven-shade-plugin relocationを使用して、競合するパッケージを分離します。
ローカルモードでのデバッグ方法
構成方法は Spark のバージョンによって異なります。
Spark 2.3.0
-
以下の内容を
spark-defaults.confに追加します。spark.hadoop.odps.project.name=<Yourprojectname> spark.hadoop.odps.access.id=<YourAccessKeyID> spark.hadoop.odps.access.key=<YourAccessKeySecret> spark.hadoop.odps.end.point=<endpoint> -
ローカルモードで実行します。
./bin/spark-submit --master local spark_sql.py
Spark 2.4.5 / Spark 3.1.1
-
以下の内容で
odps.confを作成します。odps.access.id=<YourAccessKeyID> odps.access.key=<YourAccessKeySecret> odps.end.point=<endpoint> odps.project.name=<Yourprojectname> -
環境変数をファイルを指すように設定します。
export ODPS_CONF_FILE=/path/to/odps.conf -
ローカルモードで実行します。
./bin/spark-submit --master local spark_sql.py
ローカルモードでの一般的なエラー
| エラー | 原因 | 解決方法 |
|---|---|---|
Incomplete config, no accessId or accessKey / Incomplete config, no odps.service.endpoint |
ローカルモードで EventLog が有効になっています | spark.eventLog.enabled=true を spark-defaults.conf |
Cannot create CupidSession with empty CupidConf |
Spark 2.4.5 または 3.1.1 が odps.access.id を spark-defaults.conf |
odps.conf を作成し、ODPS_CONF_FILE 環境変数を設定して、再度実行してください |
java.util.NoSuchElementException: odps.access.id |
Spark 2.3.0 が Access ID を見つけられません | spark.hadoop.odps.access.id および関連パラメーターを spark-defaults.conf |
ジョブエラー
「User signature does not match」が発生した場合の対処方法
com.aliyun.odps.OdpsException: ODPS-0410042:
Invalid signature value - User signature does not match
spark-defaults.conf 内の AccessKey ID または AccessKey Secret が正しくありません。Alibaba Cloud コンソールの「ユーザー情報管理」に表示される AccessKey ID および AccessKey Secret と照合し、誤りがあれば修正してください。
「You have NO privilege」が発生した場合の対処方法
com.aliyun.odps.OdpsException: ODPS-0420095:
Access Denied - Authorization Failed [4019], You have NO privilege 'odps:CreateResource' on {acs:odps:*:projects/*}
アカウントに必要な権限がありません。プロジェクト所有者に連絡し、リソースに対する読み取りおよび作成権限を付与してもらってください。「MaxCompute の権限」をご参照ください。
「Access Denied」が発生した場合の対処方法
タスクがリリース範囲内にありません: CUPID
原因を診断します。
-
spark-defaults.conf 内の AccessKey ID または AccessKey Secret は正しいですか? — Alibaba Cloud コンソールの「ユーザー情報管理」と照合してください。「Linux 開発環境のセットアップ」で正しい構成フォーマットをご確認ください。
-
お使いのリージョンで MaxCompute 上の Spark が利用可能ですか? — サービスがお使いのリージョンで有効になっていない可能性があります。リージョンの可用性を確認するか、DingTalk グループ 21969532(MaxCompute 上の Spark サポート)に参加してサポートに問い合わせてください。
「No space left on device」が発生した場合の対処方法
Spark はシャッフルデータおよび BlockManager のオーバーフローのためにローカルディスクを使用します。ディスクサイズは spark.hadoop.odps.cupid.disk.driver.device_size(デフォルト:20 GB、最大:100 GB)で制御されます。
ディスクサイズを 100 GB まで増加させてもエラーが解消しない場合、データスキューが原因である可能性が高く、シャッフルまたはキャッシュ処理中にデータが少数のブロックに集中しています。この場合、spark.executor.cores を減らし、spark.executor.instances を増加させて、データをより均等に分散してください。
「Table or view not found」が発生した場合の対処方法
Table or view not found: xxx
原因を診断します。
-
テーブルまたはビューは存在しますか? — ジョブを実行する前に、テーブルを作成してください。
-
Hive カタログのサポートは有効になっていますか? — テーブルが存在するにもかかわらず見つからない場合、セッションビルダーに
enableHiveSupport()が含まれているかどうかを確認してください。これを削除してください。# 変更前 spark = SparkSession.builder.appName(app_name).enableHiveSupport().getOrCreate() # 変更後 spark = SparkSession.builder.appName(app_name).getOrCreate()
「Shutdown hook called before final status was reported」が発生した場合の対処方法
App Status: SUCCEEDED, diagnostics: Shutdown hook called before final status was reported.
メインアプリケーションが ApplicationMaster(AM)を介してクラスターのリソースを要求していません。最も一般的な原因は、コード内で spark.master が local に設定されているか、SparkContext が一度も作成されていないことです。クラスターに送信する際は、コード内の spark.master=local の設定を削除してください。
JAR パッケージのバージョン競合エラーが発生した場合の対処方法
User class threw exception: java.lang.NoSuchMethodError
JAR パッケージのバージョン競合または誤ったクラスの読み込みが原因です。競合する依存関係を特定するには、以下の手順を実行してください。
-
$SPARK_HOME/jars内の JAR から問題のあるクラスを含むものを検索します。grep <AbnormalClassName> $SPARK_HOME/jars/*.jar -
すべてのプロジェクト依存関係を表示します。
mvn dependency:tree -
競合する依存関係を除外します。
maven dependency exclusions -
再コンパイルおよび再送信を行います。
「ClassNotFound」エラーが発生した場合の対処方法
java.lang.ClassNotFoundException: xxxx.xxx.xxxxx
提出された JAR にクラスが含まれていないか、依存関係の構成が誤っている可能性があります。
-
クラスが JAR に存在することを確認します。
jar -tf <JobJARFile> | grep <ClassName> -
pom.xml内の依存関係を確認します。 -
必要に応じて、Shade メソッドを使用して再パッケージ化および再送信を行います。
「The task is not in release range」が発生した場合の対処方法
The task is not in release range: CUPID
お使いのリージョンで MaxCompute 上の Spark が有効になっていません。サービスが利用可能なリージョンを選択してください。
「java.io.UTFDataFormatException」エラーが発生した場合の対処方法
java.io.UTFDataFormatException: encoded string too long: 2818545 bytes
spark.hadoop.odps.cupid.disk.driver.device_size の値を spark-defaults.conf で増加させます。デフォルト値は 20 GB、最大値は 100 GB です。
Spark 出力で中国語文字が文字化けする場合の対処方法
以下の内容を spark-defaults.conf または DataWorks の設定項目に追加します。
"--conf" "spark.executor.extraJavaOptions=-Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8"
"--conf" "spark.driver.extraJavaOptions=-Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8"
Spark がインターネット経由でサードパーティ製サービスを呼び出した際にエラーが発生した場合の対処方法
MaxCompute 上の Spark はインターネットへの直接接続を持たないため、アウトバウンドのインターネット呼び出しは失敗します。VPC 内に Nginx リバースプロキシを構築してトラフィックをルーティングし、Spark の ENI を使用した VPC アクセス機能でそのプロキシに到達してください。「Spark からの VPC タイプのインスタンスへのアクセス」をご参照ください。