このトピックでは、Flink CDC データインジェストジョブでダーティデータコレクターを使用する方法について説明します。
機能概要
リアルタイムデータ同期のシナリオでは、フォーマットエラー、エンコーディングの問題、または互換性のないスキーマが原因で、ソースデータの解析に失敗することがあります。このように正しく処理できないデータは、ダーティデータと呼ばれます。
ダーティデータ収集は、Ververica Runtime (VVR) 11.5 以降のデータインジェストでサポートされています。この機能は Kafka データソースで利用できます。コネクタが解析できないデータに遭遇すると、システムは生データと例外情報をコレクターに書き込みます。コネクタの設定項目を使用して、ジョブがエラーを無視し、詳細を記録して実行を継続するように設定できます。
コネクタが解析不能なデータに遭遇すると、システムは自動的に元のメッセージと例外情報をキャッチし、指定されたコレクターに書き込みます。設定ポリシーを使用すると、次のことが可能になります:
少量のダーティデータを許容し、パイプライン全体が中断するのを防ぎます。
後のトラブルシューティングと解決のために、完全なコンテキストを記録します。
しきい値を設定して、例外のオーバーフローを防ぎます。
典型的なシナリオ
シナリオ | 説明 |
ログ収集パイプライン (例:アプリログなどの非構造化データソースから) | データ品質に一貫性がない場合があります。少量のダーティデータをスキップして、メインプロセスの実行を継続させることができます。 |
コア業務テーブルの同期 (例:注文やアカウント変更などの主要システム) | 一貫性の要件が高いです。ダーティデータが発見されたときにアラートをトリガーして、迅速な介入を可能にします。 |
データ探索および調査フェーズ | まずデータセット全体を迅速に処理して全体のデータ分布を把握し、その後ダーティデータを処理します。 |
制限事項と注意事項
この機能を使用する前に、その制限事項と潜在的な脅威を理解する必要があります:
コネクタのサポート:現在、この機能をサポートしているのは Kafka データソースのみです。他のソースへのサポートは順次追加されています。
サポートされるコレクタータイプ:現在、
loggerタイプのみがサポートされています。このタイプはダーティデータをログファイルに書き込みます。
この機能は、デバッグや初期の本番ステージに適しています。大量のダーティデータが継続して発生する場合は、上流システムでデータガバナンスを実施してください。
構文
ダーティデータコレクターの有効化
ダーティデータコレクターは、パイプラインモジュールで定義されます。構文は次のとおりです:
pipeline:
dirty-data.collector:
name: Logger Dirty Data Collector
type: loggerパラメーター | 説明 |
| コレクターの名前です。 |
| コレクターのタイプです。次の値が利用可能です:
|
この設定項目が定義されていない場合、フォールトトレランスが有効になっていてもダーティデータは記録されません。
データソースでのフォールトトレランスポリシーの設定
ダーティデータ収集を設定するだけでは、解析エラーのスキップは有効になりません。この機能は、Kafka のフォールトトレランスポリシーと組み合わせて使用する必要があります。詳細については、「Kafka コネクタドキュメント」をご参照ください。次の例は、設定のサンプルを示しています:
source:
type: kafka
# Skip the first 100 parsing exceptions. If the number of exceptions exceeds 100, the job fails.
ingestion.ignore-errors: true
ingestion.error-tolerance.max-count: 100パラメーター | デフォルト | 説明 |
|
| 解析エラーを無視するかどうかを指定します。
|
|
| 許容されるダーティデータレコードの最大数です。
|
Logger ダーティデータコレクター
Logger ダーティデータコレクターは、ダーティデータを別のログファイルに保存します。ダーティデータのログファイルを表示するには、次の手順を実行します:
[ジョブの運用保守] ページに移動し、[ジョブログ] タブをクリックします。
[運用ログ] をクリックし、[実行中の TaskManager] サブタブに移動して、オペレーター用のテクノロジーマネージャー (TM) ノードを選択します。
[ログリスト] をクリックし、リストから
yaml-dirty-data.outログファイルを選択して、ダーティデータレコードをクエリして保存します。
現在、ダーティデータに対して次のメタデータが記録されます:
ダーティデータが処理されたときのタイムスタンプ
ダーティデータレコードを報告したオペレーターとサブタスクインデックス
生のダーティデータの内容
処理失敗の原因となった例外情報
ダーティデータレコードのフォーマット例
各レコードには、次のメタデータが含まれています:
text[2025-04-05 10:23:45] [Operator: SourceKafka -> Subtask: 2]
Raw Data: {"id": "abc", "ts": "invalid-timestamp"}
Exception: java.time.format.DateTimeParseException: Text 'invalid-timestamp' could not be parsed at index 0
---フィールド | 説明 |
タイムスタンプ | ダーティデータがキャッチされた時間。 |
オペレーター & サブタスク | エラーが発生した特定のオペレーターと並列インスタンス番号。 |
生データ | 未解析の生メッセージの内容 (Base64 または文字列形式)。 |
例外 | 解析失敗の例外タイプとスタックの概要。 |
よくある質問
ダーティデータはチェックポイントに影響しますか?
いいえ、影響しません。ダーティデータは状態が更新される前にインターセプトされるため、チェックポイントの成功には影響しません。
この機能と Flink SQL の副次出力ストリームの違いは何ですか?
ダーティデータコレクター:逆シリアル化または解析に失敗したデータを処理します。
副次出力:解析はできるが、ビジネスルールに準拠していないデータを処理します。