本文介紹了如何在EMR Serverless Spark中開發並運行一個基於資料湖構建(DLF)的Paimon表寫入任務。通過上傳測試檔案、建立任務並運行,最終可以通過日誌探查或控制台查看結果,驗證資料寫入和查詢的正確性。
前提條件
操作流程
步驟一:準備測試檔案
建立一個簡單的DataFrame,並將該DataFrame寫入名為pyspark_test的表中,儲存格式為Paimon。隨後,通過Spark SQL對錶資料進行查詢,以驗證寫入結果的正確性。
Java
Java程式碼範例如下。單擊SparkExample-1.0-SNAPSHOT.jar,直接下載測試JAR包。
Maven依賴
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.5.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.5.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.12</artifactId>
<version>3.5.2</version>
<scope>provided</scope>
</dependency>程式碼範例
package org.example;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import java.util.Arrays;
import java.util.List;
public class DlfAccess {
public static void main(String[] args) {
// 建立 SparkSession 執行個體
SparkSession spark = SparkSession.builder()
.appName("DLF Example")
.enableHiveSupport()
.getOrCreate();
// 構造 DataFrame
List<Row> data = Arrays.asList(
RowFactory.create(1, "Alice"),
RowFactory.create(2, "Bob"),
RowFactory.create(3, "Charlie")
);
StructType schema = DataTypes.createStructType(new StructField[] {
DataTypes.createStructField("id", DataTypes.IntegerType, false),
DataTypes.createStructField("name", DataTypes.StringType, false)
});
// 建立 DataFrame 並寫表
Dataset<Row> df = spark.createDataFrame(data, schema);
spark.sql("drop table if exists pyspark_test");
df.write().format("paimon").mode("overwrite").saveAsTable("pyspark_test");
// 執行查詢,擷取前 10 條資料
Dataset<Row> tableDf = spark.sql("select * from pyspark_test limit 10");
tableDf.show();
spark.stop();
}
}Python
如果您的工作空間綁定的是DLF 1.0(舊版),需要在Spark配置中添加spark.sql.extensions org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions配置。
程式碼範例
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("DLF test") \
.enableHiveSupport() \
.getOrCreate()
# 測試資料
data = [
(1, "Alice"),
(2, "Bob"),
(3, "Charlie")
]
spark.sql("drop table if exists pyspark_test")
# 建立 DataFrame 並寫表
df = spark.createDataFrame(data, schema='id int, name string')
df.write.format('paimon').mode("overwrite").saveAsTable("pyspark_test")
# 查表驗證
spark.sql("select * from pyspark_test").show()
步驟二:上傳測試檔案
進入資源上傳頁面。
在左側導覽列,選擇。
在Spark頁面,單擊目標工作空間名稱。
在EMR Serverless Spark頁面,單擊左側導覽列中的檔案管理。
在檔案管理頁面,單擊上傳檔案。
在上傳檔案對話方塊中,單擊待上傳檔案地區選取項目Python檔案或者JAR包,或直接拖拽Python或檔案者JAR包到待上傳檔案地區。
步驟三:建立批任務並運行
在EMR Serverless Spark頁面,單擊左側導覽列中的資料開發。
在開發目錄頁簽下,單擊
(建立)表徵圖。在彈出的對話方塊中,輸入名稱,根據實際需求在批任務中選擇PySpark或者JAR類型,然後單擊確定。
在右上方選擇目標隊列。
添加隊列的具體操作,請參見管理資源隊列。
在建立的開發頁簽中,配置以下資訊,其餘參數無需配置,然後單擊運行。
JAR
參數
說明
主jar資源
選擇上傳步驟一中編譯的JAR包。
Main Class
提交Spark任務時所指定的主類。本文樣本填寫為
org.example.DlfAccess。PySpark
參數
說明
主Python資源
選擇工作空間資源,然後選擇在資源上傳頁面上傳的Python檔案。
步驟四:查看結果
任務運行完成後,您可以選擇以下方式查看結果。
方式一:通過日誌探查直接查看
運行任務後,在下方的運行記錄地區,單擊任務操作列的日誌探查。
在日誌探查頁簽,您可以查看相關的日誌資訊。

方式二:通過資料湖構建控制台查看
登入資料湖構建控制台。
進入對應的Catalog和資料庫,即可看到建立的測試表。
方式三:通過SQL開發查看
在資料開發中建立SQL開發,執行查詢語句,以驗證資料寫入的正確性,詳情請參見SparkSQL開發。
