このトピックでは、E-MapReduce (EMR) コンソールで Flink ジョブを送信し、ジョブのステータスを表示する方法について説明します。
背景情報
Dataflow クラスターでは、Flink は YARN 上にデプロイされます。SSH を使用して Dataflow クラスターにログインし、CLI でコマンドを実行して Flink ジョブを送信できます。
YARN モードでデプロイされた Dataflow クラスターは、Session モード、Per-Job Cluster モード、および Application モードでの Flink ジョブの送信をサポートしています。
モード | 説明 | 特徴 |
Session モード | Session モードでは、指定したリソースパラメーターに基づいて Flink クラスターが作成されます。すべてのジョブはこのクラスターに送信されます。ジョブが完了しても、クラスターは自動的にリリースされません。 たとえば、ジョブで例外が発生して Task Manager がシャットダウンすると、この Task Manager で実行されている他のすべてのジョブは失敗します。さらに、クラスターには Job Manager が 1 つしかないため、ジョブの数が増えるにつれて Job Manager への負荷が増加します。 |
これらの特徴に基づき、このモードは、起動時間が短く、実行時間が比較的短いジョブのデプロイに適しています。 |
Per-Job Cluster モード | Per-Job Cluster モードを使用する場合、YARN は送信する Flink ジョブごとに新しい Flink クラスターを起動し、そのジョブを実行します。ジョブが完了またはキャンセルされると、このジョブの Flink クラスターはリリースされます。 |
これらの特徴に基づき、このモードは通常、実行時間が長いジョブに適しています。 |
Application モード | Application モードを使用する場合、YARN は送信する Flink アプリケーション (アプリケーションには 1 つ以上のジョブを含めることができます) ごとに新しい Flink クラスターを起動します。アプリケーションが完了またはキャンセルされると、このアプリケーションの Flink クラスターはリリースされます。 このモードと Per-Job モードの違いは、アプリケーションの JAR パッケージ内の 送信された JAR パッケージに複数のジョブが含まれている場合、これらのジョブはすべて、そのアプリケーションに属するクラスターで実行されます。 |
|
前提条件
Flink モードの Dataflow クラスターが作成されていること。詳細については、「クラスターの作成」をご参照ください。
ジョブの送信とジョブステータスの表示
このトピックでは、Flink の TopSpeedWindowing の例を使用します。TopSpeedWindowing は、長時間実行されるストリーミングジョブです。
ビジネス要件に基づいて、次のいずれかのモードを選択してジョブを送信し、ジョブのステータスを表示できます:
Session モード
SSH モードでクラスターのマスターノードにログインします。詳細については、「クラスターのマスターノードへのログイン」をご参照ください。
次のコマンドを実行して YARN セッションを開始します。
yarn-session.sh --detachedコマンドが正常に実行されると、システムはアプリケーション ID を返します。たとえば、
application_1750137174986_0001です。以降のセクションでは、<application_XXXX_YY>を使用してこの ID を表します。
次のコマンドを実行してジョブを送信します。
flink run --detached /opt/apps/FLINK/flink-current/examples/streaming/TopSpeedWindowing.jarジョブが送信されると、次のような出力情報が返されます。

この出力では、
3785db18d371326758d7843dd2a1****がジョブ ID です。以降のセクションでは、<jobId>を使用してこの ID を表します。次のコマンドを実行してジョブのステータスを表示します。
flink list -t yarn-session -Dyarn.application.id=<application_XXXX_YY>次のような出力情報が返されます。
------------------ Running/Restarting Jobs ------------------- 16.06.2025 18:20:55 : 3785db18d371326758d7843dd2a1**** : CarTopSpeedWindowingExample (RUNNING)Flink の Web UI でジョブのステータスを表示することもできます。詳細については、「Flink の Web UI でジョブのステータスを表示する」をご参照ください。
次のコマンドを実行してジョブを停止します。flink cancel -t yarn-session -Dyarn.application.id=<application_XXXX_YY> <jobId>
flink cancel -t yarn-session -Dyarn.application.id=<application_XXXX_YY> <jobId>
Per-job cluster モード
SSH モードでクラスターのマスターノードにログインします。詳細については、「クラスターのマスターノードへのログイン」をご参照ください。
次のコマンドを実行してジョブを送信します。
flink run -t yarn-per-job --detached /opt/apps/FLINK/flink-current/examples/streaming/TopSpeedWindowing.jarジョブが送信されると、次のような出力情報が返されます。

この出力では、
application_1750125819948_****がアプリケーション ID です。以降のセクションでは、<application_XXXX_YY>を使用してこの ID を表します。f5f980ac631192b02548235f1bbe****はジョブ ID です。以降のセクションでは、<jobId>を使用してこの ID を表します。次のコマンドを実行してジョブのステータスを表示します。
flink list -t yarn-per-job -Dyarn.application.id=<application_XXXX_YY>Flink の Web UI でジョブのステータスを表示することもできます。詳細については、「Flink の Web UI でジョブのステータスを表示する」をご参照ください。
次のコマンドを実行してジョブを停止します。
flink cancel -t yarn-per-job -Dyarn.application.id=<application_XXXX_YY> <jobId>
Application モード
SSH モードでクラスターのマスターノードにログインします。詳細については、「クラスターのマスターノードへのログイン」をご参照ください。
次のコマンドを実行してジョブを送信します。
flink run-application -t yarn-application /opt/apps/FLINK/flink-current/examples/streaming/TopSpeedWindowing.jarジョブが送信されると、次のような出力情報が返されます。

この出力では、
application_1750125819948_0004は送信された Flink ジョブの YARN アプリケーション ID です。以降のセクションでは、<application_XXXX_YY>を使用してこの ID を表します。次のコマンドを実行してジョブのステータスを表示します。
flink list -t yarn-application -Dyarn.application.id=<application_XXXX_YY>次のような出力情報が返されます。この出力では、
4db32b5339e6d64de2a1096c4762****がジョブ ID であり、以降のセクションでは<jobId>で表されます。------------------ Running/Restarting Jobs ------------------- 16.06.2025 18:20:55 : 4db32b5339e6d64de2a1096c4762**** : CarTopSpeedWindowingExample (RUNNING)Flink の Web UI でジョブのステータスを表示することもできます。詳細については、「Flink の Web UI でジョブのステータスを表示する」をご参照ください。
次のコマンドを実行してジョブを停止します。
flink cancel -t yarn-application -Dyarn.application.id=<application_XXXX_YY> <jobId>
ジョブの設定
Flink では、次のいずれかの方法を使用してジョブを設定できます:
方法 1: ジョブのコードでパラメーターの値を指定します。詳細については、「設定」をご参照ください。
方法 2:
flink runコマンドを実行してジョブを送信するときに、-D 引数を使用してジョブのパラメーターの値を指定できます。例:flink run-application -t yarn-application -D state.backend=rocksdb...。方法 3:
/etc/taihao-apps/flink-conf/flink-conf.yamlファイルでパラメーターの値を指定します。
前述の 3 つの方法でパラメーター値を指定しない場合、デフォルト値が使用されます。詳細については、「Apache Flink ドキュメント」をご参照ください。
Flink の Web UI でジョブのステータスを表示する
Flink の Web UI にアクセスします。
EMR コンソールにログインします。左側のナビゲーションウィンドウで、[EMR on ECS] をクリックします。
左側のナビゲーションウィンドウで、[EMR On ECS] を選択します。
上部のナビゲーションバーで、クラスターが存在するリージョンを選択し、ビジネス要件に基づいてリソースグループを選択します。
[EMR on ECS] ページで、ターゲットクラスターの [クラスター ID] をクリックします。
[アクセスリンクとポート] タブをクリックします。
[アクセスリンクとポート] ページで、YARN UI のリンクをクリックします。
Web UI へのアクセス方法の詳細については、「オープンソースコンポーネントの Web UI へのアクセス」をご参照ください。
アプリケーション ID をクリックします。

[Tracking URL] のリンクをクリックします。

Apache Flink ダッシュボードページが表示されます。このページでジョブのステータスを表示できます。
リファレンス
YARN 上の Flink の詳細については、「Apache Hadoop YARN」をご参照ください。