すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:ダーティデータの収集

最終更新日:Jan 10, 2026

このトピックでは、Flink CDC データインジェストジョブでダーティデータコレクターを使用する方法について説明します。

機能概要

リアルタイムのデータ同期シナリオでは、フォーマットエラー、エンコーディングの問題、または互換性のないスキーマが原因で、ソースデータの解析に失敗することがあります。このように処理できないデータをダーティデータと呼びます。

データインジェスト機能は、Ververica Runtime (VVR) 11.5 以降、ダーティデータの収集をサポートしています。この機能は Kafka データソースで利用できます。コネクタが解析できないデータに遭遇すると、システムは生データと例外情報をコレクターに書き込みます。コネクタの設定項目を使用して、ジョブがエラーを無視し、詳細を記録して実行を継続するように設定できます。

コネクタが解析できないデータに遭遇すると、システムは自動的に生のメッセージと例外情報をキャッチし、指定されたコレクターに書き込みます。設定ポリシーを使用すると、次のことが可能になります:

  • 少量のダーティデータを許容し、パイプライン全体が中断するのを防ぎます。

  • トラブルシューティングと解決を容易にするために、完全なコンテキストを記録します。

  • しきい値を設定して、例外のオーバーフローを防ぎます。

典型的な利用シーン

シナリオ

目的

ログ収集パイプライン

(例:アプリログなどのソースからの非構造化データ)

少量の不正なデータをスキップしてデータ品質の不一致を処理し、メインプロセスが継続して実行されるようにします。

コアビジネステーブルの同期

(例:注文やアカウント変更などのキーシステム)

データの一貫性要件は高いです。目標は、発見後すぐにアラートをトリガーして、迅速な介入を可能にすることです。

データ探索および調査フェーズ

すべてのデータを迅速に処理して全体的な分布を理解し、後でダーティデータを処理します。

制限事項と注意事項

この機能を使用する前に、その機能と潜在的なリスクを理解してください:

  • コネクタのサポート:現在、この機能をサポートしているのは Kafka データソースのみです。他のデータソースのサポートは順次追加されています。

  • サポートされているコレクタータイプ:現在、logger タイプのみがサポートされています。このタイプはダーティデータをログファイルに書き込みます。

説明

この機能は、デバッグや初期の本番ステージに適しています。大量のダーティデータが継続して発生する場合は、上流システムでデータガバナンスを実施してください。

構文構造

ダーティデータコレクターの有効化

ダーティデータコレクターは Pipeline モジュールで定義されます。構文は次のとおりです:

pipeline:
  dirty-data.collector:
    name: Logger Dirty Data Collector
    type: logger

パラメーター

説明

name

コレクターの名前。Kafka-DQ-Collector のように、意味のある名前を使用してください。

type

コレクターのタイプ。次の値が利用可能です:

  • logger:ダーティデータをログファイルに書き込みます。

説明

この設定項目を定義しない場合、フォールトトレランスを有効にしてもダーティデータは記録されません。

データソースでのフォールトトレランスポリシーの設定

ダーティデータコレクターを有効にしても、解析エラーが自動的にスキップされるわけではありません。エラーをスキップするには、この機能を Kafka のフォールトトレランスポリシーと併用する必要があります。詳細については、Kafka コネクタドキュメントをご参照ください。次の例は設定を示しています:

yamlsource:
  type: kafka
  # 最初の 100 件の解析例外をスキップします。例外の数が 100 を超えると、ジョブは失敗します。
  ingestion.ignore-errors: true
  ingestion.error-tolerance.max-count: 100

パラメーター

デフォルト値

説明

ingestion.ignore-errors

false

解析エラーを無視するかどうかを指定します。

このパラメーターを true に設定すると、データ処理はスキップされます。このパラメーターを false に設定すると、ジョブはすぐに失敗します。

ingestion.error-tolerance.max-count

-1 (無制限)

許容するダーティデータレコードの最大数。

ingestion.ignore-errorstrue に設定されていて、収集されたダーティデータレコードの数がこの値を超えると、フェールオーバーがトリガーされてジョブが終了します。

Logger ダーティデータコレクター

Logger ダーティデータコレクターは、ダーティデータを別のログファイルに保存します。ダーティデータログファイルを表示するには、次の手順に従います:

  1. [ジョブの O&M] ページに移動し、[ジョブログ] タブをクリックします。

  2. [運用ログ] をクリックし、[実行中のタスクマネージャー] サブタブをクリックして、対応するオペレーターのテクノロジーマネージャー (TM) ノードを選択します。

  3. [ログリスト] をクリックし、リスト内の yaml-dirty-data.out という名前のログファイルをクリックして、収集されたダーティデータレコードをクエリおよび保存します。

ダーティデータには、次のメタデータが記録されます:

  • ダーティデータが処理されたときのタイムスタンプ

  • ダーティデータレコードを生成したオペレーターとサブタスクインデックス

  • 生のダーティデータの内容

  • 処理の失敗を引き起こした例外情報

ダーティデータレコードのフォーマット例

各レコードには、次のメタデータが含まれています:

text[2025-04-05 10:23:45] [Operator: SourceKafka -> Subtask: 2]
Raw Data: {"id": "abc", "ts": "invalid-timestamp"}
Exception: java.time.format.DateTimeParseException: Text 'invalid-timestamp' could not be parsed at index 0
---

フィールド

説明

タイムスタンプ

ダーティデータがキャッチされた時間。

オペレーター & サブタスク

エラーを引き起こした特定のオペレーターと並列インスタンス番号。

生データ

未解析の生メッセージの内容 (Base64 または文字列形式)。

例外

解析失敗の例外タイプとスタックの概要。

よくある質問

ダーティデータはチェックポイントに影響しますか?

いいえ、影響しません。ダーティデータは状態が更新される前にインターセプトされるため、チェックポイントの成功には影響しません。

この機能と Flink SQL のサイド出力ストリームの違いは何ですか?

  • ダーティデータコレクター:逆シリアル化または解析に失敗したデータを処理します。

  • サイド出力:解析は可能ですが、ビジネスルールを満たさないデータを処理します。