すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:ダーティデータ収集

最終更新日:Mar 10, 2026

このトピックでは、Flink CDC データインジェストジョブでダーティデータコレクターを使用する方法について説明します。

機能概要

リアルタイムデータ同期のシナリオでは、フォーマットエラー、エンコーディングの問題、または互換性のないスキーマが原因で、ソースデータの解析に失敗することがあります。このように正しく処理できないデータは、ダーティデータと呼ばれます。

ダーティデータ収集は、Ververica Runtime (VVR) 11.5 以降のデータインジェストでサポートされています。この機能は Kafka データソースで利用できます。コネクタが解析できないデータに遭遇すると、システムは生データと例外情報をコレクターに書き込みます。コネクタの設定項目を使用して、ジョブがエラーを無視し、詳細を記録して実行を継続するように設定できます。

コネクタが解析不能なデータに遭遇すると、システムは自動的に元のメッセージと例外情報をキャッチし、指定されたコレクターに書き込みます。設定ポリシーを使用すると、次のことが可能になります:

  • 少量のダーティデータを許容し、パイプライン全体が中断するのを防ぎます。

  • 後のトラブルシューティングと解決のために、完全なコンテキストを記録します。

  • しきい値を設定して、例外のオーバーフローを防ぎます。

典型的なシナリオ

シナリオ

説明

ログ収集パイプライン

(例:アプリログなどの非構造化データソースから)

データ品質に一貫性がない場合があります。少量のダーティデータをスキップして、メインプロセスの実行を継続させることができます。

コア業務テーブルの同期

(例:注文やアカウント変更などの主要システム)

一貫性の要件が高いです。ダーティデータが発見されたときにアラートをトリガーして、迅速な介入を可能にします。

データ探索および調査フェーズ

まずデータセット全体を迅速に処理して全体のデータ分布を把握し、その後ダーティデータを処理します。

制限事項と注意事項

この機能を使用する前に、その制限事項と潜在的な脅威を理解する必要があります:

  • コネクタのサポート:現在、この機能をサポートしているのは Kafka データソースのみです。他のソースへのサポートは順次追加されています。

  • サポートされるコレクタータイプ:現在、logger タイプのみがサポートされています。このタイプはダーティデータをログファイルに書き込みます。

説明

この機能は、デバッグや初期の本番ステージに適しています。大量のダーティデータが継続して発生する場合は、上流システムでデータガバナンスを実施してください。

構文

ダーティデータコレクターの有効化

ダーティデータコレクターは、パイプラインモジュールで定義されます。構文は次のとおりです:

pipeline:
  dirty-data.collector:
    name: Logger Dirty Data Collector
    type: logger

パラメーター

説明

name

コレクターの名前です。Kafka-DQ-Collector のように、意味のある名前を付けます。

type

コレクターのタイプです。次の値が利用可能です:

  • logger:ダーティデータをログファイルに書き込みます。

説明

この設定項目が定義されていない場合、フォールトトレランスが有効になっていてもダーティデータは記録されません。

データソースでのフォールトトレランスポリシーの設定

ダーティデータ収集を設定するだけでは、解析エラーのスキップは有効になりません。この機能は、Kafka のフォールトトレランスポリシーと組み合わせて使用する必要があります。詳細については、「Kafka コネクタドキュメント」をご参照ください。次の例は、設定のサンプルを示しています:

source:
  type: kafka
  # Skip the first 100 parsing exceptions. If the number of exceptions exceeds 100, the job fails.
  ingestion.ignore-errors: true
  ingestion.error-tolerance.max-count: 100

パラメーター

デフォルト

説明

ingestion.ignore-errors

false

解析エラーを無視するかどうかを指定します。

true に設定すると、データ処理はスキップされます。false に設定すると、ジョブは直ちに失敗します。

ingestion.error-tolerance.max-count

-1 (無制限)

許容されるダーティデータレコードの最大数です。

ingestion.ignore-errorstrue に設定されていて、収集されたダーティデータレコードの数がこの値を超えると、フェールオーバーがトリガーされてジョブが終了します。

Logger ダーティデータコレクター

Logger ダーティデータコレクターは、ダーティデータを別のログファイルに保存します。ダーティデータのログファイルを表示するには、次の手順を実行します:

  1. [ジョブの運用保守] ページに移動し、[ジョブログ] タブをクリックします。

  2. [運用ログ] をクリックし、[実行中の TaskManager] サブタブに移動して、オペレーター用のテクノロジーマネージャー (TM) ノードを選択します。

  3. [ログリスト] をクリックし、リストから yaml-dirty-data.out ログファイルを選択して、ダーティデータレコードをクエリして保存します。

現在、ダーティデータに対して次のメタデータが記録されます:

  • ダーティデータが処理されたときのタイムスタンプ

  • ダーティデータレコードを報告したオペレーターとサブタスクインデックス

  • 生のダーティデータの内容

  • 処理失敗の原因となった例外情報

ダーティデータレコードのフォーマット例

各レコードには、次のメタデータが含まれています:

text[2025-04-05 10:23:45] [Operator: SourceKafka -> Subtask: 2]
Raw Data: {"id": "abc", "ts": "invalid-timestamp"}
Exception: java.time.format.DateTimeParseException: Text 'invalid-timestamp' could not be parsed at index 0
---

フィールド

説明

タイムスタンプ

ダーティデータがキャッチされた時間。

オペレーター & サブタスク

エラーが発生した特定のオペレーターと並列インスタンス番号。

生データ

未解析の生メッセージの内容 (Base64 または文字列形式)。

例外

解析失敗の例外タイプとスタックの概要。

よくある質問

ダーティデータはチェックポイントに影響しますか?

いいえ、影響しません。ダーティデータは状態が更新される前にインターセプトされるため、チェックポイントの成功には影響しません。

この機能と Flink SQL の副次出力ストリームの違いは何ですか?

  • ダーティデータコレクター:逆シリアル化または解析に失敗したデータを処理します。

  • 副次出力:解析はできるが、ビジネスルールに準拠していないデータを処理します。