すべてのプロダクト
Search
ドキュメントセンター

E-MapReduce:基本的な使い方

最終更新日:Jan 11, 2025

このトピックでは、E-MapReduce(EMR)コンソールで Flink ジョブを送信し、ジョブの状態を表示する方法について説明します。

背景情報

Dataflow クラスタでは、Flink は YARN 上にデプロイされます。 SSH を使用して Dataflow クラスタにログオンし、CLI でコマンドを実行して Flink ジョブを送信できます。

次の表に、Dataflow クラスタの YARN 上で Flink によってサポートされているデプロイメントモードを示します。

モード説明機能
セッションモードこのモードでは、指定したリソースパラメータに基づいて Flink クラスタが作成され、すべてのジョブが Flink クラスタに送信されます。すべてのジョブの実行が完了した後、Flink クラスタは自動的に解放されません。

ジョブで例外が発生し、TaskManager が停止した場合、TaskManager 上で実行されている他のすべてのジョブは失敗します。また、クラスタには 1 つの JobManager のみがデプロイされます。その結果、ジョブの数が増えるにつれて、JobManager の負荷が高くなります。

  • 利点:送信されたジョブにリソースを割り当てるために必要な時間は、他のモードで必要な時間よりも短くなります。
  • 欠点:すべてのジョブが Flink クラスタ上で実行されます。その結果、ジョブはリソースを奪い合い、互いに影響を及ぼします。

このモードは、起動時間と実行時間が短いジョブに適しています。

ジョブごとのクラスタモードこのモードでは、Flink ジョブを送信するたびに、YARN はジョブを実行するための Flink クラスタを起動します。ジョブの実行が完了した後、またはジョブがキャンセルされた後、Flink クラスタは解放されます。
  • 利点:ジョブによって占有されているリソースは分離されています。 1 つのジョブで例外が発生しても、他のジョブは影響を受けません。

    各 JobManager は 1 つのジョブを実行します。これにより、JobManager が複数のジョブによって過負荷になるのを防ぎます。

  • 欠点:ジョブを実行するたびに、YARN は専用の Flink クラスタを起動します。この操作は、オーバーヘッドが高くなります。

このモードは、実行時間が長いジョブに適しています。

アプリケーションモードこのモードでは、Flink アプリケーションを送信するたびに、YARN はアプリケーションを実行するための Flink クラスタを起動します。アプリケーションには、1 つ以上のジョブが含まれています。アプリケーションの実行が完了した後、またはアプリケーションがキャンセルされた後、アプリケーションを実行する Flink クラスタは解放されます。

ジョブごとのクラスタモードとは異なり、アプリケーションの JAR パッケージ内の main() メソッドは、クラスタの JobManager によって実装されます。

JAR パッケージに複数のジョブが含まれている場合、すべてのジョブがクラスタ上で実行されます。

  • 利点:このモードは、クライアントがジョブを送信するときのクライアントのワークロードを削減するのに役立ちます。
  • 欠点:アプリケーションを実行するたびに、YARN は専用の Flink クラスタを起動します。これは時間のかかる操作です。

前提条件

Flink モードの Dataflow クラスタが作成されていること。詳細については、「クラスタの作成」をご参照ください。

ジョブを送信してジョブの状態を表示する

ビジネス要件に基づいて、ジョブを送信し、ジョブの状態を表示するために、次のモードのいずれかを選択できます。

セッションモード

  1. SSH モードでクラスタにログオンします。詳細については、「クラスタへのログオン」をご参照ください。

  2. 次のコマンドを実行して、YARN セッションを開始します。

    yarn-session.sh --detached
  3. 次のコマンドを実行して、ジョブを送信します。

    flink run /opt/apps/FLINK/flink-current/examples/streaming/TopSpeedWindowing.jar
    説明

    このトピックでは、Flink の TopSpeedWindowing の例を使用しています。 TopSpeedWindowing は、長時間実行されるストリーミングジョブです。

    ジョブが送信されると、次のような出力が返されます。これには、Flink ジョブの YARN アプリケーション ID が含まれています。Session

  4. 次のコマンドを実行して、ジョブの状態を表示します。

    flink list -t yarn-session -Dyarn.application.id=<application_XXXX_YY>

    Flink の Web UI でジョブの状態を表示することもできます。詳細については、「Flink の Web UI でジョブの状態を表示する」をご参照ください。

  5. 次のコマンドを実行して、ジョブを停止します。

    flink cancel -t yarn-session -Dyarn.application.id=<application_XXXX_YY> <jobId>

ジョブごとのクラスタモード

  1. SSH モードでクラスタにログオンします。詳細については、「クラスタへのログオン」をご参照ください。

  2. 次のコマンドを実行して、ジョブを送信します。

    flink run -t yarn-per-job --detached /opt/apps/FLINK/flink-current/examples/streaming/TopSpeedWindowing.jar

    ジョブが送信されると、次のような出力が返されます。これには、Flink ジョブの YARN アプリケーション ID が含まれています。 Per-Job Cluster

  3. 次のコマンドを実行して、ジョブの状態を表示します。

    flink list -t yarn-per-job -Dyarn.application.id=<application_XXXX_YY>
    説明

    この例では、<application_XXXX_YY> は、ジョブの実行が完了した後に返されるアプリケーション ID です。

    job status

    Flink の Web UI でジョブの状態を表示することもできます。詳細については、「Flink の Web UI でジョブの状態を表示する」をご参照ください。

  4. 次のコマンドを実行して、ジョブを停止します。

    flink cancel -t yarn-per-job -Dyarn.application.id=<application_XXXX_YY> <jobId>

アプリケーションモード

  1. SSH モードでクラスタにログオンします。詳細については、「クラスタへのログオン」をご参照ください。

  2. 次のコマンドを実行して、ジョブを送信します。

    flink run-application -t yarn-application /opt/apps/FLINK/flink-current/examples/streaming/TopSpeedWindowing.jar

    ジョブが送信されると、次のような出力が返されます。これには、Flink ジョブの YARN アプリケーション ID が含まれています。Application

  3. 次のコマンドを実行して、ジョブの状態を表示します。

    flink list -t yarn-application -Dyarn.application.id=<application_XXXX_YY>
    説明

    この例では、<application_XXXX_YY> は、ジョブの実行が完了した後に返されるアプリケーション ID です。

    Flink の Web UI でジョブの状態を表示することもできます。詳細については、「Flink の Web UI でジョブの状態を表示する」をご参照ください。

  4. 次のコマンドを実行して、ジョブを停止します。

    flink cancel -t yarn-application -Dyarn.application.id=<application_XXXX_YY> <jobId>

ジョブを構成する

Flink では、次のいずれかの方法を使用してジョブを構成できます。

  • 方法 1:ジョブのコードでパラメータの値を指定します。詳細については、「構成」をご参照ください。

  • 方法 2:flink run コマンドを実行してジョブを送信するときに、-D 引数を使用してジョブのパラメータの値を指定できます。例:flink run-application -t yarn-application -D state.backend=rocksdb...

  • 方法 3:/etc/taihao-apps/flink-conf/ ディレクトリに保存されている flink-conf.yaml ファイルにパラメータの値を指定します。

上記の 3 つの方法でパラメータ値を指定しない場合は、デフォルト値が使用されます。詳細については、「Apache Flink ドキュメント」をご参照ください。

Flink の Web UI でジョブの状態を表示する

  1. Flink の Web UI にアクセスします。

    1. EMR コンソールにログオンします。左側のナビゲーションペインで、[EMR on ECS] をクリックします。

    2. 上部のナビゲーションバーで、クラスタが存在するリージョンを選択し、ビジネス要件に基づいてリソースグループを選択します

    3. [EMR On ECS] ページで、目的のクラスタを見つけ、[クラスタ ID/名前] 列のクラスタ名をクリックします。

    4. 表示されたページで、[アクセスリンクとポート] をクリックします。

    5. [アクセスリンクとポート] タブで、YARN UI のリンクをクリックします。

      Web UI へのアクセス方法の詳細については、「オープンソースコンポーネントの Web UI にアクセスする」をご参照ください。

  2. アプリケーションの ID をクリックします。

    Application ID

  3. [追跡 URL] のリンクをクリックします。

    application information

    Apache Flink ダッシュボードページが表示されます。このページでジョブの状態を表示できます。Apache Flink Dashboard

関連情報

YARN 上の Flink の詳細については、「Apache Hadoop YARN」をご参照ください。