すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:UDF の管理

最終更新日:Jan 07, 2025

SQL デプロイメントでユーザー定義関数 (UDF) を使用する前に、UDF を登録する必要があります。このトピックでは、UDF の登録、更新、および削除の方法について説明します。

注意事項

  • RAM ユーザーまたは RAM ロールを使用して、名前空間で UDF 管理などの操作を実行する場合は、RAM ユーザーまたは RAM ロールに名前空間へのアクセス権限が付与されていることを確認する必要があります。詳細については、「名前空間への権限の付与」をご参照ください。

  • JAR ファイルの依存関係間の競合を防ぐために、UDF を開発する際には、次の項目に注意してください。

    • SQL エディター ページで選択する Flink のバージョンが、POM 依存関係で指定されている Flink のバージョンと同じであることを確認してください。

    • <scope>provided</scope> を指定します。

    • 他のサードパーティの依存関係をパッケージ化するには、Shade プラグインを使用します。詳細については、「Apache Maven Shade プラグイン」をご参照ください。

    説明

    JAR ファイル間の Flink 依存関係の競合を処理する方法の詳細については、「Flink の依存関係の競合をトラブルシューティングするにはどうすればよいですか?」をご参照ください。

UDF の登録

カタログ UDF の登録

  1. [UDF アーティファクトの登録] ダイアログボックスに移動します。

    1. Realtime Compute for Apache Flink コンソール にログオンします。

    2. 管理するワークスペースを見つけ、[アクション] 列の [コンソール] をクリックします。

    3. 左側のナビゲーションペインで、[開発] > [ETL] をクリックします。

    4. SQL エディターページの左側にある [UDF] タブをクリックします。

  2. [UDF アーティファクトの登録] をクリックします。

  3. [UDF アーティファクトの登録] ダイアログボックスで、UDF JAR ファイルをアップロードします。注册UDF

    次のいずれかの方法を使用して、UDF JAR ファイルをアップロードできます。

    • [ファイルのアップロード][ファイルを選択] の横にある [クリックして選択] をクリックして、アップロードする UDF JAR ファイルをアップロードします。依存関係ファイルをアップロードする場合は、[依存関係] の横にある [クリックして選択] をクリックして、UDF JAR ファイルが依存するファイルをアップロードします。

      説明
      • UDF JAR ファイルは、選択したオブジェクトストレージサービス (OSS) バケットの sql-artifacts ディレクトリにアップロードされて保存されます。

      • Java UDF の依存関係は、UDF JAR ファイルにパッケージ化することも、依存関係ファイルとして個別にアップロードすることもできます。Python UDF の場合は、依存関係を依存関係ファイルとして個別にアップロードすることをお勧めします。

    • [外部 URL]:外部 URL を入力します。別のサービスで使用されている UDF JAR ファイルを使用する場合は、外部 URL を使用して UDF ファイルを取得できます。

      説明

      次の 2 種類の外部 URL のみがサポートされています。

      • Realtime Compute for Apache Flink ワークスペースの購入時に指定したオブジェクトストレージサービス (OSS) バケットのエンドポイント。Realtime Compute for Apache Flink の管理コンソール[ワークスペースの詳細] メッセージで、指定した OSS バケットのエンドポイントを表示できます。

      • Realtime Compute for Apache Flink がアクセスできる別の外部ストレージシステムのエンドポイント。外部ストレージシステムのアクセス制御リスト (ACL) はパブリック読み取りであるか、Realtime Compute for Apache Flink に外部ストレージシステムへのアクセス権限が付与されています。

  4. [確認] をクリックします。

  5. [関数の管理] ダイアログボックスの [使用可能な関数] セクションで、登録する 1 つ以上の UDF を選択し、[関数の作成] をクリックします。

    フルマネージド Flink のコンソールでは、Flink は UDF JAR ファイルを解析し、ファイル内で Flink の UDF、ユーザー定義集計関数 (UDAF)、およびユーザー定義テーブル値関数 (UDTF) インターフェースのクラスが使用されているかどうかを確認します。次に、Flink はクラス名を自動的に抽出し、[関数の管理] ダイアログボックスの [関数名] 列にクラス名を入力します。UDF が登録されると、SQL エディターページの左側にある [UDF] ペインに登録したすべての UDF が表示されます。黄色で強調表示された fx 文字は、登録した各 UDF の名前の左側に表示されます。

    image.png

    説明

    デフォルトでは、カタログ UDF を登録するときに、データ解析に最新バージョンの Realtime Compute for Apache Flink が使用されます。デプロイメントでカタログ UDF を使用し、[構成] タブの [基本] セクションで以前のエンジンバージョンの Realtime Compute for Apache Flink を選択すると、互換性の問題が発生する可能性があります。この問題を解決するには、デプロイメントで必要なエンジンバージョンに基づいて UDF コードを実装し、デプロイメントレベルの UDF を使用できます。詳細については、「デプロイメントレベルの UDF の登録」をご参照ください。

デプロイメントレベルの UDF の登録

重要
  • Ververica Runtime (VVR) 8.0.3 以降を使用する Realtime Compute for Apache Flink のみが、デプロイメントレベルの Python UDF をサポートしています。

  • デプロイメントレベルの Python UDF を使用する場合は、[構成] タブの [パラメーター] セクションで python.files や python.archives などのパラメーターを構成して、関連する依存関係ファイルを指定できます。

  • デプロイメントレベルの Python UDF を使用する場合は、ドラフトの構文チェックを実行できません。ドラフトをデプロイメントとしてデプロイする前に、構文チェックをスキップする必要があります。

デプロイメントレベルの UDF を登録するには、次の手順を実行します。

  1. UDF の JAR または Python ファイルをアップロードします。

    左側のナビゲーションペインで、[アーティファクト] をクリックします。[アーティファクト] ページで、[アーティファクトのアップロード] をクリックして、UDF の JAR または Python ファイルをアップロードします。

  2. デプロイメントでデプロイメントレベルの UDF を指定します。

    SQL エディターページの [構成] タブで、[追加の依存関係] から UDF の JAR または Python ファイルを選択します。

  3. デプロイメントレベルの UDF を登録します。

    • Java UDF

      CREATE TEMPORARY FUNCTION yourfunctionname;
    • Python UDF

      CREATE TEMPORARY FUNCTION yourfunctionname LANGUAGE Python;

UDF の更新

UDF JAR ファイルに UDF を追加するか、ファイル内の登録済み UDF のコードを変更する場合は、次の操作を実行して UDF JAR ファイルを更新できます。

  1. [UDF アーティファクトの登録] ダイアログボックスに移動します。

    1. Realtime Compute for Apache Flink コンソール にログオンします。

    2. 管理するワークスペースを見つけ、[アクション] 列の [コンソール] をクリックします。

    3. 左側のナビゲーションペインで、[開発] > [ETL] をクリックします。

    4. SQL エディターページの左側にある [UDF] タブをクリックします。

  2. SQL エディターページの左側にある [UDF] ペインで、JAR ファイルを更新する UDF の名前にポインターを移動し、更新JAR アイコンをクリックします。

  3. [UDF アーティファクトの登録] ダイアログボックスで、UDF JAR ファイルをアップロードします。更新JAR

    重要
    • アップロードする UDF JAR ファイルには、登録済み UDF のすべてのクラスが含まれている必要があります。

    • 新しい UDF JAR ファイルのコードは、デプロイメントのドラフトを再起動するか、新しいドラフトを公開したときにのみ有効になります。新しい UDF JAR ファイルのコードは、実行中のジョブには影響しません。実行中のジョブは、元の UDF JAR ファイルを引き続き使用します。

  4. [更新] をクリックします。

UDF の削除

説明

UDF JAR ファイルを削除する前に、UDF JAR ファイルを使用して登録された UDF が、デプロイメントまたは SQL ファイルによって参照されていないことを確認してください。

UDF JAR ファイルが不要になった場合は、次の操作を実行して UDF JAR ファイルを削除します。

  1. [UDF アーティファクトの登録] ダイアログボックスに移動します。

    1. Realtime Compute for Apache Flink コンソール にログオンします。

    2. 管理するワークスペースを見つけ、[アクション] 列の [コンソール] をクリックします。

    3. 左側のナビゲーションペインで、[開発] > [ETL] をクリックします。

    4. SQL エディターページの左側にある [UDF] タブをクリックします。

  2. SQL エディターページの左側にある [UDF] ペインで、削除する UDF JAR ファイルの名前にポインターを移動し、删除JAR アイコンをクリックします。

  3. [関連ファイルの削除] を選択します。

    UDF JAR ファイルを削除する場合は、ダーティデータを回避するために、登録されているすべての UDF をファイルから削除する必要があります。

  4. [確認] をクリックします。

参照

  • Java UDF の分類、Realtime Compute for Apache Flink コンソールでの Java UDF パラメーターの渡し方、Java UDF の開発と使用方法の詳細については、「Java」および「UDAF」をご参照ください。

  • Python UDF の分類と依存関係、Python UDF のデバッグ、チューニング、開発、使用方法の詳細については、「Python」および「UDAF」をご参照ください。

  • UDAF を使用してデータをソートおよび集計する方法の詳細については、「UDAF を使用してデータをソートおよび集計する」をご参照ください。