SQL デプロイメントで使用する前に、ユーザー定義関数 (UDF) を登録します。本トピックでは、Realtime Compute for Apache Flink における UDF の登録、更新、および削除方法について説明します。
前提条件
作業を開始する前に、以下の要件を満たしていることを確認してください。
RAM ユーザーまたは RAM ロールに名前空間のアクセス権限が付与されていること。詳細については、「名前空間の権限付与」をご参照ください。
アップロード可能な UDF JAR ファイル(Java UDF の場合)または Python ファイル(Python UDF の場合)を準備済みであること。
JAR 依存関係の競合を回避する:
UDF JAR ファイルをビルドする際は、実行時に依存関係の競合を防ぐために、以下のルールに従ってください。
UDF プロジェクトで使用する Flink のバージョンを、SQL エディターで選択したエンジンバージョンと一致させること。
POM ファイル内で Flink 関連の依存関係を
<scope>provided</scope>としてマークすること。サードパーティ製の依存関係を JAR にパッケージングする場合は、Apache Maven Shade Plugin を使用すること。
トラブルシューティング手順については、「Flink の依存関係競合をトラブルシューティングする方法」をご参照ください。
Python UDF の要件(デプロイメントレベルのみ):
Ververica Runtime (VVR) 8.0.3 以降が必要です。
[設定] タブの [パラメーター] セクションで、依存ファイル(
python.filesまたはpython.archivesなど)を設定します。Python UDF の下書きに対して構文チェックはサポートされていません。デプロイ前に構文チェックをスキップしてください。
カタログ UDF とデプロイメントレベル UDF の選択
Realtime Compute for Apache Flink では、2 種類の UDF スコープがサポートされています。ユースケースに応じて適切な方を選択してください。
| 項目 | カタログ UDF | デプロイメントレベル UDF |
|---|---|---|
| 範囲 | ワークスペース内のすべてのデプロイメントで利用可能 | 単一のデプロイメントに紐付けられる |
| 登録方法 | SQL エディター > UDF タブ > UDF アーティファクトの登録 | アーティファクトをアップロード > [追加依存関係] から選択 > SQL 内で CREATE TEMPORARY FUNCTION を実行 |
| エンジンバージョン | デフォルトで Realtime Compute for Apache Flink の最新バージョンがデータ解析に使用される — 古いデプロイメントと競合する可能性あり | デプロイメントのエンジンバージョンおよび依存関係セットに紐付けられる |
カタログ UDF がご利用のデプロイメントの古いエンジンバージョンと互換性の問題を引き起こす場合は、代わりにデプロイメントレベル UDF を使用してください。
カタログ UDF の登録
Realtime Compute for Apache Flink コンソール にログインします。
ワークスペースを見つけ、[操作] 列の [コンソール] をクリックします。
左側のナビゲーションウィンドウで、[開発] > [ETL] をクリックします。
SQL エディター画面の左側で、[UDF] タブをクリックし、次に [UDF アーティファクトの登録] をクリックします。
[UDF アーティファクトの登録] ダイアログボックスで、UDF JAR ファイルをアップロードします。

以下のいずれかのアップロード方法を選択します。
[ファイルをアップロード]: [ファイルを選択] の横にある [クリックして選択] をクリックして JAR ファイルをアップロードします。依存ファイルを追加するには、[依存関係] の横にある [クリックして選択] をクリックします。
説明JAR ファイルは、ワークスペースに関連付けられた OSS バケットの
sql-artifactsディレクトリに保存されます。Java UDF の場合、依存関係を JAR にパッケージングするか、別途アップロードできます。Python UDF の場合は、依存関係を別ファイルとしてアップロードしてください。
[外部 URL]: 外部ホスト上の既存の UDF JAR ファイルの URL を入力します。
説明以下の外部 URL のみがサポートされています。
ワークスペース購入時に指定した Object Storage Service (OSS) バケットのエンドポイント。このエンドポイントは、Realtime Compute for Apache Flink コンソールの [ワークスペースの詳細] で確認できます。
Realtime Compute for Apache Flink からアクセス可能な外部ストレージシステムのエンドポイントで、公開読み取りのアクセス制御リスト (ACL) が設定されている、または Flink へのアクセスが明示的に許可されているもの。
[確認] をクリックします。
[関数の管理] ダイアログボックスの [利用可能な関数] セクションで、登録する UDF を選択し、[関数の作成] をクリックします。Flink は JAR ファイルを解析し、UDF、ユーザー定義集計関数 (UDAF)、またはユーザー定義テーブル関数 (UDTF) インターフェイスを実装するクラスを自動検出します。クラス名は自動的に [関数名] 列に入力されます。登録後、UDF は SQL エディター左側の [UDF] ペインに表示され、各関数名の先頭に黄色でハイライトされた fx 文字が付与されます。
説明カタログ UDF を登録する際は、デフォルトで Realtime Compute for Apache Flink の最新バージョンがデータ解析に使用されます。ご利用のデプロイメントが古いエンジンバージョンを使用している場合([設定] タブの [基本] セクションで設定)、互換性の問題が発生する可能性があります。その場合は、代わりにデプロイメントレベル UDF を使用してください。「デプロイメントレベル UDF の登録」をご参照ください。

デプロイメントレベル UDF の登録
デプロイメントレベル UDF は単一のデプロイメントにスコープされます。デプロイメントが古いエンジンバージョンで実行されている場合や、Python UDF のサポートが必要な場合にこの方法を使用します。
JAR または Python ファイルをアップロードします。左側のナビゲーションウィンドウで、[アーティファクト] をクリックします。アーティファクトページで、[アーティファクトのアップロード] をクリックし、JAR または Python ファイルをアップロードします。
デプロイメントの依存関係としてファイルを追加します。SQL エディター画面の [設定] タブで、[追加依存関係] から JAR または Python ファイルを選択します。
SQL 下書き内で以下のいずれかの文を使用して UDF を登録します。
Java UDF:
CREATE TEMPORARY FUNCTION yourfunctionname;Python UDF:
CREATE TEMPORARY FUNCTION yourfunctionname LANGUAGE Python;
UDF の更新
既存の JAR ファイル内の UDF コードを変更したり、関数を追加したりした場合は、変更内容を適用するためにアーティファクトを更新します。
新しい JAR ファイルには、変更した関数だけでなく、これまで登録されていたすべての UDF クラスを含める必要があります。
更新されたコードは、デプロイメントの下書きを再起動するか、新しい下書きを公開した場合にのみ有効になります。実行中のジョブは引き続き元の JAR ファイルを使用します。
Realtime Compute for Apache Flink コンソール にログインします。
ワークスペースを見つけ、[操作] 列の [コンソール] をクリックします。
左側のナビゲーションウィンドウで、[開発] > [ETL] をクリックします。
SQL エディター画面の左側で、[UDF] タブをクリックします。
[UDF] ペインで、UDF 名にマウスカーソルを合わせ、
アイコンをクリックします。[UDF アーティファクトの登録] ダイアログボックスで、新しい JAR ファイルをアップロードします。

[更新] をクリックします。
UDF の削除
UDF JAR ファイルを削除する前に、そのファイルから登録された UDF をどのデプロイメントまたは SQL ファイルも参照していないことを確認してください。
Realtime Compute for Apache Flink コンソール にログインします。
ワークスペースを見つけて、[操作] 列で [コンソール] をクリックします。
左側のナビゲーションウィンドウで、[開発] > [ETL] をクリックします。
SQL エディター画面の左側で、[UDF] タブをクリックします。
[UDF] ペインで、UDF JAR ファイル名にマウスカーソルを合わせ、
アイコンをクリックします。[関連付けられたファイルの削除] を選択して、ファイルから登録済みの UDF をすべて削除します。これにより、ワークスペースにダーティデータが残ることを防ぎます。
[確認] をクリックします。
次のステップ
Java UDF の開発(UDF の分類、パラメーターの渡し方、および使用方法を含む)については、「Java UDF 開発ガイド」をご参照ください。ユーザー定義集約関数については、「UDAF」をご参照ください。
Python UDF の開発(分類、依存関係、デバッグ、および使用量を含む)については、「Python UDF 開発ガイド」をご参照ください。ユーザー定義集約関数については、「UDAFs」をご参照ください。
UDAF を使用してデータをソートおよび集計する方法については、「UDAF を使用したデータのソートと集計」をご参照ください。