すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:例外処理ガイド

最終更新日:Oct 24, 2025

このガイドでは、開発者向けに、明確で実用的な例外処理のガイドラインを提供します。これらのガイドラインに従うことで、コードが Flink のフォールトトレランスと効果的に連携し、ジョブの安定性と可観測性が向上します。

背景

Apache Flink は、チェックポイント、自動再起動、および 1 回限りのセマンティクスを通じて、堅牢なフォールトトレランスを提供します。

しかし、DataStream ジョブのユーザーコードや SQL ジョブの UDF における TaskCancelledExceptionOutOfMemoryErrorClassCastException などの不適切な例外処理は、これらのメカニズムを損なう可能性があります。不適切な例外処理は Flink の回復を妨げ、状態の不整合やデータ損失につながる可能性があります。

中心的な原則

システムレベルのエラー回復は Flink に委任し、ビジネス固有の例外の処理に集中します。

例外の種類と推奨される処理戦略

例外の種類

推奨される処理戦略

ビジネス例外 (回復可能)

JSON の解析エラー、データフィールドの欠落、またはビジネスルールの違反。

エラーレコードをキャッチ、ログに記録し、サイド出力に出力します。これにより、メインデータフローの継続性が維持されます。

外部依存関係の例外 (部分的に回復可能)

HTTP タイムアウト、データベース接続、またはサードパーティ API エラー (例: 5xx) などの問題。

バックオフポリシーを使用した限定的なリトライ戦略を採用し、リトライが失敗した場合は例外にエスカレーションします。

システム例外 (回復不能)

`TaskCancelledException``OutOfMemoryError``ClassCastException`、および状態アクセスに関する問題。

これらはキャッチしないでください。Flink の組み込みのエラー回復システムに、これらの重大な障害の管理と対処を任せます。

ベストプラクティス

1. 包括的な例外のキャッチを避ける

避けるべきこと:

広範な Exception 型をキャッチすると、Flink の重大な内部エラー (チェックポイントの問題やタスクのキャンセルなど) がマスクされる可能性があります。これにより、ジョブは動作しているように見えても、データを処理できなくなることがあります。

try {
    // ユーザーロジック
} catch (Exception e) {
    LOG.warn("Something went wrong", e);
}

推奨されること:

特定の回復可能なビジネス例外のみをキャッチし、サイド出力を使用してエラーレコードを出力します。

try {
    processRecord(value);
} catch (JsonParseException e) {
    LOG.warn("Invalid input record: {}", value, e);
    ctx.output(ERROR_TAG, new ErrorRecord(value, e.getMessage()));
}

2. システムレベルのエラーは Flink に委任する

回復不能な状態エラー (例: 未初期化の状態、デシリアライズの失敗) が発生した場合は、例外をスローします。これにより、Flink のフェールオーバーメカニズムがトリガーされます。

if (state.value() == null) {
    // 状態が正しく初期化されていません
    throw new IllegalStateException("State not properly initialized");
}

その後、Flink は設定された再起動戦略と最後の成功したチェックポイントを使用して、ジョブを自動的に復元します。

3. 外部呼び出しのリトライを制限する

外部システム (例: データベース、HTTP サービス) とやり取りする際は、無限リトライメカニズムの実装を避けてください。

int maxRetries = 3;
for (int i = 0; i < maxRetries; i++) {
    try {
        callExternalService(record);
        return;
    } catch (IOException e) {
        if (i == maxRetries - 1) {
            throw new RuntimeException("Failed after " + maxRetries + " attempts", e);
        }
        Thread.sleep(1000 * (i + 1)); // エクスポネンシャルバックオフを使用します。
    }
}

4. 完全な例外コンテキストを保持する

e.getMessage() のみをログに記録するだけでは、トラブルシューティングには不十分なことがよくあります。代わりに、次のことを行います。

  • 完全なスタックトレースをログに記録します。

  • 元の入力データを保持します。

  • サイド出力を使用して、ダウンストリームのモニタリングやリプレイのために構造化されたエラーイベントを出力します。

ctx.output(ERROR_TAG, new ErrorRecord(
    originalInput,
    Instant.now(),
    e.getClass().getSimpleName(),
    e.getMessage(),
    ExceptionUtils.stringifyException(e)
));

5. 例外処理パスを検証する

  • 単体テストと統合テストに例外シナリオを含めます。

  • コードレビュー中に、広範な catch (Exception) を使用したり、例外をサイレントに無視したりするロジックを特に精査します。

  • すべての例外処理ブロックについて、「このエラーは本当に回復可能か?」、「回復可能である場合、回復ロジックは状態の整合性に影響を与えるか?」と自問します。

効果的な例外処理は、単にジョブのクラッシュを防ぐだけではありません。適切な例外処理により、エラーの追跡、回復、および状態の整合性の維持が保証されます。常に Flink のフォールトトレランス機能を信頼し、その回復プロセスを妨げないように注意してください。SonarQube ルールなどの静的コード解析ツールを使用して、不適切な例外処理を自動的にフラグ付けすることで、これらのプラクティスを CI/CD パイプラインに統合します。