Flink Change Data Capture (CDC) は、Realtime Compute for Apache Flink が提供するデータインジェストツールです。ソースからご利用のデータレイクハウスへデータベース全体を同期することをサポートしています。このトピックでは、Paimon REST を介して Flink CDC を使用し、DLF カタログにリアルタイムでデータをインジェストする方法について説明します。
このトピックを完了すると、次のことができるようになります。
インジェスト送信先として DLF カタログを作成済みであること
DLF を指す Paimon シンクで Flink CDC YAML ジョブを設定済みであること
一般的な取り込みシナリオの例を実行します:データベース全体の同期、パーティションテーブル、追記専用テーブル、およびKafka ソース
前提条件
開始する前に、以下を確認してください。
Realtime Compute for Apache Flink ワークスペースがあること。詳細については、「ワークスペースの作成」をご参照ください。
Realtime Compute for Apache Flink ワークスペースと DLF カタログが同じリージョンにあること。
ご利用の Realtime Compute for Apache Flink ワークスペースの VPC が DLF の VPC ホワイトリストに追加されていること。詳細については、「VPC ホワイトリストの設定」をご参照ください。
エンジン要件
ご利用の Realtime Compute for Apache Flink ジョブは、Ververica Runtime (VVR) バージョン 11.1.0 以降で実行する必要があります。
DLF カタログの作成
詳細については、「DLF の使用開始」をご参照ください。
データインジェストジョブの作成と設定
Flink CDC を使用してデータをインジェストするための YAML ドラフトを作成します。詳細については、「データインジェスト用 Flink CDC ジョブの開発 (ベータ版)」をご参照ください。
シンクモジュールを設定します。
sink: type: paimon catalog.properties.metastore: rest catalog.properties.uri: dlf_uri catalog.properties.warehouse: your_warehouse catalog.properties.token.provider: dlf # (オプション) コミットユーザー。競合を避けるため、異なるジョブには異なるコミットユーザーを設定してください。 commit.user: your_job_name # (オプション) 読み取りパフォーマンスを向上させるために削除ベクターを有効にします。 table.properties.deletion-vectors.enabled: trueプレースホルダーの値を実際の値に置き換えてください。
設定オプション 説明 必須 デフォルト 例 catalog.properties.metastoreメタストアのタイプ。 restに設定します。はい — restcatalog.properties.token.providerトークンプロバイダー。 dlfに設定します。はい — dlfcatalog.properties.uriDLF REST カタログサーバーにアクセスするための URI。形式: http://[region-id]-vpc.dlf.aliyuncs.com。リージョン ID については、「リージョンとエンドポイント」をご参照ください。はい — http://ap-southeast-1-vpc.dlf.aliyuncs.comcatalog.properties.warehousePaimon カタログの名前。 はい — dlf_testcommit.userデータ書き込み用のコミットユーザー。競合を避けるため、異なるジョブには一意のコミットユーザーを割り当ててください。 いいえ adminyour_job_nametable.properties.deletion-vectors.enabled書き込みへの影響を最小限に抑えつつ、読み取りパフォーマンスを向上させるために削除ベクターを有効にします。 いいえ — true
注意事項
ジョブを実行する前に、以下の制約事項に注意してください。
コミットユーザーの競合: デフォルトのコミットユーザーは
adminです。同じコミットユーザーで同じテーブルに対して同時データ書き込みジョブを実行すると、コミットの競合やデータ不整合が発生します。各ジョブには一意のcommit.userを割り当ててください。コンパクションまたはバケットオプションなし: DLF は自動ファイルコンパクションを提供します。
bucketやnum-sorted-run.compaction-triggerなどのコンパクションまたはバケットオプションを設定しないでください。削除ベクター:
table.properties.deletion-vectors.enabled: trueを設定すると、書き込みへの影響を最小限に抑えつつ、読み取りを大幅に高速化できます。
例
MySQL データベース全体から DLF へのデータインジェスト
以下のジョブは、MySQL データベース内のすべてのテーブルを DLF に同期します。
source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: mysql_test.\.*
server-id: 8601-8604
# (オプション) 増分フェーズ中に作成されたテーブルからデータを同期します。
scan.binlog.newly-added-table.enabled: true
# (オプション) テーブルとフィールドのコメントを同期します。
include-comments.enabled: true
# (オプション) 潜在的な TaskManager メモリ不足 (OOM) エラーを防ぐために、無制限シャードを優先します。
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (オプション) 一致するテーブルからのみデータを逆シリアル化して読み取りを高速化します。
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (オプション) コミットユーザー。競合を避けるため、異なるジョブには異なるコミットユーザーを設定してください。
commit.user: your_job_name
# (オプション) 読み取りパフォーマンスを向上させるために削除ベクターを有効にします。
table.properties.deletion-vectors.enabled: true推奨される MySQL ソース オプションは次のとおりです。
| オプション | 説明 |
|---|---|
scan.binlog.newly-added-table.enabled | 増分フェーズ中に作成されたテーブルからデータを同期します。 |
include-comments.enabled | テーブルとフィールドのコメントを同期します。 |
scan.incremental.snapshot.unbounded-chunk-first.enabled | 無制限シャードを優先することで、潜在的な TaskManager メモリ不足 (OOM) エラーを防ぎます。 |
scan.only.deserialize.captured.tables.changelog.enabled | 一致するテーブルからのみデータを逆シリアル化して読み取りを高速化します。 |
パーティションテーブルへのデータインジェスト
非パーティションソーステーブルから DLF のパーティションテーブルにインジェストするには、変換モジュールに partition-keys オプションを追加します。詳細については、「Flink CDC を使用したデータインジェスト」をご参照ください。
source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: mysql_test.\.*
server-id: 8601-8604
# (オプション) 増分フェーズ中に作成されたテーブルからデータを同期します。
scan.binlog.newly-added-table.enabled: true
# (オプション) テーブルとフィールドのコメントを同期します。
include-comments.enabled: true
# (オプション) 潜在的な TaskManager メモリ不足 (OOM) エラーを防ぐために、無制限シャードを優先します。
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (オプション) 一致するテーブルからのみデータを逆シリアル化して読み取りを高速化します。
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (オプション) コミットユーザー。競合を避けるため、異なるジョブには異なるコミットユーザーを設定してください。
commit.user: your_job_name
# (オプション) 読み取りパフォーマンスを向上させるために削除ベクターを有効にします。
table.properties.deletion-vectors.enabled: true
transform:
- source-table: mysql_test.tbl1
partition-keys: id,pt
- source-table: mysql_test.tbl2
partition-keys: id,pt追記専用テーブルへのデータインジェスト
インジェスト中に論理削除 (ソフトデリート) を実装するには、変換モジュールで converter-after-transform: SOFT_DELETE を使用します。これにより、削除操作が挿入操作に変換され、ダウンストリームテーブルはすべての変更操作を完全に記録します。projection 内の __data_event_type__ フィールドは、変更タイプを新しい列としてダウンストリームテーブルに書き込みます。
source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: mysql_test.\.*
server-id: 8601-8604
# (オプション) 増分フェーズ中に作成されたテーブルからデータを同期します。
scan.binlog.newly-added-table.enabled: true
# (オプション) テーブルとフィールドのコメントを同期します。
include-comments.enabled: true
# (オプション) 潜在的な TaskManager メモリ不足 (OOM) エラーを防ぐために、無制限シャードを優先します。
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (オプション) 一致するテーブルからのみデータを逆シリアル化して読み取りを高速化します。
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (オプション) コミットユーザー。競合を避けるため、異なるジョブには異なるコミットユーザーを設定してください。
commit.user: your_job_name
# (オプション) 読み取りパフォーマンスを向上させるために削除ベクターを有効にします。
table.properties.deletion-vectors.enabled: true
transform:
- source-table: mysql_test.tbl1
partition-keys: id,pt
projection: \*, __data_event_type__ AS op_type
converter-after-transform: SOFT_DELETE
- source-table: mysql_test.tbl2
partition-keys: id,pt
projection: \*, __data_event_type__ AS op_type
converter-after-transform: SOFT_DELETE詳細については、「Flink CDC を使用したデータインジェスト」をご参照ください。
Kafka から DLF へのリアルタイムデータ同期
以下のジョブは、Kafka inventory トピック (Debezium JSON 形式のテーブル customers と products) から CDC データを読み取り、対応する送信先テーブルに DLF で同期します。Debezium JSON メッセージにはプライマリキー情報が含まれていないため、プライマリキーは変換モジュールで明示的に指定されます。
source:
type: kafka
name: Kafka Source
properties.bootstrap.servers: ${kafka.bootstrap.servers}
topic: inventory
scan.startup.mode: earliest-offset
value.format: debezium-json
debezium-json.distributed-tables: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (オプション) コミットユーザー。競合を避けるため、異なるジョブには異なるコミットユーザーを設定してください。
commit.user: your_job_name
# (オプション) 読み取りパフォーマンスを向上させるために削除ベクターを有効にします。
table.properties.deletion-vectors.enabled: true
# Debezium JSON にはプライマリキー情報が含まれていないため、プライマリキーを明示的に指定します。
transform:
- source-table: \.*.\.*
projection: \*
primary-keys: idKafka ソースに関する追加の注意事項:
Kafka ソースは、
canal-json、debezium-json(デフォルト)、およびjsonの 3 つのデータ形式をサポートしています。複数の Kafka パーティションから DLF の単一テーブルにインジェストする場合、
debezium-json.distributed-tablesまたはcanal-json.distributed-tablesをtrueに設定します。Kafka ソースは、
schema.inference.strategyオプションを介して複数のスキーマ推論ポリシーをサポートしています。詳細については、「Message Queue for Apache Kafka」をご参照ください。
詳細については、「Flink CDC を使用したデータインジェスト」をご参照ください。