すべてのプロダクト
Search
ドキュメントセンター

Data Lake Formation:Flink CDC を使用した DLF へのアクセス

最終更新日:Nov 09, 2025

このトピックでは、Realtime Compute for Apache Flink の Flink CDC を使用して、Paimon REST API を介して DLF カタログにアクセスする方法について説明します。

前提条件

  • Realtime Compute for Apache Flink ワークスペースを作成済みであること。詳細については、「Realtime Compute for Apache Flink の有効化」をご参照ください。

  • Realtime Compute for Apache Flink ワークスペースと DLF カタログが同じリージョンに存在すること。

  • Realtime Compute for Apache Flink ワークスペースの VPC を DLF の VPC ホワイトリストに追加済みであること。詳細については、「VPC ホワイトリストの設定」をご参照ください。

エンジン要件

Realtime Compute for Apache Flink ジョブは、Ververica Runtime (VVR) バージョン 11.1.0 以降を実行します。

DLF カタログの作成

詳細については、「DLF の概要」をご参照ください。

カタログに接続するための Flink CDC パラメーター

データインジェストジョブを作成するには、「Flink CDC データインジェストジョブの開発 (パブリックプレビュー)」をご参照ください。

Flink データインジェストジョブのシンクには、次の構成を使用します。

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (オプション) ジョブをサブミットするユーザー名。競合を避けるために、ジョブごとに異なるユーザー名を設定します。
  commit.user: your_job_name
  # (オプション) 削除ベクターを有効にして、読み取りパフォーマンスを向上させます。
  table.properties.deletion-vectors.enabled: true

次の表にパラメーターを示します。

パラメーター

説明

必須

catalog.properties.metastore

メタストアのタイプ。`rest` に設定します。

はい

rest

catalog.properties.token.provider

トークンプロバイダー。`dlf` に設定します。

はい

dlf

catalog.properties.uri

DLF REST Catalog Server にアクセスするための URI。フォーマットは http://[region-id]-vpc.dlf.aliyuncs.com です。リージョン ID の詳細については、「サービスエンドポイント」をご参照ください。

はい

http://cn-hangzhou-vpc.dlf.aliyuncs.com

catalog.properties.warehouse

DLF カタログの名前。

はい

dlf_test

構成例

以下のセクションでは、Flink CDC YAML ジョブを使用して DLF データレイクにデータを同期するための典型的な構成例を示します。

MySQL データベース全体を DLF データレイクに同期する

次の YAML コードは、MySQL データベース全体を DLF に同期する CDC ジョブを示しています。

source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: mysql_test.\.*
  server-id: 8601-8604
  # (オプション) 増分フェーズ中に新しく作成されたテーブルからデータを同期します。
  scan.binlog.newly-added-table.enabled: true
  # (オプション) テーブルとフィールドのコメントを同期します。
  include-comments.enabled: true
  # (オプション) TaskManager の OutOfMemory エラーの可能性を防ぐために、無制限のシャードの分散を優先します。
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (オプション) 読み取りを高速化するために、解析フィルターを有効にします。
  scan.only.deserialize.captured.tables.changelog.enabled: true

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (オプション) ジョブをサブミットするユーザー名。競合を避けるために、ジョブごとに異なるユーザー名を設定します。
  commit.user: your_job_name
  # (オプション) 削除ベクターを有効にして、読み取りパフォーマンスを向上させます。
  table.properties.deletion-vectors.enabled: true
説明

[MySQL ソース構成] 次のパラメーターを設定します。詳細については、「MySQL」をご参照ください。

  1. パラメーター: scan.binlog.newly-added-table.enabled

    機能: 増分フェーズ中に新しく作成されたテーブルからデータを同期します。

  2. パラメーター: include-comments.enabled

    機能: テーブルとフィールドのコメントを同期します。

  3. パラメーター: scan.incremental.snapshot.unbounded-chunk-first.enabled

    機能: TaskManager の OutOfMemory エラーの可能性を防ぎます。

  4. パラメーター: scan.only.deserialize.captured.tables.changelog.enabled: true

    機能: ジョブに一致するテーブルのデータのみを解析して、読み取りを高速化します。

説明

[Paimon シンク構成]

  1. カタログ接続パラメーター

  • パラメータープレフィックス: catalog.properties

  • 機能: カタログ接続情報を指定します。

  1. テーブル作成パラメーター

  • パラメータープレフィックス: table.properties

  • 機能: テーブル作成情報を指定します。

  • 推奨構成: テーブル作成パラメーターに `deletion-vectors.enabled` 構成を追加します。これにより、書き込みおよび更新パフォーマンスへの影響を最小限に抑えながら読み取りパフォーマンスが大幅に向上し、ほぼリアルタイムの更新と高速クエリが実現します。

  • 追加情報: DLF は自動ファイルマージ機能を提供します。`bucket` や `num-sorted-run.compaction-trigger` などのファイルマージおよびバケット関連のパラメーターをテーブル作成パラメーターに追加しないでください。

  1. サブミッター

  • パラメーター名: commit.user

  • 機能: Paimon に書き込まれたファイルをコミットするユーザー。

  • 推奨構成: ジョブごとに異なるコミットユーザーを設定します。ジョブ名を使用できます。

  • 追加情報: デフォルトのコミットユーザーは `admin` です。デフォルトのユーザーを使用すると、複数のジョブが同じテーブルに書き込むときにコミットの競合や不整合が発生する可能性があります。

DLF データレイクのパーティションテーブルにデータを書き込む

データインジェストジョブのソーステーブルには、通常、パーティションフィールドは含まれていません。パーティション分割された子孫テーブルにデータを書き込むには、`partition-keys` を使用してパーティションフィールドを設定できます。詳細については、「Flink CDC データインジェストジョブ開発リファレンス」をご参照ください。次のコードは構成例です。

source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: mysql_test.\.*
  server-id: 8601-8604
  # (オプション) 増分フェーズ中に新しく作成されたテーブルからデータを同期します。
  scan.binlog.newly-added-table.enabled: true
  # (オプション) テーブルとフィールドのコメントを同期します。
  include-comments.enabled: true
  # (オプション) TaskManager の OutOfMemory エラーの可能性を防ぐために、無制限のシャードの分散を優先します。
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (オプション) 読み取りを高速化するために、解析フィルターを有効にします。
  scan.only.deserialize.captured.tables.changelog.enabled: true

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (オプション) ジョブをサブミットするユーザー名。競合を避けるために、ジョブごとに異なるユーザー名を設定します。
  commit.user: your_job_name
  # (オプション) 削除ベクターを有効にして、読み取りパフォーマンスを向上させます。
  table.properties.deletion-vectors.enabled: true

transform:
  - source-table: mysql_test.tbl1
    # (オプション) パーティションフィールドを設定します。  
    partition-keys: id,pt
  - source-table: mysql_test.tbl2
    partition-keys: id,pt

DLF データレイクの追加入力専用テーブルにデータを書き込む

データインジェストジョブのソーステーブルには、完全な変更タイプが含まれています。論理削除のために削除操作を挿入操作に変換するように子孫テーブルを構成するには、「Flink CDC データインジェストジョブ開発リファレンス」をご参照ください。次のコードは構成例です。

source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: mysql_test.\.*
  server-id: 8601-8604
  # (オプション) 増分フェーズ中に新しく作成されたテーブルからデータを同期します。
  scan.binlog.newly-added-table.enabled: true
  # (オプション) テーブルとフィールドのコメントを同期します。
  include-comments.enabled: true
  # (オプション) TaskManager の OutOfMemory エラーの可能性を防ぐために、無制限のシャードの分散を優先します。
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (オプション) 読み取りを高速化するために、解析フィルターを有効にします。
  scan.only.deserialize.captured.tables.changelog.enabled: true

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (オプション) ジョブをサブミットするユーザー名。競合を避けるために、ジョブごとに異なるユーザー名を設定します。
  commit.user: your_job_name
  # (オプション) 削除ベクターを有効にして、読み取りパフォーマンスを向上させます。
  table.properties.deletion-vectors.enabled: true
  
transform:
  - source-table: mysql_test.tbl1
    # (オプション) パーティションフィールドを設定します。
    partition-keys: id,pt
    # (オプション) 論理削除を実装します。
    projection: \*, __data_event_type__ AS op_type
    converter-after-transform: SOFT_DELETE
  - source-table: mysql_test.tbl2
    # (オプション) パーティションフィールドを設定します。
    partition-keys: id,pt
    # (オプション) 論理削除を実装します。
    projection: \*, __data_event_type__ AS op_type
    converter-after-transform: SOFT_DELETE
説明
  • プロジェクションに `__data_event_type` を追加して変更タイプを新しいフィールドとして子孫テーブルに書き込み、`converter-after-transform` を `SOFT_DELETE` に設定して削除操作を挿入操作に変換できます。これにより、ダウンストリームシステムはすべての変更操作を完全に記録できます。詳細については、「Flink CDC データインジェストジョブ開発リファレンス」をご参照ください。

Kafka から DLF データレイクにデータをリアルタイムで同期する

Kafka の `inventory` Topic に `customers` と `products` の 2 つのテーブルのデータが格納されており、データ形式が Debezium JSON であると仮定します。次のジョブ例では、これら 2 つのテーブルのデータを DLF の対応する宛先テーブルに同期します。

source:
  type: kafka
  name: Kafka Source
  properties.bootstrap.servers: ${kafka.bootstrap.servers}
  topic: inventory
  scan.startup.mode: earliest-offset
  value.format: debezium-json
  debezium-json.distributed-tables: true

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (オプション) ジョブをサブミットするユーザー名。競合を避けるために、ジョブごとに異なるユーザー名を設定します。
  commit.user: your_job_name
  # (オプション) 削除ベクターを有効にして、読み取りパフォーマンスを向上させます。
  table.properties.deletion-vectors.enabled: true

# Debezium JSON にはプライマリキー情報が含まれていません。テーブルにプライマリキーを個別に追加する必要があります。  
transform:
  - source-table: \.*.\.*
    projection: \*
    primary-keys: id
説明
  • Kafka データソースは、`canal-json`、`debezium-json` (デフォルト)、および `json` フォーマットでのデータの読み取りをサポートしています。

  • データ形式が `debezium-json` の場合、Debezium JSON メッセージにはプライマリキー情報が記録されないため、変換ルールを使用してテーブルにプライマリキーを手動で追加する必要があります。

    transform:
      - source-table: \.*.\.*
        projection: \*
        primary-keys: id
  • 単一テーブルのデータが複数のパーティションに分散している場合、またはシャーディング後に異なるパーティションのテーブルをマージする必要がある場合は、debezium-json.distributed-tables または canal-json.distributed-tables パラメーターを `true` に設定できます。

  • Kafka データソースは、複数のスキーマ推論ポリシーをサポートしています。schema.inference.strategy パラメーターを使用してポリシーを設定できます。スキーマ推論と変更同期ポリシーの詳細については、「Kafka メッセージキュー」をご参照ください。

より詳細なジョブ構成については、「Flink CDC データインジェストジョブ開発リファレンス」をご参照ください。