All Products
Search
Document Center

E-MapReduce:Configure a VVR-based Flink job

Last Updated:Dec 20, 2023

E-MapReduce (EMR) V3.27.X and earlier versions use the open source version of Flink. Versions later than EMR V3.27.X use Ververica Runtime (VVR), an enterprise-grade computing engine. VVR is fully compatible with Flink. This topic describes how to configure a VVR-based Flink job.

Background information

Enterprise-edition Flink is officially released by the founding team of Apache Flink and maintains a globally uniform brand.

VVR provides an enterprise-edition state backend whose performance is three to five times better than the performance of open source Flink. You can use the VVR engine and EMR data development feature to submit jobs in an EMR Hadoop cluster. VVR supports open source Flink 1.10 and provides business GeminiStateBackend by default, which brings the following benefits:

  • Uses a new data structure to increase the random query speed and reduce frequent disk I/O operations.

  • Optimizes the cache policy. If memory is sufficient, hot data is not stored in disks and cache entries do not expire after compaction.

  • Uses Java to implement GeminiStateBackend, which eliminates Java Native Interface (JNI) overheads that are caused by RocksDB.

  • Uses off-heap memory and implements an efficient memory allocator based on GeminiDB to eliminate the impact of garbage collection for Java Virtual Machines (JVMs).

  • Supports asynchronous incremental checkpoints. This ensures that only memory indexes are copied during data synchronization. Unlike RocksDB, GeminiStateBackend avoids jitters that are caused by I/O operations.

  • Supports local recovery and storage of the timer.

Note

If you want to use GeminiStateBackend, do not specify the type of a state backend in the code. To use GeminiStateBackend to start the Flink component, TaskManager must have 1,728 MiB of memory or more.

The basic configurations of the checkpoint and state backend in Flink also apply to GeminiStateBackend. For more information, see Configuration.

You can configure parameters based on your requirements. The following table describes some special parameters.

Parameter

Description

state.backend.gemini.memory.managed

Specifies whether to calculate the memory size of each backend based on the values of the Managed Memory and Task Slot parameters. Default value: true. Valid values:

  • true

  • false

state.backend.gemini.offheap.size

Specifies the memory of each backend when the state.backend.gemini.memory.managed parameter is set to false. Default value: 2. Unit: GiB.

state.backend.gemini.local.dir

Specifies the directory that stores local data files of GeminiDB.

state.backend.gemini.timer-service.factory

Specifies the storage location of the timer-service state. Default value: HEAP. Valid values:

  • HEAP

  • GEMINI

Prerequisites

  • An EMR Hadoop cluster is created.

  • A project is created.

  • Resources that are required for jobs and data files to be processed are obtained, such as JAR packages, data file names, and storage paths of the packages and files.

    Note
    • We recommend that you use Object Storage Service (OSS) to maintain the JAR packages that you want to run.

    • If you use a local path of a file, use the absolute path.

Procedure

  1. Go to the Data Platform tab.
    1. Log on to the Alibaba Cloud EMR console by using your Alibaba Cloud account.
    2. In the top navigation bar, select the region where your cluster resides and select a resource group based on your business requirements.
    3. Click the Data Platform tab.
  2. In the Projects section of the page that appears, find the project that you want to manage and click Edit Job in the Actions column.
  3. Create a Flink job.

    1. In the Edit Job pane on the left, right-click the folder on which you want to perform operations and select Create Job.
    2. In the Create Job dialog box, specify Name and Description, and select Flink from the Job Type drop-down list.

    3. Click OK.
  4. Edit job content.

    1. Configure the command line parameters required to submit the job in the Content field.

      • You can configure a Flink Datastream, table, or SQL job that is specified as a JAR package. Example:

        run -m yarn-cluster -yjm 1024 -ytm 2048 ossref://path/to/oss/of/WordCount.jar --input oss://path/to/oss/to/data --output oss://path/to/oss/to/result
      • In EMR V3.28.2 and later minor versions, you can configure a PyFlink job. Example:

        run -m yarn-cluster -yjm 1024 -ytm 2048 -py ossref://path/to/oss/of/word_count.py

        For more information about the parameters related to the PyFlink job, see Apache Flink official documentation.

    2. Click Save.

      Note

      You can access the web UI of Flink based on the version of your cluster:

      • Versions earlier than EMR V3.29.0:

        Use an SSH tunnel.

      • EMR V3.29.0 and later:

        • Recommended. Use the EMR console.

        • Use an SSH tunnel.