全部產品
Search
文件中心

E-MapReduce:使用DLF

更新時間:Aug 26, 2025

本文介紹了如何在EMR Serverless Spark中開發並運行一個基於資料湖構建(DLF)的Paimon表寫入任務。通過上傳測試檔案、建立任務並運行,最終可以通過日誌探查或控制台查看結果,驗證資料寫入和查詢的正確性。

前提條件

操作流程

步驟一:準備測試檔案

建立一個簡單的DataFrame,並將該DataFrame寫入名為pyspark_test的表中,儲存格式為Paimon。隨後,通過Spark SQL對錶資料進行查詢,以驗證寫入結果的正確性。

Java

Java程式碼範例如下。單擊SparkExample-1.0-SNAPSHOT.jar,直接下載測試JAR包。

Maven依賴

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.12</artifactId>
    <version>3.5.2</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.12</artifactId>
    <version>3.5.2</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-hive_2.12</artifactId>
    <version>3.5.2</version>
    <scope>provided</scope>
</dependency>

程式碼範例

package org.example;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import java.util.Arrays;
import java.util.List;

public class DlfAccess {
    public static void main(String[] args) {
        // 建立 SparkSession 執行個體
        SparkSession spark = SparkSession.builder()
                .appName("DLF Example")
                .enableHiveSupport()
                .getOrCreate();

        // 構造 DataFrame
        List<Row> data = Arrays.asList(
                RowFactory.create(1, "Alice"),
                RowFactory.create(2, "Bob"),
                RowFactory.create(3, "Charlie")
        );

        StructType schema = DataTypes.createStructType(new StructField[] {
                DataTypes.createStructField("id",   DataTypes.IntegerType, false),
                DataTypes.createStructField("name", DataTypes.StringType,  false)
        });
        // 建立 DataFrame 並寫表

        Dataset<Row> df = spark.createDataFrame(data, schema);

        spark.sql("drop table if exists pyspark_test");
        df.write().format("paimon").mode("overwrite").saveAsTable("pyspark_test");

        // 執行查詢,擷取前 10 條資料
        Dataset<Row> tableDf = spark.sql("select * from pyspark_test limit 10");

        tableDf.show();

        spark.stop();
    }
}

Python

如果您的工作空間綁定的是DLF 1.0(舊版),需要在Spark配置中添加spark.sql.extensions org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions配置。

程式碼範例

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("DLF test") \
    .enableHiveSupport() \
    .getOrCreate()

# 測試資料
data = [
    (1, "Alice"),
    (2, "Bob"),
    (3, "Charlie")
]

spark.sql("drop table if exists pyspark_test")
# 建立 DataFrame 並寫表
df = spark.createDataFrame(data, schema='id int, name string')
df.write.format('paimon').mode("overwrite").saveAsTable("pyspark_test")

# 查表驗證
spark.sql("select * from pyspark_test").show()

步驟二:上傳測試檔案

  1. 進入資源上傳頁面。

    1. 登入E-MapReduce控制台

    2. 在左側導覽列,選擇EMR Serverless > Spark

    3. Spark頁面,單擊目標工作空間名稱。

    4. 在EMR Serverless Spark頁面,單擊左側導覽列中的檔案管理

  2. 檔案管理頁面,單擊上傳檔案

  3. 上傳檔案對話方塊中,單擊待上傳檔案地區選取項目Python檔案或者JAR包,或直接拖拽Python或檔案者JAR包到待上傳檔案地區。

步驟三:建立批任務並運行

  1. 在EMR Serverless Spark頁面,單擊左側導覽列中的資料開發

  2. 開發目錄頁簽下,單擊image(建立)表徵圖。

  3. 在彈出的對話方塊中,輸入名稱,根據實際需求在批任務中選擇PySpark或者JAR類型,然後單擊確定

  4. 在右上方選擇目標隊列。

    添加隊列的具體操作,請參見管理資源隊列

  5. 在建立的開發頁簽中,配置以下資訊,其餘參數無需配置,然後單擊運行

    JAR

    參數

    說明

    主jar資源

    選擇上傳步驟一中編譯的JAR包。

    Main Class

    提交Spark任務時所指定的主類。本文樣本填寫為org.example.DlfAccess

    PySpark

    參數

    說明

    主Python資源

    選擇工作空間資源,然後選擇在資源上傳頁面上傳的Python檔案。

步驟四:查看結果

任務運行完成後,您可以選擇以下方式查看結果。

方式一:通過日誌探查直接查看

  1. 運行任務後,在下方的運行記錄地區,單擊任務操作列的日誌探查

  2. 日誌探查頁簽,您可以查看相關的日誌資訊。

    image

方式二:通過資料湖構建控制台查看

  1. 登入資料湖構建控制台

  2. 進入對應的Catalog和資料庫,即可看到建立的測試表。

方式三:通過SQL開發查看

資料開發中建立SQL開發,執行查詢語句,以驗證資料寫入的正確性,詳情請參見SparkSQL開發

image