Hologres V3.1版本起支援遠程調用Function Compute服務(Function Compute,簡稱FC)中的自訂函數,用於計算處理複雜的商務邏輯或進行進階資料操作。本文為您介紹如何在Hologres中使用自訂函數。
功能概述
Remote Function允許開發人員通過將外部函數整合到Hologres中,擴充資料處理和分析的能力。在Hologres V3.1版本中,Remote Function支援調用阿里雲的Function Compute服務FC。該功能的引入,使使用者能夠在查詢Hologres資料的同時,動態調用阿里雲Function Compute處理複雜的商務邏輯或進行進階資料操作。
通過Remote Function,使用者可以實現以下三種應用情境:
-
即時資料處理:在查詢資料時,即時調用外部函數進行資料清洗、格式轉換或複雜計算。
-
整合第三方服務:通過Function Compute,將Hologres資料與其他阿里雲服務或第三方API進行互動。
-
進階分析:利用Function Compute實現進階分析演算法,如模型推斷、機器學習等,並將結果直接寫回Hologres。
前提條件
-
已開通Hologres執行個體,且該執行個體版本需為V3.1及以上。詳情請參見購買Hologres執行個體。
-
已開通Function Compute服務,且Function ComputeFC執行個體為V3.0版本。具體操作請登入Function Compute控制台。
-
已授權服務關聯角色
AliyunServiceRoleForHologresRemoteUDF。-
登入Hologres管理主控台,單擊左側導覽列授權服務關聯角色。
-
勾選
AliyunServiceRoleForHologresRemoteUDF,單擊一鍵授權完成。如圖所示:
-
-
Hologre執行個體和Function Compute服務需在同一地區下。
-
開通阿里雲Function Compute服務,並開發、部署供Hologres調用的函數,詳情請參見代碼開發。
注意事項
-
僅支援標量UDF和UDTF,暫不支援UDAF。
-
僅支援如下資料類型和對應的數群組類型:
BOOLEAN、INTEGER、BIGINT、REAL、DOUBLE PRECISION和TEXT。 -
Remote Function不支援常量入參。
-
該功能在Function ComputeFC中會產生一定費用。詳情請參見產品計費。
-
該功能在Hologres中不會產生額外費用。
自訂函數管理
建立擴充
首次使用前需要建立擴充,命令如下:
CREATE EXTENSION [ IF NOT EXISTS ] function_compute;
建立函數
文法
CREATE [ OR REPLACE ] FUNCTION <function_name>
( [ [ argmode ] [ argname ] <argtype> [ { DEFAULT | = } default_expr ] [, ...] ] )
[ RETURNS [SETOF] rettype | RETURNS TABLE ( column_name column_type) ]
LANGUAGE function_compute
AS '<fc_endpoint>/<func_name>'
{
{ CALLED ON NULL INPUT | RETURNS NULL ON NULL INPUT | STRICT }
| SET function_compute.qualifier TO <qualifier>
| SET function_compute.compression TO <compression>
| SET function_compute.max_batch_size TO <max_batch_size>
} ...
參數說明
|
參數類別 |
參數 |
是否必填 |
說明 |
|
函數名稱 |
function_name |
是 |
函數名稱。
|
|
函數參數 |
argtype |
是 |
參數資料類型。 |
|
argmode |
否 |
支援IN,OUT和INOUT,預設為IN,為避免歧義不建議使用INOUT。另外,只能包含一個OUT或者1個INOUT的參數。 |
|
|
argname |
否 |
參數名稱。
|
|
|
default_expr |
否 |
預設值,僅限入參,若某參數指定預設值,其後所有參數必須均有預設值。 |
|
|
FC Endpoint |
LANGUAGE function_compute |
是 |
固定值,聲明使用Function Compute服務。 |
|
fc_endpoint |
是 |
FC的內網服務接入地址。詳情請參見服務接入地址。 |
|
|
func_name |
是 |
目標函數名稱(需在FC控制台預先建立)。 |
|
|
傳回型別 |
rettype |
否 |
傳回值類型。 當存在OUT或INOUT參數時,可以省略RETURNS子句。如果存在該子句,則必須與輸出參數隱含的結果類型一致。 |
|
column_name |
否 |
在RETURNS TABLE文法中,輸出資料行的名稱。 聲明命名OUT參數的另一種方式,不同之處在於RETURNS TABLE還隱含RETURNS SETOF(即返回集合)。 |
|
|
column_type |
否 |
在RETURNS TABLE文法中,輸出資料行的資料類型。 |
|
|
空值處理策略 |
CALLED ON NULL INPUT |
否 |
預設行為,允許輸入含NULL,函數需自行處理空值邏輯。 |
|
RETURNS NULL ON NULL INPUT |
否 |
任意輸入為NULL時直接返回NULL。 |
|
|
STRICT |
否 |
為RETURNS NULL ON NULL INPUT的別名。 |
|
|
函數版本控制 |
qualifier |
否 |
指定調用的函數版本或別名,預設值為 |
|
請求壓縮演算法 |
compression |
否 |
指定調用函數的Request和Response中所用的壓縮演算法,有效值為:
|
|
單次請求最大批處理行數 |
max_batch_size |
否 |
用於指定每批次發送至FC的最大行數。 核心目的是為存在記憶體限制或其他約束的FC函數設定批量處理的行數上限。若未顯式指定此參數,Hologres將自動計算並採用最優批量大小,通常無需手動幹預。 |
使用樣本
-
建立標量UDF。
CREATE OR REPLACE FUNCTION rf_add ( a INTEGER, b INTEGER DEFAULT 1 ) RETURNS BIGINT LANGUAGE function_compute AS 'xxxxxxxxxxxxx.cn-shanghai-internal.fc.aliyuncs.com/add' ; -
建立UDTF。
-- 形式 1 CREATE OR REPLACE FUNCTION rf_unnest(TEXT []) RETURNS TABLE (item TEXT ) LANGUAGE function_compute AS 'xxxxxxxxxxxxxx.cn-hangzhou-internal.fc.aliyuncs.com/unnest' STRICT SET function_compute.max_batch_size TO 1024; -- 形式 2 CREATE OR REPLACE FUNCTION rf_unnest(TEXT []) RETURNS SETOF TEXT LANGUAGE function_compute AS 'xxxxxxxxxxxxxx.cn-hangzhou-internal.fc.aliyuncs.com/unnest' STRICT SET function_compute.max_batch_size TO 1024;
查看函數
查看已建立的Remote Function。
SELECT
CASE
WHEN p.proretset = 'f' THEN 'scalar UDF'
WHEN p.proretset = 't' THEN 'UDTF'
END AS function_type,
n.nspname AS schema_name,
p.proname AS function_name,
pg_get_function_arguments(p.oid) AS arguments,
pg_get_function_result(p.oid) AS return_type,
p.proisstrict AS is_strict,
p.proconfig AS config,
pg_get_functiondef(p.oid) AS definition
FROM
pg_proc p
JOIN pg_language l ON p.prolang = l.oid
JOIN pg_namespace n ON p.pronamespace = n.oid
WHERE
l.lanname = 'function_compute'
AND p.prokind != 'p'
ORDER BY
function_type,
schema_name,
function_name;
刪除函數
文法
DROP FUNCTION [ IF EXISTS ] <function_name> [ ( [ [ argmode ] [ argname ] <argtype> [, ...] ] ) ] [, ...]
參數說明
|
參數 |
是否必填 |
說明 |
|
function_name |
是 |
要刪除的函數名。 |
|
argtype |
是 |
參數資料類型。 |
|
argmode |
否 |
支援IN,OUT和INOUT,預設為IN,為避免歧義不建議使用INOUT。另外,只能包含一個OUT或者1個INOUT的參數。 |
|
argname |
否 |
參數名稱。對於入參僅作為文檔說明,對於出參,決定返回結果集的列名,若省略則由系統產生預設名稱。 |
使用樣本
DROP FUNCTION rf_add(INTEGER, INTEGER);
資料互動格式
Hologres發送到FC的請求格式
Hologres通過POST調用FC的InvokeFunction介面。請求體為JSON格式,其頂層為對象。目前僅包含名為data鍵,其值為二維數組,其中的每個元素對應一批次中的一行資料,其中包含調用函數所需的參數。
資料的序列化規則
-
BOOLEAN序列化為JSON BOOLEAN類型。 -
INTEGER、BIGINT序列化為JSON NUMBER類型。 -
REAL、DOUBLE PRECISION序列化為JSON NUMBER類型。 -
TEXT序列化為JSON STRING類型。 -
NULL序列化為JSON的NULL。
樣本
以下是一個針對具有簽名rf_demo(TEXT, INTEGER, BOOLEAN)的Remote Function的序列化請求樣本。
{
"data": [
["foo", 100, true],
[null, null, false],
["bar", 200, false]
]
}
Hologres接收到FC的響應格式
當FC完成對某批次資料的處理時,需按照以下規範,以JSON的形式將結果資料返回給Hologres:
-
純量涵式Scalar UDF。
-
頂層必須包含
results欄位,其值為一個數組,每個元素對應輸入資料的一行結果。 -
每行結果必須為一個數組,且嚴格遵循輸入資料的順序(即第N個元素對應輸入資料的第N行)。
以下是一個大小為4的批次的返回資料樣本:
{ "results": [ ["北京"], ["上海"], ["深圳"], ["廣州"] ] } -
-
UDTF:UTDF支援單行輸入產生多行輸出,需通過行號(row_num)標識原始輸入行的關聯關係。
-
頂層必須包含
results欄位,其值為一個數組,每個元素對應輸出的一行資料。 -
每行結果必須是包含兩個元素的數組:
-
row_num(首元素):原始輸入資料的行號(0起始索引),用於關聯輸入與輸出。row_num不可亂序,必須以遞增的形式返回。
-
result(次元素):處理後的一行傳回值。
-
樣本如下:
{ "results": [ [0, "北京"], [1, "上海"], [3, "深圳"], [3, "廣州"], ] } -
自訂函數使用樣本
本樣本以unnest函數為例。
-
開通Function Compute。
登入Function Compute控制台。您還可以根據介面提示領取一定額度的免費資源套件。詳情請參見試用額度。
-
建立FC事件函數。
-
單擊左側導覽列函數,切換地區,保持與Hologres執行個體在同一地區。
-
在函數頁面,單擊建立函數,進入建立函數頁面。
-
選擇事件函數,配置如下參數,其他參數保持不變。詳情請參見建立事件函數。
參數
說明
函數名稱
自訂設定。例如
unnest。運行環境
選擇內建運行時/Python/Python 3.10。
代碼上傳方式
選擇通過ZIP包上傳代碼。
程式碼封裝
上傳已編寫並打包好的程式碼封裝。
將如下代碼儲存至unnest.py,並壓縮成unnest.zip。
import json def unnest(event, context): evt = json.loads(event) data = evt.get('data', None) if data is None: raise ValueError('no "data" key in event.') if not isinstance(data, list): raise ValueError('data is not a list.') res = list() for i in range(len(data)): if len(data[i]) != 1 or not isinstance(data[i], list): raise ValueError('the item in data is not a list.') for item in data[i][0]: res.append([i, item]) return json.dumps({'results': res})請求處理常式
填寫
unnest.unnest。
-
-
建立Hologres Remote Function。
說明您需要在HoloWeb平台完成以下步驟,具體操作請參見串連HoloWeb。
CREATE EXTENSION IF NOT EXISTS function_compute; CREATE OR REPLACE FUNCTION rf_unnest(INTEGER []) RETURNS SETOF INTEGER STRICT LANGUAGE function_compute AS 'xxxxxxxxxxxxxxxxx.cn-shanghai-internal.fc.aliyuncs.com/unnest'; -
準備測試資料。
CREATE TABLE test_array ( numbers INTEGER[] ); INSERT INTO test_array (numbers) VALUES (ARRAY[1, 3]), (ARRAY[2, 4]), ('{}'), (ARRAY[]::INTEGER[]), (NULL); -
調用Remote Function。
SELECT numbers, rf_unnest(numbers) FROM test_array; numbers | rf_unnest ---------+----------- {2,4} | 2 {2,4} | 4 {1,3} | 1 {1,3} | 3 (4 rows)