このトピックでは、ローカル開発環境で Realtime Compute for Apache Flink のコネクタを含むジョブを実行およびデバッグする方法について説明します。このアプローチにより、コードの正確性を迅速に検証し、問題を特定して解決し、クラウドコストを削減できます。
背景情報
Realtime Compute for Apache Flink の商用コネクタへの依存関係を含む Flink ジョブを IntelliJ IDEA で実行またはデバッグすると、コネクタ関連のクラスが見つからないことを示す実行時エラーが発生することがあります。たとえば、MaxCompute コネクタを含むジョブを実行すると、次の例外が発生します。
Caused by: java.lang.ClassNotFoundException: com.alibaba.ververica.connectors.odps.newsource.split.OdpsSourceSplitSerializer
at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)この例外は、一部のランタイムクラスがデフォルトのコネクタ JAR パッケージに含まれていないために発生します。以下の手順に従って、これらの不足しているクラスを追加し、IntelliJ IDEA でジョブを正常に実行またはデバッグできます。
ローカルデバッグ問題の回避策
ステップ1:ジョブ構成への依存関係の追加
まず、Maven Central Repository から、必要なランタイムクラスを含む uber JAR パッケージをダウンロードします。 たとえば、MaxCompute で使用される ververica-connector-odps 依存関係のバージョンが 1.17-vvr-8.0.11-1 の場合、Maven リポジトリの対応するディレクトリで -uber.jar サフィックスが付いた ververica-connector-odps-1.17-vvr-8.0.11-1-uber.jar ファイルを見つけ、ローカルディレクトリにダウンロードします。
コードで環境を作成する際に、pipeline.classpaths 構成を追加し、その値を uber JAR のパスに設定します。複数のコネクタ依存関係がある場合は、パスをセミコロン (;) で区切ります。例:file:///path/to/a-uber.jar;file:///path/to/b-uber.jar。Windows では、ディスクパーティションを含める必要があります。例:file:///D:/path/to/a-uber.jar;file:///E:/path/to/b-uber.jar。DataStream API ジョブの場合は、次のコードを使用して構成を追加します。
Configuration conf = new Configuration();
conf.setString("pipeline.classpaths", "file://" + "absolute path of the uber jar");
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(conf);Table API ジョブの場合は、次のコードを使用して構成を追加します。
Configuration conf = new Configuration();
conf.setString("pipeline.classpaths", "file://" + "absolute path of the uber jar");
EnvironmentSettings envSettings =
EnvironmentSettings.newInstance().withConfiguration(conf).build();
TableEnvironment tEnv = TableEnvironment.create(envSettings);ジョブをパッケージ化して Realtime Compute for Apache Flink にアップロードする前に、
pipeline.classpaths構成を削除してください。バージョンの違いにより、古いバージョンの `ververica-connector-odps` を使用している場合は、ローカルデバッグ用に
1.17-vvr-8.0.11-1の uber パッケージをダウンロードしてください。ジョブをパッケージ化する際には、古い JAR バージョンを引き続き使用できますが、新しいバージョンで導入されたパラメーターの使用は避けてください。MySQL コネクタをデバッグするには、関連する Maven 依存関係も構成する必要があります。詳細については、「MySQL コネクタを使用する DataStream ジョブのデバッグ」をご参照ください。
ローカルデバッグでは、アップストリームおよびダウンストリームのストレージに対するネットワークの可用性を確保してください。ローカルストレージ、またはパブリックネットワークアクセスが有効になっているクラウドサービスを使用できます。また、ローカルマシンのパブリック IP アドレスを、対応するアップストリームおよびダウンストリームサービスのホワイトリストに追加してください。
ステップ2:ランタイムに必要な ClassLoader JAR パッケージの構成
Flink がコネクタのランタイムクラスをロードできるようにするには、ClassLoader JAR パッケージも追加する必要があります。まず、ご利用の Ververica Runtime (VVR) データベースエンジンのバージョンに基づいて、適切な ClassLoader JAR をダウンロードします。ダウンロードリンクは次のとおりです。
例として、IntelliJ IDEA でジョブのローカル実行構成を変更します。エントリクラスの左側にある緑色のアイコンをクリックしてメニューバーを展開し、[実行構成の変更...] を選択します。


開いた実行構成ウィンドウで、[オプションの変更] をクリックし、[クラスパスの変更] を選択します。ウィンドウの下部に [クラスパスの変更] セクションが表示されます。`+` アイコンをクリックし、ダウンロードした ClassLoader JAR パッケージを選択して、実行構成を保存します。`org.apache.flink.configuration.Configuration` などの一般的な Flink クラスが見つからないというエラーメッセージが表示され、ジョブの実行に失敗した場合は、[オプションの変更] で [provided スコープの依存関係をクラスパスに追加] を選択します。

VVR 11.1 以降、VVR エンジンは JDK 11 を使用します。次の図に示すように、[JVM オプション] に `--add-opens java.base/jdk.internal.loader=ALL-UNNAMED` オプションを追加する必要があります。

Table API ジョブのローカルデバッグのソリューション
VVR 11.1 以降、Ververica コネクタは `flink-table-common` パッケージのコミュニティバージョンと完全な互換性がなくなりました。次のような実行時エラーが発生する可能性がありますが、これらに限定されません。
java.lang.ClassNotFoundException: org.apache.flink.table.factories.OptionUpgradabaleTableFactoryこの問題を解決するには、`pom.xml` ファイルを更新します。`org.apache.flink:flink-table-common` 依存関係を、対応するバージョンの `com.alibaba.ververica:flink-table-common` に置き換えます。
参照
MySQL コネクタをローカルでデバッグする方法については、「MySQL コネクタを使用する DataStream ジョブのデバッグ」をご参照ください。
DataStream を使用してデータを読み書きするには、対応する DataStream コネクタを使用して Realtime Compute for Apache Flink に接続します。DataStream コネクタの使用方法と注意事項の詳細については、「JAR ジョブの開発」をご参照ください。
Python ジョブの開発とデバッグについては、「Python ジョブの開発」をご参照ください。