このトピックでは、E-MapReduce(EMR)コンソールで Flink ジョブを送信し、ジョブの状態を表示する方法について説明します。
背景情報
Dataflow クラスタでは、Flink は YARN 上にデプロイされます。 SSH を使用して Dataflow クラスタにログオンし、CLI でコマンドを実行して Flink ジョブを送信できます。
次の表に、Dataflow クラスタの YARN 上で Flink によってサポートされているデプロイメントモードを示します。
モード | 説明 | 機能 |
セッションモード | このモードでは、指定したリソースパラメータに基づいて Flink クラスタが作成され、すべてのジョブが Flink クラスタに送信されます。すべてのジョブの実行が完了した後、Flink クラスタは自動的に解放されません。 ジョブで例外が発生し、TaskManager が停止した場合、TaskManager 上で実行されている他のすべてのジョブは失敗します。また、クラスタには 1 つの JobManager のみがデプロイされます。その結果、ジョブの数が増えるにつれて、JobManager の負荷が高くなります。 |
このモードは、起動時間と実行時間が短いジョブに適しています。 |
ジョブごとのクラスタモード | このモードでは、Flink ジョブを送信するたびに、YARN はジョブを実行するための Flink クラスタを起動します。ジョブの実行が完了した後、またはジョブがキャンセルされた後、Flink クラスタは解放されます。 |
このモードは、実行時間が長いジョブに適しています。 |
アプリケーションモード | このモードでは、Flink アプリケーションを送信するたびに、YARN はアプリケーションを実行するための Flink クラスタを起動します。アプリケーションには、1 つ以上のジョブが含まれています。アプリケーションの実行が完了した後、またはアプリケーションがキャンセルされた後、アプリケーションを実行する Flink クラスタは解放されます。 ジョブごとのクラスタモードとは異なり、アプリケーションの JAR パッケージ内の JAR パッケージに複数のジョブが含まれている場合、すべてのジョブがクラスタ上で実行されます。 |
|
前提条件
Flink モードの Dataflow クラスタが作成されていること。詳細については、「クラスタの作成」をご参照ください。
ジョブを送信してジョブの状態を表示する
ビジネス要件に基づいて、ジョブを送信し、ジョブの状態を表示するために、次のモードのいずれかを選択できます。
セッションモード
SSH モードでクラスタにログオンします。詳細については、「クラスタへのログオン」をご参照ください。
次のコマンドを実行して、YARN セッションを開始します。
yarn-session.sh --detached
次のコマンドを実行して、ジョブを送信します。
flink run /opt/apps/FLINK/flink-current/examples/streaming/TopSpeedWindowing.jar
説明このトピックでは、Flink の TopSpeedWindowing の例を使用しています。 TopSpeedWindowing は、長時間実行されるストリーミングジョブです。
ジョブが送信されると、次のような出力が返されます。これには、Flink ジョブの YARN アプリケーション ID が含まれています。
次のコマンドを実行して、ジョブの状態を表示します。
flink list -t yarn-session -Dyarn.application.id=<application_XXXX_YY>
Flink の Web UI でジョブの状態を表示することもできます。詳細については、「Flink の Web UI でジョブの状態を表示する」をご参照ください。
次のコマンドを実行して、ジョブを停止します。
flink cancel -t yarn-session -Dyarn.application.id=<application_XXXX_YY> <jobId>
ジョブごとのクラスタモード
SSH モードでクラスタにログオンします。詳細については、「クラスタへのログオン」をご参照ください。
次のコマンドを実行して、ジョブを送信します。
flink run -t yarn-per-job --detached /opt/apps/FLINK/flink-current/examples/streaming/TopSpeedWindowing.jar
ジョブが送信されると、次のような出力が返されます。これには、Flink ジョブの YARN アプリケーション ID が含まれています。
次のコマンドを実行して、ジョブの状態を表示します。
flink list -t yarn-per-job -Dyarn.application.id=<application_XXXX_YY>
説明この例では、
<application_XXXX_YY>
は、ジョブの実行が完了した後に返されるアプリケーション ID です。Flink の Web UI でジョブの状態を表示することもできます。詳細については、「Flink の Web UI でジョブの状態を表示する」をご参照ください。
次のコマンドを実行して、ジョブを停止します。
flink cancel -t yarn-per-job -Dyarn.application.id=<application_XXXX_YY> <jobId>
アプリケーションモード
SSH モードでクラスタにログオンします。詳細については、「クラスタへのログオン」をご参照ください。
次のコマンドを実行して、ジョブを送信します。
flink run-application -t yarn-application /opt/apps/FLINK/flink-current/examples/streaming/TopSpeedWindowing.jar
ジョブが送信されると、次のような出力が返されます。これには、Flink ジョブの YARN アプリケーション ID が含まれています。
次のコマンドを実行して、ジョブの状態を表示します。
flink list -t yarn-application -Dyarn.application.id=<application_XXXX_YY>
説明この例では、
<application_XXXX_YY>
は、ジョブの実行が完了した後に返されるアプリケーション ID です。Flink の Web UI でジョブの状態を表示することもできます。詳細については、「Flink の Web UI でジョブの状態を表示する」をご参照ください。
次のコマンドを実行して、ジョブを停止します。
flink cancel -t yarn-application -Dyarn.application.id=<application_XXXX_YY> <jobId>
ジョブを構成する
Flink では、次のいずれかの方法を使用してジョブを構成できます。
方法 1:ジョブのコードでパラメータの値を指定します。詳細については、「構成」をご参照ください。
方法 2:flink run コマンドを実行してジョブを送信するときに、-D 引数を使用してジョブのパラメータの値を指定できます。例:
flink run-application -t yarn-application -D state.backend=rocksdb...
。方法 3:/etc/taihao-apps/flink-conf/ ディレクトリに保存されている flink-conf.yaml ファイルにパラメータの値を指定します。
上記の 3 つの方法でパラメータ値を指定しない場合は、デフォルト値が使用されます。詳細については、「Apache Flink ドキュメント」をご参照ください。
Flink の Web UI でジョブの状態を表示する
Flink の Web UI にアクセスします。
EMR コンソールにログオンします。左側のナビゲーションペインで、[EMR on ECS] をクリックします。
上部のナビゲーションバーで、クラスタが存在するリージョンを選択し、ビジネス要件に基づいてリソースグループを選択します。
[EMR On ECS] ページで、目的のクラスタを見つけ、[クラスタ ID/名前] 列のクラスタ名をクリックします。
表示されたページで、[アクセスリンクとポート] をクリックします。
[アクセスリンクとポート] タブで、YARN UI のリンクをクリックします。
Web UI へのアクセス方法の詳細については、「オープンソースコンポーネントの Web UI にアクセスする」をご参照ください。
アプリケーションの ID をクリックします。
[追跡 URL] のリンクをクリックします。
Apache Flink ダッシュボードページが表示されます。このページでジョブの状態を表示できます。
関連情報
YARN 上の Flink の詳細については、「Apache Hadoop YARN」をご参照ください。