This topic describes how to run Flink jobs in a Hadoop cluster to process Object Storage Service (OSS) data.

Prerequisites

  • E-MapReduce (EMR) and OSS are activated.
  • The Alibaba Cloud account is authorized. For more information, see Authorize roles.

Step 1: Prepare the environment

Before you create a Flink job, you must install Maven and Java on your on-premises machine and create a Hadoop cluster in the EMR console. If you use Maven 3.0 or later, we recommend that you use Java 2.0 or earlier to ensure compatibility.

  1. Install Maven and Java on your on-premises machine.
  2. Create a Hadoop cluster in the EMR console and select Flink from Optional Services. For more information, see Create a cluster.

Step 2: Prepare test data

Before you create a Flink job, you must upload test data to an OSS bucket. In this example, a file named test.txt is uploaded. The file content is Nothing is impossible for a willing heart. While there is a life, there is a hope~.

  1. Log on to the OSS console.
  2. Create an OSS bucket and upload the test data file to the bucket. For information about how to create an OSS bucket, see Create buckets. For information about how to upload a file, see Upload objects.
    In this example, the path of the uploaded file is oss://emr-logs2/test/test.txt. Keep the path for later use.
    Note After the file is uploaded, keep the OSS logon window open for later use.

Step 3: Create a JAR package and upload it to an OSS bucket or a header node of the Hadoop cluster

In this example, the EMR code package aliyun-emapreduce-demo is downloaded and the code is compiled to create a JAR package. You can upload the JAR package to a header node of the Hadoop cluster or an OSS bucket. In this example, the JAR package is uploaded to an OSS bucket.

  1. Download and decompress the code package aliyun-emapreduce-demo to your on-premises machine.
  2. In IntelliJ IDEA, choose file > open to open the aliyun-emapreduce-demo-master-2 file.
  3. In the directory where pom.xml is stored, run the following command to create a JAR package:
    mvn clean package -DskipTests
  4. Access the OSS console.
  5. Upload the JAR package to a storage path of OSS.
    In this example, the path of the uploaded JAR package is oss://emr-logs2/test/examples-1.2.0.jar. Keep the path for later use.

Step 4: Create and run a Flink job

  1. Log on to the EMR console.
  2. In the top navigation bar, select the region where your cluster resides and select a resource group based on your business requirements.
  3. Click the Data Platform tab.
  4. On the page that appears, create a project. For more information, see Manage projects.
  5. Click Edit Job in the Actions column that corresponds to the created project. On the page that appears, create a job with Job Type set to Flink. For more information, see Configure a Flink job.
  6. Configure the job content.
    Example:
    run -m yarn-cluster  -yjm 1024 -ytm 1024 -yn 4 -ys 4 -ynm flink-oss-sample -c com.aliyun.emr.example.flink.FlinkOSSSample  ossref://emr-logs2/test/examples-1.2.0.jar --input oss://emr-logs2/test/test.txt
    Descriptions of key parameters in the preceding code:
    • ossref://emr-logs2/test/examples-1.2.0.jar: the path of the uploaded JAR package.
    • oss://emr-logs2/test/test.txt: the path of the uploaded test data.
  7. Click Run in the upper-right corner.
    In the Run Job dialog box, select the created Hadoop cluster from the Target Cluster drop-down list.
  8. Click OK.
    The Flink job is run in the EMR cluster to process OSS data.Running results of the Flink job

Step 5: (Optional) View a job log and job details

You can click the Log tab of a job to identify the cause of a job running failure. You can click the Records tab of a job to learn detailed information about the job.

  1. View a job log.
    You can view a job log in the EMR console or on an SSH client.
    • In the EMR console: After you submit a job, click the Records tab, find your job, and then click Details in the Action column.Flink job log
    • On an SSH client: Log on to the specific header node of the Hadoop cluster by using SSH and view the log of the job you submitted.

      By default, the log data of a Flink job is stored in /mnt/disk1/log/flink/flink-{user}-client-{hostname}.log based on the configurations in log4j. For more information about the configurations in log4j, see log4j-yarn-session.properties in the /etc/ecm/flink-conf/ directory.

      user indicates the account with which you submit a Flink job. hostname indicates the name of the node to which you submit a job. Assume that you submit a Flink job on the emr-header-1 node as the root user. The log path is /mnt/disk1/log/flink/flink-flink-historyserver-0-emr-header-1.cluster-126601.log.

  2. View job details.
    You can view the details of a Flink job on the web UI of YARN. You can use an SSH tunnel or use Knox to access the web UI of YARN. For information about how to use an SSH tunnel to access the web UI of YARN, see Create an SSH tunnel to access web UIs of open source components. For information about how to use Knox to access the web UI of YARN, see Knox and Access open-source components. Use Knox as an example to describe how to view the details of a job.
    1. On the Public Connect Strings page of the Hadoop cluster, click the link of YARN UI.
      Link of YARN UI
    2. In the Hadoop console, click the ID of the job.
      View the running details of a specific job.Flink job list in the Hadoop console
      The following figure shows the running details.Flink job details in the Hadoop console
    3. If you want to view a list of running Flink jobs, click the link next to Tracking URL on the job details page. The Flink Dashboard page that appears displays the list of running Flink jobs.
    4. If you need to view a list of completed jobs, visit http://emr-header-1:8082.