このトピックでは、複雑なビジネスシナリオで Flink CDC データインジェストジョブを使用するためのベストプラクティスについて説明します。これらのシナリオには、ソーステーブルのスキーマ進化の処理、メタデータを注入したデータロジックの強化、計算列の追加、論理削除の実行、シャーディングされたテーブルのマージとデータベース全体の同期のための異種ルーティングの実装、テーブルのフィルタリングと特定のタイムスタンプからのジョブ開始による精密な制御などが含まれます。
新しく追加されたテーブルの同期
Flink CDC データインジェストジョブは、2 つの方法で新しく追加されたテーブルの同期をサポートしています。
履歴データのない新しい空のテーブルのホット同期:この方法は、履歴データがなく、後続の変更のみが含まれる新しいテーブルに適用されます。ジョブは再起動することなく、これらのテーブルを動的にキャプチャできます。
履歴データのあるテーブルの同期:この方法は、すでに履歴データが含まれている新しいテーブルに適用されます。このプロセスにはフルスキャンと増分同期が必要であり、変更を有効にするにはジョブを再起動する必要があります。
履歴データのない新しい空のテーブルのホット同期
scan.binlog.newly-added-table.enabled パラメーターを有効にすると、お使いの Flink CDC ジョブで、増分フェーズ中に新しく追加された空のテーブルをリアルタイムで同期できるようになります。この方法はジョブの再起動を必要としないため、推奨されています。
たとえば、Flink CDC データインジェストジョブが MySQL の `dlf_test` データベース内のすべてのテーブルを同期している場合、ソースデータベースに products という新しい空のテーブルが作成されたとしても、ジョブを再起動せずにこの新しいテーブルを同期できます。これを行うには、ジョブ構成で scan.binlog.newly-added-table.enabled パラメーターを `true` に設定します。以下は構成例です。
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: dlf_test.\.*
server-id: 8601-8604
# (任意) 増分フェーズ中に新しく作成されたテーブルのデータを同期します。
scan.binlog.newly-added-table.enabled: true
# (任意) テーブルとフィールドのコメントを同期します。
include-comments.enabled: true
# (任意) TaskManager の OutOfMemory エラーを防ぐために、unbounded チャンクを優先的にディスパッチします。
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (任意) 読み取りを高速化するために、解析フィルターを有効にします。
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (任意) コミットユーザー名を指定します。競合を避けるため、ジョブごとに異なるユーザー名を設定することを推奨します。
commit.user: your_job_name
# (任意) 読み取りパフォーマンスを向上させるために、削除ベクターを有効にします。
table.properties.deletion-vectors.enabled: trueこの構成で CDC YAML ジョブを実行すると、ジョブは dlf_test データベースから宛先に新しいテーブルを自動的に作成します。
scan.newly-added-table.enabled パラメーターは、scan.startup.mode が initial(デフォルト)に設定されている場合にのみ有効です。
履歴データのあるテーブルの同期
MySQL データベースに customers テーブルと products テーブルが含まれているが、起動時には customers テーブルのみを同期したいと仮定します。初期のジョブ構成は次のとおりです。
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: dlf_test.customers
server-id: 8601-8604
# (任意) テーブルとフィールドのコメントを同期します。
include-comments.enabled: true
# (任意) TaskManager の OutOfMemory エラーを防ぐために、unbounded チャンクを優先的にディスパッチします。
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (任意) 読み取りを高速化するために、解析フィルターを有効にします。
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (任意) コミットユーザー名を指定します。競合を避けるため、ジョブごとに異なるユーザー名を設定することを推奨します。
commit.user: your_job_name
# (任意) 読み取りパフォーマンスを向上させるために、削除ベクターを有効にします。
table.properties.deletion-vectors.enabled: trueジョブが一定期間実行された後、データベースからすべてのテーブルとその履歴データを同期したい場合は、ジョブを再起動する必要があります。そのためには、次の手順に従います。
ジョブを終了し、セーブポイントを作成します。
同期するすべてのテーブルが含まれるように、MySQL データソースの `tables` 構成を変更します。さらに、
scan.binlog.newly-added-table.enabledパラメーターを削除し、scan.newly-added-table.enabledパラメーターを有効にします。
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: dlf_test.\.*
server-id: 8601-8604
# (任意) 新しく追加されたテーブルのフルデータと増分データを同期します。
scan.newly-added-table.enabled: true
# (任意) テーブルとフィールドのコメントを同期します。
include-comments.enabled: true
# (任意) TaskManager の OutOfMemory エラーを防ぐために、unbounded チャンクを優先的にディスパッチします。
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (任意) 読み取りを高速化するために、解析フィルターを有効にします。
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (任意) コミットユーザー名を指定します。競合を避けるため、ジョブごとに異なるユーザー名を設定することを推奨します。
commit.user: your_job_name
# (任意) 読み取りパフォーマンスを向上させるために、削除ベクターを有効にします。
table.properties.deletion-vectors.enabled: trueセーブポイントからジョブを再起動します。
scan.binlog.newly-added-table.enabled と scan.newly-added-table.enabled を同時に有効にすることはできません。
特定のテーブルの除外
Flink CDC データインジェストジョブでは、特定のテーブルを除外して、ダウンストリームの宛先で作成および同期されないようにすることができます。
例えば、MySQL の dlf_test データベースに customers や products などの複数のテーブルが含まれており、products_tmp テーブルを除外したい場合、ジョブを次のように構成できます。
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: dlf_test.\.*
# (任意) 同期したくないテーブルを除外します。
tables.exclude: dlf_test.products_tmp
server-id: 8601-8604
# (任意) 増分フェーズ中に新しく作成されたテーブルのデータを同期します。
scan.binlog.newly-added-table.enabled: true
# (任意) テーブルとフィールドのコメントを同期します。
include-comments.enabled: true
# (任意) TaskManager の OutOfMemory エラーを防ぐために、unbounded チャンクを優先的にディスパッチします。
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (任意) 読み取りを高速化するために、解析フィルターを有効にします。
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (任意) コミットユーザー名を指定します。競合を避けるため、ジョブごとに異なるユーザー名を設定することを推奨します。
commit.user: your_job_name
# (任意) 読み取りパフォーマンスを向上させるために、削除ベクターを有効にします。
table.properties.deletion-vectors.enabled: trueこの構成の Flink CDC データインジェストジョブは、宛先の dlf_test データベースから、products_tmp テーブルを除くすべてのテーブルを自動的に作成します。また、このジョブはテーブルスキーマとデータをリアルタイムで同期します。
tables.exclude パラメーターは、正規表現を使用して複数のテーブルを照合することをサポートしています。tables.exclude と tables で指定されたテーブルに重複がある場合、重複するテーブルは除外され、同期されません。これは、除外が包含よりも優先されることを意味します。
メタデータと計算列による拡張
メタデータ列の追加
データを書き込む際に、transform モジュールを使用してメタデータ列を追加できます。例えば、以下のジョブ構成では、テーブル名、操作時間、操作タイプをダウンストリームテーブルに追加します。詳細については、「Transform モジュール」をご参照ください。
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: dlf_test.\.*
server-id: 8601-8604
# (任意) 新しく追加されたテーブルのフルデータと増分データを同期します。
scan.newly-added-table.enabled: true
# (任意) テーブルとフィールドのコメントを同期します。
include-comments.enabled: true
# (任意) TaskManager の OutOfMemory エラーを防ぐために、unbounded チャンクを優先的にディスパッチします。
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (任意) 読み取りを高速化するために、解析フィルターを有効にします。
scan.only.deserialize.captured.tables.changelog.enabled: true
# 操作時間をメタデータとして使用します。
metadata-column.include-list: op_ts
transform:
- source-table: dlf_test.customers
projection: __schema_name__ || '.' || __table_name__ as identifier, op_ts, __data_event_type__ as op, *
# (任意) プライマリキーを変更します。
primary-keys: id,identifier
description: identifier、op_ts、op を追加
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (任意) コミットユーザー名を指定します。競合を避けるため、ジョブごとに異なるユーザー名を設定することを推奨します。
commit.user: your_job_name
# (任意) 読み取りパフォーマンスを向上させるために、削除ベクターを有効にします。
table.properties.deletion-vectors.enabled: trueMySQL をソースとして使用する場合、metadata-column.include-list: op_ts を設定して、操作時間をメタデータとしてダウンストリームのシンクに送信する必要があります。詳細については、「MySQL」をご参照ください。
インジェストジョブのソースデータには、削除を含むすべての変更データキャプチャ (CDC) イベントタイプが含まれます。ダウンストリームテーブルで DELETE 操作を INSERT 操作に変換し、論理削除を実装するには、transform モジュールに converter-after-transform: SOFT_DELETE 構成を追加します。
計算列の追加
データを書き込む際に、transform モジュールを使用して計算列を追加できます。例えば、以下のジョブ構成では、created_at フィールドを変換して dt フィールドを生成し、その新しい dt フィールドをダウンストリームテーブルのパーティションフィールドとして使用します。
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: dlf_test.\.*
server-id: 8601-8604
# (任意) 新しく追加されたテーブルのフルデータと増分データを同期します。
scan.newly-added-table.enabled: true
# (任意) テーブルとフィールドのコメントを同期します。
include-comments.enabled: true
# (任意) TaskManager の OutOfMemory エラーを防ぐために、unbounded チャンクを優先的にディスパッチします。
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (任意) 読み取りを高速化するために、解析フィルターを有効にします。
scan.only.deserialize.captured.tables.changelog.enabled: true
# 操作時間をメタデータとして使用します。
metadata-column.include-list: op_ts
transform:
- source-table: dlf_test.customers
projection: DATE_FORMAT(created_at, 'yyyyMMdd') as dt, *
# (任意) パーティションフィールドを設定します。
partition-keys: dt
description: dt を追加
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (任意) コミットユーザー名を指定します。競合を避けるため、ジョブごとに異なるユーザー名を設定することを推奨します。
commit.user: your_job_name
# (任意) 読み取りパフォーマンスを向上させるために、削除ベクターを有効にします。
table.properties.deletion-vectors.enabled: trueMySQL をソースとして使用する場合、metadata-column.include-list: op_ts を設定して、操作時間をメタデータとしてダウンストリームのシンクに送信する必要があります。詳細については、「MySQL」をご参照ください。
テーブル名のマッピング
先祖テーブルを子孫テーブルに同期する際、route モジュールを使用してテーブル名を置き換える必要がある場合があります。以下のセクションでは、Flink CDC データインジェストジョブでテーブル名を置き換える典型的なシナリオの構成例を示します。
シャーディングされたデータベースとテーブルのマージ
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: dlf_test.\.*
server-id: 8601-8604
# (任意) 新しく追加されたテーブルのフルデータと増分データを同期します。
scan.newly-added-table.enabled: true
# (任意) テーブルとフィールドのコメントを同期します。
include-comments.enabled: true
# (任意) TaskManager の OutOfMemory エラーを防ぐために、unbounded チャンクを優先的にディスパッチします。
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (任意) 読み取りを高速化するために、解析フィルターを有効にします。
scan.only.deserialize.captured.tables.changelog.enabled: true
route:
# dlf_test データベース内の、名前が product_ で始まり数字で終わるすべてのテーブルを dlf.products テーブルにマージします。
- source-table: dlf_test.product_[0-9]+
sink-table: dlf.products
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (任意) コミットユーザー名を指定します。競合を避けるため、ジョブごとに異なるユーザー名を設定することを推奨します。
commit.user: your_job_name
# (任意) 読み取りパフォーマンスを向上させるために、削除ベクターを有効にします。
table.properties.deletion-vectors.enabled: trueデータベース全体の同期
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: dlf_test.\.*
server-id: 8601-8604
# (任意) 新しく追加されたテーブルのフルデータと増分データを同期します。
scan.newly-added-table.enabled: true
# (任意) テーブルとフィールドのコメントを同期します。
include-comments.enabled: true
# (任意) TaskManager の OutOfMemory エラーを防ぐために、unbounded チャンクを優先的にディスパッチします。
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (任意) 読み取りを高速化するために、解析フィルターを有効にします。
scan.only.deserialize.captured.tables.changelog.enabled: true
route:
# テーブル名を一律に変更します。dlf_test データベースのすべてのテーブルを dlf データベースに同期し、各宛先テーブル名の前に ods_ を付けます。
- source-table: dlf_test.\.*
sink-table: dlf.ods_<>
replace-symbol: <>
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (任意) コミットユーザー名を指定します。競合を避けるため、ジョブごとに異なるユーザー名を設定することを推奨します。
commit.user: your_job_name
# (任意) 読み取りパフォーマンスを向上させるために、削除ベクターを有効にします。
table.properties.deletion-vectors.enabled: true複雑なビジネスシナリオの包括的なケーススタディ
以下の Flink CDC データインジェストジョブ構成は、前のセクションで説明した特徴を組み合わせた、複雑なビジネスシナリオの包括的な例です。このコードを特定のビジネス要件に合わせて調整できます。
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: dlf_test.\.*
# (任意) 同期したくないテーブルを除外します。
tables.exclude: dlf_test.products_tmp
server-id: 8601-8604
# (任意) 新しく追加されたテーブルのフルデータと増分データを同期します。
scan.newly-added-table.enabled: true
# (任意) テーブルとフィールドのコメントを同期します。
include-comments.enabled: true
# (任意) TaskManager の OutOfMemory エラーを防ぐために、unbounded チャンクを優先的にディスパッチします。
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (任意) 読み取りを高速化するために、解析フィルターを有効にします。
scan.only.deserialize.captured.tables.changelog.enabled: true
# 操作時間をメタデータとして使用します。
metadata-column.include-list: op_ts
transform:
- source-table: dlf_test.customers
projection: __schema_name__ || '.' || __table_name__ as identifier, op_ts, __data_event_type__ as op, DATE_FORMAT(created_at, 'yyyyMMdd') as dt, *
# (任意) プライマリキーを変更します。
primary-keys: id,identifier
# (任意) パーティションフィールドを設定します。
partition-keys: dt
# (任意) 削除されたデータを挿入に変換します。
converter-after-transform: SOFT_DELETE
route:
# dlf_test データベースのすべてのテーブルを dlf データベースに同期し、各宛先テーブル名の前に ods_ を付けます。
- source-table: dlf_test.\.*
sink-table: dlf.ods_<>
replace-symbol: <>
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (任意) コミットユーザー名を指定します。競合を避けるため、ジョブごとに異なるユーザー名を設定することを推奨します。
commit.user: your_job_name
# (任意) 読み取りパフォーマンスを向上させるために、削除ベクターを有効にします。
table.properties.deletion-vectors.enabled: true特定のタイムスタンプからの開始
Flink CDC データインジェストジョブのステートレス起動を実行する際、データソースの開始時刻を指定できます。これにより、特定のバイナリログ (binlog) の位置からデータの読み取りを再開できます。
O&M ページでの構成
ジョブの運用保守 (O&M) ページで、ステートレス起動のためにソーステーブルの開始時刻を指定できます。

ジョブパラメーターの構成
ジョブのドラフトで、パラメーターを構成することにより、ソーステーブルの開始時刻を指定できます。
MySQL ソースでは、ジョブ構成で scan.startup.mode を `timestamp` に設定して開始時刻を指定できます。以下に構成例を示します。
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: dlf_test.\.*
server-id: 8601-8604
# (任意) ソーステーブルの開始時刻を指定するモードで開始します。
scan.startup.mode: timestamp
# タイムスタンプ起動モードで起動タイムスタンプを指定します。
scan.startup.timestamp-millis: 1667232000000
# (任意) 増分フェーズ中に新しく作成されたテーブルのデータを同期します。
scan.binlog.newly-added-table.enabled: true
# (任意) テーブルとフィールドのコメントを同期します。
include-comments.enabled: true
# (任意) TaskManager の OutOfMemory エラーを防ぐために、unbounded チャンクを優先的にディスパッチします。
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (任意) 読み取りを高速化するために、解析フィルターを有効にします。
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (任意) コミットユーザー名を指定します。競合を避けるため、ジョブごとに異なるユーザー名を設定することを推奨します。
commit.user: your_job_name
# (任意) 読み取りパフォーマンスを向上させるために、削除ベクターを有効にします。
table.properties.deletion-vectors.enabled: trueO&M ページとジョブパラメーターの両方で開始時刻を指定した場合、O&M ページの構成が優先されます。