Java ユーザー定義関数 (UDF) を使用すると、組み込み関数では表現できないカスタムロジックで StarRocks を拡張できます。EMR Serverless StarRocks は、次の 4 種類の UDF をサポートしています。
| タイプ | 機能 |
|---|---|
| スカラー UDF | 1行を入力として受け取り、1つの値を返します。 UPPER や ROUND のようなビルトイン関数と同等です。 |
| UDAF (ユーザー定義集計関数) | 複数行を入力として受け取り、グループごとに1つの値を返します。 SUM や COUNT のようなビルトイン関数と同等です。 |
| UDWF (ユーザー定義ウィンドウ関数) | OVER 句で定義された行のウィンドウに対して操作を行い、行ごとに1つの値を返します。 |
| UDTF (ユーザー定義テーブル値関数) | 1行を入力として受け取り、単一の列で複数行を返します。 行から列への変換によく使用されます。 |
StarRocks 2.2.0 以降では、Java UDF をサポートしています。StarRocks 3.0 以降では、グローバル UDF をサポートしています。GLOBAL キーワードを CREATE、SHOW、および DROP 文に追加することで、カタログまたはデータベースのプレフィックスを指定せずに、UDF をすべてのデータベースでアクセス可能にできます。
前提条件
開始する前に、次のものが揃っていることを確認してください。
Apache Maven がインストールされていること (Java プロジェクトのビルド用)
サーバーに Java 開発キット (JDK) 1.8 がインストールされていること
UDF 機能を有効化するには、EMR Serverless StarRocks インスタンスの詳細ページの [インスタンス設定] タブで、[FE] セクションに移動し、
enable_udfをTRUEに設定して、インスタンスを再起動します。
データ型のマッピング
Java クラスのすべてのパラメーターと戻り値の型は、サポートされている SQL 型にマッピングする必要があります。以下の表に、サポートされているマッピングを示します。
| SQL 型 | Java 型 |
|---|---|
| BOOLEAN | java.lang.Boolean |
| TINYINT | java.lang.Byte |
| SMALLINT | java.lang.Short |
| INT | java.lang.Integer |
| BIGINT | java.lang.Long |
| FLOAT | java.lang.Float |
| DOUBLE | java.lang.Double |
| STRING/VARCHAR | java.lang.String |
UDF の開発とデプロイ
ワークフローは、Maven プロジェクトの作成、依存関係の追加、UDF クラスの実装、JAR のパッケージ化、Object Storage Service (OSS) へのアップロード、StarRocks での UDF の登録、クエリでの呼び出しの 7 つのステップで構成されます。
ステップ 1:Maven プロジェクトの作成
次のディレクトリ構造で Maven プロジェクトを作成します。
project
|--pom.xml
|--src
| |--main
| | |--java
| | |--resources
| |--test
|--targetステップ 2:依存関係の追加
次の内容を pom.xml に追加します。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>udf</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.76</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.10</version>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.3.0</version>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
</project>ステップ 3:UDF クラスの実装
スカラー UDF
スカラ UDF は、evaluate メソッドを public メンバーメソッドとして実装する必要があります。メソッドシグネチャによって SQL パラメーターおよび戻り値の型が決まります。これらは CREATE FUNCTION 文で宣言する型と一致している必要があります(データ型マッピングをご参照ください)。
| メソッド | 説明 |
|---|---|
TYPE1 evaluate(TYPE2, ...) | 呼び出しのエントリポイントです。public メンバーメソッドである必要があります。 |
以下の例では、ドット区切りのパス式を使用してネストされた JSON 値を抽出する MY_UDF_JSON_GET を実装しています。これは、ネストされた GET_JSON_STRING(GET_JSON_STRING(...)) パターンを、単一の呼び出し MY_UDF_JSON_GET('{"key":"{\\"k0\\":\\"v0\\"}"}', "$.key.k0") に置き換えます。
package com.starrocks.udf.sample;
import com.alibaba.fastjson.JSONPath;
public class UDFJsonGet {
public final String evaluate(String jsonObj, String key) {
if (jsonObj == null || key == null) return null;
try {
// JSONPath.read はネストされた JSON 文字列を完全に展開します
return JSONPath.read(jsonObj, key).toString();
} catch (Exception e) {
return null;
}
}
}UDAF
UDAF は、グループごとの複数の行を単一の結果に集約します。これは、中間結果を保持するための State 内部クラスを使用し、実行ノード間でデータを送信する際に StarRocks によってシリアル化および逆シリアル化されます。
必須メソッド — すべての UDAF で 6 つすべてを実装します:
| メソッド | 必須 | 説明 |
|---|---|---|
State create() | 常時 | 新しい State オブジェクトを割り当てます。 |
void destroy(State) | 常時 | State が保持しているリソースを解放します。 |
void update(State, ...) | 常時 | 1 つの入力行を State に蓄積します。最初のパラメーターは State で、残りのパラメーターは宣言された関数の入力です。 |
void serialize(State, ByteBuffer) | 常時 | ノード間転送のために State をバッファーに書き込みます。 |
void merge(State, ByteBuffer) | 常時 | バッファーから State をマージして逆シリアル化します。 |
TYPE finalize(State) | 常時 | State から最終的な集約結果を抽出します。 |
中間状態バッファー — java.nio.ByteBuffer を使用して中間結果を格納します:
| 項目 | 説明 |
|---|---|
java.nio.ByteBuffer | ノード間通信時のシリアル化された State を保持します。 |
serializeLength() | シリアル化された State のバイト長(データの型:INT)を返します。この値は、serialize メソッド内で書き込むバイト数と完全に一致する必要があります。たとえば、int 型のカウンターの場合は 4、long 型のカウンターの場合は 8 を返します。 |
State の逆シリアル化時に ByteBuffer で remaining() を呼び出さないでください。また、clear() も呼び出さないでください。 serializeLength が serialize で実際に書き込まれたバイト数と一致しない場合、集約処理の結果が正しくなりません。
以下の例では、MY_SUM_INT(INT 型の入力と出力を持つ合計関数)を実装しています(組み込みの SUM 関数とは異なり、常に BIGINT を返します):
package com.starrocks.udf.sample;
public class SumInt {
public static class State {
int counter = 0;
public int serializeLength() { return 4; } // INT = 4 バイト
}
public State create() {
return new State();
}
public void destroy(State state) {
}
public final void update(State state, Integer val) {
if (val != null) {
state.counter += val;
}
}
public void serialize(State state, java.nio.ByteBuffer buff) {
buff.putInt(state.counter);
}
public void merge(State state, java.nio.ByteBuffer buffer) {
int val = buffer.getInt();
state.counter += val;
}
public Integer finalize(State state) {
return state.counter;
}
}UDWF
ユーザー定義ウィンドウ関数 (UDWF) は、グループごとに 1 つの結果を返すのではなく、入力行ごとに 1 つの結果を返す特殊な UDAF です。これは、OVER 句を使用してパーティションとウィンドウフレームを定義し、標準の UDAF インターフェイスに windowUpdate メソッドを追加します。
6 つの UDAF メソッドすべてに加えて、windowUpdate を実装します:
| メソッド | 説明 |
|---|---|
void reset(State state) | ウィンドウフレームが変更されたときに State をリセットします。 |
void windowUpdate(State state, int peer_group_start, int peer_group_end, int frame_start, int frame_end, TYPE[] inputs) | 現在の行のウィンドウフレームに対して State を更新します。 |
`windowUpdate` パラメーター:
| パラメーター | 説明 |
|---|---|
peer_group_start | 現在のパーティション(同じ PARTITION BY キーを持つ行)の開始インデックスです。 |
peer_group_end | 現在のパーティションの終了インデックスです。 |
frame_start | 現在のウィンドウフレーム(例: ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING)の開始インデックスです。 |
frame_end | 現在のウィンドウフレームの終了インデックスです。 |
inputs | ウィンドウ内の入力列値をラッパークラスの配列として保持します。INT 型の入力には Integer[] を使用します。 |
以下の例では、INT のウィンドウ合計である MY_WINDOW_SUM_INT を実装します:
package com.starrocks.udf.sample;
public class WindowSumInt {
public static class State {
int counter = 0;
public int serializeLength() { return 4; }
}
public State create() {
return new State();
}
public void destroy(State state) {
}
public void update(State state, Integer val) {
if (val != null) {
state.counter += val;
}
}
public void serialize(State state, java.nio.ByteBuffer buff) {
buff.putInt(state.counter);
}
public void merge(State state, java.nio.ByteBuffer buffer) {
int val = buffer.getInt();
state.counter += val;
}
public Integer finalize(State state) {
return state.counter;
}
public void reset(State state) {
state.counter = 0;
}
public void windowUpdate(State state,
int peer_group_start, int peer_group_end,
int frame_start, int frame_end,
Integer[] inputs) {
for (int i = (int)frame_start; i < (int)frame_end; ++i) {
state.counter += inputs[i];
}
}
}ウィンドウ関数の構文の詳細については、「ウィンドウ関数」をご参照ください。
UDTF
UDTF は 1 つの入力行を読み取り、単一の列に複数の行を返します。配列を返す process メソッドを実装する必要があります。
UDTF は、単一の列に複数の行を返すことのみをサポートしています。
| メソッド | 説明 |
|---|---|
TYPE[] process() | 呼び出しのエントリポイントです。配列を返します。各要素は個別の出力行になります。 |
以下の例では、文字列をスペースで分割する MY_UDF_SPLIT を実装します:
package com.starrocks.udf.sample;
public class UDFSplit{
public String[] process(String in) {
if (in == null) return null;
return in.split(" ");
}
}ステップ 4:プロジェクトのパッケージ化
次のコマンドを実行して JAR をビルドします。
mvn packageこれにより、target ディレクトリに2つのファイルが生成されます。
udf-1.0-SNAPSHOT.jarudf-1.0-SNAPSHOT-jar-with-dependencies.jar
ステップ 5:JAR を OSS にアップロード
udf-1.0-SNAPSHOT-jar-with-dependencies.jar を Object Storage Service (OSS) バケットにアップロードし、バケットの ACL を公開読み取りが許可されるように設定します。アップロード手順については、「簡単なアップロード」および「バケット ACL」をご参照ください。
フロントエンド (FE) ノードは JAR を検証し、そのチェックサムを計算します。 バックエンド (BE) ノードは JAR をダウンロードして実行します。 ステップ 6 の file プロパティは、OSS 内部エンドポイントの URL を使用する必要があります。
ステップ 6:StarRocks で UDF を登録
StarRocks は、グローバルとデータベースレベルの 2 つの UDF 名前空間をサポートしています。
グローバル UDF:
catalog.databaseプレフィックスなしで、任意のデータベースから名前で呼び出し可能です。共有ユーティリティ関数に使用します。データベースレベル UDF: 自身のデータベース内では、名前で呼び出すことができます。別のデータベースからは、
catalog.database.function_nameフォーマットを使用します。複数のデータベースで同じ関数名が必要な場合に使用します。
必要な権限:
グローバル UDF の作成:システムレベルの
CREATE GLOBAL FUNCTION権限データベースレベルの UDF を作成する: データベースレベルの
CREATE FUNCTION権限GRANTUDF の呼び出し:UDF に対する
USAGE権限
権限の設定については、「GRANT」をご参照ください。
構文
CREATE [GLOBAL] [AGGREGATE | TABLE] FUNCTION function_name(arg_type [, ...])
RETURNS return_type
[PROPERTIES ("key" = "value" [, ...]) ]パラメーター
| パラメーター | 必須 | 説明 |
|---|---|---|
GLOBAL | いいえ | グローバル UDF を作成します。StarRocks 3.0 以降でサポートされています。 |
AGGREGATE | いいえ | UDAF および UDWF に必要です。 |
TABLE | いいえ | UDTF に必要です。 |
function_name | はい | db1.my_func関数名。データベース名を含めると、特定のデータベースに UDF を作成できます (例:`db1.my_func`)。同じ名前でパラメーターの型が同一の関数を同じデータベースに 2 回作成することはできませんが、パラメーターの型が異なれば許可されます。 |
arg_type | はい | パラメーターの型。詳細については、「データ型のマッピング」をご参照ください。 |
return_type | はい | 戻り値の型。詳細については、「データ型のマッピング」をご参照ください。 |
PROPERTIES | はい | 関数のプロパティ。以下のサブセクションをご参照ください。 |
PROPERTIES パラメーター
| プロパティ | 必須 | 説明 |
|---|---|---|
symbol | はい | 完全修飾クラス名(<package_name>.<class_name> 形式)。 |
type | はい | Java ベースの UDF の場合は StarrocksJar を指定します。 |
file | はい | OSS 内部エンドポイントを使用した JAR の HTTP URL:http://<YourBucketName>.oss-cn-xxxx-internal.aliyuncs.com/<YourPath>/<jar_package_name> |
analytic | いいえ | UDWF の場合は true を指定します。その他の UDF タイプでは不要です。 |
スカラー UDF の作成
CREATE [GLOBAL] FUNCTION MY_UDF_JSON_GET(string, string)
RETURNS string
PROPERTIES (
"symbol" = "com.starrocks.udf.sample.UDFJsonGet",
"type" = "StarrocksJar",
"file" = "http://<YourBucketName>.oss-cn-xxxx-internal.aliyuncs.com/<YourPath>/udf-1.0-SNAPSHOT-jar-with-dependencies.jar"
);UDAF の作成
CREATE [GLOBAL] AGGREGATE FUNCTION MY_SUM_INT(INT)
RETURNS INT
PROPERTIES (
"symbol" = "com.starrocks.udf.sample.SumInt",
"type" = "StarrocksJar",
"file" = "http://<YourBucketName>.oss-cn-xxxx-internal.aliyuncs.com/<YourPath>/udf-1.0-SNAPSHOT-jar-with-dependencies.jar"
);UDWF の作成
CREATE [GLOBAL] AGGREGATE FUNCTION MY_WINDOW_SUM_INT(Int)
RETURNS Int
PROPERTIES (
"analytic" = "true",
"symbol" = "com.starrocks.udf.sample.WindowSumInt",
"type" = "StarrocksJar",
"file" = "http://<YourBucketName>.oss-cn-xxxx-internal.aliyuncs.com/<YourPath>/udf-1.0-SNAPSHOT-jar-with-dependencies.jar"
);UDTF の作成
CREATE [GLOBAL] TABLE FUNCTION MY_UDF_SPLIT(string)
RETURNS string
PROPERTIES (
"symbol" = "com.starrocks.udf.sample.UDFSplit",
"type" = "StarrocksJar",
"file" = "http://<YourBucketName>.oss-cn-xxxx-internal.aliyuncs.com/<YourPath>/udf-1.0-SNAPSHOT-jar-with-dependencies.jar"
);ステップ 7:UDF の呼び出し
スカラー UDF
SELECT MY_UDF_JSON_GET('{"key":"{\\"in\\":2}"}', '$.key.in');UDAF
SELECT MY_SUM_INT(col1);UDWF
SELECT MY_WINDOW_SUM_INT(intcol)
OVER (PARTITION BY intcol2
ORDER BY intcol3
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)
FROM test_basic;UDTF
-- テーブル t1 には a, b, c1 の列があると仮定します
SELECT t1.a, t1.b, t1.c1 FROM t1;
-- 出力:
-- 1, 2.1, "hello world"
-- 2, 2.2, "hello UDTF."
-- c1 を 1 行に 1 単語ずつ分割します
SELECT t1.a, t1.b, MY_UDF_SPLIT FROM t1, MY_UDF_SPLIT(t1.c1);
-- 出力:
-- 1, 2.1, "hello"
-- 1, 2.1, "world"
-- 2, 2.2, "hello"
-- 2, 2.2, "UDTF."MY_UDF_SPLIT は、SELECT 句内の関数呼び出し時に生成される列エイリアスです。UDTF の結果に対して、AS t2(f1) を使用してテーブル エイリアスまたは列エイリアスを割り当てることはできません。
UDF の表示
SHOW [GLOBAL] FUNCTIONS;UDF の削除
DROP [GLOBAL] FUNCTION <function_name>(arg_type [, ...]);よくある質問
UDF で静的変数を使用できますか? 異なる UDF の静的変数は互いに影響しますか?
はい。静的変数は UDF クラスごとに分離されているため、2 つのクラスが同じ名前を共有している場合でも、他の UDF クラスの静的変数と干渉することはありません。