This topic describes how to run Flink jobs in a Dataflow cluster to process Object Storage Service (OSS) data.

Background information

In the example of this topic, the data development feature provided by E-MapReduce (EMR) is used. The data development feature stopped updating at 21:00 on February 21, 2022. We recommend that you do not use the feature.

Prerequisites

  • EMR and OSS are activated.
  • The Alibaba Cloud account or RAM user that you want to use is granted the required permissions. For more information, see Assign roles.

Procedure

  1. Step 1: Prepare the environment
  2. Step 2: Prepare test data
  3. Step 3: Create and run a Flink job
  4. Step 4: View a job log and job details (Optional)

Step 1: Prepare the environment

Create a Dataflow cluster in Flink mode in the EMR console. For more information, see Create a cluster.
Note A Dataflow cluster of EMR V3.39.1 is used in this topic.

Step 2: Prepare test data

Before you create a Flink job, you must upload test data to an OSS bucket. In the example of this topic, a file named test.txt that contains Nothing is impossible for a willing heart. While there is a life, there is a hope~ is uploaded.

  1. Log on to the OSS console.
  2. Create an OSS bucket and upload the test data file to the bucket. For information about how to create an OSS bucket, see Create buckets. For information about how to upload a file, see Upload objects.
    In this example, the path of the uploaded file is oss://vvr-test/test.txt. Keep the path for later use.

Step 3: Create and run a Flink job

  1. Go to the Data Platform tab.
    1. Log on to the Alibaba Cloud EMR console by using your Alibaba Cloud account.
    2. In the top navigation bar, select the region where your cluster resides and select a resource group based on your business requirements.
    3. Click the Data Platform tab.
  2. On the Data Platform page, create a project. For more information, see Manage projects.
  3. Create a Flink job.
    1. In the Edit Job pane on the left, right-click the folder on which you want to perform operations and select Create Job.
    2. In the Create Job dialog box, specify Name and Description, and select Flink from the Job Type drop-down list.
    3. Click OK.
  4. Edit job content.
    Example:
    run -m yarn-cluster -yjm 1024 -ytm 1024 -ynm flink-oss-sample /usr/lib/flink-current/examples/batch/WordCount.jar --input oss://vvr-test/test.txt
    Descriptions of key parameters in the preceding code:
    • /usr/lib/flink-current/examples/batch/WordCount.jar: the built-in Flink WordCount job in the Dataflow cluster. For more information about the code, see the official code repository.
    • oss://vvr-test/test.txt: the path of the uploaded test data file.
  5. Click Run in the upper-right corner.
    In the Run Job dialog box, select the created Dataflow cluster for Target Cluster.
  6. Click OK.
    The Flink job is run in the EMR cluster to process OSS data. The following information is printed in the log:
    (a,3)
    (for,1)
    (heart,1)
    (hope,1)
    (impossible,1)
    (is,3)
    (life,1)
    (nothing,1)
    (there,2)
    (while,1)
    (willing,1)
    =================JOB OUTPUT END=================

Step 4: View a job log and job details (Optional)

You can click the Log tab of a job to identify the cause of a job failure. You can click the Records tab of a job to view details about the job.

  1. View a job log.
    You can view a job log in the EMR console or on an SSH client.
    • View a job log in the EMR console: After you submit a job, click the Records tab, find your job, and then click Details in the Action column. View Flink job details

      Click the Log tab to view the job log.

      Flink job log
    • View a job log on an SSH client: Connect to the master node of the cluster in SSH mode and view the log of the job you submitted.

      By default, the log data of a Flink job is stored in /mnt/disk1/log/flink/flink-{user}-client-{hostname}.log based on the configurations in log4j. For more information about the configurations in log4j, see log4j-yarn-session.properties in the /etc/ecm/flink-conf/ directory.

      user indicates the account that you use to submit a Flink job. hostname indicates the name of the node on which a job is submitted. For example, you submit a Flink job on the emr-header-1 node as the root user. The log path is /mnt/disk1/log/flink/flink-flink-historyserver-0-emr-header-1.cluster-126601.log.

  2. View job details.
    You can view the details of a Flink job on the web UI of YARN. You can use an SSH tunnel or Knox to access the web UI of YARN. For information about how to use an SSH tunnel to access the web UI of YARN, see Create an SSH tunnel to access web UIs of open source components. For information about how to use Knox to access the web UI of YARN, see Knox and Access the web UIs of open source components. Knox is used in this example to describe how to view the details of a job.
    1. On the Public Connect Strings page, click the URL for YARN UI.
      URL for YARN UI
    2. In the Hadoop console, click the ID of the job.
      View the details of a specific job. Flink jobs in the Hadoop console
      The following figure shows the details. Flink job details in the Hadoop console
    3. If you want to view the Flink jobs that are running, click the link next to Tracking URL on the job details page. The Flink Dashboard page displays the list of running Flink jobs.
    4. After the Flink job is completed, you can view the list and logs of all completed Flink jobs.