Fungsi yang didefinisikan pengguna (UDF) berbasis Java memungkinkan Anda memperluas StarRocks dengan logika kustom yang tidak dapat diekspresikan oleh fungsi bawaan. EMR Serverless StarRocks mendukung empat jenis UDF:
| Type | What it does |
|---|---|
| Scalar UDF | Menerima satu baris sebagai input dan mengembalikan satu nilai. Setara dengan fungsi bawaan seperti UPPER atau ROUND. |
| UDAF (user-defined aggregate function) | Menerima beberapa baris sebagai input dan mengembalikan satu nilai per kelompok. Setara dengan fungsi bawaan seperti SUM atau COUNT. |
| UDWF (user-defined window function) | Beroperasi pada jendela baris yang ditentukan oleh klausa OVER dan mengembalikan satu nilai per baris. |
| UDTF (user-defined table-valued function) | Menerima satu baris sebagai input dan mengembalikan beberapa baris dalam satu kolom. Umumnya digunakan untuk konversi baris ke kolom. |
StarRocks 2.2.0 dan versi lebih baru mendukung Java UDF. StarRocks 3.0 dan versi lebih baru mendukung global UDF — tambahkan kata kunci GLOBAL pada pernyataan CREATE, SHOW, dan DROP agar UDF dapat diakses di semua database tanpa awalan catalog atau database.
Prasyarat
Sebelum memulai, pastikan Anda telah:
Menginstal Apache Maven (untuk membangun proyek Java)
Menginstal Java Development Kit (JDK) 1.8 di server
Mengaktifkan fitur UDF: pada tab Instance Configuration di halaman detail instans EMR Serverless StarRocks Anda, buka bagian FE, atur
enable_udfkeTRUE, lalu restart instans
Pemetaan tipe data
Semua tipe parameter dan tipe kembalian dalam kelas Java Anda harus dipetakan ke tipe SQL yang didukung. Tabel berikut menunjukkan pemetaan yang didukung.
| SQL type | Java type |
|---|---|
| BOOLEAN | java.lang.Boolean |
| TINYINT | java.lang.Byte |
| SMALLINT | java.lang.Short |
| INT | java.lang.Integer |
| BIGINT | java.lang.Long |
| FLOAT | java.lang.Float |
| DOUBLE | java.lang.Double |
| STRING/VARCHAR | java.lang.String |
Kembangkan dan deploy UDF
Alur kerja terdiri dari tujuh langkah: membuat proyek Maven, menambahkan dependensi, mengimplementasikan kelas UDF, mengemas JAR, mengunggahnya ke OSS, mendaftarkan UDF di StarRocks, dan memanggilnya dalam kueri.
Langkah 1: Buat proyek Maven
Buat proyek Maven dengan struktur direktori berikut:
project
|--pom.xml
|--src
| |--main
| | |--java
| | |--resources
| |--test
|--targetLangkah 2: Tambahkan dependensi
Tambahkan konten berikut ke pom.xml:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>udf</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.76</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.10</version>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.3.0</version>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
</project>Langkah 3: Implementasikan kelas UDF
Scalar UDF
Scalar UDF harus mengimplementasikan metode evaluate sebagai metode anggota public. Signature metode ini menentukan tipe parameter dan tipe kembalian SQL — tipe tersebut harus sesuai dengan tipe yang Anda deklarasikan dalam pernyataan CREATE FUNCTION (lihat Pemetaan tipe data).
| Method | Description |
|---|---|
TYPE1 evaluate(TYPE2, ...) | Titik masuk pemanggilan. Harus merupakan metode anggota public. |
Contoh berikut mengimplementasikan MY_UDF_JSON_GET, yang mengekstraksi nilai JSON bersarang menggunakan ekspresi path bertitik. Fungsi ini menggantikan pola bersarang GET_JSON_STRING(GET_JSON_STRING(...)) dengan satu panggilan: MY_UDF_JSON_GET('{"key":"{\\"k0\\":\\"v0\\"}"}', "$.key.k0").
package com.starrocks.udf.sample;
import com.alibaba.fastjson.JSONPath;
public class UDFJsonGet {
public final String evaluate(String jsonObj, String key) {
if (jsonObj == null || key == null) return null;
try {
// JSONPath.read sepenuhnya mengekspansi string JSON bersarang
return JSONPath.read(jsonObj, key).toString();
} catch (Exception e) {
return null;
}
}
}UDAF
UDAF mengagregasi beberapa baris per kelompok menjadi satu hasil. Fungsi ini menggunakan kelas dalam State untuk menyimpan hasil antara, yang diserialisasi dan dideserialisasi oleh StarRocks saat mentransmisikan data antar node eksekusi.
Metode wajib — implementasikan keenam metode berikut untuk setiap UDAF:
| Method | Required | Description |
|---|---|---|
State create() | Selalu | Mengalokasikan objek State baru. |
void destroy(State) | Selalu | Melepaskan sumber daya yang dipegang oleh State. |
void update(State, ...) | Selalu | Mengakumulasi satu baris input ke dalam State. Parameter pertama adalah State; parameter sisanya adalah input fungsi yang dideklarasikan. |
void serialize(State, ByteBuffer) | Selalu | Menulis State ke buffer untuk transmisi antar node. |
void merge(State, ByteBuffer) | Selalu | Menggabungkan dan mendeserialisasi State dari buffer. |
TYPE finalize(State) | Selalu | Mengekstraksi hasil agregasi akhir dari State. |
Buffer state antara — gunakan java.nio.ByteBuffer untuk menyimpan hasil antara:
| Item | Description |
|---|---|
java.nio.ByteBuffer | Menyimpan State yang telah diserialisasi selama transmisi antar node. |
serializeLength() | Mengembalikan panjang byte dari State yang diserialisasi (tipe data: INT). Harus persis sama dengan jumlah byte yang Anda tulis di serialize. Untuk penghitung int, kembalikan 4; untuk penghitung long, kembalikan 8. |
Jangan memanggil remaining() pada ByteBuffer untuk mendeserialisasi State, dan jangan memanggil clear() padanya. Jika serializeLength tidak sesuai dengan byte yang sebenarnya ditulis di serialize, agregasi akan menghasilkan hasil yang salah.
Contoh berikut mengimplementasikan MY_SUM_INT, penjumlahan INT-in/INT-out (berbeda dengan SUM bawaan yang selalu mengembalikan BIGINT):
package com.starrocks.udf.sample;
public class SumInt {
public static class State {
int counter = 0;
public int serializeLength() { return 4; } // INT = 4 byte
}
public State create() {
return new State();
}
public void destroy(State state) {
}
public final void update(State state, Integer val) {
if (val != null) {
state.counter += val;
}
}
public void serialize(State state, java.nio.ByteBuffer buff) {
buff.putInt(state.counter);
}
public void merge(State state, java.nio.ByteBuffer buffer) {
int val = buffer.getInt();
state.counter += val;
}
public Integer finalize(State state) {
return state.counter;
}
}UDWF
UDWF (user-defined window function) adalah UDAF khusus yang mengembalikan satu hasil per baris input, bukan satu hasil per kelompok. Fungsi ini menggunakan klausa OVER untuk menentukan partisi dan frame jendela, serta menambahkan metode windowUpdate ke antarmuka UDAF standar.
Implementasikan keenam metode UDAF ditambah windowUpdate:
| Method | Description |
|---|---|
void reset(State state) | Mereset State ketika frame jendela berubah. |
void windowUpdate(State state, int peer_group_start, int peer_group_end, int frame_start, int frame_end, TYPE[] inputs) | Memperbarui State untuk frame jendela baris saat ini. |
Parameter `windowUpdate`:
| Parameter | Description |
|---|---|
peer_group_start | Indeks awal partisi saat ini (baris yang memiliki kunci PARTITION BY yang sama). |
peer_group_end | Indeks akhir partisi saat ini. |
frame_start | Indeks awal frame jendela saat ini (misalnya, ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING). |
frame_end | Indeks akhir frame jendela saat ini. |
inputs | Nilai kolom input untuk jendela dalam bentuk array kelas pembungkus. Gunakan Integer[] untuk input INT. |
Contoh berikut mengimplementasikan MY_WINDOW_SUM_INT, penjumlahan jendela INT:
package com.starrocks.udf.sample;
public class WindowSumInt {
public static class State {
int counter = 0;
public int serializeLength() { return 4; }
}
public State create() {
return new State();
}
public void destroy(State state) {
}
public void update(State state, Integer val) {
if (val != null) {
state.counter += val;
}
}
public void serialize(State state, java.nio.ByteBuffer buff) {
buff.putInt(state.counter);
}
public void merge(State state, java.nio.ByteBuffer buffer) {
int val = buffer.getInt();
state.counter += val;
}
public Integer finalize(State state) {
return state.counter;
}
public void reset(State state) {
state.counter = 0;
}
public void windowUpdate(State state,
int peer_group_start, int peer_group_end,
int frame_start, int frame_end,
Integer[] inputs) {
for (int i = (int)frame_start; i < (int)frame_end; ++i) {
state.counter += inputs[i];
}
}
}Untuk informasi lebih lanjut tentang sintaks fungsi jendela, lihat Window functions.
UDTF
UDTF membaca satu baris input dan mengembalikan beberapa baris, semuanya dalam satu kolom. Fungsi ini harus mengimplementasikan metode process, yang mengembalikan array.
UDTF hanya mendukung pengembalian beberapa baris dalam satu kolom.
| Method | Description |
|---|---|
TYPE[] process() | Titik masuk pemanggilan. Mengembalikan array — setiap elemen menjadi baris output terpisah. |
Contoh berikut mengimplementasikan MY_UDF_SPLIT, yang memisahkan string berdasarkan spasi:
package com.starrocks.udf.sample;
public class UDFSplit{
public String[] process(String in) {
if (in == null) return null;
return in.split(" ");
}
}Langkah 4: Kemas proyek
Jalankan perintah berikut untuk membangun file JAR:
mvn packagePerintah ini menghasilkan dua file di direktori target:
udf-1.0-SNAPSHOT.jarudf-1.0-SNAPSHOT-jar-with-dependencies.jar
Langkah 5: Unggah JAR ke OSS
Unggah udf-1.0-SNAPSHOT-jar-with-dependencies.jar ke bucket Object Storage Service (OSS) dan atur ACL bucket agar memungkinkan pembacaan publik. Untuk instruksi pengunggahan, lihat Simple upload dan Bucket ACLs.
Node frontend (FE) memverifikasi JAR dan menghitung checksum-nya. Node backend (BE) mengunduh dan menjalankan JAR. Properti file pada Langkah 6 harus menggunakan URL titik akhir internal OSS.
Langkah 6: Daftarkan UDF di StarRocks
StarRocks mendukung dua namespace UDF: global dan tingkat database.
Global UDF: Dapat dipanggil berdasarkan nama dari database mana pun tanpa awalan
catalog.database. Gunakan ini untuk fungsi utilitas bersama.Database-level UDF: Dapat dipanggil berdasarkan nama dalam database asalnya. Dari database lain, gunakan format
catalog.database.function_name. Gunakan ini jika Anda memerlukan nama fungsi yang sama di beberapa database.
Izin yang diperlukan:
Membuat global UDF: izin sistem
CREATE GLOBAL FUNCTIONMembuat database-level UDF: izin tingkat database
CREATE FUNCTIONGRANTMemanggil UDF: izin
USAGEpada UDF
Untuk pengaturan izin, lihat GRANT.
Sintaks
CREATE [GLOBAL] [AGGREGATE | TABLE] FUNCTION function_name(arg_type [, ...])
RETURNS return_type
[PROPERTIES ("key" = "value" [, ...]) ]Parameter
| Parameter | Required | Description |
|---|---|---|
GLOBAL | Tidak | Membuat global UDF. Didukung di StarRocks 3.0 dan versi lebih baru. |
AGGREGATE | Tidak | Wajib untuk UDAF dan UDWF. |
TABLE | Tidak | Wajib untuk UDTF. |
function_name | Ya | Nama fungsi. Sertakan nama database untuk membuat UDF di database tertentu (misalnya, db1.my_func). Fungsi dengan nama dan tipe parameter identik tidak dapat dibuat dua kali di database yang sama; tipe parameter berbeda diperbolehkan. |
arg_type | Ya | Tipe parameter. Lihat Pemetaan tipe data. |
return_type | Ya | Tipe kembalian. Lihat Pemetaan tipe data. |
PROPERTIES | Ya | Properti fungsi. Lihat sub-bagian di bawah. |
Parameter PROPERTIES
| Property | Required | Description |
|---|---|---|
symbol | Ya | Nama kelas lengkap dalam format <package_name>.<class_name>. |
type | Ya | Atur ke StarrocksJar untuk UDF berbasis Java. |
file | Ya | URL HTTP file JAR menggunakan titik akhir internal OSS: http://<YourBucketName>.oss-cn-xxxx-internal.aliyuncs.com/<YourPath>/<jar_package_name> |
analytic | Tidak | Atur ke true untuk UDWF. Tidak diperlukan untuk jenis UDF lainnya. |
Buat scalar UDF
CREATE [GLOBAL] FUNCTION MY_UDF_JSON_GET(string, string)
RETURNS string
PROPERTIES (
"symbol" = "com.starrocks.udf.sample.UDFJsonGet",
"type" = "StarrocksJar",
"file" = "http://<YourBucketName>.oss-cn-xxxx-internal.aliyuncs.com/<YourPath>/udf-1.0-SNAPSHOT-jar-with-dependencies.jar"
);Buat UDAF
CREATE [GLOBAL] AGGREGATE FUNCTION MY_SUM_INT(INT)
RETURNS INT
PROPERTIES (
"symbol" = "com.starrocks.udf.sample.SumInt",
"type" = "StarrocksJar",
"file" = "http://<YourBucketName>.oss-cn-xxxx-internal.aliyuncs.com/<YourPath>/udf-1.0-SNAPSHOT-jar-with-dependencies.jar"
);Buat UDWF
CREATE [GLOBAL] AGGREGATE FUNCTION MY_WINDOW_SUM_INT(Int)
RETURNS Int
PROPERTIES (
"analytic" = "true",
"symbol" = "com.starrocks.udf.sample.WindowSumInt",
"type" = "StarrocksJar",
"file" = "http://<YourBucketName>.oss-cn-xxxx-internal.aliyuncs.com/<YourPath>/udf-1.0-SNAPSHOT-jar-with-dependencies.jar"
);Buat UDTF
CREATE [GLOBAL] TABLE FUNCTION MY_UDF_SPLIT(string)
RETURNS string
PROPERTIES (
"symbol" = "com.starrocks.udf.sample.UDFSplit",
"type" = "StarrocksJar",
"file" = "http://<YourBucketName>.oss-cn-xxxx-internal.aliyuncs.com/<YourPath>/udf-1.0-SNAPSHOT-jar-with-dependencies.jar"
);Langkah 7: Panggil UDF
Scalar UDF
SELECT MY_UDF_JSON_GET('{"key":"{\\"in\\":2}"}', '$.key.in');UDAF
SELECT MY_SUM_INT(col1);UDWF
SELECT MY_WINDOW_SUM_INT(intcol)
OVER (PARTITION BY intcol2
ORDER BY intcol3
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)
FROM test_basic;UDTF
-- Asumsikan tabel t1 memiliki kolom a, b, dan c1
SELECT t1.a, t1.b, t1.c1 FROM t1;
-- Output:
-- 1, 2.1, "hello world"
-- 2, 2.2, "hello UDTF."
-- Pisahkan c1 menjadi satu kata per baris
SELECT t1.a, t1.b, MY_UDF_SPLIT FROM t1, MY_UDF_SPLIT(t1.c1);
-- Output:
-- 1, 2.1, "hello"
-- 1, 2.1, "world"
-- 2, 2.2, "hello"
-- 2, 2.2, "UDTF."MY_UDF_SPLIT dalam daftar SELECT adalah alias kolom yang dihasilkan saat Anda memanggil fungsi tersebut. Anda tidak dapat menggunakan AS t2(f1) untuk memberikan alias tabel atau alias kolom pada hasil UDTF.
Lihat UDF
SHOW [GLOBAL] FUNCTIONS;Hapus UDF
DROP [GLOBAL] FUNCTION <function_name>(arg_type [, ...]);FAQ
Apakah saya dapat menggunakan variabel statis dalam UDF? Apakah variabel statis dari UDF yang berbeda saling memengaruhi?
Ya. Variabel statis diisolasi per kelas UDF — variabel tersebut tidak saling mengganggu variabel statis dari kelas UDF lain, bahkan jika dua kelas memiliki nama yang sama.