このトピックでは、Flink CDC データインジェストジョブでダーティデータコレクターを使用する方法について説明します。
概要
リアルタイムデータ同期において、ソースからのデータは、形式の誤り、エンコードエラー、スキーマの非互換性などの問題により、解析に失敗する場合があります。このような処理不可能なデータは、ダーティデータと呼ばれます。
VVR バージョン 11.5 以降、データインジェストは Kafka データソースでのダーティデータ収集をサポートしています。コネクターの設定オプションを使用すると、エラーを無視し、詳細をログに記録して、ジョブの実行を継続するように設定できます。
コネクターが解析できないデータを検出すると、システムは自動的に生データと例外情報を取得し、指定されたコレクターに書き込みます。これにより、以下のことが可能になります。
-
少量のダーティデータを許容し、パイプライン全体が中断されるのを防ぎます。
-
後のトラブルシューティングと分析のために、完全なコンテキストをログに記録します。
-
しきい値を設定し、過剰な数のエラーを防ぎます。
代表的なユースケース
|
ユースケース |
目的 |
|
ログ収集パイプライン (アプリケーションログなどの非構造化データソース向け) |
データ品質に一貫性がないため、少数の不良レコードをスキップして、メインプロセスの実行を継続します。 |
|
コアビジネステーブル同期 (注文やアカウント変更などの重要なシステム向け) |
高い整合性が求められます。エラー発生時には、手動で介入できるよう即座にアラートがトリガーされます。 |
|
データ探索および分析フェーズ |
ダーティデータの問題に対処する前に、データセット全体を迅速に処理して全体的な構造を把握します。 |
制限事項と考慮事項
この機能を使用する前に、制限事項と潜在的なリスクを理解してください。
-
サポート対象のコネクター:この機能は現在、Kafka データソースでのみ利用可能です。他のソースへの対応は、今後のリリースで追加される予定です。
-
サポート対象のコレクタータイプ:現在、
loggerタイプのみがサポートされており、ダーティデータをログファイルに書き込みます。
この機能は、デバッグおよび本番環境の初期段階に適しています。大量のダーティデータが継続的に発生する場合は、アップストリームシステムでデータガバナンス対策を実施することを推奨します。
構文
ダーティデータコレクターの有効化
ダーティデータコレクターは pipeline モジュールで定義します。構文は以下のとおりです。
pipeline:
dirty-data.collector:
name: Logger Dirty Data Collector
type: logger
|
パラメーター |
説明 |
|
|
コレクター名です。 |
|
|
コレクタータイプです。有効な値:
|
このオプションを定義しない場合、エラー耐性が有効になっていても、システムはダーティデータを記録しません。
データソースでのエラー耐性の設定
ダーティデータコレクターを設定するだけでは、解析エラーをスキップできません。この機能は、Kafka エラー耐性ポリシーと組み合わせて使用する必要があります。詳細については、Kafka コネクタードキュメントをご参照ください。以下は一般的な設定例です。
source:
type: kafka
# 最初の 100 件の解析エラーをスキップします。100 件を超えるとジョブは失敗します。
ingestion.ignore-errors: true
ingestion.error-tolerance.max-count: 100
|
パラメーター |
デフォルト |
説明 |
|
|
|
解析エラーを無視するかどうかを指定します。
|
|
|
|
許容するダーティデータレコードの最大数です。
|
ロガーダーティデータコレクター
ロガーダーティデータコレクターは、ダーティデータを個別のログファイルに保存します。ダーティデータログを表示するには、次の手順に従ってください。
-
[運用保守] ページに移動し、[ジョブログ] タブをクリックします。
-
[実行ログ] をクリックし、[実行中の TaskManager] サブタブを選択してから、該当するオペレーターの TaskManager ノードを選択します。
-
[ログリスト] をクリックし、リスト内の
yaml-dirty-data.outという名前のログファイルをクリックして、ダーティデータレコードを表示または保存します。
各ダーティデータレコードには、以下のメタデータが含まれます。
-
ダーティデータが処理されたタイムスタンプ
-
ダーティデータレコードを出力したオペレーターとサブタスクインデックス
-
生データの内容
-
処理失敗の原因となった例外情報
ダーティデータレコードの形式
各レコードには、以下のメタデータが含まれます。
text[2025-04-05 10:23:45] [Operator: SourceKafka -> Subtask: 2]
Raw Data: {"id": "abc", "ts": "invalid-timestamp"}
Exception: java.time.format.DateTimeParseException: Text 'invalid-timestamp' could not be parsed at index 0
---
|
フィールド |
説明 |
|
タイムスタンプ |
ダーティデータが取得された時刻です。 |
|
オペレーターとサブタスク |
エラーが発生した特定のオペレーターとその並列サブタスクインデックスです。 |
|
生データ |
解析されていない生データ (Base64 または文字列形式) です。 |
|
例外 |
解析失敗の例外タイプとスタックトレースの概要です。 |
よくある質問
ダーティデータはチェックポイントに影響しますか?
いいえ。ダーティデータは状態更新に含まれる前にインターセプトされるため、チェックポイントの失敗を引き起こすことはありません。
ダーティデータ収集とサイドアウトプット
-
ダーティデータコレクター:デシリアライズまたは解析できないデータを処理します。
-
サイドアウトプット:解析は可能でもビジネスロジック要件を満たさないデータを処理します。