すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:ダーティデータの収集

最終更新日:Jun 19, 2026

このトピックでは、Flink CDC データインジェストジョブでダーティデータコレクターを使用する方法について説明します。

概要

リアルタイムデータ同期において、ソースからのデータは、形式の誤り、エンコードエラー、スキーマの非互換性などの問題により、解析に失敗する場合があります。このような処理不可能なデータは、ダーティデータと呼ばれます。

VVR バージョン 11.5 以降、データインジェストは Kafka データソースでのダーティデータ収集をサポートしています。コネクターの設定オプションを使用すると、エラーを無視し、詳細をログに記録して、ジョブの実行を継続するように設定できます。

コネクターが解析できないデータを検出すると、システムは自動的に生データと例外情報を取得し、指定されたコレクターに書き込みます。これにより、以下のことが可能になります。

  • 少量のダーティデータを許容し、パイプライン全体が中断されるのを防ぎます。

  • 後のトラブルシューティングと分析のために、完全なコンテキストをログに記録します。

  • しきい値を設定し、過剰な数のエラーを防ぎます。

代表的なユースケース

ユースケース

目的

ログ収集パイプライン

(アプリケーションログなどの非構造化データソース向け)

データ品質に一貫性がないため、少数の不良レコードをスキップして、メインプロセスの実行を継続します。

コアビジネステーブル同期

(注文やアカウント変更などの重要なシステム向け)

高い整合性が求められます。エラー発生時には、手動で介入できるよう即座にアラートがトリガーされます。

データ探索および分析フェーズ

ダーティデータの問題に対処する前に、データセット全体を迅速に処理して全体的な構造を把握します。

制限事項と考慮事項

この機能を使用する前に、制限事項と潜在的なリスクを理解してください。

  • サポート対象のコネクター:この機能は現在、Kafka データソースでのみ利用可能です。他のソースへの対応は、今後のリリースで追加される予定です。

  • サポート対象のコレクタータイプ:現在、logger タイプのみがサポートされており、ダーティデータをログファイルに書き込みます。

説明

この機能は、デバッグおよび本番環境の初期段階に適しています。大量のダーティデータが継続的に発生する場合は、アップストリームシステムでデータガバナンス対策を実施することを推奨します。

構文

ダーティデータコレクターの有効化

ダーティデータコレクターは pipeline モジュールで定義します。構文は以下のとおりです。

pipeline:
  dirty-data.collector:
    name: Logger Dirty Data Collector
    type: logger

パラメーター

説明

name

コレクター名です。Kafka-DQ-Collector など、わかりやすい名前を使用することを推奨します。

type

コレクタータイプです。有効な値:

  • logger:ダーティデータをログファイルに書き込みます。

説明

このオプションを定義しない場合、エラー耐性が有効になっていても、システムはダーティデータを記録しません。

データソースでのエラー耐性の設定

ダーティデータコレクターを設定するだけでは、解析エラーをスキップできません。この機能は、Kafka エラー耐性ポリシーと組み合わせて使用する必要があります。詳細については、Kafka コネクタードキュメントをご参照ください。以下は一般的な設定例です。

source:
  type: kafka
  # 最初の 100 件の解析エラーをスキップします。100 件を超えるとジョブは失敗します。
  ingestion.ignore-errors: true
  ingestion.error-tolerance.max-count: 100

パラメーター

デフォルト

説明

ingestion.ignore-errors

false

解析エラーを無視するかどうかを指定します。

true に設定すると、ジョブは失敗したレコードをスキップします。false に設定すると、ジョブは即座に失敗します。

ingestion.error-tolerance.max-count

-1 (無制限)

許容するダーティデータレコードの最大数です。

ingestion.ignore-errorstrue の場合、収集したダーティデータレコード数がこの値を超えると、ジョブはフェールオーバーをトリガーして停止します。

ロガーダーティデータコレクター

ロガーダーティデータコレクターは、ダーティデータを個別のログファイルに保存します。ダーティデータログを表示するには、次の手順に従ってください。

  1. [運用保守] ページに移動し、[ジョブログ] タブをクリックします。

  2. [実行ログ] をクリックし、[実行中の TaskManager] サブタブを選択してから、該当するオペレーターの TaskManager ノードを選択します。

  3. [ログリスト] をクリックし、リスト内の yaml-dirty-data.out という名前のログファイルをクリックして、ダーティデータレコードを表示または保存します。

各ダーティデータレコードには、以下のメタデータが含まれます。

  • ダーティデータが処理されたタイムスタンプ

  • ダーティデータレコードを出力したオペレーターとサブタスクインデックス

  • 生データの内容

  • 処理失敗の原因となった例外情報

ダーティデータレコードの形式

各レコードには、以下のメタデータが含まれます。

text[2025-04-05 10:23:45] [Operator: SourceKafka -> Subtask: 2]
Raw Data: {"id": "abc", "ts": "invalid-timestamp"}
Exception: java.time.format.DateTimeParseException: Text 'invalid-timestamp' could not be parsed at index 0
---

フィールド

説明

タイムスタンプ

ダーティデータが取得された時刻です。

オペレーターとサブタスク

エラーが発生した特定のオペレーターとその並列サブタスクインデックスです。

生データ

解析されていない生データ (Base64 または文字列形式) です。

例外

解析失敗の例外タイプとスタックトレースの概要です。

よくある質問

ダーティデータはチェックポイントに影響しますか?

いいえ。ダーティデータは状態更新に含まれる前にインターセプトされるため、チェックポイントの失敗を引き起こすことはありません。

ダーティデータ収集とサイドアウトプット

  • ダーティデータコレクター:デシリアライズまたは解析できないデータを処理します。

  • サイドアウトプット:解析は可能でもビジネスロジック要件を満たさないデータを処理します。