全部產品
Search
文件中心

E-MapReduce:使用UDF函數

更新時間:Nov 26, 2025

當 Spark SQL 內建函數無法滿足您的特定業務需求時,您可以建立自訂函數(UDF)來擴充 Spark 的功能。本文將引導您完成 Python UDF 與 Java/Scala UDF 的完整使用流程。

支援版本

僅以下引擎版本支援本文樣本:

  • esr-5.x:esr-5.0.0及之後版本。

  • esr-4.x:esr-4.6.0及之後版本。

  • esr-3.x:esr-3.5.0及之後版本。

  • esr-2.x:esr-2.9.0及之後版本。

Python UDF

樣本將示範一個主函數 my_adds.my_add,它通過匯入兩個獨立的 Python 模組 module1 和 module2,分別執行加 0.5 與加 0.3 的操作,並將結果合并返回。

步驟一:下載並上傳檔案

為便於快速測試與驗證,我們提供樣本所需的代碼檔案。請按以下步驟完成檔案擷取與上傳。

  1. 下載樣本檔案。單擊以下連結,下載測試所需的三個檔案:

    • module1.tgz:封裝了將輸入整數加 0.5 並返回浮點數的邏輯。

    • module2.tgz:封裝了將輸入整數加 0.3 並返回浮點數的功能。

    • my_adds.py:主函數檔案,調用 PythonUDF.your_add(a) 和 PythonUDF2.add2(a),並將兩個結果相加後返回最終值。

  2. 上傳檔案。將以下檔案上傳至您已授權訪問的Object Storage Service Bucket 中。詳情請參見簡單上傳

    • 相依模組包:module1.tgz

    • 相依模組包:module2.tgz

    • 主邏輯檔案:my_adds.py

說明

如果您的自訂函數(UDF)需要引用自訂包,則需將自訂依賴包解壓到Spark的Python環境中。以下是my_adds.py的程式碼範例:

### my_adds.py內容
import sys

def my_add(a: int) -> float:
    # 確保依賴路徑已添加到 sys.path
    if not sys.path.__contains__("./module1.tgz"):
        sys.path.insert(0, "./module1.tgz/")
        from module1 import PythonUDF
    if not sys.path.__contains__("./module2.tgz"):
        sys.path.insert(0, "./module2.tgz/")
        from module2 import PythonUDF2

    b = PythonUDF.your_add(a) + PythonUDF2.add2(a)
    return b

步驟二:註冊 UDF

在 Spark SQL 中註冊函數,使其可以像內建函數一樣被調用。您可以根據需要選擇註冊為永久函數或臨時函數。

  1. 進入您的工作空間,建立SparkSQL任務,詳情請參見SparkSQL開發快速入門

  2. 在新增的Spark SQL頁簽中,使用 CREATE FUNCTION 語句註冊 UDF。

    • 註冊為永久函數:函數資訊將持久化儲存在資料目錄中,可被所有 SQL 會話複用。推薦用於生產環境,便於共用和管理。

      -- AS 後的格式為 "[python檔案名稱].[函數名]"
      -- 推薦使用公用讀取或已授權訪問的OSS Bucket
      CREATE OR REPLACE FUNCTION adds AS "my_adds.my_add"
      USING FILE "oss://<bucket>/demo/udf/my_adds.py",
            FILE "oss://<bucket>/demo/udf/module1.tgz",
            FILE "oss://<bucket>/demo/udf/module2.tgz";
    • 註冊為臨時函數:函數僅在當前 SQL 會話中生效,會話結束後自動失效。推薦用於開發與測試階段。

      -- 使用 TEMPORARY 關鍵字建立臨時函數
      CREATE TEMPORARY FUNCTION adds AS "my_adds.my_add"
      USING FILE "oss://<bucket>/demo/udf/my_adds.py",
            FILE "oss://<bucket>/demo/udf/module1.tgz",
            FILE "oss://<bucket>/demo/udf/module2.tgz";

步驟三:使用 UDF

註冊成功後,您可以在 SQL 陳述式中調用該函數。

  1. 在Spark SQL頁簽中執行以下 SQL 陳述式進行測試。

    -- 準備測試資料
    CREATE TABLE IF NOT EXISTS test_tbl (id INT, name STRING);
    TRUNCATE TABLE test_tbl;
    INSERT INTO test_tbl VALUES (1, 'Alice'), (2, 'Bob'), (3, 'Charlie'), (4, 'David');
    
    -- 調用登入的 Python UDF
    SELECT id, adds(id) AS result FROM test_tbl;
  2. 查看返回結果。image

Java/Scala UDF

本樣本提供了一個先行編譯的 Java UDF 樣本 JAR 包,您無需自行開發代碼或進行專案構建,即可完成函數的註冊與調用。Scala UDF 的流程與本文相似。

步驟一:下載並上傳檔案

為便於快速測試與驗證,我們提供樣本所需的JAR包。請按以下步驟完成檔案擷取與上傳。

  1. 下載樣本檔案。單擊以下連結,下載測試所需的檔案:

    udf-1.0-SNAPSHOT.jar:封裝了將輸入字串追加 ":HelloWorld" 並返回的邏輯。

  2. 上傳檔案。將下載好的檔案上傳至您已授權訪問的Object Storage Service Bucket 中。詳情請參見簡單上傳

步驟二:註冊 UDF

在 Spark SQL 中,必須先註冊 UDF 才能調用。您可以根據使用情境選擇註冊為永久函數臨時函數

  1. 進入您的工作空間,建立SparkSQL任務,詳情請參見SparkSQL開發快速入門

  2. 在新增的Spark SQL頁簽中,使用 CREATE FUNCTION 語句註冊 UDF,並通過 USING JAR 指定 JAR 檔案的 OSS 路徑。

    • 註冊為永久函數:函數資訊將持久化儲存在資料目錄中,可被所有 SQL 會話複用。推薦用於生產環境,便於共用和管理。

      # 註冊到DLF/DLF1.0/HMS中
      # AS 後面是UDF中建立的類
      CREATE FUNCTION myfunc AS "org.example.MyUDF" 
      USING jar "oss://path/to/udf-1.0-SNAPSHOT.jar";
    • 註冊為臨時函數:函數僅在當前 SQL 會話中生效,會話結束後自動失效。推薦用於開發與測試階段。

      # 使用temporary function僅在當前session生效
      CREATE TEMPORARY FUNCTION myfunc AS "org.example.MyUDF" 
      USING jar "oss://path/to/udf-1.0-SNAPSHOT.jar";

步驟三:使用 UDF

註冊成功後,即可在 SQL 中調用。

  1. 任務編輯器中執行以下 SQL 陳述式進行測試。

    SELECT myfunc("abc");
  2. 查看返回結果。image