このトピックでは、複雑なビジネスシナリオで Flink Change Data Capture (CDC) データインジェストジョブを使用するためのベストプラクティスについて説明します。これらのシナリオには、ソーステーブルのスキーマ進化、データロジックの強化 (メタデータの注入、計算列の追加、ソフトデリートの実行など)、異種ルーティング (シャード化されたテーブルのマージやデータベース全体の同期など)、および精密な制御 (テーブルのフィルタリングや特定のタイムスタンプからの開始など) が含まれます。
新しく追加されたテーブルの同期
Flink CDC データインジェストジョブは、2 つのシナリオで新しく追加されたテーブルの同期をサポートします。
新しい空のテーブルのホット同期: 既存データがない新しいテーブルを動的にキャプチャできます。ジョブは再起動を必要とせずに、これらのテーブルへの後続の変更をキャプチャします。
既存データを持つ新しいテーブルの同期: すでに既存データが含まれている新しいテーブルには、完全同期と増分同期が必要です。これらのテーブルを含めるには、ジョブを再起動する必要があります。
既存データのない新しい空のテーブルのホット同期
Flink CDC ジョブで scan.binlog.newly-added-table.enabled パラメーターを有効にすると、増分フェーズ中に新しい空のテーブルをリアルタイムで同期できます。これらのテーブルは既存データなしで作成され、ジョブの再起動は必要ありません。この構成が推奨されます。
たとえば、Flink CDC データインジェストジョブが MySQL dlf_test データベース内のすべてのテーブルを同期しているとします。ソースデータベースに、既存データのない products という名前の新しい空のテーブルが作成されます。ジョブを再起動せずにこの新しいテーブルを同期するには、scan.binlog.newly-added-table.enabled: true パラメーターを有効にします。構成は次のとおりです。
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: dlf_test.\.*
server-id: 8601-8604
# (オプション) 増分フェーズ中に新しく作成されたテーブルからデータを同期します。
scan.binlog.newly-added-table.enabled: true
# (オプション) テーブルとフィールドのコメントを同期します。
include-comments.enabled: true
# (オプション) TaskManager の OutOfMemory 問題の可能性を防ぐために、無制限のチャンクのディスパッチを優先します。
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (オプション) 読み取りを高速化するために解析フィルターを有効にします。
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (オプション) コミット用のユーザー名。競合を避けるために、ジョブごとに異なるユーザー名を設定します。
commit.user: your_job_name
# (オプション) 読み取りパフォーマンスを向上させるために削除ベクターを有効にします。
table.properties.deletion-vectors.enabled: trueこの構成の CDC YAML ジョブが実行されると、ジョブは dlf_test データベースから宛先へ、新しく追加された空のテーブルを自動的に同期します。
既存データを持つ新しいテーブルの同期
MySQL データベースに customers テーブルと products テーブルがすでに存在しているが、起動時には customers テーブルのみを同期したいとします。ジョブの構成は次のとおりです。
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: dlf_test.customers
server-id: 8601-8604
# (オプション) テーブルとフィールドのコメントを同期します。
include-comments.enabled: true
# (オプション) TaskManager の OutOfMemory 問題の可能性を防ぐために、無制限のチャンクのディスパッチを優先します。
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (オプション) 読み取りを高速化するために解析フィルターを有効にします。
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (オプション) コミット用のユーザー名。競合を避けるために、ジョブごとに異なるユーザー名を設定します。
commit.user: your_job_name
# (オプション) 読み取りパフォーマンスを向上させるために削除ベクターを有効にします。
table.properties.deletion-vectors.enabled: trueジョブがしばらく実行された後、データベースからすべてのテーブルとその既存データを同期したい場合は、ジョブを再起動する必要があります。次のステップに従ってください。
ジョブを終了し、セーブポイントを作成します。
MySQL データソースの構成で、tables パラメーターを目的のテーブルに一致するように変更します。
scan.binlog.newly-added-table.enabledパラメーターを削除し、scan.newly-added-table.enabledを有効にします。
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: dlf_test.\.*
server-id: 8601-8604
# (オプション) 新しく追加されたテーブルの完全データと増分データを同期します。
scan.newly-added-table.enabled: true
# (オプション) テーブルとフィールドのコメントを同期します。
include-comments.enabled: true
# (オプション) TaskManager の OutOfMemory 問題の可能性を防ぐために、無制限のチャンクのディスパッチを優先します。
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (オプション) 読み取りを高速化するために解析フィルターを有効にします。
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (オプション) コミット用のユーザー名。競合を避けるために、ジョブごとに異なるユーザー名を設定します。
commit.user: your_job_name
# (オプション) 読み取りパフォーマンスを向上させるために削除ベクターを有効にします。
table.properties.deletion-vectors.enabled: trueセーブポイントからジョブを再起動します。
scan.binlog.newly-added-table.enabled と scan.newly-added-table.enabled を同時に有効にすることはできません。
特定のテーブルの除外
Flink CDC データインジェストジョブでは、特定のテーブルをフィルターで除外して、ダウンストリームで作成および同期されないようにすることができます。
たとえば、MySQL データベース dlf_test には customers や products などの複数のテーブルが含まれています。products_tmp という名前のテーブルを除外したい場合、ジョブを次のように構成します。
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: dlf_test.\.*
# (オプション) 同期したくないテーブルを除外します。
tables.exclude: dlf_test.products_tmp
server-id: 8601-8604
# (オプション) 増分フェーズ中に新しく作成されたテーブルからデータを同期します。
scan.binlog.newly-added-table.enabled: true
# (オプション) テーブルとフィールドのコメントを同期します。
include-comments.enabled: true
# (オプション) TaskManager の OutOfMemory 問題の可能性を防ぐために、無制限のチャンクのディスパッチを優先します。
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (オプション) 読み取りを高速化するために解析フィルターを有効にします。
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (オプション) コミット用のユーザー名。競合を避けるために、ジョブごとに異なるユーザー名を設定します。
commit.user: your_job_name
# (オプション) 読み取りパフォーマンスを向上させるために削除ベクターを有効にします。
table.properties.deletion-vectors.enabled: trueこの構成の Flink CDC データインジェストジョブは、products_tmp テーブルを除き、dlf_test データベースから宛先まですべてのテーブルを自動的に同期します。また、ジョブはテーブルスキーマとデータをリアルタイムで同期し続けます。
tables.exclude パラメーターは、複数のテーブルに一致する正規表現をサポートします。テーブルが tables パラメーターと tables.exclude パラメーターの両方で指定されている場合、除外ルールが優先され、そのテーブルは同期されません。
メタデータと計算列による拡張
メタデータ列の追加
データを書き込む際に、transform モジュールを使用してメタデータ列を追加できます。たとえば、次のジョブ構成では、テーブル名、操作時間、操作タイプをダウンストリームテーブルに書き込みます。詳細については、「Transform モジュール」をご参照ください。
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: dlf_test.\.*
server-id: 8601-8604
# (オプション) 新しく追加されたテーブルの完全データと増分データを同期します。
scan.newly-added-table.enabled: true
# (オプション) テーブルとフィールドのコメントを同期します。
include-comments.enabled: true
# (オプション) TaskManager の OutOfMemory 問題の可能性を防ぐために、無制限のチャンクのディスパッチを優先します。
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (オプション) 読み取りを高速化するために解析フィルターを有効にします。
scan.only.deserialize.captured.tables.changelog.enabled: true
# 操作時間をメタデータとして使用します。
metadata-column.include-list: op_ts
transform:
- source-table: dlf_test.customers
projection: __schema_name__ || '.' || __table_name__ as identifier, op_ts, __data_event_type__ as op, *
# (オプション) プライマリキーを変更します。
primary-keys: id,identifier
description: add identifier, op_ts and op
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (オプション) コミット用のユーザー名。競合を避けるために、ジョブごとに異なるユーザー名を設定します。
commit.user: your_job_name
# (オプション) 読み取りパフォーマンスを向上させるために削除ベクターを有効にします。
table.properties.deletion-vectors.enabled: trueMySQL をソースとして使用する場合、metadata-column.include-list: op_ts を追加して、操作時間をメタデータとしてダウンストリームに送信する必要があります。詳細については、「MySQL」をご参照ください。
ダウンストリームテーブルでソフトデリートを実行したい場合は、delete 操作を insert 操作に変換できます。これを行うには、transform モジュールに converter-after-transform: SOFT_DELETE 構成を追加します。ソースデータには削除を含むすべての変更タイプが含まれているため、これは便利です。
計算列の追加
データを書き込む際に、transform モジュールを使用して計算列を追加できます。たとえば、次のジョブ構成では created_at フィールドを変換して dt フィールドを生成し、それをダウンストリームテーブルのパーティションフィールドとして使用します。
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: dlf_test.\.*
server-id: 8601-8604
# (オプション) 新しく追加されたテーブルの完全データと増分データを同期します。
scan.newly-added-table.enabled: true
# (オプション) テーブルとフィールドのコメントを同期します。
include-comments.enabled: true
# (オプション) TaskManager の OutOfMemory 問題の可能性を防ぐために、無制限のチャンクのディスパッチを優先します。
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (オプション) 読み取りを高速化するために解析フィルターを有効にします。
scan.only.deserialize.captured.tables.changelog.enabled: true
# 操作時間をメタデータとして使用します。
metadata-column.include-list: op_ts
transform:
- source-table: dlf_test.customers
projection: DATE_FORMAT(created_at, 'yyyyMMdd') as dt, *
# (オプション) パーティションフィールドを設定します。
partition-keys: dt
description: add dt
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (オプション) コミット用のユーザー名。競合を避けるために、ジョブごとに異なるユーザー名を設定します。
commit.user: your_job_name
# (オプション) 読み取りパフォーマンスを向上させるために削除ベクターを有効にします。
table.properties.deletion-vectors.enabled: trueMySQL をソースとして使用する場合、metadata-column.include-list: op_ts を追加して、操作時間をメタデータとしてダウンストリームに送信する必要があります。詳細については、「MySQL」をご参照ください。
テーブル名のマッピング
ソーステーブルから宛先テーブルにデータを同期する際、テーブル名を変更する必要がある場合があります。これは routing モジュールを使用して行うことができます。以下のセクションでは、典型的なシナリオを説明し、Flink CDC データインジェストジョブの構成例を示します。
シャード化されたテーブルのマージ
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: dlf_test.\.*
server-id: 8601-8604
# (オプション) 新しく追加されたテーブルの完全データと増分データを同期します。
scan.newly-added-table.enabled: true
# (オプション) テーブルとフィールドのコメントを同期します。
include-comments.enabled: true
# (オプション) TaskManager の OutOfMemory 問題の可能性を防ぐために、無制限のチャンクのディスパッチを優先します。
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (オプション) 読み取りを高速化するために解析フィルターを有効にします。
scan.only.deserialize.captured.tables.changelog.enabled: true
route:
# dlf_test データベース内の、名前が 'product_' で始まり数字で終わるすべてのテーブルを dlf.products にマージします。
- source-table: dlf_test.product_[0-9]+
sink-table: dlf.products
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (オプション) コミット用のユーザー名。競合を避けるために、ジョブごとに異なるユーザー名を設定します。
commit.user: your_job_name
# (オプション) 読み取りパフォーマンスを向上させるために削除ベクターを有効にします。
table.properties.deletion-vectors.enabled: trueデータベース全体の同期
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: dlf_test.\.*
server-id: 8601-8604
# (オプション) 新しく追加されたテーブルの完全データと増分データを同期します。
scan.newly-added-table.enabled: true
# (オプション) テーブルとフィールドのコメントを同期します。
include-comments.enabled: true
# (オプション) TaskManager の OutOfMemory 問題の可能性を防ぐために、無制限のチャンクのディスパッチを優先します。
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (オプション) 読み取りを高速化するために解析フィルターを有効にします。
scan.only.deserialize.captured.tables.changelog.enabled: true
route:
# テーブル名を一様に変更します。dlf_test データベースから dlf データベースにすべてのテーブルを同期します。新しいテーブル名にはプレフィックス 'ods_' が付きます。
- source-table: dlf_test.\.*
sink-table: dlf.ods_<>
replace-symbol: <>
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (オプション) コミット用のユーザー名。競合を避けるために、ジョブごとに異なるユーザー名を設定します。
commit.user: your_job_name
# (オプション) 読み取りパフォーマンスを向上させるために削除ベクターを有効にします。
table.properties.deletion-vectors.enabled: true複雑なビジネスシナリオ向けの包括的なユースケース
以下の Flink CDC データインジェストジョブは、このトピックで述べた特徴を組み合わせて複雑なビジネスシナリオを処理する包括的な例です。特定のビジネスニーズに合わせてこの構成を適応させることができます。
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: dlf_test.\.*
# (オプション) 同期したくないテーブルを除外します。
tables.exclude: dlf_test.products_tmp
server-id: 8601-8604
# (オプション) 新しく追加されたテーブルの完全データと増分データを同期します。
scan.newly-added-table.enabled: true
# (オプション) テーブルとフィールドのコメントを同期します。
include-comments.enabled: true
# (オプション) TaskManager の OutOfMemory 問題の可能性を防ぐために、無制限のチャンクのディスパッチを優先します。
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (オプション) 読み取りを高速化するために解析フィルターを有効にします。
scan.only.deserialize.captured.tables.changelog.enabled: true
# 操作時間をメタデータとして使用します。
metadata-column.include-list: op_ts
transform:
- source-table: dlf_test.customers
projection: __schema_name__ || '.' || __table_name__ as identifier, op_ts, __data_event_type__ as op, DATE_FORMAT(created_at, 'yyyyMMdd') as dt, *
# (オプション) プライマリキーを変更します。
primary-keys: id,identifier
# (オプション) パーティションフィールドを設定します。
partition-keys: dt
# (オプション) 削除されたデータを挿入に変換します。
converter-after-transform: SOFT_DELETE
route:
# dlf_test データベースから dlf データベースにすべてのテーブルを同期します。新しいテーブル名にはプレフィックス 'ods_' が付きます。
- source-table: dlf_test.\.*
sink-table: dlf.ods_<>
replace-symbol: <>
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (オプション) コミット用のユーザー名。競合を避けるために、ジョブごとに異なるユーザー名を設定します。
commit.user: your_job_name
# (オプション) 読み取りパフォーマンスを向上させるために削除ベクターを有効にします。
table.properties.deletion-vectors.enabled: true特定のタイムスタンプからの開始
Flink CDC データインジェストジョブのステートレスな開始を実行する際、データソースの開始時刻を指定できます。これにより、特定のバイナリログ (binlog) の位置からデータの読み取りを再開できます。
O&M ページでの構成
ジョブの O&M ページでは、ジョブのステートレスな開始を選択する際に、ソーステーブルの開始時刻を指定できます。

ジョブパラメーターの構成
ジョブの下書きでは、パラメーターを構成することでソーステーブルの開始時刻を指定できます。
MySQL ソースの場合、ジョブ構成で scan.startup.mode: timestamp を設定して開始時刻を指定できます。以下は構成例です。
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: dlf_test.\.*
server-id: 8601-8604
# (オプション) ソーステーブルの開始時刻を指定するモードで開始します。
scan.startup.mode: timestamp
# タイムスタンプ起動モードで起動タイムスタンプを指定します。
scan.startup.timestamp-millis: 1667232000000
# (オプション) 増分フェーズ中に新しく作成されたテーブルからデータを同期します。
scan.binlog.newly-added-table.enabled: true
# (オプション) テーブルとフィールドのコメントを同期します。
include-comments.enabled: true
# (オプション) TaskManager の OutOfMemory 問題の可能性を防ぐために、無制限のチャンクのディスパッチを優先します。
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (オプション) 読み取りを高速化するために解析フィルターを有効にします。
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (オプション) コミット用のユーザー名。競合を避けるために、ジョブごとに異なるユーザー名を設定します。
commit.user: your_job_name
# (オプション) 読み取りパフォーマンスを向上させるために削除ベクターを有効にします。
table.properties.deletion-vectors.enabled: trueO&M ページで指定された開始時刻は、ジョブパラメーターで指定された開始時刻よりも優先されます。