すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:Flink CDC を使用して DLF データレイクにリアルタイムでデータを取り込む

最終更新日:Nov 21, 2025

このトピックでは、Flink CDC を使用してリアルタイムデータを Alibaba Cloud Data Lake Formation (DLF) に取り込むためのベストプラクティスについて説明します。

DLF は、データとメタデータの管理とストレージを統合するフルマネージドプラットフォームです。また、権限管理とストレージの最適化機能も備えています。

Flink CDC を使用してデータベース全体から DLF でホストされている Paimon カタログにデータをロードするデータインジェストタスクを作成できます。

MySQL から DLF にデータベース全体を取り込む

YAML でコードを記述して、MySQL から DLF にデータを取り込みます:

source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: mysql_test.\.*
  server-id: 8601-8604
  # (オプション) 増分読み取り中に作成された新しいテーブルからデータを同期します。
  scan.binlog.newly-added-table.enabled: true
  # (オプション) テーブルとフィールドのコメントを同期します。
  include-comments.enabled: true
  # (オプション) TM OOM エラーを防ぐために、無制限のシャードの分散を優先します。
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (オプション) 変更イベントのみを逆シリアル化して、データ読み取りを高速化します。
  scan.only.deserialize.captured.tables.changelog.enabled: true

sink:
  type: paimon
  # メタストアのタイプ。rest に固定されています。
  catalog.properties.metastore: rest
  # トークンプロバイダー。dlf に固定されています。
  catalog.properties.token.provider: dlf
  # DLF Rest Catalog Server へのアクセスに使用される URI。フォーマット: http://[region-id]-vpc.dlf.aliyuncs.com。例: http://ap-southeast-1-vpc.dlf.aliyuncs.com。
  catalog.properties.uri: dlf_uri
  # カタログ名。
  catalog.properties.warehouse: your_warehouse
  # (オプション) 削除ベクターを有効にして、読み取りパフォーマンスを向上させます。
  table.properties.deletion-vectors.enabled: true
説明
  • MySQL ソース構成の詳細については、「MySQL」をご参照ください。

  • ほぼリアルタイムの書き込みと高速な読み取りの両方を実現するには、table.properties.deletion-vectors.enabled オプションを有効にします。これにより、書き込みへの影響を最小限に抑えながら、読み取りパフォーマンスが大幅に向上します。

  • DLF は自動コンパクション機能を備えているため、ファイルコンパクション (num-sorted-run.compaction-trigger) とバケット (bucket) に関連するテーブルプロパティを追加しないでください。

DLF パーティションテーブルにデータを取り込む

非パーティションテーブルから DLF パーティションテーブルにデータをロードするには、transform モジュールで partition-keys オプションを使用します。コード例:

source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: mysql_test.\.*
  server-id: 8601-8604
  # (オプション) 増分読み取り中に作成された新しいテーブルからデータを同期します。
  scan.binlog.newly-added-table.enabled: true
  # (オプション) テーブルとフィールドのコメントを同期します。
  include-comments.enabled: true
  # (オプション) TM OOM を防ぐために、無制限のシャードの分散を優先します。
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (オプション) 変更イベントのみを逆シリアル化して、データ読み取りを高速化します。
  scan.only.deserialize.captured.tables.changelog.enabled: true

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (オプション) 削除ベクターを有効にして、読み取りパフォーマンスを向上させます。
  table.properties.deletion-vectors.enabled: true

transform:
  - source-table: mysql_test.tbl1
    # (オプション) パーティションフィールドを設定します。
    partition-keys: id,pt
  - source-table: mysql_test.tbl2
    partition-keys: id,pt

DLF の追記専用テーブルにデータを取り込む

MySQL データを DLF に取り込み、ソースから削除されたデータに対して論理削除を実装します。

source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: mysql_test.\.*
  server-id: 8601-8604
  # (オプション) 増分読み取り中に作成された新しいテーブルからデータを同期します。
  scan.binlog.newly-added-table.enabled: true
  # (オプション) テーブルとフィールドのコメントを同期します。
  include-comments.enabled: true
  # (オプション) TM OOM を防ぐために、無制限のシャードの分散を優先します。
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (オプション) 変更イベントのみを逆シリアル化して、データ読み取りを高速化します。
  scan.only.deserialize.captured.tables.changelog.enabled: true

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (オプション) 削除ベクターを有効にして、読み取りパフォーマンスを向上させます。
  table.properties.deletion-vectors.enabled: true
  
transform:
  - source-table: mysql_test.tbl1
    # (オプション) パーティションフィールドを設定します。
    partition-keys: id,pt
    # (オプション) 論理削除を実装します。
    projection: \*, __data_event_type__ AS op_type
    converter-after-transform: SOFT_DELETE
  - source-table: mysql_test.tbl2
    # (オプション) パーティションフィールドを設定します。
    partition-keys: id,pt
    # (オプション) 論理削除を実装します。
    projection: \*, __data_event_type__ AS op_type
    converter-after-transform: SOFT_DELETE
説明
  • projection__data_event_type__ を指定すると、イベントタイプが新しい列としてシンクテーブルに追加されます。converter-after-transformSOFT_DELETE に設定すると、削除が挿入に変換されます。これにより、すべての変更操作がダウンストリームシステムに完全に記録されることが保証されます。

Kafka から DLF にリアルタイムでデータを取り込む

Flink CDC を使用して MySQL から Kafka にリアルタイムでデータを同期した後、さらにデータを DLF にロードしてストレージできます。

Kafka の inventory トピック (2 つのテーブル、customersproducts を含む) からデータを取り込みます:

source:
  type: kafka
  name: Kafka Source
  properties.bootstrap.servers: ${kafka.bootstrap.servers}
  topic: inventory
  scan.startup.mode: earliest-offset
  value.format: debezium-json
  debezium-json.distributed-tables: true

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (オプション) 削除ベクターを有効にして、読み取りパフォーマンスを向上させます。
  table.properties.deletion-vectors.enabled: true

# Debezium JSON にはプライマリキー情報が含まれていません。テーブルにプライマリキーを追加する必要があります。
transform:
  - source-table: \.*.\.*
    projection: \*
    primary-keys: id
説明
  • Kafka ソースの value.format の有効な値: canal-json、debezium-json (デフォルト)、および json。

  • Kafka から Paimon へのスキーマ進化がサポートされており、データレプリケーションの柔軟性が向上します。

  • 値のフォーマットが debezium-json の場合、transform モジュールでプライマリキーを明示的に指定する必要があります:

    transform:
      - source-table: \.*.\.*
        projection: \*
        primary-keys: id
  • シャードに分散されたデータを持つテーブルを取り込むには、debezium-json.distributed-tables または canal-json.distributed-tablestrue に設定します。

  • Kafka ソースは複数のスキーマ推論ポリシーをサポートしています。schema.inference.strategy オプションを使用してポリシーを設定できます。詳細については、「Kafka コネクタ」をご参照ください。