Hologres V3.1版本起支援遠程調用Function Compute服務(Function Compute,簡稱FC)中的自訂函數,用於計算處理複雜的商務邏輯或進行進階資料操作。本文為您介紹如何在Hologres中使用自訂函數。
功能概述
Remote Function允許開發人員通過將外部函數整合到Hologres中,擴充資料處理和分析的能力。在Hologres V3.1版本中,Remote Function支援調用阿里雲的Function Compute服務FC。該功能的引入,使使用者能夠在查詢Hologres資料的同時,動態調用阿里雲Function Compute處理複雜的商務邏輯或進行進階資料操作。
通過Remote Function,使用者可以實現以下三種應用情境:
即時資料處理:在查詢資料時,即時調用外部函數進行資料清洗、格式轉換或複雜計算。
整合第三方服務:通過Function Compute,將Hologres資料與其他阿里雲服務或第三方API進行互動。
進階分析:利用Function Compute實現進階分析演算法,如模型推斷、機器學習等,並將結果直接寫回Hologres。
前提條件
已開通Hologres執行個體,且該執行個體版本需為V3.1及以上。詳情請參見購買Hologres。
已開通Function Compute服務,且Function ComputeFC執行個體為V3.0版本。具體操作請登入Function Compute控制台。
已授權服務關聯角色
AliyunServiceRoleForHologresRemoteUDF。登入Hologres管理主控台,單擊左側導覽列授權服務關聯角色。
勾選
AliyunServiceRoleForHologresRemoteUDF,單擊一鍵授權完成。如圖所示:
Hologre執行個體和Function Compute服務需在同一地區下。
開通阿里雲Function Compute服務,並開發、部署供Hologres調用的函數,詳情請參見代碼開發。
注意事項
僅支援標量UDF和UDTF,暫不支援UDAF。
僅支援如下資料類型和對應的數群組類型:
BOOLEAN、INTEGER、BIGINT、REAL、DOUBLE PRECISION和TEXT。Remote Function不支援常量入參。
該功能在Function ComputeFC中會產生一定費用。詳情請參見產品計費。
該功能在Hologres中不會產生額外費用。
自訂函數管理
建立擴充
首次使用前需要建立擴充,命令如下:
CREATE EXTENSION [ IF NOT EXISTS ] function_compute;建立函數
文法
CREATE [ OR REPLACE ] FUNCTION <function_name>
( [ [ argmode ] [ argname ] <argtype> [ { DEFAULT | = } default_expr ] [, ...] ] )
[ RETURNS [SETOF] rettype | RETURNS TABLE ( column_name column_type) ]
LANGUAGE function_compute
AS '<fc_endpoint>/<func_name>'
{
{ CALLED ON NULL INPUT | RETURNS NULL ON NULL INPUT | STRICT }
| SET function_compute.qualifier TO <qualifier>
| SET function_compute.compression TO <compression>
| SET function_compute.max_batch_size TO <max_batch_size>
} ...參數說明
參數類別 | 參數 | 是否必填 | 說明 |
函數名稱 | function_name | 是 | 函數名稱。
|
函數參數 | argtype | 是 | 參數資料類型。 |
argmode | 否 | 支援IN,OUT和INOUT,預設為IN,為避免歧義不建議使用INOUT。另外,只能包含一個OUT或者1個INOUT的參數。 | |
argname | 否 | 參數名稱。
| |
default_expr | 否 | 預設值,僅限入參,若某參數指定預設值,其後所有參數必須均有預設值。 | |
FC Endpoint | LANGUAGE function_compute | 是 | 固定值,聲明使用Function Compute服務。 |
fc_endpoint | 是 | FC的內網服務接入地址。詳情請參見服務接入地址。 | |
func_name | 是 | 目標函數名稱(需在FC控制台預先建立)。 | |
傳回型別 | rettype | 否 | 傳回值類型。 當存在OUT或INOUT參數時,可以省略RETURNS子句。如果存在該子句,則必須與輸出參數隱含的結果類型一致。 |
column_name | 否 | 在RETURNS TABLE文法中,輸出資料行的名稱。 聲明命名OUT參數的另一種方式,不同之處在於RETURNS TABLE還隱含RETURNS SETOF(即返回集合)。 | |
column_type | 否 | 在RETURNS TABLE文法中,輸出資料行的資料類型。 | |
空值處理策略 | CALLED ON NULL INPUT | 否 | 預設行為,允許輸入含NULL,函數需自行處理空值邏輯。 |
RETURNS NULL ON NULL INPUT | 否 | 任意輸入為NULL時直接返回NULL。 | |
STRICT | 否 | 為RETURNS NULL ON NULL INPUT的別名。 | |
函數版本控制 | qualifier | 否 | 指定調用的函數版本或別名,預設值為 |
請求壓縮演算法 | compression | 否 | 指定調用函數的Request和Response中所用的壓縮演算法,有效值為:
|
單次請求最大批處理行數 | max_batch_size | 否 | 用於指定每批次發送至FC的最大行數。 核心目的是為存在記憶體限制或其他約束的FC函數設定批量處理的行數上限。若未顯式指定此參數,Hologres將自動計算並採用最優批量大小,通常無需手動幹預。 |
使用樣本
建立標量UDF。
CREATE OR REPLACE FUNCTION rf_add ( a INTEGER, b INTEGER DEFAULT 1 ) RETURNS BIGINT LANGUAGE function_compute AS 'xxxxxxxxxxxxx.cn-shanghai-internal.fc.aliyuncs.com/add' ;建立UDTF。
-- 形式 1 CREATE OR REPLACE FUNCTION rf_unnest(TEXT []) RETURNS TABLE (item TEXT ) LANGUAGE function_compute AS 'xxxxxxxxxxxxxx.cn-hangzhou-internal.fc.aliyuncs.com/unnest' STRICT SET function_compute.max_batch_size TO 1024; -- 形式 2 CREATE OR REPLACE FUNCTION rf_unnest(TEXT []) RETURNS SETOF TEXT LANGUAGE function_compute AS 'xxxxxxxxxxxxxx.cn-hangzhou-internal.fc.aliyuncs.com/unnest' STRICT SET function_compute.max_batch_size TO 1024;
查看函數
查看已建立的Remote Function。
SELECT
CASE
WHEN p.proretset = 'f' THEN 'scalar UDF'
WHEN p.proretset = 't' THEN 'UDTF'
END AS function_type,
n.nspname AS schema_name,
p.proname AS function_name,
pg_get_function_arguments(p.oid) AS arguments,
pg_get_function_result(p.oid) AS return_type,
p.proisstrict AS is_strict,
p.proconfig AS config,
pg_get_functiondef(p.oid) AS definition
FROM
pg_proc p
JOIN pg_language l ON p.prolang = l.oid
JOIN pg_namespace n ON p.pronamespace = n.oid
WHERE
l.lanname = 'function_compute'
AND p.prokind != 'p'
ORDER BY
function_type,
schema_name,
function_name;刪除函數
文法
DROP FUNCTION [ IF EXISTS ] <function_name> [ ( [ [ argmode ] [ argname ] <argtype> [, ...] ] ) ] [, ...]參數說明
參數 | 是否必填 | 說明 |
function_name | 是 | 要刪除的函數名。 |
argtype | 是 | 參數資料類型。 |
argmode | 否 | 支援IN,OUT和INOUT,預設為IN,為避免歧義不建議使用INOUT。另外,只能包含一個OUT或者1個INOUT的參數。 |
argname | 否 | 參數名稱。對於入參僅作為文檔說明,對於出參,決定返回結果集的列名,若省略則由系統產生預設名稱。 |
使用樣本
DROP FUNCTION rf_add(INTEGER, INTEGER);資料互動格式
Hologres發送到FC的請求格式
Hologres通過POST調用FC的InvokeFunction介面。請求體為JSON格式,其頂層為對象。目前僅包含名為data鍵,其值為二維數組,其中的每個元素對應一批次中的一行資料,其中包含調用函數所需的參數。
資料的序列化規則
BOOLEAN序列化為JSON BOOLEAN類型。INTEGER、BIGINT序列化為JSON NUMBER類型。REAL、DOUBLE PRECISION序列化為JSON NUMBER類型。TEXT序列化為JSON STRING類型。NULL序列化為JSON的NULL。
樣本
以下是一個針對具有簽名rf_demo(TEXT, INTEGER, BOOLEAN)的Remote Function的序列化請求樣本。
{
"data": [
["foo", 100, true],
[null, null, false],
["bar", 200, false]
]
} Hologres接收到FC的響應格式
當FC完成對某批次資料的處理時,需按照以下規範,以JSON的形式將結果資料返回給Hologres:
純量涵式Scalar UDF。
頂層必須包含
results欄位,其值為一個數組,每個元素對應輸入資料的一行結果。每行結果必須為一個數組,且嚴格遵循輸入資料的順序(即第N個元素對應輸入資料的第N行)。
以下是一個大小為4的批次的返回資料樣本:
{ "results": [ ["北京"], ["上海"], ["深圳"], ["廣州"] ] }UDTF:UTDF支援單行輸入產生多行輸出,需通過行號(row_num)標識原始輸入行的關聯關係。
頂層必須包含
results欄位,其值為一個數組,每個元素對應輸出的一行資料。每行結果必須是包含兩個元素的數組:
row_num(首元素):原始輸入資料的行號(0起始索引),用於關聯輸入與輸出。row_num不可亂序,必須以遞增的形式返回。
result(次元素):處理後的一行傳回值。
樣本如下:
{ "results": [ [0, "北京"], [1, "上海"], [3, "深圳"], [3, "廣州"], ] }
自訂函數使用樣本
本樣本以unnest函數為例。
開通Function Compute。
登入Function Compute控制台。您還可以根據介面提示領取一定額度的免費資源套件。詳情請參見試用額度。
建立FC事件函數。
單擊左側導覽列函數,切換地區,保持與Hologres執行個體在同一地區。
在函數頁面,單擊建立函數,進入建立函數頁面。
選擇事件函數,配置如下參數,其他參數保持不變。詳情請參見建立事件函數。
參數
說明
函數名稱
自訂設定。例如
unnest。運行環境
選擇內建運行時/Python/Python 3.10。
代碼上傳方式
選擇通過ZIP包上傳代碼。
程式碼封裝
上傳已編寫並打包好的程式碼封裝。
將如下代碼儲存至unnest.py,並壓縮成unnest.zip。
import json def unnest(event, context): evt = json.loads(event) data = evt.get('data', None) if data is None: raise ValueError('no "data" key in event.') if not isinstance(data, list): raise ValueError('data is not a list.') res = list() for i in range(len(data)): if len(data[i]) != 1 or not isinstance(data[i], list): raise ValueError('the item in data is not a list.') for item in data[i][0]: res.append([i, item]) return json.dumps({'results': res})請求處理常式
填寫
unnest.unnest。
建立Hologres Remote Function。
說明您需要在HoloWeb平台完成以下步驟,具體操作請參見串連HoloWeb。
CREATE EXTENSION IF NOT EXISTS function_compute; CREATE OR REPLACE FUNCTION rf_unnest(INTEGER []) RETURNS SETOF INTEGER STRICT LANGUAGE function_compute AS 'xxxxxxxxxxxxxxxxx.cn-shanghai-internal.fc.aliyuncs.com/unnest';準備測試資料。
CREATE TABLE test_array ( numbers INTEGER[] ); INSERT INTO test_array (numbers) VALUES (ARRAY[1, 3]), (ARRAY[2, 4]), ('{}'), (ARRAY[]::INTEGER[]), (NULL);調用Remote Function。
SELECT numbers, rf_unnest(numbers) FROM test_array; numbers | rf_unnest ---------+----------- {2,4} | 2 {2,4} | 4 {1,3} | 1 {1,3} | 3 (4 rows)