すべてのプロダクト
Search
ドキュメントセンター

E-MapReduce:基本的な使用方法

最終更新日:Nov 09, 2025

このトピックでは、E-MapReduce (EMR) コンソールで Flink ジョブを送信し、ジョブのステータスを表示する方法について説明します。

背景情報

Dataflow クラスターでは、Flink は YARN 上にデプロイされます。SSH を使用して Dataflow クラスターにログインし、CLI でコマンドを実行して Flink ジョブを送信できます。

YARN モードでデプロイされた Dataflow クラスターは、Session モード、Per-Job Cluster モード、および Application モードでの Flink ジョブの送信をサポートしています。

モード

説明

特徴

Session モード

Session モードでは、指定したリソースパラメーターに基づいて Flink クラスターが作成されます。すべてのジョブはこのクラスターに送信されます。ジョブが完了しても、クラスターは自動的にリリースされません。

たとえば、ジョブで例外が発生して Task Manager がシャットダウンすると、この Task Manager で実行されている他のすべてのジョブは失敗します。さらに、クラスターには Job Manager が 1 つしかないため、ジョブの数が増えるにつれて Job Manager への負荷が増加します。

  • 利点: ジョブ送信時のリソース割り当てによる時間的オーバーヘッドが他のモードに比べて小さいです。

  • 欠点: すべてのジョブが同じクラスターで実行されるため、リソースの競合が発生し、ジョブが互いに影響し合う可能性があります。

これらの特徴に基づき、このモードは、起動時間が短く、実行時間が比較的短いジョブのデプロイに適しています。

Per-Job Cluster モード

Per-Job Cluster モードを使用する場合、YARN は送信する Flink ジョブごとに新しい Flink クラスターを起動し、そのジョブを実行します。ジョブが完了またはキャンセルされると、このジョブの Flink クラスターはリリースされます。

  • 利点: ジョブ間でリソースが分離されているため、1 つのジョブの異常な動作が他のジョブに影響を与えることはありません。

    各ジョブは 1 つの Job Manager に対応するため、複数のジョブを実行することによる Job Manager の過負荷の問題は発生しません。

  • 欠点: ジョブごとに専用の Flink クラスターが起動されるため、ジョブの起動オーバーヘッドが増加します。

これらの特徴に基づき、このモードは通常、実行時間が長いジョブに適しています。

Application モード

Application モードを使用する場合、YARN は送信する Flink アプリケーション (アプリケーションには 1 つ以上のジョブを含めることができます) ごとに新しい Flink クラスターを起動します。アプリケーションが完了またはキャンセルされると、このアプリケーションの Flink クラスターはリリースされます。

このモードと Per-Job モードの違いは、アプリケーションの JAR パッケージ内の main() メソッドがクラスター内の Job Manager で実行される点です。

送信された JAR パッケージに複数のジョブが含まれている場合、これらのジョブはすべて、そのアプリケーションに属するクラスターで実行されます。

  • 利点: ジョブ送信時のクライアントの負担を軽減します。

  • 欠点: Flink アプリケーションごとに専用の Flink クラスターが起動されるため、アプリケーションの起動にかかる時間的オーバーヘッドが増加します。

前提条件

Flink モードの Dataflow クラスターが作成されていること。詳細については、「クラスターの作成」をご参照ください。

ジョブの送信とジョブステータスの表示

説明

このトピックでは、Flink の TopSpeedWindowing の例を使用します。TopSpeedWindowing は、長時間実行されるストリーミングジョブです。

ビジネス要件に基づいて、次のいずれかのモードを選択してジョブを送信し、ジョブのステータスを表示できます:

Session モード

  1. SSH モードでクラスターのマスターノードにログインします。詳細については、「クラスターのマスターノードへのログイン」をご参照ください。

  2. 次のコマンドを実行して YARN セッションを開始します。

    yarn-session.sh --detached

    コマンドが正常に実行されると、システムはアプリケーション ID を返します。たとえば、application_1750137174986_0001 です。以降のセクションでは、<application_XXXX_YY> を使用してこの ID を表します。

    image

  3. 次のコマンドを実行してジョブを送信します。

    flink run --detached /opt/apps/FLINK/flink-current/examples/streaming/TopSpeedWindowing.jar

    ジョブが送信されると、次のような出力情報が返されます。

    image

    この出力では、3785db18d371326758d7843dd2a1**** がジョブ ID です。以降のセクションでは、<jobId> を使用してこの ID を表します。

  4. 次のコマンドを実行してジョブのステータスを表示します。

    flink list -t yarn-session -Dyarn.application.id=<application_XXXX_YY>

    次のような出力情報が返されます。

    ------------------ Running/Restarting Jobs -------------------
    16.06.2025 18:20:55 : 3785db18d371326758d7843dd2a1**** : CarTopSpeedWindowingExample (RUNNING)

    Flink の Web UI でジョブのステータスを表示することもできます。詳細については、「Flink の Web UI でジョブのステータスを表示する」をご参照ください。

  5. 次のコマンドを実行してジョブを停止します。flink cancel -t yarn-session -Dyarn.application.id=<application_XXXX_YY> <jobId>

    flink cancel -t yarn-session -Dyarn.application.id=<application_XXXX_YY> <jobId>

Per-job cluster モード

  1. SSH モードでクラスターのマスターノードにログインします。詳細については、「クラスターのマスターノードへのログイン」をご参照ください。

  2. 次のコマンドを実行してジョブを送信します。

    flink run -t yarn-per-job --detached /opt/apps/FLINK/flink-current/examples/streaming/TopSpeedWindowing.jar

    ジョブが送信されると、次のような出力情報が返されます。

    image

    この出力では、application_1750125819948_**** がアプリケーション ID です。以降のセクションでは、<application_XXXX_YY> を使用してこの ID を表します。f5f980ac631192b02548235f1bbe**** はジョブ ID です。以降のセクションでは、<jobId> を使用してこの ID を表します。

  3. 次のコマンドを実行してジョブのステータスを表示します。

    flink list -t yarn-per-job -Dyarn.application.id=<application_XXXX_YY>

    Flink の Web UI でジョブのステータスを表示することもできます。詳細については、「Flink の Web UI でジョブのステータスを表示する」をご参照ください。

  4. 次のコマンドを実行してジョブを停止します。

    flink cancel -t yarn-per-job -Dyarn.application.id=<application_XXXX_YY> <jobId>

Application モード

  1. SSH モードでクラスターのマスターノードにログインします。詳細については、「クラスターのマスターノードへのログイン」をご参照ください。

  2. 次のコマンドを実行してジョブを送信します。

    flink run-application -t yarn-application /opt/apps/FLINK/flink-current/examples/streaming/TopSpeedWindowing.jar

    ジョブが送信されると、次のような出力情報が返されます。

    image

    この出力では、application_1750125819948_0004 は送信された Flink ジョブの YARN アプリケーション ID です。以降のセクションでは、<application_XXXX_YY> を使用してこの ID を表します。

  3. 次のコマンドを実行してジョブのステータスを表示します。

    flink list -t yarn-application -Dyarn.application.id=<application_XXXX_YY>

    次のような出力情報が返されます。この出力では、4db32b5339e6d64de2a1096c4762**** がジョブ ID であり、以降のセクションでは <jobId> で表されます。

    ------------------ Running/Restarting Jobs -------------------
    16.06.2025 18:20:55 : 4db32b5339e6d64de2a1096c4762**** : CarTopSpeedWindowingExample (RUNNING)

    Flink の Web UI でジョブのステータスを表示することもできます。詳細については、「Flink の Web UI でジョブのステータスを表示する」をご参照ください。

  4. 次のコマンドを実行してジョブを停止します。

    flink cancel -t yarn-application -Dyarn.application.id=<application_XXXX_YY> <jobId>

ジョブの設定

Flink では、次のいずれかの方法を使用してジョブを設定できます:

  • 方法 1: ジョブのコードでパラメーターの値を指定します。詳細については、「設定」をご参照ください。

  • 方法 2: flink run コマンドを実行してジョブを送信するときに、-D 引数を使用してジョブのパラメーターの値を指定できます。例: flink run-application -t yarn-application -D state.backend=rocksdb...

  • 方法 3: /etc/taihao-apps/flink-conf/flink-conf.yaml ファイルでパラメーターの値を指定します。

前述の 3 つの方法でパラメーター値を指定しない場合、デフォルト値が使用されます。詳細については、「Apache Flink ドキュメント」をご参照ください。

Flink の Web UI でジョブのステータスを表示する

  1. Flink の Web UI にアクセスします。

    1. EMR コンソールにログインします。左側のナビゲーションウィンドウで、[EMR on ECS] をクリックします。

    2. 左側のナビゲーションウィンドウで、[EMR On ECS] を選択します。

    3. 上部のナビゲーションバーで、クラスターが存在するリージョンを選択し、ビジネス要件に基づいてリソースグループを選択します

    4. [EMR on ECS] ページで、ターゲットクラスターの [クラスター ID] をクリックします。

    5. [アクセスリンクとポート] タブをクリックします。

    6. [アクセスリンクとポート] ページで、YARN UI のリンクをクリックします。

      Web UI へのアクセス方法の詳細については、「オープンソースコンポーネントの Web UI へのアクセス」をご参照ください。

  2. アプリケーション ID をクリックします。

    Application ID

  3. [Tracking URL] のリンクをクリックします。

    application information

    Apache Flink ダッシュボードページが表示されます。このページでジョブのステータスを表示できます。Apache Flink Dashboard

リファレンス

YARN 上の Flink の詳細については、「Apache Hadoop YARN」をご参照ください。