オンプレミス環境で Realtime Compute for Apache Flink コネクタを使用するプログラムをデバッグできます。これにより、コードの正確性を迅速に検証し、問題を特定して解決することで、クラウド移行のコストを節約できます。
背景情報
IntelliJ IDEA で Realtime Compute for Apache Flink コネクタを使用する Flink プログラムをデバッグすると、コネクタ関連のクラスが見つからないという問題が発生する場合があります。たとえば、MaxCompute コネクタを使用するプログラムをデバッグすると、次の例外が発生します。
Caused by: java.lang.ClassNotFoundException: com.alibaba.ververica.connectors.odps.newsource.split.OdpsSourceSplitSerializer
at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)このエラーは、デフォルトのコネクタ JAR に特定のランタイムクラスがないために発生します。
不足しているクラスを追加するには、次の手順を実行します。
手順
ステップ 1: 依存関係を追加する
Maven 中央リポジトリからランタイムクラスを含むコネクタ uber JAR パッケージをダウンロードします。
たとえば、MaxCompute コネクタの ververica-connector-odps 依存関係のバージョンが 1.17-vvr-8.0.11-1 の場合、Maven 中央リポジトリのディレクトリから
ververica-connector-odps-1.17-vvr-8.0.11-1-uber.jarパッケージをオンプレミスのディレクトリにダウンロードする必要があります。実行環境を取得するときに、
pipeline.classpathsパラメーターを uber JAR パッケージのパスに設定します。複数のコネクタ依存関係が存在する場合、パッケージのパスをセミコロン (;) で区切ります。たとえば、このパラメーターを
file:///path/to/a-uber.jar;file:///path/to/b-uber.jarに設定できます。Windows では、file:///D:/path/to/a-uber.jar;file:///E:/path/to/b-uber/jarのように、関連するディスクの名前をパスに追加する必要があります。次のサンプルコードは、DataStream プログラムの構成を示しています。Configuration conf = new Configuration(); conf.setString("pipeline.classpaths", "file://" + "uber JAR パッケージの絶対パス"); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);次のサンプルコードは、Table API プログラムの構成を示しています。
Configuration conf = new Configuration(); conf.setString("pipeline.classpaths", "file://" + "uber JAR パッケージの絶対パス"); EnvironmentSettings envSettings = EnvironmentSettings.newInstance().withConfiguration(conf).build(); TableEnvironment tEnv = TableEnvironment.create(envSettings);
コンパイルされた JAR を Realtime Compute for Apache Flink にアップロードする前に、
pipeline.classpaths構成を削除する必要があります。オンプレミスで MaxCompute コネクタを使用し、
1.17-vvr-8.0.11-1より古いフレームワークバージョンで実行される Flink プログラムをデバッグするには、1.17-vvr-8.0.11-1コネクタの uber JAR を使用する必要があります。クラウドデプロイメント用にプログラムの JAR をビルドする場合、古いバージョンのコネクタ uber JAR を含めることができますが、新しいフレームワークバージョンでのみサポートされるコネクタオプションは必ず削除してください。MySQL コネクタを使用するプログラムをデバッグするには、「オンプレミス環境で MySQL コネクタを含む DataStream プログラムをデバッグする」の説明に従って、Maven で依存関係を構成する必要もあります。
オンプレミスでデバッグを行う際は、Flink アプリケーションとアップストリーム/ダウンストリーム サービス間のネットワーク接続性を確保する必要があります。次の 2 つのオプションがあります。
Flink と同じネットワーク上でアップストリーム/ダウンストリーム サービスをローカルに実行します。
クラウドサービスをデータソースまたはデスティネーションとして使用するには、Flink がインターネット経由でアクセスできることを確認します。また、デバイスのパブリック IP アドレスをクラウドサービスのホワイトリストに追加します。
ステップ 2: ジョブの実行に必要な ClassLoader JAR パッケージを構成する
Flink がコネクタのランタイムクラスをロードできるようにするには、ClassLoader JAR パッケージを構成に追加します。
VVR バージョンに基づいて ClassLoader JAR パッケージをダウンロードするには、クリックします。
IntelliJ IDEA でプログラムファイルを開きます。
エントリクラスの左側にある緑色のアイコンをクリックして、メニューを展開します。

[実行構成の変更...] を選択します。

表示されたウィンドウで、[オプションの変更] をクリックします。

[実行オプションの追加] ドロップダウンリストの [java] セクションで、[クラスパスの変更] を選択します。
[クラスパスの変更] セクションがウィンドウに表示されます。
[クラスパスの変更] セクションで、[+] アイコンをクリックし、[含める] を選択して、ダウンロードした ClassLoader JAR を選択します。
構成を保存します。
説明一般的な Flink クラスが見つからないことを示すエラーメッセージが表示された場合は、[オプションの変更] をクリックし、["provided" スコープで依存関係をクラスパスに追加する] を選択します。
Table API ジョブのデバッグ
VVR 11.1 以降、Realtime Compute for Apache Flink コネクタは Apache Flink の flink-table-common パッケージと完全な互換性がなくなりました。Table API を使用するジョブを実行すると、以下のエラーが発生する場合があります。
java.lang.ClassNotFoundException: org.apache.flink.table.factories.OptionUpgradabaleTableFactory解決策: pom.xml ファイルを更新します。org.apache.flink:flink-table-common を com.alibaba.ververica:flink-table-common に置き換え、正しいバージョンを使用していることを確認してください。
関連ドキュメント
DataStream モードでデータを読み書きする場合は、DataStream コネクタの関連タイプを使用して Realtime Compute for Apache Flink に接続する必要があります。 DataStream コネクタの使用方法と DataStream コネクタを使用する際の注意事項については、詳細については、「JAR ドラフトを開発する」をご参照ください。
Python API ドラフトの開発とデバッグ方法については、詳細については、「Python API ドラフトを開発する」をご参照ください。