E-MapReduce (EMR) V3.27.X 以前のバージョンは、オープンソースバージョンの Flink を使用します。EMR V3.27.X 以降のバージョンは、エンタープライズグレードのコンピューティングエンジンである Ververica Runtime (VVR) を使用します。VVR は Flink と完全に互換性があります。このトピックでは、VVR ベースの Flink ジョブを構成する方法について説明します。
背景情報
Apache Flink の創設チームによって正式にリリースされたエンタープライズ版 Flink は、グローバルに統一されたブランドを維持しています。
VVR は、オープンソースの Flink よりも 3 ~ 5 倍優れたパフォーマンスを持つエンタープライズ版の状態バックエンドを提供します。 VVR エンジンと EMR データ開発機能を使用して、EMR Hadoop クラスターでジョブを送信できます。 VVR はオープンソースの Flink 1.10 をサポートし、デフォルトでビジネス GeminiStateBackend を提供します。これにより、次の利点が得られます。
新しいデータ構造を使用して、ランダムクエリの速度を向上させ、頻繁なディスク I/O 操作を削減します。
キャッシュポリシーを最適化します。メモリが十分な場合、ホットデータはディスクに保存されず、コンパクション後もキャッシュエントリは期限切れになりません。
Java を使用して GeminiStateBackend を実装します。これにより、RocksDB によって発生する Java Native Interface(JNI)のオーバーヘッドが解消されます。
オフヒープメモリを使用し、GeminiDB に基づく効率的なメモリアロケータを実装することで、Java 仮想マシン ( JVM ) のガベージコレクションの影響を排除します。
非同期増分チェックポイントをサポートしています。これにより、データ同期中にメモリインデックスのみがコピーされるようになります。 RocksDB とは異なり、GeminiStateBackend は I/O 操作によって発生するジッターを回避します。
タイマーのローカルリカバリとストレージをサポートします。
GeminiStateBackend を使用する場合は、コードで状態バックエンドのタイプを指定しないでください。 GeminiStateBackend を使用して Flink コンポーネントを起動するには、TaskManager に 1,728 MiB 以上のメモリが必要です。
Flink のチェックポイントと状態バックエンドの基本構成は、GeminiStateBackend にも適用されます。詳細については、「Configuration」をご参照ください。
要件に基づいてパラメーターを構成できます。次の表に、いくつかの特別なパラメーターについて説明します。
パラメーター | 説明 |
state.backend.gemini.memory.managed | `true`、`false`。
|
state.backend.gemini.offheap.size | state.backend.gemini.memory.managed パラメーターが false に設定されている場合、各バックエンドのメモリを指定します。デフォルト値:2。単位:GiB。 |
state.backend.gemini.local.dir | GeminiDB のローカルデータファイルを格納するディレクトリを指定します。 |
state.backend.gemini.timer-service.factory | タイマーサービスの状態の保存場所を指定します。既定値: HEAP。有効な値:
|
前提条件
EMR Hadoop クラスターが作成されます。
プロジェクトが作成されました。
ジョブとデータファイルの処理に必要なリソース (JARパッケージ、データファイル名、パッケージとファイルのストレージパスなど) が取得されます。
説明Object Storage Service (OSS) を使用して、実行する JAR パッケージを保持することをお勧めします。
ローカルファイルのパスを使用する場合は、絶対パスを使用してください。
手順
- [データプラットフォーム] タブに移動します。
- Alibaba Cloud アカウントを使用して、Alibaba Cloud EMR コンソール にログインします。
- 上部のナビゲーションバーで、クラスターが存在するリージョンを選択し、ビジネス要件に基づいてリソースグループを選択します。
- [データプラットフォーム] タブをクリックします。
- 表示されるページの [プロジェクト] セクションで、管理するプロジェクトを見つけ、[アクション] 列の [ジョブの編集] をクリックします。
Flink ジョブを作成します。
- 左側の [ジョブの編集] ペインで、操作を実行するフォルダを右クリックし、[ジョブの作成] を選択します。
[ジョブの作成] ダイアログボックスで、[名前] と [説明] を指定し、Flink[ジョブの種類] ドロップダウンリストから を選択します。
- [OK] をクリックします。
ジョブの内容を編集します。
[コンテンツ] フィールドに、ジョブの送信に必要なコマンドライン パラメーターを構成します。
Flink Datastream、table、または SQL ジョブを JAR パッケージとして指定して構成できます。例:
run -m yarn-cluster -yjm 1024 -ytm 2048 ossref://path/to/oss/of/WordCount.jar --input oss://path/to/oss/to/data --output oss://path/to/oss/to/resultEMR V3.28.2 以降のマイナーバージョンでは、PyFlink ジョブを構成できます。例:
run -m yarn-cluster -yjm 1024 -ytm 2048 -py ossref://path/to/oss/of/word_count.pyPyFlink ジョブに関連するパラメーターの詳細については、「Apache Flink 公式ドキュメント」をご参照ください。Apache Flink 公式ドキュメント
[保存] をクリックします。
説明Flink の Web UI には、クラスターのバージョンに応じてアクセスできます。
EMR V3.29.0 より前のバージョン:
SSHトンネルを使用します。
EMR V3.29.0 以降:
推奨。EMR コンソールを使用します。
SSHトンネルを使用します。