このトピックでは、Flink CDC データインジェストジョブでダーティデータコレクターを使用する方法について説明します。
機能概要
リアルタイムのデータ同期シナリオでは、フォーマットエラー、エンコーディングの問題、または互換性のないスキーマが原因で、ソースデータの解析に失敗することがあります。このように処理できないデータをダーティデータと呼びます。
データインジェスト機能は、Ververica Runtime (VVR) 11.5 以降、ダーティデータの収集をサポートしています。この機能は Kafka データソースで利用できます。コネクタが解析できないデータに遭遇すると、システムは生データと例外情報をコレクターに書き込みます。コネクタの設定項目を使用して、ジョブがエラーを無視し、詳細を記録して実行を継続するように設定できます。
コネクタが解析できないデータに遭遇すると、システムは自動的に生のメッセージと例外情報をキャッチし、指定されたコレクターに書き込みます。設定ポリシーを使用すると、次のことが可能になります:
少量のダーティデータを許容し、パイプライン全体が中断するのを防ぎます。
トラブルシューティングと解決を容易にするために、完全なコンテキストを記録します。
しきい値を設定して、例外のオーバーフローを防ぎます。
典型的な利用シーン
シナリオ | 目的 |
ログ収集パイプライン (例:アプリログなどのソースからの非構造化データ) | 少量の不正なデータをスキップしてデータ品質の不一致を処理し、メインプロセスが継続して実行されるようにします。 |
コアビジネステーブルの同期 (例:注文やアカウント変更などのキーシステム) | データの一貫性要件は高いです。目標は、発見後すぐにアラートをトリガーして、迅速な介入を可能にすることです。 |
データ探索および調査フェーズ | すべてのデータを迅速に処理して全体的な分布を理解し、後でダーティデータを処理します。 |
制限事項と注意事項
この機能を使用する前に、その機能と潜在的なリスクを理解してください:
コネクタのサポート:現在、この機能をサポートしているのは Kafka データソースのみです。他のデータソースのサポートは順次追加されています。
サポートされているコレクタータイプ:現在、
loggerタイプのみがサポートされています。このタイプはダーティデータをログファイルに書き込みます。
この機能は、デバッグや初期の本番ステージに適しています。大量のダーティデータが継続して発生する場合は、上流システムでデータガバナンスを実施してください。
構文構造
ダーティデータコレクターの有効化
ダーティデータコレクターは Pipeline モジュールで定義されます。構文は次のとおりです:
pipeline:
dirty-data.collector:
name: Logger Dirty Data Collector
type: loggerパラメーター | 説明 |
| コレクターの名前。 |
| コレクターのタイプ。次の値が利用可能です:
|
この設定項目を定義しない場合、フォールトトレランスを有効にしてもダーティデータは記録されません。
データソースでのフォールトトレランスポリシーの設定
ダーティデータコレクターを有効にしても、解析エラーが自動的にスキップされるわけではありません。エラーをスキップするには、この機能を Kafka のフォールトトレランスポリシーと併用する必要があります。詳細については、Kafka コネクタドキュメントをご参照ください。次の例は設定を示しています:
yamlsource:
type: kafka
# 最初の 100 件の解析例外をスキップします。例外の数が 100 を超えると、ジョブは失敗します。
ingestion.ignore-errors: true
ingestion.error-tolerance.max-count: 100パラメーター | デフォルト値 | 説明 |
|
| 解析エラーを無視するかどうかを指定します。 このパラメーターを |
|
| 許容するダーティデータレコードの最大数。
|
Logger ダーティデータコレクター
Logger ダーティデータコレクターは、ダーティデータを別のログファイルに保存します。ダーティデータログファイルを表示するには、次の手順に従います:
[ジョブの O&M] ページに移動し、[ジョブログ] タブをクリックします。
[運用ログ] をクリックし、[実行中のタスクマネージャー] サブタブをクリックして、対応するオペレーターのテクノロジーマネージャー (TM) ノードを選択します。
[ログリスト] をクリックし、リスト内の
yaml-dirty-data.outという名前のログファイルをクリックして、収集されたダーティデータレコードをクエリおよび保存します。
ダーティデータには、次のメタデータが記録されます:
ダーティデータが処理されたときのタイムスタンプ
ダーティデータレコードを生成したオペレーターとサブタスクインデックス
生のダーティデータの内容
処理の失敗を引き起こした例外情報
ダーティデータレコードのフォーマット例
各レコードには、次のメタデータが含まれています:
text[2025-04-05 10:23:45] [Operator: SourceKafka -> Subtask: 2]
Raw Data: {"id": "abc", "ts": "invalid-timestamp"}
Exception: java.time.format.DateTimeParseException: Text 'invalid-timestamp' could not be parsed at index 0
---フィールド | 説明 |
タイムスタンプ | ダーティデータがキャッチされた時間。 |
オペレーター & サブタスク | エラーを引き起こした特定のオペレーターと並列インスタンス番号。 |
生データ | 未解析の生メッセージの内容 (Base64 または文字列形式)。 |
例外 | 解析失敗の例外タイプとスタックの概要。 |
よくある質問
ダーティデータはチェックポイントに影響しますか?
いいえ、影響しません。ダーティデータは状態が更新される前にインターセプトされるため、チェックポイントの成功には影響しません。
この機能と Flink SQL のサイド出力ストリームの違いは何ですか?
ダーティデータコレクター:逆シリアル化または解析に失敗したデータを処理します。
サイド出力:解析は可能ですが、ビジネスルールを満たさないデータを処理します。