このトピックでは、Dataflowクラスタに関するよくある質問への回答を提供します。
クラスタの使用とO&Mに関する質問:
ジョブに関する質問:
DataflowクラスタでFlinkジョブを実行して、パスワードなしでOSSからデータを読み書きするときにエラーが報告された場合はどうすればよいですか?
flink runコマンドを実行してジョブを開始するときに、Flinkジョブのパラメータが有効にならないのはなぜですか?
次のエラーメッセージが返された場合はどうすればよいですか?クラスパスで見つかった「...」を実装する識別子「...」の複数のファクトリ
次のエラーメッセージが返された場合はどうすればよいですか?java.lang.OutOfMemoryError: GCオーバーヘッド制限を超えました
次のエラーメッセージが返された場合はどうすればよいですか?スレッド「main」の例外java.lang.NoSuchFieldError: DEPLOYMENT_MODE
クラスタのログはどこに保存されますか?ログを表示するにはどうすればよいですか?
JobManagerの状態に基づいて、次の方法でクラスタのログを表示できます。
FlinkクラスタのJobManagerが停止している場合は、クラスタのノードで yarn logs -applicationId application_xxxx_yy コマンドを実行して、ログをオンプレミスマシンにプルすることで、ログを表示できます。また、YARN Web UIで完了したジョブのログリンクにアクセスして、Webページでログを表示することもできます。
FlinkクラスタのJobManagerが実行されている場合は、次のいずれかの方法でログを表示できます。
FlinkジョブのWeb UIでログを表示します。
yarn logs -applicationId application_xxxx_yy -am ALL -logFiles jobmanager.log コマンドを実行して、JobManagerログを表示します。yarn logs -applicationId application_xxxx_yy -containerId container_xxxx_yy_aa_bb -logFiles taskmanager.log コマンドを実行して、TaskManagerログを表示します。
ジョブのJARパッケージがクラスタのFlinkのJARパッケージと競合する場合はどうすればよいですか?
ほとんどの場合、この問題が発生した場合、NoSuchFieldError、NoSuchMethodError、またはClassNotFoundException などのエラーがジョブログに記録されます。次の手順を実行して、問題のトラブルシューティングと解決を行うことができます。
競合の原因となっている依存クラスを特定します。エラーログを確認して、競合の原因となっているクラスを見つけます。クラスが存在するJARパッケージを見つけます。次に、ジョブの pom.xml ファイルが保存されているディレクトリで mvn dependency:tree コマンドを実行して、JAR依存関係ツリーを表示します。
競合の原因となっている依存クラスを除外します。
pom.xml ファイルでJARパッケージのscopeパラメータが誤って設定されている場合は、scopeパラメータの値をprovidedに変更して、JARパッケージを除外できます。
競合の原因となっているクラスが存在するJARパッケージを使用する必要がある場合は、依存関係でクラスを除外できます。
競合の原因となっているクラスをクラスタ内の対応するバージョンのクラスに置き換えることができない場合は、Maven Shade Pluginを使用してクラスをシェードできます。
さらに、クラスパスでJARパッケージの複数のバージョンが指定されている場合、ジョブで使用されるクラスのバージョンは、クラスのロード順序によって異なります。クラスがロードされるJARパッケージを確認するには、flink-conf.yamlファイルでJava仮想マシン(JVM)パラメータ env.java.opts: -verbose:class を指定するか、動的パラメータ -Denv.java.opts="-verbose:class" を指定します。このようにして、システムはロードされたクラスと、クラスがロードされたJARパッケージを記録します。
説明JobManagerまたはTaskManagerの場合、前述の情報は
jobmanager.outファイルまたはtaskmanager.outファイルに記録されます。
Dataflow クラスタにデプロイされていないクライアントを使用して、Flink ジョブを Dataflow クラスタに送信するにはどうすればよいですか?
Dataflow クラスタにデプロイされていないクライアントを使用して、Flink ジョブを Dataflow クラスタに送信するには、次の手順を実行します。
Dataflow クラスタがクライアントに接続されていることを確認します。
クライアントの Hadoop YARN 環境を構成します。
Dataflow クラスタでは、Hadoop YARN ソフトウェアは /opt/apps/YARN/yarn-current ディレクトリにインストールされ、構成ファイルは /etc/taihao-apps/hadoop-conf/ ディレクトリに保存されます。 yarn-current ディレクトリと hadoop-conf ディレクトリにあるファイルをダウンロードし、Flink ジョブの送信に使用するクライアントに保存する必要があります。
次に、クライアントで次のコマンドを実行して、環境変数を構成します。
export HADOOP_HOME=/path/to/yarn-current && \ export PATH=${HADOOP_HOME}/bin/:$PATH && \ export HADOOP_CLASSPATH=$(hadoop classpath) && \ export HADOOP_CONF_DIR=/path/to/hadoop-conf重要yarn-site.xml などの Hadoop 構成ファイルでは、ResourceManager などのコンポーネントは、サービスアドレスとして完全修飾ドメイン名(FQDN)を使用します。例: master-1-1.c-xxxxxxxxxx.cn-hangzhou.emr.aliyuncs.com。したがって、Dataflow クラスタにデプロイされていないクライアントを使用して Flink ジョブを送信する場合は、これらの FQDN を解決できることを確認するか、FQDN を IP アドレスに変更してください。
上記の構成を完了したら、Dataflow クラスタにデプロイされていないクライアントで Flink ジョブを開始します。たとえば、
flink run -d -t yarn-per-job -ynm flink-test $FLINK_HOME/examples/streaming/TopSpeedWindowing.jarコマンドを実行して、Flink ジョブを開始できます。その後、Dataflow クラスタの YARN Web UI で Flink ジョブを表示できます。
Dataflow クラスタにデプロイされていないクライアントを使用して Flink ジョブを送信する場合、クライアントの Dataflow クラスタの構成ファイルで指定されているホスト名をどのように解決すればよいですか?
次のいずれかの方法を使用して、Dataflow クラスタにデプロイされていないクライアントの Dataflow クラスタの構成ファイルで指定されているホスト名を解決します。
クライアントの /etc/hosts ファイルを変更して、ホスト名と IP アドレス間のマッピングを追加します。
Alibaba Cloud DNS PrivateZone を使用します。
また、次の JVM パラメータを指定して、独自の DNS サービスを使用することもできます。
env.java.opts.client: "-Dsun.net.spi.nameservice.nameservers=xxx -Dsun.net.spi.nameservice.provider.1=dns,sun -Dsun.net.spi.nameservice.domain=yyy"
Flink ジョブの状態を表示するにはどうすればよいですか?
E-MapReduce(EMR)コンソールを使用します。
EMR は Knox をサポートしており、インターネット経由で YARN や Flink などのサービスの Web UI にアクセスできます。YARN Web UI から Apache Flink ダッシュボードページに移動して、Flink ジョブの状態を表示できます。詳細については、「基本的な使い方」をご参照ください。
SSH トンネルを使用します。詳細については、「オープンソースコンポーネントの Web UI にアクセスするための SSH トンネルの作成」をご参照ください。
YARN の RESTful API を呼び出します。
curl --compressed -v -H "Accept: application/json" -X GET "http://master-1-1:8088/ws/v1/cluster/apps?states=RUNNING&queue=default&user.name=***"説明YARN の RESTful API への呼び出しを許可するには、セキュリティグループでポート 8443 と 8088 が有効になっていることを確認してください。または、Dataflow クラスタとアクセスするノードが同じ仮想プライベートクラウド(VPC)にあることを確認してください。
Flink ジョブのログを表示するにはどうすればよいですか?
Flink ジョブが実行されている場合は、その Web UI で Flink ジョブのログを表示できます。
Flink ジョブが完了している場合は、Flink HistoryServer で、または
yarn logs -applicationId application_xxxx_yyyyコマンドを実行することで、Flink ジョブの統計情報を表示できます。デフォルトでは、完了した Flink ジョブのログは、Hadoop 分散ファイルシステム(HDFS)クラスタの hdfs:///tmp/logs/$USERNAME/logs/ ディレクトリに保存されます。
Dataflow クラスタの Flink HistoryServer にアクセスするにはどうすればよいですか?
デフォルトでは、Flink HistoryServer は、Dataflow クラスタのマスターサーバーグループの最初のサーバーである master-1-1 ノードのポート 18082 で起動されます。Flink HistoryServer は、完了した Flink ジョブの統計情報を収集します。Flink HistoryServer にアクセスするには、次の手順を実行します。
セキュリティグループルールを構成して、master-1-1 ノードのポート 18082 へのアクセスを有効にします。
http://$master-1-1-ip:18082 にアクセスします。
Flink HistoryServer は、完了した Flink ジョブのログを保存しません。ログを表示するには、YARN API 操作を実行するか、YARN Web UI にアクセスします。
Dataflow クラスタでサポートされている商用コネクタを使用するにはどうすればよいですか?
Dataflow クラスタは、Hologres、Log Service、MaxCompute、DataHub、Elasticsearch、ClickHouse コネクタなど、さまざまな商用コネクタを提供します。Flink ジョブでは、オープンソースコネクタまたは商用コネクタを使用できます。この例では、Hologres コネクタを使用します。
コネクタをインストールする
Hologres コネクタの JAR パッケージをダウンロードし、オンプレミスマシンで Hologres コネクタを Maven にインストールします。Hologres コネクタの JAR パッケージは、Dataflow クラスタの /opt/apps/FLINK/flink-current/opt/connectors ディレクトリに保存されます。
mvn install:install-file -Dfile=/path/to/ververica-connector-hologres-1.13-vvr-4.0.7.jar -DgroupId=com.alibaba.ververica -DartifactId=ververica-connector-hologres -Dversion=1.13-vvr-4.0.7 -Dpackaging=jar次の依存関係を pom.xml ファイルに追加します。
<dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-hologres</artifactId> <version>1.13-vvr-4.0.7</version> <scope>provided</scope> </dependency>
ジョブを実行する
方法 1:
Hologres コネクタの JAR パッケージを別のディレクトリにコピーします。
hdfs mkdir hdfs:///flink-current/opt/connectors/hologres/ hdfs cp hdfs:///flink-current/opt/connectors/ververica-connector-hologres-1.13-vvr-4.0.7.jar hdfs:///flink-current/opt/connectors/hologres/ververica-connector-hologres-1.13-vvr-4.0.7.jarジョブの送信に使用するコマンドに次の情報を追加します。
-D yarn.provided.lib.dirs=hdfs:///flink-current/opt/connectors/hologres/
方法 2:
Hologres コネクタの JAR パッケージを、Flink ジョブの送信に使用するクライアントの /opt/apps/FLINK/flink-current/opt/connectors/ververica-connector-hologres-1.13-vvr-4.0.7.jar ディレクトリにコピーします。このディレクトリは、Dataflow クラスタの Hologres コネクタの JAR パッケージを保存するディレクトリと同じ構造になっています。
ジョブの送信に使用するコマンドに次の情報を追加します。
-C file:///opt/apps/FLINK/flink-current/opt/connectors/ververica-connector-hologres-1.13-vvr-4.0.7.jar
方法 3: Hologres コネクタの JAR パッケージを実行するジョブの JAR パッケージに追加します。
GeminiStateBackend を使用するにはどうすればよいですか?
Dataflow クラスタは、エンタープライズ版のステートバックエンドである GeminiStateBackend を提供します。GeminiStateBackend のパフォーマンスは、オープンソースのステートバックエンドの 3 ~ 5 倍です。デフォルトでは、Dataflow クラスタの構成ファイルで GeminiStateBackend が使用されます。GeminiStateBackend の詳細設定については、「GeminiStateBackend の構成」をご参照ください。
オープンソースのステートバックエンドを使用するにはどうすればよいですか?
デフォルトでは、Dataflow クラスタの構成ファイルで GeminiStateBackend が使用されます。ジョブに RocksDB などのオープンソースのステートバックエンドを使用する場合は、-D パラメータを使用してステートバックエンドを指定できます。例:
flink run-application -t yarn-application -D state.backend=rocksdb /opt/apps/FLINK/flink-current/examples/streaming/TopSpeedWindowing.jar後続のジョブで上記の構成を有効にするには、E-MapReduce(EMR)コンソールで [state.backend] パラメータの値を使用するステートバックエンドに変更します。たとえば、ステートバックエンドを RocksDB に変更できます。[保存] をクリックし、[クライアント構成のデプロイ] をクリックします。
クライアントログはどこに保存されますか?クライアントログを表示するにはどうすればよいですか?
EMR クラスタでは、FLINK_LOG_DIR 環境変数は、Flink クライアントのログが保存されるディレクトリを指定します。デフォルトのディレクトリは /var/log/taihao-apps/flink です。V3.43.0 より前の EMR バージョンでは、デフォルトのディレクトリは /mnt/disk1/log/flink です。SQL Client などのクライアントの完全なログは、このディレクトリの対応するファイルで表示できます。
flink run コマンドを実行してジョブを開始するときに、Flink ジョブのパラメータが有効にならないのはなぜですか?
コマンドを実行して Flink ジョブを開始する場合は、Flink ジョブのパラメータを Flink ジョブの JAR パッケージの後ろに配置する必要があります。例: flink run -d -t yarn-per-job test.jar arg1 arg2。
次のエラーメッセージが返された場合はどうすればよいですか?クラスパスで見つかった「...」を実装する識別子「...」の複数のファクトリ
原因
このエラーメッセージは、コネクタの複数の実装がクラスパスで見つかったことを示しています。ほとんどの場合、コネクタの依存関係がジョブの JAR 依存関係に追加されているにもかかわらず、コネクタのパッケージが $FLINK_HOME/ib ディレクトリに手動で配置されているために、依存関係の競合が発生します。
解決策
重複する依存関係を削除します。詳細については、「ジョブの JAR パッケージがクラスタの Flink の JAR パッケージと競合する場合はどうすればよいですか?」をご参照ください。
Flink ジョブの安定性を向上させるために、JobManager の高可用性を有効にするにはどうすればよいですか?
Dataflow クラスタでは、Flink ジョブは YARN にデプロイされて実行されます。Apache Flink コミュニティが提供するドキュメントの「構成」セクションに記載されている手順に従って、JobManager の高可用性を有効にして、Flink ジョブの安定した実行を実現できます。構成例:
high-availability: zookeeper
high-availability.zookeeper.quorum: 192.168.**.**:2181,192.168.**.**:2181,192.168.**.**:2181
high-availability.zookeeper.path.root: /flink
high-availability.storageDir: hdfs:///flink/recoveryデフォルトでは、高可用性が有効になった後、障害が発生した場合、JobManager は 1 回だけ再起動できます。JobManager を複数回再起動できるようにするには、YARN の yarn.resourcemanager.am.max-attempts パラメータと、Flink の yarn.application-attempts パラメータを構成する必要があります。詳細については、「Apache Flink 公式ドキュメント」をご参照ください。JobManager が繰り返し再起動されないようにするには、yarn.application-attempt-failures-validity-interval パラメータの値を増やします。このパラメータのデフォルト値は 10000 です。単位: ミリ秒。デフォルト値は 10 秒を示します。このパラメータの値を 300000(5 分に相当)に増やすことができます。
Flink ジョブのメトリックを表示するにはどうすればよいですか?
EMRコンソールにログオンし、管理するクラスターの[モニタリング] タブに移動します。[モニタリング] タブで、[メトリックモニタリング] をクリックします。
[ダッシュボード] ドロップダウンリストから [FLINK] を選択します。
アプリケーション ID とジョブ ID を選択します。Flink ジョブのメトリックが表示されます。
説明アプリケーション ID とジョブ ID は、既存の Flink ジョブがクラスタで実行されている場合にのみ使用できます。
sourceIdleTime メトリックなど、一部のメトリックの出力情報は、メトリックのソースとシンクが構成されている場合にのみ使用できます。
アップストリームおよびダウンストリームストレージに関連する問題のトラブルシューティングを行うにはどうすればよいですか?
詳細については、「アップストリームおよびダウンストリームストレージ」をご参照ください。
Dataflow クラスタで Flink ジョブを実行して、パスワードなしで OSS からデータを読み書きするときにエラーが報告された場合はどうすればよいですか?
具体的なエラーメッセージに基づいてエラーを解決します。
エラーメッセージ:
java.lang.UnsupportedOperationException: Hadoop での回復可能なライターは HDFS でのみサポートされています原因: Dataflow クラスタは、組み込みの JindoSDK を使用して、Object Storage Service(OSS)へのパスワードなしアクセスを実装し、StreamingFileSink などの API をサポートしています。コミュニティドキュメントに記載されている追加の構成を実行する必要はありません。そうしないと、依存関係の競合が原因でこのエラーが発生する可能性があります。
解決策: クラスタのジョブの送信に使用するノードの $FLINK_HOME/plugins ディレクトリに oss-fs-hadoop ディレクトリが存在するかどうかを確認します。 oss-fs-hadoop ディレクトリが存在する場合は、ディレクトリを削除して、ジョブを再送信します。
エラーメッセージ:
スキーム 'oss' のファイルシステム実装が見つかりませんでした。このスキームは、次のプラグインを通じて Flink によって直接サポートされています: flink-oss-fs-hadoop。....原因: EMR V3.40 以前のバージョンのクラスタでは、マスターサーバーグループの master-1-1 以外のノードで、Jindo に関連する JAR パッケージが欠落している可能性があります。
解決策:
EMR V3.40.0 のクラスタ、または EMR V3.40.0 より前のマイナーバージョンのクラスタでは、jindo-flink-4.0.0-full.jar などの Jindo に関連する JAR パッケージが、クラスタのジョブの送信に使用するノードの $FLINK_HOME/lib ディレクトリに存在するかどうかを確認します。Jindo に関連する JAR パッケージが存在しない場合は、クラスタで次のコマンドを実行して、Jindo に関連する JAR パッケージを $FLINK_HOME/lib ディレクトリにコピーしてから、ジョブを再送信します。
cp /opt/apps/extra-jars/flink/jindo-flink-*-full.jar $FLINK_HOME/libEMR V3.40.0 より後のマイナーバージョン
YARN モードの Flink: OSS アクセスのメカニズムが最適化されています。Jindo に関連する JAR パッケージが $FLINK_HOME/lib ディレクトリに存在しない場合でも、OSS からデータを読み書きするために使用されるジョブは正常に実行できます。
その他のデプロイモード: jindo-flink-4.0.0-full.jar などの Jindo に関連する JAR パッケージが、クラスタのジョブの送信に使用するノードの $FLINK_HOME/lib ディレクトリに存在するかどうかを確認します。Jindo に関連する JAR パッケージが存在しない場合は、クラスタで次のコマンドを実行して、Jindo に関連する JAR パッケージを $FLINK_HOME/lib ディレクトリにコピーしてから、ジョブを再送信します。
cp /opt/apps/extra-jars/flink/jindo-flink-*-full.jar $FLINK_HOME/lib
次のエラーメッセージが返された場合はどうすればよいですか?java.util.concurrent.TimeoutException: ID を持つ TaskManager のハートビートがタイムアウトしました
原因
直接の原因は、TaskManager のハートビートがタイムアウトしたことです。TaskManager ログの具体的なエラーを表示して、原因を特定できます。また、TaskManager のヒープメモリサイズが制限されているか、ジョブコードにメモリリークがあるために、メモリ不足(OOM)エラーが発生する可能性があります。詳細については、「次のエラーメッセージが返された場合はどうすればよいですか?java.lang.OutOfMemoryError: GC オーバーヘッド制限を超えました」をご参照ください。
解決策
OOM エラーが発生した場合は、メモリサイズを増やすか、ジョブのメモリ使用量を分析して、原因をさらに特定します。
次のエラーメッセージが返された場合はどうすればよいですか?java.lang.OutOfMemoryError: GC オーバーヘッド制限を超えました
原因
このエラーメッセージは、ジョブに構成されているメモリが不足しているために、ガベージコレクション(GC)がタイムアウトしたことを示しています。一般的な原因は、ユーザー定義関数(UDF)などのジョブコードがメモリリークを引き起こしているか、メモリサイズがビジネス要件を満たしていないことです。
解決策
ジョブを再送信してエラーを再現するときは、-D パラメータを使用して、OOM エラー時にヒープダンプを作成するための JVM パラメータを指定します。例:
-D env.java.opts="-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/dump.hprof"。OOM エラー時にヒープダンプを作成するには、
env.java.opts: -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/dump.hprofパラメータを flink-conf.yaml ファイルに追加します。
エラーが再現されたら、HeapDumpPath パラメータで指定されたパスにあるヒープダンプファイルを分析します。たとえば、Memory Analyzer Tool(MAT)または Java VisualVM を使用してファイルを分析し、エラーの根本原因を特定できます。
Flink Web UI でジョブに 1 つのオペレータのみが表示され、「受信レコード」メトリックの値が 0 なのはなぜですか?
これは正常な状態です。Flink の「受信レコード」メトリックは、異なるオペレータ間のデータ通信を示すために使用されます。ジョブが 1 つのオペレータのみを持つように最適化されている場合、このメトリックの値は常に 0 になります。
Flink ジョブのフレームグラフを有効にするにはどうすればよいですか?
フレームグラフは、プロセス内の各メソッドの CPU 負荷を視覚化します。これは、Flink ジョブのパフォーマンスボトルネックの問題を解決するのに役立ちます。フレームグラフ機能は、Flink 1.13 以降でサポートされています。ただし、本番環境のジョブに対するフレームグラフの影響を回避するために、フレームグラフ機能はデフォルトで無効になっています。 Flink ジョブのパフォーマンスを分析するためにフレームグラフ機能を使用する場合は、次の操作を実行します。EMR コンソールにログインし、Flink サービスページの [構成] タブに移動します。[構成] タブで、[flink-conf.yaml] をクリックします。 [flink-conf.yaml] タブで、rest.flamegraph.enabled 構成項目を追加し、値を true に設定します。構成項目の追加方法の詳細については、「構成項目の管理」をご参照ください。
フレームグラフの詳細については、「フレームグラフ」をご参照ください。
次のエラーメッセージが返された場合はどうすればよいですか?スレッド「main」の例外 java.lang.NoSuchFieldError: DEPLOYMENT_MODE
原因
ジョブの JAR パッケージに、クラスタの Flink のバージョンと互換性のない flink-core 依存関係が含まれています。
解決策
次の依存関係を pom.xml ファイルに追加します。この依存関係では、
scopeパラメータはprovidedに設定されています。<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <!-- Change to your own flink version --> <version>1.16.1</version> <scope>provided</scope> </dependency>説明上記の依存関係の
versionを、使用する Flink のバージョンに変更します。互換性のない依存関係がどのように導入されるかの詳細については、「ジョブの JAR パッケージがクラスタの Flink の JAR パッケージと競合する場合はどうすればよいですか?」をご参照ください。