すべてのプロダクト
Search
ドキュメントセンター

E-MapReduce:VVR ベースの Flink ジョブを構成する

最終更新日:Jan 11, 2025

E-MapReduce (EMR) V3.27.X 以前のバージョンは、オープンソースバージョンの Flink を使用します。EMR V3.27.X 以降のバージョンは、エンタープライズグレードのコンピューティングエンジンである Ververica Runtime (VVR) を使用します。VVR は Flink と完全に互換性があります。このトピックでは、VVR ベースの Flink ジョブを構成する方法について説明します。

背景情報

Apache Flink の創設チームによって正式にリリースされたエンタープライズ版 Flink は、グローバルに統一されたブランドを維持しています。

VVR は、オープンソースの Flink よりも 3 ~ 5 倍優れたパフォーマンスを持つエンタープライズ版の状態バックエンドを提供します。 VVR エンジンと EMR データ開発機能を使用して、EMR Hadoop クラスターでジョブを送信できます。 VVR はオープンソースの Flink 1.10 をサポートし、デフォルトでビジネス GeminiStateBackend を提供します。これにより、次の利点が得られます。

  • 新しいデータ構造を使用して、ランダムクエリの速度を向上させ、頻繁なディスク I/O 操作を削減します。

  • キャッシュポリシーを最適化します。メモリが十分な場合、ホットデータはディスクに保存されず、コンパクション後もキャッシュエントリは期限切れになりません。

  • Java を使用して GeminiStateBackend を実装します。これにより、RocksDB によって発生する Java Native Interface(JNI)のオーバーヘッドが解消されます。

  • オフヒープメモリを使用し、GeminiDB に基づく効率的なメモリアロケータを実装することで、Java 仮想マシン ( JVM ) のガベージコレクションの影響を排除します。

  • 非同期増分チェックポイントをサポートしています。これにより、データ同期中にメモリインデックスのみがコピーされるようになります。 RocksDB とは異なり、GeminiStateBackend は I/O 操作によって発生するジッターを回避します。

  • タイマーのローカルリカバリとストレージをサポートします。

説明

GeminiStateBackend を使用する場合は、コードで状態バックエンドのタイプを指定しないでください。 GeminiStateBackend を使用して Flink コンポーネントを起動するには、TaskManager に 1,728 MiB 以上のメモリが必要です。

Flink のチェックポイントと状態バックエンドの基本構成は、GeminiStateBackend にも適用されます。詳細については、「Configuration」をご参照ください。

要件に基づいてパラメーターを構成できます。次の表に、いくつかの特別なパラメーターについて説明します。

パラメーター

説明

state.backend.gemini.memory.managed

`true`、`false`。

  • true

  • false

state.backend.gemini.offheap.size

state.backend.gemini.memory.managed パラメーターが false に設定されている場合、各バックエンドのメモリを指定します。デフォルト値:2。単位:GiB。

state.backend.gemini.local.dir

GeminiDB のローカルデータファイルを格納するディレクトリを指定します。

state.backend.gemini.timer-service.factory

タイマーサービスの状態の保存場所を指定します。既定値: HEAP。有効な値:

  • ヒープ

  • GEMINI

前提条件

  • EMR Hadoop クラスターが作成されます。

  • プロジェクトが作成されました。

  • ジョブとデータファイルの処理に必要なリソース (JARパッケージ、データファイル名、パッケージとファイルのストレージパスなど) が取得されます。

    説明
    • Object Storage Service (OSS) を使用して、実行する JAR パッケージを保持することをお勧めします。

    • ローカルファイルのパスを使用する場合は、絶対パスを使用してください。

手順

  1. [データプラットフォーム] タブに移動します。
    1. Alibaba Cloud アカウントを使用して、Alibaba Cloud EMR コンソール にログインします。
    2. 上部のナビゲーションバーで、クラスターが存在するリージョンを選択し、ビジネス要件に基づいてリソースグループを選択します
    3. [データプラットフォーム] タブをクリックします。
  2. 表示されるページの [プロジェクト] セクションで、管理するプロジェクトを見つけ、[アクション] 列の [ジョブの編集] をクリックします。
  3. Flink ジョブを作成します。

    1. 左側の [ジョブの編集] ペインで、操作を実行するフォルダを右クリックし、[ジョブの作成] を選択します。
    2. [ジョブの作成] ダイアログボックスで、[名前][説明] を指定し、Flink[ジョブの種類] ドロップダウンリストから を選択します。

    3. [OK] をクリックします。
  4. ジョブの内容を編集します。

    1. [コンテンツ] フィールドに、ジョブの送信に必要なコマンドライン パラメーターを構成します。

      • Flink Datastream、table、または SQL ジョブを JAR パッケージとして指定して構成できます。例:

        run -m yarn-cluster -yjm 1024 -ytm 2048 ossref://path/to/oss/of/WordCount.jar --input oss://path/to/oss/to/data --output oss://path/to/oss/to/result
      • EMR V3.28.2 以降のマイナーバージョンでは、PyFlink ジョブを構成できます。例:

        run -m yarn-cluster -yjm 1024 -ytm 2048 -py ossref://path/to/oss/of/word_count.py

        PyFlink ジョブに関連するパラメーターの詳細については、「Apache Flink 公式ドキュメント」をご参照ください。Apache Flink 公式ドキュメント

    2. [保存] をクリックします。

      説明

      Flink の Web UI には、クラスターのバージョンに応じてアクセスできます。

      • EMR V3.29.0 より前のバージョン:

        SSHトンネルを使用します。

      • EMR V3.29.0 以降:

        • 推奨。EMR コンソールを使用します。

        • SSHトンネルを使用します。