このトピックでは、Flink バッチ処理の基本原則と、その構成をチューニングする方法について説明します。
背景情報
ストリーム処理とバッチ処理の両方をサポートする統一コンピューティングフレームワークとして、Flink は 2 つの異なるデータパターンを処理できます。Flink は、ストリーム処理モードとバッチ処理モードの間で多くのコア実行メカニズムを共有しています。ただし、2 つのモードには、ジョブの実行、パラメーター設定、およびパフォーマンスチューニングに重要な違いがあります。このトピックでは、Flink バッチジョブに焦点を当てます。バッチジョブ固有の実行メカニズムと構成パラメーターについて説明します。これらの違いを理解することは、ジョブをより効率的にチューニングし、Flink バッチジョブの問題をトラブルシューティングするのに役立ちます。
Apache Flink 向け Realtime Compute は、バッチ処理専用のサポートも提供しています。ジョブ開発、運用保守 (O&M)、オーケストレーション、リソースキュー管理、データ結果探索などの機能を提供します。すぐに開始するには、「Flink バッチ処理クイックスタート」をご参照ください。
バッチジョブとストリームジョブの比較
Flink バッチジョブの構成パラメーターとチューニング方法について学ぶ前に、Flink バッチジョブとストリームジョブの実行メカニズムの違いを理解することが重要です。
実行モード
ストリームジョブ: ストリーム処理モードは、継続的で無限のデータストリームの処理に重点を置いています。その主な目的は、低レイテンシーのデータ処理を実現することです。このモードでは、データはノード間ですぐに渡され、パイプラインで処理されます。したがって、ストリームジョブのすべてのノードのサブタスクは、同時にデプロイおよび実行されます。

バッチジョブ: バッチ処理モードは、有界データセットの処理に重点を置いています。その主な目標は、高スループットのデータ処理を提供することです。この実行モードでは、ジョブは通常、複数のステージで構成されます。独立したステージは並列で実行でき、リソース使用率を向上させます。データ依存関係のあるステージの場合、ダウンストリームタはアップストリームタスクが完了するのを待ってから開始する必要があります。

データ転送
ストリームジョブ: 低レイテンシーを実現するために、ストリームジョブは中間データをメモリに保持し、永続化せずにネットワーク経由で直接転送します。ダウンストリームノードの処理能力が不十分な場合、アップストリームノードにバックプレッシャーを引き起こす可能性があります。
バッチジョブ: バッチジョブは、ダウンストリームタスクで使用するために、中間結果を外部ストレージシステムに書き込みます。デフォルトでは、これらの結果ファイルは TaskManager のローカルディスクに保存されます。リモートシャッフルサービスが使用されている場合、データファイルはリモートシャッフルサービスに保存されます。
リソース要件
ストリームジョブ: ストリームジョブは、起動時にすべてのリソースを事前に割り当てる必要があります。これにより、すべてのサブタスクを同時にデプロイして実行できます。
バッチジョブ: バッチジョブは、実行時にすべてのリソースを一度に取得する必要はありません。Flink は、入力データの準備が整うとタスクをバッチでスケジュールできます。これにより、既存のリソースをより効率的に使用できます。ジョブは限られたリソース (単一のスロットを含む) でもスムーズに実行できます。
タスクの失敗と再起動
ストリームジョブ: 障害が発生した場合、ストリームジョブは最新のチェックポイントまたはセーブポイントから回復できます。これにより、ジョブの進行状況のロールバックが最小限に抑えられます。ただし、中間結果は永続化されないため、回復中にすべてのタスクを再起動する必要があります。
バッチジョブ: バッチジョブは中間結果をディスクに保存します。タスクが失敗して再起動した場合、これらの中間結果を再利用できます。つまり、失敗したタスクとそのダウンストリームタスクのみを再起動する必要があり、グローバルなロールバックは必要ありません。これにより、障害後に再実行する必要があるタスクの数が減り、回復効率が向上します。ただし、バッチジョブにはチェックポイントメカニズムがないため、これらの再起動されたタスクは最初から実行する必要があります。
主要な構成パラメーターとチューニング方法
このセクションでは、Flink バッチジョブの主要な構成パラメーターについて説明します。
リソース構成
CPU とメモリ
ジョブのリソース構成ウィンドウで、JobManager と各 TaskManager の CPU およびメモリリソースを設定できます。以下にいくつかの構成の提案を示します:
JobManager リソース: JobManager に 1 CPU コアと少なくとも 4 GiB のメモリを割り当てます。これにより、スムーズなジョブのスケジューリングと管理が保証されます。
TaskManager リソース: スロット数に基づいてリソースを割り当てます。具体的には、各スロットに 1 CPU コアと 4 GiB のメモリを提供します。TaskManager に n 個のスロットがある場合、合計で n CPU コアと 4n GiB のメモリを割り当てます。
デフォルトでは、リアルタイムコンピューティングエンジンのバッチジョブは TaskManager ごとに 1 つのスロットを割り当てます。TaskManager のスケジューリングと管理のオーバーヘッドを削減するために、TaskManager ごとのスロット数を 2 または 4 に増やすことを検討してください。
ただし、各 TaskManager で使用可能なディスク領域は限られており、割り当てられた CPU コアに比例することに注意してください。具体的には、各 CPU コアには 20 GiB のディスク領域クォータが割り当てられます。TaskManager の最小ディスク領域は 20 GiB で、最大は 200 GiB です。
したがって、各 TaskManager のスロット数を増やすと、同じ TaskManager ノードで実行されるタスクが増えます。これにより、ローカルディスク領域が圧迫され、ディスク領域が不足する可能性があります。ディスク領域が不足すると、ジョブが失敗して再起動します。
大規模なジョブや複雑なトポロジを持つジョブの場合、JobManager と TaskManager はより高いリソース仕様を必要とする場合があります。このような場合、ジョブが効率的に実行され、安定性を維持できるように、必要に応じてリソース構成を増やしてください。
ジョブの実行中にリソース関連の問題が発生した場合は、トラブルシューティング情報について次のドキュメントをご参照ください:
安定したジョブ実行を保証するために、各 JobManager と TaskManager に少なくとも 0.5 CPU コアと 2 GiB のメモリを構成してください。
最大スロット数
Flink ジョブに割り当てることができる最大スロット数を構成できます。Flink バッチジョブはリソースに制約のある環境で実行できるため、最大スロット数を設定すると、バッチジョブが使用できるリソースの最大量が制限されます。これにより、バッチジョブが過剰なリソースを消費して他のジョブに影響を与えるのを防ぐことができます。詳細については、「バッチジョブの TaskManager の数が並列度を超えることがある」をご参照ください。
並列度の構成
ジョブのリソース構成で、グローバルな並列度を設定したり、自動並列度推論を有効にしたりできます。
グローバル並列度: グローバル並列度は、ジョブ内で並列に実行できるタスクの最大数を決定します。ページのジョブの並列度を直接入力できます。ジョブはこの値をグローバルなデフォルトの並列度として使用します。
自動推論: 自動推論が有効になっている場合、Flink バッチジョブは自動的に並列度を推論します。これは、各ノードによって消費される合計データ量を分析し、各サブタスクによって処理されると予想される平均データ量を分析することによって行われます。これにより、並列度構成を最適化できます。
さらに、Realtime Compute Engine Ververica Runtime (VVR) 8.0 以降では、次の設定項目が提供されています。ジョブの実行時パラメーター設定エリアでこれらを構成して、自動並列度推を微調整できます:
Realtime Compute Engine VVR 8.0 以降では、Flink バッチジョブに対して自動並列度推論機能がデフォルトで有効になっています。構成したグローバル並列度は、推論された並列度の上限として使用されます。Flink バッチジョブのパフォーマンスを向上させるには、Realtime Compute Engine VVR 8.0 以降を使用してください。
設定項目 | 説明 | デフォルト値 |
execution.batch.adaptive.auto-parallelism.enabled | 自動並列度推論を有効にするかどうかを指定します。 | true |
execution.batch.adaptive.auto-parallelism.min-parallelism | 自動的に設定できる最小並列度。 | 1 |
execution.batch.adaptive.auto-parallelism.max-parallelism | 自動的に設定できる最大並列度。このパラメーターが構成されていない場合、グローバル並列度がデフォルト値として使用されます。 | 128 |
execution.batch.adaptive.auto-parallelism.avg-data-volume-per-task | 各タスクが処理すると予想される平均データボリューム。Flink は、この構成とノードが処理する必要がある実際のデータボリュームに基づいて、ノードの並列処理の次数を動的に決定します。 | 16MiB |
execution.batch.adaptive.auto-parallelism.default-source-parallelism | Source オペレーターのデフォルトの並列度。現在、Flink は Source ノードによって読み取られるデータ量を正確に認識できません。したがって、その並列度を構成します。このパラメーターが構成されていない場合、グローバル並列度が使用されます。 | 1 |
FAQ
バッチジョブの TaskManager の数が並列度を超えることがある
バッチジョブでは、TaskManager (TM) はオンデマンドで動的に作成および解放されます。例:
ジョブの並列度は 16 で、各 TM には 1 スロットのみが割り当てられます。
最初のオペレーターが開始すると、16 個の TM が作成されます。
一部のサブタスクが早期に終了した場合、対応する TM は一定期間非アクティブになった後に自動的に解放されます。
後続のオペレーターが実行を開始すると、システムは新しい TM を要求します。これにより、作成された TM の総数 (17、18、19 など) が元の並列度を超えます。
これは例外ではありません。これはバッチジョブのエラスティックなスケジューリングの通常の動作です。
TM の総数を厳密に制限するには、 [最大スロット数] パラメーターを構成できます。
並列度とスロット数の違い
並列度: ジョブ内の各オペレーターで同時に実行できるサブタスクの数を定義します。これは処理能力の理論上の上限です。
スロット: Flink のリソースユニット。1 つのスロットで 1 つのサブタスクを実行できます。
ストリームジョブ: デフォルトでは、スロット共有が有効になっています。起動時に、すべてのタスクがすぐに実行されるように、グローバル並列度と同じ数のスロットを要求します。
バッチジョブ: すべてのリソースを事前に割り当てる必要はありません。実際の並列タスク数は、グローバル並列度が高くても、現在利用可能なスロットによって制限されます。
例: 並列度が 4 のストリームジョブには 4 つのスロットが必要です。クラスターに利用可能なスロットが 4 つしかない場合、バッチジョブは最大 4 つのサブタスクを実行できます。残りのタスクは、スロットが解放されるのを待ってから実行されます。
バッチジョブがスタックする原因を特定する方法
TaskManager のメモリ、CPU、およびスレッドの使用状況を監視する方法については、「ジョブのパフォーマンスを表示する」をご参照ください。
メモリ問題のトラブルシューティング: まず、メモリ使用量を確認して、メモリ不足による頻繁なガベージコレクション (GC) が発生しているかどうかを判断します。メモリが不足していることを確認した場合は、TaskManager のメモリ構成を増やして、頻繁な GC によるパフォーマンスの問題を軽減します。
CPU 使用率の分析: 個々のスレッドが大量の CPU リソースを消費していないか確認します。これがジョブのスタッタリングの原因である可能性があります。
スレッドスタックのトレース: スレッドスタック情報を使用して、現在のノードの実行におけるボトルネックを分析します。
エラー: デバイスに空き容量がありません
リアルタイムコンピューティングエンジンでバッチジョブを実行するときに「No space left on device」エラーが発生した場合、通常、TaskManager が中間結果ファイルを保存するために使用するローカルディスク領域が使い果たされたことを示します。各 TaskManager で使用可能なディスク領域は限られており、割り当てられた CPU コアに比例します。
解決策:
各 TaskManager のスロット数を減らします。これにより、単一ノードでの並列タスク数が減り、ローカルディスク領域への要求が減少します。
TaskManager の CPU コア数を増やします。これにより、TaskManager のディスク領域が増加します。
関連ドキュメント
Apache Flink 向け Realtime Compute の主要な機能を使用してバッチ処理を開始するには、「Flink バッチ処理クイックスタート」をご参照ください。
実行時パラメーターの構成方法については、「カスタムジョブ実行時パラメーターを構成するにはどうすればよいですか?」をご参照ください。