All Products
Search
Document Center

E-MapReduce:Basic usage

Last Updated:Aug 14, 2023

This topic describes how to submit a Flink job and view the job status in the E-MapReduce (EMR) console.

Background information

In a Dataflow cluster, Flink is deployed on YARN. You can use SSH to log on to your Dataflow cluster and run a command in the CLI to submit a Flink job.

The following table describes the deployment modes that are supported by Flink on YARN in a Dataflow cluster.

ModeDescriptionFeature
Session modeIn this mode, a Flink cluster is created based on the resource parameters that you specify, and all jobs are submitted to the Flink cluster. The Flink cluster is not automatically released after the running of all jobs is complete.

If an exception occurs on a job and a TaskManager is stopped, all other jobs that are running on the TaskManager fail. In addition, only one JobManager is deployed in the cluster. As a result, the load on the JobManager rises as the number of jobs increases.

  • Advantage: The time required to allocate resources for the submitted jobs is shorter than the time required in other modes.
  • Disadvantage: All jobs run on the Flink cluster. As a result, the jobs compete for resources and affect each other.

This mode is suitable for jobs that require a short start time and a short runtime.

Per-job cluster modeIn this mode, each time you submit a Flink job, YARN starts a Flink cluster to run the job. After the running of the job is complete or if the job is canceled, the Flink cluster is released.
  • Advantage: Resources that are occupied by jobs are isolated. If an exception occurs on a job, the other jobs are not affected.

    Each JobManager runs one job. This prevents the JobManager from being overloaded by multiple jobs.

  • Disadvantage: Each time you run a job, YARN starts a dedicated Flink cluster. This operation results in high overheads.

This mode is suitable for jobs that have a long runtime.

Application modeIn this mode, each time you submit a Flink application, YARN starts a Flink cluster to run the application. An application contains one or more jobs. After the running of the application is complete or if the application is canceled, the Flink cluster that runs the application is released.

Different from the per-job cluster mode, the main() method in the JAR package of the application is implemented by the JobManager of the cluster.

If the JAR package contains multiple jobs, all jobs will be run on the cluster.

  • Advantage: This mode helps reduce the workload on a client when the client submits jobs.
  • Disadvantage: Each time you run an application, YARN starts a dedicated Flink cluster. This is a time-consuming operation.

Prerequisites

A Dataflow cluster that is in Flink mode is created. For more information, see Create a cluster.

Submit a job and view the job status

You can select one of the following modes to submit jobs and view the status of the jobs based on your business requirements:

Session mode

  1. Log on to your cluster in SSH mode. For more information, see Log on to a cluster.

  2. Run the following command to start a YARN session:

    yarn-session.sh --detached
  3. Run the following command to submit a job:

    flink run /opt/apps/FLINK/flink-current/examples/streaming/TopSpeedWindowing.jar
    Note

    The TopSpeedWindowing example in Flink is used in this topic. TopSpeedWindowing is a streaming job that runs for a long period of time.

    After the job is submitted, information similar to the following output is returned, which contains the YARN application ID of the Flink job. Session

  4. Run the following command to view the job status:

    flink list -t yarn-session -Dyarn.application.id=<application_XXXX_YY>

    You can also view the job status on the web UI of Flink. For more information, see View the job status on the web UI of Flink.

  5. Run the following command to stop a job:

    flink cancel -t yarn-session -Dyarn.application.id=<application_XXXX_YY> <jobId>

Per-job cluster mode

  1. Log on to your cluster in SSH mode. For more information, see Log on to a cluster.

  2. Run the following command to submit a job:

    flink run -t yarn-per-job --detached /opt/apps/FLINK/flink-current/examples/streaming/TopSpeedWindowing.jar

    After the job is submitted, information similar to the following output is returned, which contains the YARN application ID of the Flink job. Per-Job Cluster

  3. Run the following command to view the job status:

    flink list -t yarn-per-job -Dyarn.application.id=<application_XXXX_YY>
    Note

    In this example, <application_XXXX_YY> is the application ID returned after the running of the job is complete.

    job status

    You can also view the job status on the web UI of Flink. For more information, see View the job status on the web UI of Flink.

  4. Run the following command to stop a job:

    flink cancel -t yarn-per-job -Dyarn.application.id=<application_XXXX_YY> <jobId>

Application mode

  1. Log on to your cluster in SSH mode. For more information, see Log on to a cluster.

  2. Run the following command to submit a job:

    flink run-application -t yarn-application /opt/apps/FLINK/flink-current/examples/streaming/TopSpeedWindowing.jar

    After the job is submitted, information similar to the following output is returned, which contains the YARN application ID of the Flink job. Application

  3. Run the following command to view the job status:

    flink list -t yarn-application -Dyarn.application.id=<application_XXXX_YY>
    Note

    In this example, <application_XXXX_YY> is the application ID returned after the running of the job is complete.

    You can also view the job status on the web UI of Flink. For more information, see View the job status on the web UI of Flink.

  4. Run the following command to stop a job:

    flink cancel -t yarn-application -Dyarn.application.id=<application_XXXX_YY> <jobId>

Configure a job

Flink allows you to use one of the following methods to configure a job:

  • Method 1: Specify the values of parameters in the code of the job. For more information, see Configuration.

  • Method 2: When you run the flink run command to submit a job, you can use the -D argument to specify the values of parameters for the job. Example: flink run-application -t yarn-application -D state.backend=rocksdb....

  • Method 3: Specify the values of parameters in the flink-conf.yaml file that is stored in the /etc/taihao-apps/flink-conf/ directory.

If you do not specify parameter values by using the preceding three methods, the default values are used. For more information, see Apache Flink Documentation.

View the job status on the web UI of Flink

  1. Access the web UI of Flink.

    1. Log on to the EMR console. In the left-side navigation pane, click EMR on ECS.

    2. In the top navigation bar, select the region where your cluster resides and select a resource group based on your business requirements.

    3. On the EMR on ECS page, find the desired cluster and click the name of the cluster in the Cluster ID/Name column.

    4. On the page that appears, click Access Links and Ports.

    5. On the Access Links and Ports tab, click the link for YARN UI.

      For more information about how to access the web UI, see Access the web UIs of open source components.

  2. Click the ID of an application.

    Application ID
  3. Click the link of Tracking URL.

    application information

    The Apache Flink Dashboard page appears. You can view the status of jobs on this page.Apache Flink Dashboard

References

For more information about Flink on YARN, see Apache Hadoop YARN.