All Products
Search
Document Center

E-MapReduce:Java UDF

Last Updated:Mar 26, 2026

Fungsi yang didefinisikan pengguna (UDF) berbasis Java memungkinkan Anda memperluas StarRocks dengan logika kustom yang tidak dapat diekspresikan oleh fungsi bawaan. EMR Serverless StarRocks mendukung empat jenis UDF:

TypeWhat it does
Scalar UDFMenerima satu baris sebagai input dan mengembalikan satu nilai. Setara dengan fungsi bawaan seperti UPPER atau ROUND.
UDAF (user-defined aggregate function)Menerima beberapa baris sebagai input dan mengembalikan satu nilai per kelompok. Setara dengan fungsi bawaan seperti SUM atau COUNT.
UDWF (user-defined window function)Beroperasi pada jendela baris yang ditentukan oleh klausa OVER dan mengembalikan satu nilai per baris.
UDTF (user-defined table-valued function)Menerima satu baris sebagai input dan mengembalikan beberapa baris dalam satu kolom. Umumnya digunakan untuk konversi baris ke kolom.

StarRocks 2.2.0 dan versi lebih baru mendukung Java UDF. StarRocks 3.0 dan versi lebih baru mendukung global UDF — tambahkan kata kunci GLOBAL pada pernyataan CREATE, SHOW, dan DROP agar UDF dapat diakses di semua database tanpa awalan catalog atau database.

Prasyarat

Sebelum memulai, pastikan Anda telah:

  • Menginstal Apache Maven (untuk membangun proyek Java)

  • Menginstal Java Development Kit (JDK) 1.8 di server

  • Mengaktifkan fitur UDF: pada tab Instance Configuration di halaman detail instans EMR Serverless StarRocks Anda, buka bagian FE, atur enable_udf ke TRUE, lalu restart instans

Pemetaan tipe data

Semua tipe parameter dan tipe kembalian dalam kelas Java Anda harus dipetakan ke tipe SQL yang didukung. Tabel berikut menunjukkan pemetaan yang didukung.

SQL typeJava type
BOOLEANjava.lang.Boolean
TINYINTjava.lang.Byte
SMALLINTjava.lang.Short
INTjava.lang.Integer
BIGINTjava.lang.Long
FLOATjava.lang.Float
DOUBLEjava.lang.Double
STRING/VARCHARjava.lang.String

Kembangkan dan deploy UDF

Alur kerja terdiri dari tujuh langkah: membuat proyek Maven, menambahkan dependensi, mengimplementasikan kelas UDF, mengemas JAR, mengunggahnya ke OSS, mendaftarkan UDF di StarRocks, dan memanggilnya dalam kueri.

Langkah 1: Buat proyek Maven

Buat proyek Maven dengan struktur direktori berikut:

project
|--pom.xml
|--src
|  |--main
|  |  |--java
|  |  |--resources
|  |--test
|--target

Langkah 2: Tambahkan dependensi

Tambahkan konten berikut ke pom.xml:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>udf</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.76</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-dependency-plugin</artifactId>
                <version>2.10</version>
                <executions>
                    <execution>
                        <id>copy-dependencies</id>
                        <phase>package</phase>
                        <goals>
                            <goal>copy-dependencies</goal>
                        </goals>
                        <configuration>
                            <outputDirectory>${project.build.directory}/lib</outputDirectory>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.3.0</version>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

Langkah 3: Implementasikan kelas UDF

Scalar UDF

Scalar UDF harus mengimplementasikan metode evaluate sebagai metode anggota public. Signature metode ini menentukan tipe parameter dan tipe kembalian SQL — tipe tersebut harus sesuai dengan tipe yang Anda deklarasikan dalam pernyataan CREATE FUNCTION (lihat Pemetaan tipe data).

MethodDescription
TYPE1 evaluate(TYPE2, ...)Titik masuk pemanggilan. Harus merupakan metode anggota public.

Contoh berikut mengimplementasikan MY_UDF_JSON_GET, yang mengekstraksi nilai JSON bersarang menggunakan ekspresi path bertitik. Fungsi ini menggantikan pola bersarang GET_JSON_STRING(GET_JSON_STRING(...)) dengan satu panggilan: MY_UDF_JSON_GET('{"key":"{\\"k0\\":\\"v0\\"}"}', "$.key.k0").

package com.starrocks.udf.sample;
import com.alibaba.fastjson.JSONPath;

public class UDFJsonGet {
    public final String evaluate(String jsonObj, String key) {
        if (jsonObj == null || key == null) return null;
        try {
            // JSONPath.read sepenuhnya mengekspansi string JSON bersarang
            return JSONPath.read(jsonObj, key).toString();
        } catch (Exception e) {
            return null;
        }
    }
}

UDAF

UDAF mengagregasi beberapa baris per kelompok menjadi satu hasil. Fungsi ini menggunakan kelas dalam State untuk menyimpan hasil antara, yang diserialisasi dan dideserialisasi oleh StarRocks saat mentransmisikan data antar node eksekusi.

Metode wajib — implementasikan keenam metode berikut untuk setiap UDAF:

MethodRequiredDescription
State create()SelaluMengalokasikan objek State baru.
void destroy(State)SelaluMelepaskan sumber daya yang dipegang oleh State.
void update(State, ...)SelaluMengakumulasi satu baris input ke dalam State. Parameter pertama adalah State; parameter sisanya adalah input fungsi yang dideklarasikan.
void serialize(State, ByteBuffer)SelaluMenulis State ke buffer untuk transmisi antar node.
void merge(State, ByteBuffer)SelaluMenggabungkan dan mendeserialisasi State dari buffer.
TYPE finalize(State)SelaluMengekstraksi hasil agregasi akhir dari State.

Buffer state antara — gunakan java.nio.ByteBuffer untuk menyimpan hasil antara:

ItemDescription
java.nio.ByteBufferMenyimpan State yang telah diserialisasi selama transmisi antar node.
serializeLength()Mengembalikan panjang byte dari State yang diserialisasi (tipe data: INT). Harus persis sama dengan jumlah byte yang Anda tulis di serialize. Untuk penghitung int, kembalikan 4; untuk penghitung long, kembalikan 8.
Peringatan

Jangan memanggil remaining() pada ByteBuffer untuk mendeserialisasi State, dan jangan memanggil clear() padanya. Jika serializeLength tidak sesuai dengan byte yang sebenarnya ditulis di serialize, agregasi akan menghasilkan hasil yang salah.

Contoh berikut mengimplementasikan MY_SUM_INT, penjumlahan INT-in/INT-out (berbeda dengan SUM bawaan yang selalu mengembalikan BIGINT):

package com.starrocks.udf.sample;

public class SumInt {
    public static class State {
        int counter = 0;
        public int serializeLength() { return 4; } // INT = 4 byte
    }

    public State create() {
        return new State();
    }

    public void destroy(State state) {
    }

    public final void update(State state, Integer val) {
        if (val != null) {
            state.counter += val;
        }
    }

    public void serialize(State state, java.nio.ByteBuffer buff) {
        buff.putInt(state.counter);
    }

    public void merge(State state, java.nio.ByteBuffer buffer) {
        int val = buffer.getInt();
        state.counter += val;
    }

    public Integer finalize(State state) {
        return state.counter;
    }
}

UDWF

UDWF (user-defined window function) adalah UDAF khusus yang mengembalikan satu hasil per baris input, bukan satu hasil per kelompok. Fungsi ini menggunakan klausa OVER untuk menentukan partisi dan frame jendela, serta menambahkan metode windowUpdate ke antarmuka UDAF standar.

Implementasikan keenam metode UDAF ditambah windowUpdate:

MethodDescription
void reset(State state)Mereset State ketika frame jendela berubah.
void windowUpdate(State state, int peer_group_start, int peer_group_end, int frame_start, int frame_end, TYPE[] inputs)Memperbarui State untuk frame jendela baris saat ini.

Parameter `windowUpdate`:

ParameterDescription
peer_group_startIndeks awal partisi saat ini (baris yang memiliki kunci PARTITION BY yang sama).
peer_group_endIndeks akhir partisi saat ini.
frame_startIndeks awal frame jendela saat ini (misalnya, ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING).
frame_endIndeks akhir frame jendela saat ini.
inputsNilai kolom input untuk jendela dalam bentuk array kelas pembungkus. Gunakan Integer[] untuk input INT.

Contoh berikut mengimplementasikan MY_WINDOW_SUM_INT, penjumlahan jendela INT:

package com.starrocks.udf.sample;

public class WindowSumInt {
    public static class State {
        int counter = 0;
        public int serializeLength() { return 4; }
    }

    public State create() {
        return new State();
    }

    public void destroy(State state) {
    }

    public void update(State state, Integer val) {
        if (val != null) {
            state.counter += val;
        }
    }

    public void serialize(State state, java.nio.ByteBuffer buff) {
        buff.putInt(state.counter);
    }

    public void merge(State state, java.nio.ByteBuffer buffer) {
        int val = buffer.getInt();
        state.counter += val;
    }

    public Integer finalize(State state) {
        return state.counter;
    }

    public void reset(State state) {
        state.counter = 0;
    }

    public void windowUpdate(State state,
                            int peer_group_start, int peer_group_end,
                            int frame_start, int frame_end,
                            Integer[] inputs) {
        for (int i = (int)frame_start; i < (int)frame_end; ++i) {
            state.counter += inputs[i];
        }
    }
}

Untuk informasi lebih lanjut tentang sintaks fungsi jendela, lihat Window functions.

UDTF

UDTF membaca satu baris input dan mengembalikan beberapa baris, semuanya dalam satu kolom. Fungsi ini harus mengimplementasikan metode process, yang mengembalikan array.

Catatan

UDTF hanya mendukung pengembalian beberapa baris dalam satu kolom.

MethodDescription
TYPE[] process()Titik masuk pemanggilan. Mengembalikan array — setiap elemen menjadi baris output terpisah.

Contoh berikut mengimplementasikan MY_UDF_SPLIT, yang memisahkan string berdasarkan spasi:

package com.starrocks.udf.sample;

public class UDFSplit{
    public String[] process(String in) {
        if (in == null) return null;
        return in.split(" ");
    }
}

Langkah 4: Kemas proyek

Jalankan perintah berikut untuk membangun file JAR:

mvn package

Perintah ini menghasilkan dua file di direktori target:

  • udf-1.0-SNAPSHOT.jar

  • udf-1.0-SNAPSHOT-jar-with-dependencies.jar

Langkah 5: Unggah JAR ke OSS

Unggah udf-1.0-SNAPSHOT-jar-with-dependencies.jar ke bucket Object Storage Service (OSS) dan atur ACL bucket agar memungkinkan pembacaan publik. Untuk instruksi pengunggahan, lihat Simple upload dan Bucket ACLs.

Catatan

Node frontend (FE) memverifikasi JAR dan menghitung checksum-nya. Node backend (BE) mengunduh dan menjalankan JAR. Properti file pada Langkah 6 harus menggunakan URL titik akhir internal OSS.

Langkah 6: Daftarkan UDF di StarRocks

StarRocks mendukung dua namespace UDF: global dan tingkat database.

  • Global UDF: Dapat dipanggil berdasarkan nama dari database mana pun tanpa awalan catalog.database. Gunakan ini untuk fungsi utilitas bersama.

  • Database-level UDF: Dapat dipanggil berdasarkan nama dalam database asalnya. Dari database lain, gunakan format catalog.database.function_name. Gunakan ini jika Anda memerlukan nama fungsi yang sama di beberapa database.

Izin yang diperlukan:

  • Membuat global UDF: izin sistem CREATE GLOBAL FUNCTION

  • Membuat database-level UDF: izin tingkat database CREATE FUNCTIONGRANT

  • Memanggil UDF: izin USAGE pada UDF

Untuk pengaturan izin, lihat GRANT.

Sintaks

CREATE [GLOBAL] [AGGREGATE | TABLE] FUNCTION function_name(arg_type [, ...])
RETURNS return_type
[PROPERTIES ("key" = "value" [, ...]) ]

Parameter

ParameterRequiredDescription
GLOBALTidakMembuat global UDF. Didukung di StarRocks 3.0 dan versi lebih baru.
AGGREGATETidakWajib untuk UDAF dan UDWF.
TABLETidakWajib untuk UDTF.
function_nameYaNama fungsi. Sertakan nama database untuk membuat UDF di database tertentu (misalnya, db1.my_func). Fungsi dengan nama dan tipe parameter identik tidak dapat dibuat dua kali di database yang sama; tipe parameter berbeda diperbolehkan.
arg_typeYaTipe parameter. Lihat Pemetaan tipe data.
return_typeYaTipe kembalian. Lihat Pemetaan tipe data.
PROPERTIESYaProperti fungsi. Lihat sub-bagian di bawah.

Parameter PROPERTIES

PropertyRequiredDescription
symbolYaNama kelas lengkap dalam format <package_name>.<class_name>.
typeYaAtur ke StarrocksJar untuk UDF berbasis Java.
fileYaURL HTTP file JAR menggunakan titik akhir internal OSS: http://<YourBucketName>.oss-cn-xxxx-internal.aliyuncs.com/<YourPath>/<jar_package_name>
analyticTidakAtur ke true untuk UDWF. Tidak diperlukan untuk jenis UDF lainnya.

Buat scalar UDF

CREATE [GLOBAL] FUNCTION MY_UDF_JSON_GET(string, string)
RETURNS string
PROPERTIES (
    "symbol" = "com.starrocks.udf.sample.UDFJsonGet",
    "type" = "StarrocksJar",
    "file" = "http://<YourBucketName>.oss-cn-xxxx-internal.aliyuncs.com/<YourPath>/udf-1.0-SNAPSHOT-jar-with-dependencies.jar"
);

Buat UDAF

CREATE [GLOBAL] AGGREGATE FUNCTION MY_SUM_INT(INT)
RETURNS INT
PROPERTIES (
    "symbol" = "com.starrocks.udf.sample.SumInt",
    "type" = "StarrocksJar",
    "file" = "http://<YourBucketName>.oss-cn-xxxx-internal.aliyuncs.com/<YourPath>/udf-1.0-SNAPSHOT-jar-with-dependencies.jar"
);

Buat UDWF

CREATE [GLOBAL] AGGREGATE FUNCTION MY_WINDOW_SUM_INT(Int)
RETURNS Int
PROPERTIES (
    "analytic" = "true",
    "symbol" = "com.starrocks.udf.sample.WindowSumInt",
    "type" = "StarrocksJar",
    "file" = "http://<YourBucketName>.oss-cn-xxxx-internal.aliyuncs.com/<YourPath>/udf-1.0-SNAPSHOT-jar-with-dependencies.jar"
);

Buat UDTF

CREATE [GLOBAL] TABLE FUNCTION MY_UDF_SPLIT(string)
RETURNS string
PROPERTIES (
    "symbol" = "com.starrocks.udf.sample.UDFSplit",
    "type" = "StarrocksJar",
    "file" = "http://<YourBucketName>.oss-cn-xxxx-internal.aliyuncs.com/<YourPath>/udf-1.0-SNAPSHOT-jar-with-dependencies.jar"
);

Langkah 7: Panggil UDF

Scalar UDF

SELECT MY_UDF_JSON_GET('{"key":"{\\"in\\":2}"}', '$.key.in');

UDAF

SELECT MY_SUM_INT(col1);

UDWF

SELECT MY_WINDOW_SUM_INT(intcol)
    OVER (PARTITION BY intcol2
          ORDER BY intcol3
          ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)
FROM test_basic;

UDTF

-- Asumsikan tabel t1 memiliki kolom a, b, dan c1
SELECT t1.a, t1.b, t1.c1 FROM t1;
-- Output:
-- 1, 2.1, "hello world"
-- 2, 2.2, "hello UDTF."

-- Pisahkan c1 menjadi satu kata per baris
SELECT t1.a, t1.b, MY_UDF_SPLIT FROM t1, MY_UDF_SPLIT(t1.c1);
-- Output:
-- 1, 2.1, "hello"
-- 1, 2.1, "world"
-- 2, 2.2, "hello"
-- 2, 2.2, "UDTF."
Catatan

MY_UDF_SPLIT dalam daftar SELECT adalah alias kolom yang dihasilkan saat Anda memanggil fungsi tersebut. Anda tidak dapat menggunakan AS t2(f1) untuk memberikan alias tabel atau alias kolom pada hasil UDTF.

Lihat UDF

SHOW [GLOBAL] FUNCTIONS;

Hapus UDF

DROP [GLOBAL] FUNCTION <function_name>(arg_type [, ...]);

FAQ

Apakah saya dapat menggunakan variabel statis dalam UDF? Apakah variabel statis dari UDF yang berbeda saling memengaruhi?

Ya. Variabel statis diisolasi per kelas UDF — variabel tersebut tidak saling mengganggu variabel statis dari kelas UDF lain, bahkan jika dua kelas memiliki nama yang sama.