すべてのプロダクト
Search
ドキュメントセンター

Data Lake Formation:DLF への Flink CDC 接続

最終更新日:Mar 26, 2026

Flink Change Data Capture (CDC) は、Realtime Compute for Apache Flink が提供するデータインジェストツールです。ソースからご利用のデータレイクハウスへデータベース全体を同期することをサポートしています。このトピックでは、Paimon REST を介して Flink CDC を使用し、DLF カタログにリアルタイムでデータをインジェストする方法について説明します。

このトピックを完了すると、次のことができるようになります。

  • インジェスト送信先として DLF カタログを作成済みであること

  • DLF を指す Paimon シンクで Flink CDC YAML ジョブを設定済みであること

  • 一般的な取り込みシナリオの例を実行します:データベース全体の同期、パーティションテーブル、追記専用テーブル、およびKafka ソース

前提条件

開始する前に、以下を確認してください。

  • Realtime Compute for Apache Flink ワークスペースがあること。詳細については、「ワークスペースの作成」をご参照ください。

  • Realtime Compute for Apache Flink ワークスペースと DLF カタログが同じリージョンにあること。

  • ご利用の Realtime Compute for Apache Flink ワークスペースの VPC が DLF の VPC ホワイトリストに追加されていること。詳細については、「VPC ホワイトリストの設定」をご参照ください。

エンジン要件

ご利用の Realtime Compute for Apache Flink ジョブは、Ververica Runtime (VVR) バージョン 11.1.0 以降で実行する必要があります。

DLF カタログの作成

詳細については、「DLF の使用開始」をご参照ください。

データインジェストジョブの作成と設定

  1. Flink CDC を使用してデータをインジェストするための YAML ドラフトを作成します。詳細については、「データインジェスト用 Flink CDC ジョブの開発 (ベータ版)」をご参照ください。

  2. シンクモジュールを設定します。

    sink:
      type: paimon
      catalog.properties.metastore: rest
      catalog.properties.uri: dlf_uri
      catalog.properties.warehouse: your_warehouse
      catalog.properties.token.provider: dlf
      # (オプション) コミットユーザー。競合を避けるため、異なるジョブには異なるコミットユーザーを設定してください。
      commit.user: your_job_name
      # (オプション) 読み取りパフォーマンスを向上させるために削除ベクターを有効にします。
      table.properties.deletion-vectors.enabled: true

    プレースホルダーの値を実際の値に置き換えてください。

    設定オプション説明必須デフォルト
    catalog.properties.metastoreメタストアのタイプ。rest に設定します。はいrest
    catalog.properties.token.providerトークンプロバイダー。dlf に設定します。はいdlf
    catalog.properties.uriDLF REST カタログサーバーにアクセスするための URI。形式: http://[region-id]-vpc.dlf.aliyuncs.com。リージョン ID については、「リージョンとエンドポイント」をご参照ください。はいhttp://ap-southeast-1-vpc.dlf.aliyuncs.com
    catalog.properties.warehousePaimon カタログの名前。はいdlf_test
    commit.userデータ書き込み用のコミットユーザー。競合を避けるため、異なるジョブには一意のコミットユーザーを割り当ててください。いいえadminyour_job_name
    table.properties.deletion-vectors.enabled書き込みへの影響を最小限に抑えつつ、読み取りパフォーマンスを向上させるために削除ベクターを有効にします。いいえtrue

注意事項

ジョブを実行する前に、以下の制約事項に注意してください。

  • コミットユーザーの競合: デフォルトのコミットユーザーは admin です。同じコミットユーザーで同じテーブルに対して同時データ書き込みジョブを実行すると、コミットの競合やデータ不整合が発生します。各ジョブには一意の commit.user を割り当ててください。

  • コンパクションまたはバケットオプションなし: DLF は自動ファイルコンパクションを提供します。bucketnum-sorted-run.compaction-trigger などのコンパクションまたはバケットオプションを設定しないでください。

  • 削除ベクター: table.properties.deletion-vectors.enabled: true を設定すると、書き込みへの影響を最小限に抑えつつ、読み取りを大幅に高速化できます。

MySQL データベース全体から DLF へのデータインジェスト

以下のジョブは、MySQL データベース内のすべてのテーブルを DLF に同期します。

source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: mysql_test.\.*
  server-id: 8601-8604
  # (オプション) 増分フェーズ中に作成されたテーブルからデータを同期します。
  scan.binlog.newly-added-table.enabled: true
  # (オプション) テーブルとフィールドのコメントを同期します。
  include-comments.enabled: true
  # (オプション) 潜在的な TaskManager メモリ不足 (OOM) エラーを防ぐために、無制限シャードを優先します。
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (オプション) 一致するテーブルからのみデータを逆シリアル化して読み取りを高速化します。
  scan.only.deserialize.captured.tables.changelog.enabled: true

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (オプション) コミットユーザー。競合を避けるため、異なるジョブには異なるコミットユーザーを設定してください。
  commit.user: your_job_name
  # (オプション) 読み取りパフォーマンスを向上させるために削除ベクターを有効にします。
  table.properties.deletion-vectors.enabled: true

推奨される MySQL ソース オプションは次のとおりです。

オプション説明
scan.binlog.newly-added-table.enabled増分フェーズ中に作成されたテーブルからデータを同期します。
include-comments.enabledテーブルとフィールドのコメントを同期します。
scan.incremental.snapshot.unbounded-chunk-first.enabled無制限シャードを優先することで、潜在的な TaskManager メモリ不足 (OOM) エラーを防ぎます。
scan.only.deserialize.captured.tables.changelog.enabled一致するテーブルからのみデータを逆シリアル化して読み取りを高速化します。

パーティションテーブルへのデータインジェスト

非パーティションソーステーブルから DLF のパーティションテーブルにインジェストするには、変換モジュールに partition-keys オプションを追加します。詳細については、「Flink CDC を使用したデータインジェスト」をご参照ください。

source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: mysql_test.\.*
  server-id: 8601-8604
  # (オプション) 増分フェーズ中に作成されたテーブルからデータを同期します。
  scan.binlog.newly-added-table.enabled: true
  # (オプション) テーブルとフィールドのコメントを同期します。
  include-comments.enabled: true
  # (オプション) 潜在的な TaskManager メモリ不足 (OOM) エラーを防ぐために、無制限シャードを優先します。
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (オプション) 一致するテーブルからのみデータを逆シリアル化して読み取りを高速化します。
  scan.only.deserialize.captured.tables.changelog.enabled: true

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (オプション) コミットユーザー。競合を避けるため、異なるジョブには異なるコミットユーザーを設定してください。
  commit.user: your_job_name
  # (オプション) 読み取りパフォーマンスを向上させるために削除ベクターを有効にします。
  table.properties.deletion-vectors.enabled: true

transform:
  - source-table: mysql_test.tbl1
    partition-keys: id,pt
  - source-table: mysql_test.tbl2
    partition-keys: id,pt

追記専用テーブルへのデータインジェスト

インジェスト中に論理削除 (ソフトデリート) を実装するには、変換モジュールで converter-after-transform: SOFT_DELETE を使用します。これにより、削除操作が挿入操作に変換され、ダウンストリームテーブルはすべての変更操作を完全に記録します。projection 内の __data_event_type__ フィールドは、変更タイプを新しい列としてダウンストリームテーブルに書き込みます。

source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: mysql_test.\.*
  server-id: 8601-8604
  # (オプション) 増分フェーズ中に作成されたテーブルからデータを同期します。
  scan.binlog.newly-added-table.enabled: true
  # (オプション) テーブルとフィールドのコメントを同期します。
  include-comments.enabled: true
  # (オプション) 潜在的な TaskManager メモリ不足 (OOM) エラーを防ぐために、無制限シャードを優先します。
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (オプション) 一致するテーブルからのみデータを逆シリアル化して読み取りを高速化します。
  scan.only.deserialize.captured.tables.changelog.enabled: true

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (オプション) コミットユーザー。競合を避けるため、異なるジョブには異なるコミットユーザーを設定してください。
  commit.user: your_job_name
  # (オプション) 読み取りパフォーマンスを向上させるために削除ベクターを有効にします。
  table.properties.deletion-vectors.enabled: true

transform:
  - source-table: mysql_test.tbl1
    partition-keys: id,pt
    projection: \*, __data_event_type__ AS op_type
    converter-after-transform: SOFT_DELETE
  - source-table: mysql_test.tbl2
    partition-keys: id,pt
    projection: \*, __data_event_type__ AS op_type
    converter-after-transform: SOFT_DELETE

詳細については、「Flink CDC を使用したデータインジェスト」をご参照ください。

Kafka から DLF へのリアルタイムデータ同期

以下のジョブは、Kafka inventory トピック (Debezium JSON 形式のテーブル customersproducts) から CDC データを読み取り、対応する送信先テーブルに DLF で同期します。Debezium JSON メッセージにはプライマリキー情報が含まれていないため、プライマリキーは変換モジュールで明示的に指定されます。

source:
  type: kafka
  name: Kafka Source
  properties.bootstrap.servers: ${kafka.bootstrap.servers}
  topic: inventory
  scan.startup.mode: earliest-offset
  value.format: debezium-json
  debezium-json.distributed-tables: true

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (オプション) コミットユーザー。競合を避けるため、異なるジョブには異なるコミットユーザーを設定してください。
  commit.user: your_job_name
  # (オプション) 読み取りパフォーマンスを向上させるために削除ベクターを有効にします。
  table.properties.deletion-vectors.enabled: true

# Debezium JSON にはプライマリキー情報が含まれていないため、プライマリキーを明示的に指定します。
transform:
  - source-table: \.*.\.*
    projection: \*
    primary-keys: id

Kafka ソースに関する追加の注意事項:

  • Kafka ソースは、canal-jsondebezium-json (デフォルト)、および json の 3 つのデータ形式をサポートしています。

  • 複数の Kafka パーティションから DLF の単一テーブルにインジェストする場合、debezium-json.distributed-tables または canal-json.distributed-tablestrue に設定します。

  • Kafka ソースは、schema.inference.strategy オプションを介して複数のスキーマ推論ポリシーをサポートしています。詳細については、「Message Queue for Apache Kafka」をご参照ください。

詳細については、「Flink CDC を使用したデータインジェスト」をご参照ください。