Flink SQL をリアルタイムデータ処理に使用する場合、順序が正しくない変更ログイベントがデータの精度に影響を与える可能性があります。このトピックでは、Flink SQL での変更ログメカニズムの仕組み、順序が正しくない変更ログイベントの原因、およびこれらのイベントの処理方法について説明します。
Flink SQL での変更ログ
背景情報
MySQL などのリレーショナルデータベースのバイナリログ(binlog)には、INSERT、UPDATE、DELETE 操作など、データベース内のすべての変更操作が記録されます。同様に、Flink SQL の変更ログには、増分データ処理を容易にするために、すべてのデータ変更が記録されます。
MySQL では、binlog を使用してデータのバックアップ、リカバリ、同期、およびレプリケーションを実行できます。たとえば、binlog の操作レコードを読み取って解析し、増分データの同期とレプリケーションを実行できます。Change Data Capture(CDC)は、データ同期によく使用されるテクノロジーです。CDC ツールは、データベースのデータ変更を監視し、変更をイベントストリームに変換して、リアルタイム処理を容易にします。CDC ツールを使用して、リレーショナルデータベースのデータ変更をデータウェアハウスまたは他のシステムに送信し、リアルタイム分析とレポート作成を行うことができます。一般的な CDC ツールには、Debezium と Maxwell があります。Apache Flink は、FLINK-15331 リクエストに対応するために CDC サポートを追加しました。これにより、外部システムからの CDC データを統合して、リアルタイムデータの同期と分析を実装できます。
変更ログイベントの生成と処理
このトピックの「背景情報」セクションで説明したように、変更ログイベントは、binlog ファイルや CDC ツールなどの外部ソースによって生成できます。変更ログイベントは、Flink SQL の内部操作によっても生成できます。INSERT イベントのみを含む変更ログストリームは、追加ストリームまたは非更新ストリームと呼ばれます。UPDATE イベントなどの他のタイプのイベントを含む変更ログストリームは、更新ストリームと呼ばれます。グループ集計や重複除去など、Flink の一部の操作では、UPDATE イベントが生成される可能性があります。ほとんどの場合、UPDATE イベントを生成する演算子は状態を維持し、ステートフル演算子と呼ばれます。すべてのステートフル演算子が更新ストリームを使用できるわけではないことに注意してください。たとえば、over 集計とインターバル結合の演算子は、入力として更新ストリームをサポートしていません。
Ververica Runtime(VVR) 6.0 以降を使用する Realtime Compute for Apache Flink でサポートされているクエリ操作、対応するランタイム演算子、および更新ストリームの使用と生成のサポートについては、クエリの実行 をご参照ください。
変更ログイベントのタイプ
Apache Flink は、ストリーミング SQL 演算子の増分更新アルゴリズムを実装するために FLINK-6047 リクエストで提案された取り消しメカニズムを導入しました。このメカニズムでは、イベントは INSERT または DELETE に分類されます。この分類は、INSERT イベントのみをサポートするデータソースに適用されます。その後、Apache Flink は、FLINK-16987 リクエストに対応するために変更ログイベントシステムをリファクタリングしました。リファクタリングされたシステムでは、CDC エコシステムとの統合を容易にするために、次の変更ログイベントタイプを使用します。
/**
* 変更ログの行の種類。
*/
@PublicEvolving
public enum RowKind {
/**
* 挿入操作。
*/
INSERT,
/**
* 更新された行の以前の内容。
*/
UPDATE_BEFORE,
/**
* 更新された行の新しい内容。
*/
UPDATE_AFTER,
/**
* 削除操作。
*/
DELETE
}上記のコードに示すように、UPDATE_BEFORE と UPDATE_AFTER は別々のイベントタイプです。Apache Flink は、次の理由により、これらのイベントタイプを複合 UPDATE イベントタイプに結合しません。
UPDATE_BEFORE タイプと UPDATE_AFTER タイプのイベントは同じ構造です。唯一の違いは RowKind プロパティです。これにより、シリアル化が容易になります。対照的に、複合 UPDATE イベントタイプを使用する場合、イベントは異種であるか、INSERT イベントと DELETE イベントが UPDATE イベントと連携されます。たとえば、INSERT イベントには UPDATE_AFTER イベントのみが含まれ、DELETE イベントには UPDATE_BEFORE イベントのみが含まれます。
結合や集計などのデータシャッフル操作は、分散環境で頻繁に発生します。これには、次の 例 に示すように、特定のシナリオで複合 UPDATE イベントを個別の DELETE イベントと INSERT イベントに分割する必要があります。
例
このサンプルシナリオでは、複合 UPDATE イベントが DELETE イベントと INSERT イベントに分割されます。次の SQL コードは、このトピックの以降のセクションで説明する、順序が正しくない変更ログイベントの問題と対応するソリューションを示すためにも使用されます。
-- CDC ソーステーブル: s1 & s2
CREATE TEMPORARY TABLE s1 (
id BIGINT,
level BIGINT,
PRIMARY KEY(id) NOT ENFORCED
)WITH (...);
CREATE TEMPORARY TABLE s2 (
id BIGINT,
attr VARCHAR,
PRIMARY KEY(id) NOT ENFORCED
)WITH (...);
-- シンクテーブル: t1
CREATE TEMPORARY TABLE t1 (
id BIGINT,
level BIGINT,
attr VARCHAR,
PRIMARY KEY(id) NOT ENFORCED
)WITH (...);
-- s1 と s2 を結合し、結果を t1 に挿入します
INSERT INTO t1
SELECT
s1.*, s2.attr
FROM s1 JOIN s2
ON s1.level = s2.id;テーブル s1 のチェンジログストリームで、(id=1, level=10) レコードが時刻 t0 に挿入され、時刻 t1 に (id=1, level=20) に更新されると、3 つの個別のイベントが生成されます。次の表に、これらのイベントを示します。
s1 | イベントタイプ |
+I (id=1, level=10) | INSERT |
-U (id=1, level=10) | UPDATE_BEFORE |
+U (id=1, level=20) | UPDATE_AFTER |
テーブル s1 のプライマリキーは id です。ただし、結合操作では、ON 句で指定されているように、level 列に基づいてデータをシャッフルする必要があります。

結合演算子の並列度が 2 の場合、上記の 3 つのイベントが 2 つのタスクに送信される可能性があります。この場合、複合 UPDATE イベントタイプを使用しても、並列処理を確実にするために、シャッフル中にイベントを分割する必要があります。

順序が正しくない変更ログイベント
原因
このセクションでは、上記の 例 を使用し、次の条件を追加します。結合演算子は、テーブル s2 から +I (id=10, attr='a1') イベントと +I (id=20, attr='b1') イベントを受信し、次にテーブル s1 から 3 つの変更ログイベントを受信します。分散環境では、結合操作が 2 つのタスクによって並列で実行されるため、上記の変更ログイベントがダウンストリームのシンク演算子に到着する順序は異なる場合があります。次の表に、考えられるイベントの順序を示します。

ケース 1 | ケース 2 | ケース 3 |
+I (id=1, level=10, attr='a1') -U (id=1, level=10, attr='a1') +U (id=1, level=20, attr='b1') | +U (id=1, level=20, attr='b1') +I (id=1, level=10, attr='a1') -U (id=1, level=10, attr='a1') | +I (id=1, level=10, attr='a1') +U (id=1, level=20, attr='b1') -U (id=1, level=10, attr='a1') |
ケース 1 のイベントの順序は、シーケンシャル処理の場合と同じです。ケース 2 とケース 3 では、チェンジログイベントがダウンストリームオペレーターに順不同で到着するため、Flink SQL で誤った結果が生じる可能性があります。この例では、結果テーブルのプライマリキーは id です。外部ストレージがケース 2 とケース 3 で upsert 操作を実行すると、id が 1 のレコードが外部ストレージから誤って削除されます。しかし、期待される結果は、レコード (id=1, level=20, attr='b1') が存在することです。
解決策としての SinkUpsertMaterializer の使用
この 例 では、結合演算子は、出力が INSERT イベント(+I)と UPDATE イベント(-U および +U)を含むため、更新ストリームを生成します。順序が正しくない変更ログイベントが適切に処理されない場合、最終結果が正しくない可能性があります。
一意キーと upsert キー
一意キーとは、SQL 操作後に UNIQUE 制約を満たす 1 つ以上の列を指します。この例では、(s1.id)、(s1.id, s1.level)、および (s1.id, s2.id) はすべて一意キーです。
Flink SQL の変更ログメカニズムは binlog メカニズムに似ていますが、実装が簡素化されています。各更新のタイムスタンプを記録する代わりに、Flink は SQL プランナーのグローバル分析に基づいてプライマリキーの更新イベントの順序を決定します。一意キーのソートを維持する列は、upsert キーと呼ばれます。upsert キーが存在する場合、ダウンストリーム演算子は更新イベントを正しい順序で受信できます。シャッフル操作によって一意キーのソートが中断された場合、upsert キーは空になります。この場合、ダウンストリーム演算子は、カウントアルゴリズムなどのアルゴリズムを使用して、最終的な整合性を確保する必要があります。
この 例 では、テーブル s1 の行は level 列に基づいてシャッフルされます。その結果、結合演算子の出力には、同じ s1.id 値を持ち、一意キーでソートされていない行が含まれます。これは、upsert キーが空であることを意味します。この場合、Flink はすべての入力レコードを保存し、すべての列を比較して、UPDATE 操作と INSERT 操作を区別する必要があります。
さらに、シンクテーブルのプライマリキーは id 列であり、結合演算子の出力の upsert キーと一致しません。したがって、結合演算子の出力行をシンクテーブルで必要な行に正しく変換する必要があります。
SinkUpsertMaterializer
このトピックの「一意キーと upsert キー」セクションで説明したように、結合演算子が upsert キーがシンクテーブルのプライマリキーと一致しない更新ストリームを生成する場合、シンクテーブルのプライマリキーに基づいて変更ログイベントを生成するために中間ステップが必要です。したがって、Flink は、FLINK-20374 の問題を解決するために、結合演算子とそのダウンストリーム演算子を接続する SinkUpsertMaterializer 演算子を導入しました。
原因」セクションで説明されているイベントなど、順序が正しくない変更ログイベントは、特定のルールに従います。たとえば、特定の upsert キー、または upsert キーが空の場合はすべての列について、ADD イベント(+I および +U)は対応する RETRACT イベント(-D および -U)の前に発生します。同じ upsert キーを持つ変更ログイベントのペアは、データシャッフルが発生した場合でも同じタスクによって処理されます。これは、この例の変更ログイベントの順序に、このトピックの「原因」セクションで説明されているように、3 つのケースしかない理由も説明しています。
SinkUpsertMaterializer 演算子は、上記のルールに基づいて実装されています。次の図は、演算子の動作を示しています。SinkUpsertMaterializer 演算子は、状態に RowData 値のリストを保持します。行が入力されると、演算子は、推定された upsert キー、または upsert キーが空の場合は行全体に基づいて、行が RowData リストに存在するかどうかを確認します。次に、演算子は、ADD イベントの場合は状態の行を追加または更新し、RETRACT イベントの場合は状態から行を削除します。最後に、演算子はシンクテーブルのプライマリキーに基づいて変更ログイベントを生成します。詳細については、SinkUpsertMaterializer ソースコード をご参照ください。

次の図は、この例で、SinkUpsertMaterializer オペレーターが Join オペレーターからの出力チェンジログイベントを結果テーブルの入力チェンジログイベントに変換する方法を示しています。ケース 2 では、-U (id=1, level=10, attr='a1') イベントが到着すると、SinkUpsertMaterializer オペレーターは状態から最後の行を削除し、最後から 2 番目の行に対して UPDATE イベントを生成します。ケース 3 では、+U (id=1, level=20, attr='b1') イベントが到着すると、SinkUpsertMaterializer オペレーターはそのイベントをダウンストリームに渡します。次に、-U (id=1, level=10, attr='a1') イベントが到着すると、オペレーターはイベントを生成せずに状態から対応する行を削除します。このようにして、SinkUpsertMaterializer オペレーターは、ケース 2 とケース 3 で最終結果が期待どおりの (id=1, level=20, attr='b1') になることを保証します。

一般的なユースケース
The SinkUpsertMaterializer オペレーターは、次のシナリオで使用されます:
シンクテーブルにはプライマリキーがありますが、テーブルに書き込まれたデータが UNIQUE 制約を満たしていません。考えられる原因には、次の操作が含まれますが、これらに限定されません。
ソーステーブルにプライマリキーがない場合に、シンクテーブルのプライマリキーを定義します。
データをシンクテーブルに挿入するときにソーステーブルのプライマリキー列を含めないか、ソーステーブルのプライマリキー以外の列をシンクテーブルのプライマリキーとして使用します。
ソーステーブルのプライマリキー列で使用されるデータ型が、変換またはグループ集計後に精度が低下します。たとえば、列が BIGINT 型から INT 型に変換されます。
ソーステーブルのプライマリキー列またはグループ集計によって生成された一意キーに対してデータ変換を実行します。たとえば、複数のプライマリキーを連結またはマージして単一の列にします。
CREATE TABLE students ( student_id BIGINT NOT NULL, student_name STRING NOT NULL, course_id BIGINT NOT NULL, score DOUBLE NOT NULL, PRIMARY KEY(student_id) NOT ENFORCED ) WITH (...); CREATE TABLE performance_report ( student_info STRING NOT NULL PRIMARY KEY NOT ENFORCED, avg_score DOUBLE NOT NULL ) WITH (...); CREATE TEMPORARY VIEW v AS SELECT student_id, student_name, AVG(score) AS avg_score FROM students GROUP BY student_id, student_name; -- 連結された結果は UNIQUE 制約を満たさなくなりましたが、シンクテーブルのプライマリキーとして使用されます。 INSERT INTO performance_report SELECT CONCAT('id:', student_id, ',name:', student_name) AS student_info, avg_score FROM v;
このトピックの例に示すように、入力データの元のソートがシンクテーブルに書き込まれる前に中断されます。テーブル s1 と s2 で実行される結合操作は、s1 のプライマリキーに基づいていませんが、シンクテーブルはテーブル s1 と同じプライマリキーを持っています。これにより、データのソートが中断されます。
table.exec.sink.upsert-materializeパラメーターが'force'に設定されています。詳細については、このトピックの「パラメーター設定」セクションをご参照ください。
使用上の注意
SinkUpsertMaterializer 演算子は、状態に RowData 値のリストを保持します。これにより、状態サイズが増加し、状態データにアクセスするための追加の I/O オーバーヘッドが必要になり、スループットに影響を与える可能性があります。SinkUpsertMaterializer 演算子の使用は避けることをお勧めします。
パラメータ設定
table.exec.sink.upsert-materialize パラメータを使用して、SinkUpsertMaterializer 演算子を設定します。有効な値:
auto(デフォルト):Flink は、変更ログイベントの順序が正しくないかどうかを推測し、必要に応じて SinkUpsertMaterializer 演算子を追加します。
none:SinkUpsertMaterializer 演算子を使用しません。
force:常に SinkUpsertMaterializer 演算子を使用します。この場合、シンクテーブルにプライマリキーが指定されていない場合でも、演算子が追加されます。これにより、データのマテリアライゼーションが保証されます。
パラメータを auto に設定した場合、SinkUpsertMaterializer 演算子の追加は、必ずしもイベントの順序が正しくないことを意味するわけではないことに注意してください。たとえば、GROUPING SETS 句と COALESCE 関数を一緒に使用して null 値を変換する場合、SQL プランナーは、生成された upsert キーがシンクテーブルのプライマリキーと一致するかどうかを判断できない場合があります。この場合、Flink は結果の正確性を確保するために SinkUpsertMaterializer 演算子を追加します。ただし、SinkUpsertMaterializer 演算子なしで最終結果が正しい場合は、table.exec.sink.upsert-materialize パラメータを none に設定することをお勧めします。
SinkUpsertMaterializer の使用を避ける
SinkUpsertMaterializer 演算子の使用を避けるには、次の項目に注意してください。
重複除去やグループ集計などの操作で使用されるパーティションキーが、シンクテーブルのプライマリキーと同じであることを確認します。
単一の並列度がデータセットに適している場合に順序が乱れたデータを防ぐには、並列度を 1 に設定します。このシナリオでは、
table.exec.sink.upsert-materializeをnoneに設定してSinkUpsertMaterializerオペレーターを無効にします。シンク演算子とアップストリーム演算子(重複除去演算子やグループ集計演算子など)の演算子チェーンが作成され、6.0 より前の VVR バージョンを使用した場合にデータ精度の問題が発生しない場合は、デプロイを移行して VVR 6.0 以降を使用できます。table.exec.sink.upsert-materialize パラメータを none に設定し、他の設定を保持してください。デプロイの移行方法の詳細については、デプロイのエンジンバージョンのアップグレード をご参照ください。
SinkUpsertMaterializer 演算子を使用する必要がある場合は、次の項目に注意してください。
データをシンクテーブルに書き込むときに、CURRENT_TIMESTAMP や NOW などの非決定性関数によって生成された列を追加しないでください。これにより、upsert キーが使用できない場合に SinkUpsertMaterializer 演算子の状態の異常な膨張を防ぎます。
SinkUpsertMaterializer 演算子の状態が大きく、パフォーマンスに影響を与える場合は、デプロイの並列度を上げます。詳細については、デプロイのリソースの設定 をご参照ください。
既知の問題
SinkUpsertMaterializer 演算子は、順序が正しくない変更ログイベントの問題を解決しますが、次の理由により、状態サイズが継続的に増加する可能性があります。
状態の Time To Live(TTL)が設定されていないか、状態の TTL が長すぎるため、状態の保持期間が長すぎます。ただし、TTL が短すぎると、FLINK-29225 の問題で説明されているように、状態に不要なダーティデータが保持される可能性があります。この問題は、DELETE イベントと対応する ADD イベントの間の時間間隔が設定された TTL を超えた場合に発生します。この場合、Flink はログに次の警告メッセージを出力します。
int index = findremoveFirst(values, row); if (index == -1) { LOG.info(STATE_CLEARED_WARN_MSG); return; }ビジネス要件に基づいて TTL を設定することをお勧めします。詳細については、デプロイの設定 をご参照ください。VVR 8.0.7 以降を使用する Realtime Compute for Apache Flink では、演算子ごとに異なる TTL 値を設定して、大規模状態デプロイのリソース消費を削減できます。詳細については、演算子の並列度、チェーン戦略、および TTL の設定 をご参照ください。
SinkUpsertMaterializer 演算子によって受信された更新ストリームの upsert キーを推定できず、更新ストリームに非決定性列が含まれている場合、履歴データを予期どおりに削除できません。これにより、状態サイズが継続的に増加します。
参考資料
Realtime Compute for Apache Flink と Apache Flink のエンジンバージョンのマッピングについては、リリースノート をご参照ください。