當 Spark SQL 內建函數無法滿足您的特定業務需求時,您可以建立自訂函數(UDF)來擴充 Spark 的功能。本文將引導您完成 Python UDF 與 Java/Scala UDF 的完整使用流程。
支援版本
僅以下引擎版本支援本文樣本:
esr-5.x:esr-5.0.0及之後版本。
esr-4.x:esr-4.6.0及之後版本。
esr-3.x:esr-3.5.0及之後版本。
esr-2.x:esr-2.9.0及之後版本。
Python UDF
樣本將示範一個主函數 my_adds.my_add,它通過匯入兩個獨立的 Python 模組 module1 和 module2,分別執行加 0.5 與加 0.3 的操作,並將結果合并返回。
步驟一:下載並上傳檔案
為便於快速測試與驗證,我們提供樣本所需的代碼檔案。請按以下步驟完成檔案擷取與上傳。
下載樣本檔案。單擊以下連結,下載測試所需的三個檔案:
module1.tgz:封裝了將輸入整數加
0.5並返回浮點數的邏輯。module2.tgz:封裝了將輸入整數加
0.3並返回浮點數的功能。my_adds.py:主函數檔案,調用
PythonUDF.your_add(a)和PythonUDF2.add2(a),並將兩個結果相加後返回最終值。
上傳檔案。將以下檔案上傳至您已授權訪問的Object Storage Service Bucket 中。詳情請參見簡單上傳。
相依模組包:
module1.tgz相依模組包:
module2.tgz主邏輯檔案:
my_adds.py
如果您的自訂函數(UDF)需要引用自訂包,則需將自訂依賴包解壓到Spark的Python環境中。以下是my_adds.py的程式碼範例:
### my_adds.py內容
import sys
def my_add(a: int) -> float:
# 確保依賴路徑已添加到 sys.path
if not sys.path.__contains__("./module1.tgz"):
sys.path.insert(0, "./module1.tgz/")
from module1 import PythonUDF
if not sys.path.__contains__("./module2.tgz"):
sys.path.insert(0, "./module2.tgz/")
from module2 import PythonUDF2
b = PythonUDF.your_add(a) + PythonUDF2.add2(a)
return b步驟二:註冊 UDF
在 Spark SQL 中註冊函數,使其可以像內建函數一樣被調用。您可以根據需要選擇註冊為永久函數或臨時函數。
進入您的工作空間,建立SparkSQL任務,詳情請參見SparkSQL開發快速入門。
在新增的Spark SQL頁簽中,使用
CREATE FUNCTION語句註冊 UDF。註冊為永久函數:函數資訊將持久化儲存在資料目錄中,可被所有 SQL 會話複用。推薦用於生產環境,便於共用和管理。
-- AS 後的格式為 "[python檔案名稱].[函數名]" -- 推薦使用公用讀取或已授權訪問的OSS Bucket CREATE OR REPLACE FUNCTION adds AS "my_adds.my_add" USING FILE "oss://<bucket>/demo/udf/my_adds.py", FILE "oss://<bucket>/demo/udf/module1.tgz", FILE "oss://<bucket>/demo/udf/module2.tgz";註冊為臨時函數:函數僅在當前 SQL 會話中生效,會話結束後自動失效。推薦用於開發與測試階段。
-- 使用 TEMPORARY 關鍵字建立臨時函數 CREATE TEMPORARY FUNCTION adds AS "my_adds.my_add" USING FILE "oss://<bucket>/demo/udf/my_adds.py", FILE "oss://<bucket>/demo/udf/module1.tgz", FILE "oss://<bucket>/demo/udf/module2.tgz";
步驟三:使用 UDF
註冊成功後,您可以在 SQL 陳述式中調用該函數。
在Spark SQL頁簽中執行以下 SQL 陳述式進行測試。
-- 準備測試資料 CREATE TABLE IF NOT EXISTS test_tbl (id INT, name STRING); TRUNCATE TABLE test_tbl; INSERT INTO test_tbl VALUES (1, 'Alice'), (2, 'Bob'), (3, 'Charlie'), (4, 'David'); -- 調用登入的 Python UDF SELECT id, adds(id) AS result FROM test_tbl;查看返回結果。

Java/Scala UDF
本樣本提供了一個先行編譯的 Java UDF 樣本 JAR 包,您無需自行開發代碼或進行專案構建,即可完成函數的註冊與調用。Scala UDF 的流程與本文相似。
步驟一:下載並上傳檔案
為便於快速測試與驗證,我們提供樣本所需的JAR包。請按以下步驟完成檔案擷取與上傳。
下載樣本檔案。單擊以下連結,下載測試所需的檔案:
udf-1.0-SNAPSHOT.jar:封裝了將輸入字串追加
":HelloWorld"並返回的邏輯。上傳檔案。將下載好的檔案上傳至您已授權訪問的Object Storage Service Bucket 中。詳情請參見簡單上傳。
步驟二:註冊 UDF
在 Spark SQL 中,必須先註冊 UDF 才能調用。您可以根據使用情境選擇註冊為永久函數或臨時函數。
進入您的工作空間,建立SparkSQL任務,詳情請參見SparkSQL開發快速入門。
在新增的Spark SQL頁簽中,使用
CREATE FUNCTION語句註冊 UDF,並通過USING JAR指定 JAR 檔案的 OSS 路徑。註冊為永久函數:函數資訊將持久化儲存在資料目錄中,可被所有 SQL 會話複用。推薦用於生產環境,便於共用和管理。
# 註冊到DLF/DLF1.0/HMS中 # AS 後面是UDF中建立的類 CREATE FUNCTION myfunc AS "org.example.MyUDF" USING jar "oss://path/to/udf-1.0-SNAPSHOT.jar";註冊為臨時函數:函數僅在當前 SQL 會話中生效,會話結束後自動失效。推薦用於開發與測試階段。
# 使用temporary function僅在當前session生效 CREATE TEMPORARY FUNCTION myfunc AS "org.example.MyUDF" USING jar "oss://path/to/udf-1.0-SNAPSHOT.jar";
步驟三:使用 UDF
註冊成功後,即可在 SQL 中調用。
在任務編輯器中執行以下 SQL 陳述式進行測試。
SELECT myfunc("abc");查看返回結果。
