Spark是一個通用的巨量資料分析引擎,以其高效能、易用性和廣泛的適用性而著稱。它支援複雜的記憶體計算,非常適合構建大規模且低延遲的資料分析應用。DataWorks平台提供EMR Spark節點,使您能夠在 DataWorks上便捷地開發和周期性調度Spark任務。本文將向您介紹如何配置和使用EMR Spark節點,並通過具體的應用樣本來展示EMR Spark節點的功能。
前提條件
開始進行節點開發前,若您需要定製組件環境,此時即可基於官方鏡像
dataworks_emr_base_task_pod建立自訂鏡像,並在資料開發中使用鏡像。例如:在建立自訂鏡像時替換Spark Jar包或是依賴特定的
庫、檔案或jar包。已建立阿里雲EMR叢集,並註冊EMR叢集至DataWorks。操作詳情請參見新版資料開發:綁定EMR計算資源。
(可選,RAM帳號需要)進行任務開發的RAM帳號已被添加至對應工作空間中,並具有開發或空間管理員(許可權較大,謹慎添加)角色許可權,新增成員的操作詳情請參見為工作空間增加空間成員。
如果您使用的是主帳號,則可忽略該添加操作。
如果您在開發工作單位時,需要特定的開發環境支援,可使用DataWorks提供的自訂鏡像功能,定製化構建任務執行所需的組件鏡像。更多資訊,請參見自訂鏡像。
使用限制
僅支援使用Serverless資源群組(推薦)或獨享調度資源群組運行該類型任務,若在資料開發時需使用鏡像,則必須使用Serverless資源群組。
DataLake或自訂叢集若要在DataWorks管理中繼資料,需先在叢集側配置EMR-HOOK。詳情請參見配置Spark SQL的EMR-HOOK。
說明若未在叢集側配置EMR-HOOK,則無法在DataWorks中即時展示中繼資料、產生審計日誌、展示血緣關係、開展EMR相關治理任務。
EMR on ACK類型的Spark叢集不支援查看血緣,EMR Serverless Spark叢集支援查看血緣。
EMR on ACK 類型的Spark叢集及EMR Serverless Spark叢集僅支援通過OSS REF的方式直接引用OSS資源、上傳資源到OSS,不支援上傳資源到HDFS。
DataLake叢集、自訂叢集支援通過OSS REF的方式直接引用OSS資源、上傳資源到OSS及上傳資源到HDFS。
注意事項
若您已為當前工作空間綁定的EMR叢集中的Spark開啟Ranger許可權控制。
使用預設鏡像運行Spark任務時,該功能將預設可用。
如需使用自訂鏡像運行Spark任務,請提交工單聯絡技術支援人員,對鏡像進行升級以支援該功能。
準備工作:開發Spark任務並擷取JAR包
在使用DataWorks調度EMR Spark任務前,您需要先在EMR中開發Spark任務代碼並完成任務代碼的編譯,產生編譯後的任務JAR包,EMR Spark任務的開發指導詳情請參見Spark概述。
後續您需要將任務JAR包上傳至DataWorks,在DataWorks中周期性調度EMR Spark任務。
操作步驟
在EMR Spark節點編輯頁面,執行如下開發操作。
開發Spark任務
您可以根據不同情境需求選擇適合您的操作方案:
方案一:先上傳資源後引用EMR JAR資源
DataWorks也支援您從本地先上傳資源至Data Studio,再引用資源。EMR Spark任務編譯完成後,您需擷取編譯後的JAR包,建議根據JAR包大小選擇不同方式儲存JAR包資源。
上傳JAR包資源,建立為DataWorks的EMR資源並提交,或直接儲存在EMR的HDFS儲存中(EMR on ACK 類型的Spark叢集及EMR Serverless Spark叢集不支援上傳資源到HDFS)。
JAR包小於500MB時
建立EMR JAR資源。
JAR包小於500MB時,可將JAR包通過本地上傳的方式上傳為DataWorks的EMR JAR資源,便於後續在DataWorks控制台進行可視化管理,建立完成資源後需進行提交,操作詳情請參見建立和使用EMR資源。
將JAR包通過本地上傳的方式上傳到JAR資源的存放目錄下,詳情請參見資源管理。
單擊點擊上傳按鈕,上傳JAR資源。
選擇儲存路徑、資料來源及資源群組。
單擊儲存按鈕進行儲存。

引用EMR JAR資源。
開啟建立的EMR Spark節點,停留在代碼編輯頁面。
在左側導覽列的資源管理中找到待引用資源,右鍵選擇引用資源。
選擇引用後,當EMR Spark節點的代碼編輯頁面出現如下引用成功提示時,表明已成功引用代碼資源。
##@resource_reference{"spark-examples_2.12-1.0.0-SNAPSHOT-shaded.jar"} spark-examples_2.12-1.0.0-SNAPSHOT-shaded.jar如果成功自動添加上述引用代碼,表明資源引用成功。其中,
spark-examples_2.12-1.0.0-SNAPSHOT-shaded.jar為您實際上傳的EMR JAR資源名稱。改寫EMR Spark節點代碼,補充spark submit命令,改寫後的樣本如下。
說明EMR Spark節點編輯代碼時不支援備註陳述式,請務必參考如下樣本改寫任務代碼,不要隨意添加註釋,否則後續運行節點時會報錯。
##@resource_reference{"spark-examples_2.11-2.4.0.jar"} spark-submit --class org.apache.spark.examples.SparkPi --master yarn spark-examples_2.11-2.4.0.jar 100說明org.apache.spark.examples.SparkPi:為您實際編譯的JAR包中的任務主Class。spark-examples_2.11-2.4.0.jar:為您實際上傳的EMR JAR資源名稱。其他參數可參考以上樣本不做修改,您也可執行
spark-submit --help命令查看spark submit的使用協助,根據需要修改spark submit命令。若您需要在Spark節點中使用Spark-submit命令簡化的參數,您需要在代碼中自行添加,例如,
--executor-memory 2G。Spark節點僅支援使用Yarn的Cluster提交作業。
spark-submit方式提交的任務,deploy-mode推薦使用cluster模式,不建議使用client模式。
JAR包大於等於500MB時
建立EMR JAR資源。
JAR包大於等於500MB時,無法通過本地上傳的方式直接上傳為DataWorks的資源。建議直接將JAR包儲存在EMR的HDFS中,並記錄下JAR包的儲存路徑,以便於後續在DataWorks調度Spark任務時引用該路徑。
將JAR包通過本地上傳的方式上傳到JAR資源的存放目錄下,詳情請參見資源管理。
單擊點擊上傳按鈕,上傳JAR資源。
選擇儲存路徑、資料來源及資源群組。
單擊儲存按鈕進行儲存。

引用EMR JAR資源。
JAR包儲存在HDFS時,您可以直接在EMR Spark節點中通過代碼指定JAR包路徑的方式引用JAR包。
雙擊建立的EMR Spark節點,開啟EMR Spark 節點的代碼編輯頁面。
編寫spark submit命令,樣本如下。
spark-submit --master yarn --deploy-mode cluster --name SparkPi --driver-memory 4G --driver-cores 1 --num-executors 5 --executor-memory 4G --executor-cores 1 --class org.apache.spark.examples.JavaSparkPi hdfs:///tmp/jars/spark-examples_2.11-2.4.8.jar 100說明hdfs:///tmp/jars/spark-examples_2.11-2.4.8.jar:為JAR包實際在HDFS中的路徑。
org.apache.spark.examples.JavaSparkPi:為您實際編譯的JAR包中的任務主class。
其他參數為實際EMR叢集的參數,需根據實際情況進行修改配置。您也可以執行
spark-submit --help命令查看spark submit的使用協助,根據需要修改spark submit命令。若您需要在Spark節點中使用Spark-submit命令簡化的參數,您需要在代碼中自行添加,例如,
--executor-memory 2G。Spark節點僅支援使用Yarn的Cluster提交作業。
spark-submit方式提交的任務,deploy-mode推薦使用cluster模式,不建議使用client模式。
方案二:直接引用OSS資源
當前節點可直接通過OSS REF的方式引用OSS資源,在運行EMR節點時,DataWorks會自動載入代碼中的OSS資源至本地使用。該方式常用於“需要在EMR任務中運行JAR依賴”、“EMR任務需依賴指令碼”等情境。
開發JAR資源。
代碼依賴準備。
您可前往EMR叢集,在叢集master節點的
/usr/lib/emr/spark-current/jars/路徑下查看您所需的代碼依賴。下面以Spark3.4.2版本為例,您需在開啟已建立的IDEA專案添加pom依賴並引用相關外掛程式。添加pom依賴
<dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>3.4.2</version> </dependency> <!-- Apache Spark SQL --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>3.4.2</version> </dependency> </dependencies>引用相關外掛程式
<build> <sourceDirectory>src/main/scala</sourceDirectory> <testSourceDirectory>src/test/scala</testSourceDirectory> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.7.0</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> <configuration> <recompileMode>incremental</recompileMode> </configuration> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> <configuration> <args> <arg>-dependencyfile</arg> <arg>${project.build.directory}/.scala_dependencies</arg> </args> </configuration> </execution> </executions> </plugin> </plugins> </build>程式碼範例。
package com.aliyun.emr.example.spark import org.apache.spark.sql.SparkSession object SparkMaxComputeDemo { def main(args: Array[String]): Unit = { // 建立 SparkSession val spark = SparkSession.builder() .appName("HelloDataWorks") .getOrCreate() // 列印Spark版本 println(s"Spark version: ${spark.version}") } }編輯完上述Scala代碼後將該代碼產生JAR包。
樣本產生的JAR包為
SparkWorkOSS-1.0-SNAPSHOT-jar-with-dependencies.jar。
上傳JAR資源。
完成代碼開發後,您需登入OSS管理主控台,單擊所在地區左側導覽列的Bucket列表。
單擊目標Bucket名稱,進入檔案管理頁面。
本文樣本使用的Bucket為
onaliyun-bucket-2。單擊建立目錄,建立JAR資源的存放目錄。
配置目錄名為
emr/jars,建立JAR資源的存放目錄。上傳JAR資源至JAR資源的存放目錄。
進入存放目錄,單擊上傳檔案,在待上傳檔案地區單擊掃描檔案,添加
SparkWorkOSS-1.0-SNAPSHOT-jar-with-dependencies.jar檔案至Bucket,單擊上傳檔案。
引用JAR資源。
編輯引用JAR資原始碼。
在已建立的EMR Spark節點編輯頁面,編輯引用JAR資原始碼。
spark-submit --class com.aliyun.emr.example.spark.SparkMaxComputeDemo --master yarn ossref://onaliyun-bucket-2/emr/jars/SparkWorkOSS-1.0-SNAPSHOT-jar-with-dependencies.jar引用參數說明:
參數
參數說明
class啟動並執行主類全名稱。
masterSpark應用程式的運行模式。
ossref檔案路徑格式為
ossref://{endpoint}/{bucket}/{object}endpoint:OSS對外服務的訪問網域名稱。Endpoint為空白時,僅支援使用與當前訪問的EMR叢集同地區的OSS,即OSS的Bucket需要與EMR叢集所在地區相同。
Bucket:OSS用於儲存物件的容器,每一個Bucket有唯一的名稱,登入OSS管理主控台,可查看當前登入帳號下所有Bucket。
object:儲存在Bucket中的一個具體的對象(檔案名稱或路徑)。
運行EMR Spark節點任務。
編輯完成後您可單擊
表徵圖,選擇您所建立的Serverless資源群組運行EMR Spark節點。待任務執行完成後,記錄控制台列印的applicationIds,例如application_1730367929285_xxxx。結果查看。
建立EMR Shell節點並在EMR Shell節點執行
yarn logs -applicationId application_1730367929285_xxxx命令查看運行結果:
(可選)配置進階參數
您可在節點右側調度配置的中配置下表特有屬性參數。
說明不同類型EMR叢集可配置的進階參數存在部分差異,具體如下表。
更多開源Spark屬性參數,可在節點右側調度配置的中進行配置。
DataLake叢集/自訂叢集:EMR on ECS
進階參數
配置說明
queue
提交作業的調度隊列,預設為default隊列。
如果您在註冊EMR叢集至DataWorks工作空間時,配置了工作空間級的YARN資源隊列:
如果勾選了全域配置是否優先為是,則實際Spark任務運行時,隊列以註冊EMR叢集時的配置結果為準。
如果未勾選,則實際Spark任務運行時,隊列以EMR Spark節點的配置結果為準。
關於EMR YARN說明,詳情請參見隊列基礎配置,註冊EMR叢集時的隊列配置詳情請參見設定全域YARN資源隊列。
priority
優先順序,預設為1。
FLOW_SKIP_SQL_ANALYZE
SQL語句執行方式。取值如下:
true:表示每次執行多條SQL語句。false(預設值):表示每次執行一條SQL語句。
說明該參數僅支援用於資料開發環境測試回合流程。
其他
您可以直接在進階配置裡追加自訂SPARK參數。例如,
spark.eventLog.enabled : false,DataWorks會自動在最終下發EMR叢集的代碼中進行補全,格式為:--conf key=value。還支援配置全域Spark參數,詳情請參見設定全域Spark參數。
說明如需啟用Ranger許可權控制,請在設定全域Spark參數中添加配置
spark.hadoop.fs.oss.authorization.method=ranger,以確保Ranger許可權控制生效。
Spark叢集:EMR ON ACK
進階參數
配置說明
FLOW_SKIP_SQL_ANALYZE
SQL語句執行方式。取值如下:
true:表示每次執行多條SQL語句。false:表示每次執行一條SQL語句。
說明該參數僅支援用於資料開發環境測試回合流程。
其他
您可以直接在進階配置裡追加自訂SPARK參數。例如,
spark.eventLog.enabled : false,DataWorks會自動在最終下發EMR叢集的代碼中進行補全,格式為:--conf key=value。還支援配置全域Spark參數,詳情請參見設定全域Spark參數。
Hadoop叢集:EMR on ECS
進階參數
配置說明
queue
提交作業的調度隊列,預設為default隊列。
如果您在註冊EMR叢集至DataWorks工作空間時,配置了工作空間級的YARN資源隊列:
如果勾選了全域配置是否優先為是,則實際Spark任務運行時,隊列以註冊EMR叢集時的配置結果為準。
如果未勾選,則實際Spark任務運行時,隊列以EMR Spark節點的配置結果為準。
關於EMR YARN說明,詳情請參見隊列基礎配置,註冊EMR叢集時的隊列配置詳情請參見設定全域YARN資源隊列。
priority
優先順序,預設為1。
FLOW_SKIP_SQL_ANALYZE
SQL語句執行方式。取值如下:
true:表示每次執行多條SQL語句。false:表示每次執行一條SQL語句。
說明該參數僅支援用於資料開發環境測試回合流程。
USE_GATEWAY
設定本節點提交作業時,是否通過Gateway叢集提交。取值如下:
true:通過Gateway叢集提交。false:不通過Gateway叢集提交,預設提交到header節點。
說明如果本節點所在的叢集未關聯Gateway叢集,此處手動設定參數取值為
true時,後續提交EMR作業時會失敗。其他
您可以直接在進階配置裡追加自訂SPARK參數。例如,
spark.eventLog.enabled : false,DataWorks會自動在最終下發EMR叢集的代碼中進行補全,格式為:--conf key=value。還支援配置全域Spark參數,詳情請參見設定全域Spark參數。
說明如需啟用Ranger許可權控制,請在設定全域Spark參數中添加配置
spark.hadoop.fs.oss.authorization.method=ranger,以確保Ranger許可權控制生效。
EMR Serverless Spark叢集
相關參數設定請參見提交Spark任務參數設定。
進階參數
配置說明
queue
提交作業的調度隊列,預設為dev_queue隊列。
priority
優先順序,預設為1。
FLOW_SKIP_SQL_ANALYZE
SQL語句執行方式。取值如下:
true:表示每次執行多條SQL語句。false:表示每次執行一條SQL語句。
說明該參數僅支援用於資料開發環境測試回合流程。
SERVERLESS_RELEASE_VERSION
Spark引擎版本,預設使用管理中心的叢集管理中叢集配置的預設引擎版本。如需為不同任務設定不同的引擎版本,您可在此進行設定。
SERVERLESS_QUEUE_NAME
指定資源隊列,預設使用管理中心的叢集管理中叢集配置的預設資源隊列。如有資源隔離和管理需求,可通過添加隊列實現。詳情請參見管理資源隊列。
其他
您可以直接在進階配置裡追加自訂SPARK參數。例如,
spark.eventLog.enabled : false,DataWorks會自動在最終下發EMR叢集的代碼中進行補全,格式為:--conf key=value。還支援配置全域Spark參數,詳情請參見設定全域Spark參數。
執行Spark任務
在回合組態的計算資源中,選擇配置計算資源和DataWorks資源群組。
說明您還可以根據任務執行所需的資源情況來調度 CU。預設CU為
0.25。訪問公用網路或VPC網路環境的資料來源需要使用與資料來源測試連通性成功的調度資源群組。詳情請參見網路連通方案。
在工具列的參數對話方塊中選擇對應的資料來源,單擊運行Spark任務。
如需定期執行節點任務,請根據業務需求配置調度資訊。配置詳情請參見節點調度配置。
節點任務配置完成後,需對節點進行發布。詳情請參見節點/工作流程發布。
任務發布後,您可以在營運中心查看周期任務的運行情況。詳情請參見營運中心入門。
常見問題
Q:節點運行出現連線逾時?
A:請確保資源群組和叢集的網路聯通。請進入計算資源的列表頁,單擊資源初始化,在彈窗中單擊再次初始化,確保成功初始化。

