すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:データレイクへのリアルタイムデータベースデータのインジェスト

最終更新日:Feb 06, 2026

このトピックでは、Change Data Capture(CDC)データインジェスト YAML ジョブを活用して、リアルタイムデータを Alibaba Cloud Data Lake Formation(DLF)に書き込む際のベストプラクティスについて説明します。

Alibaba Cloud Data Lake Formation(DLF)は、メタデータ、データストレージ、データ管理を統合的に提供するフルマネージドプラットフォームです。DLF には、メタデータ管理、権限管理、ストレージ最適化などの機能が備わっています。詳細については、「Data Lake Formation とは?」をご参照ください。

データインジェストジョブでは、DLF の Paimon Catalog を宛先として使用できます。また、大規模なデータベース全体をデータレイクにインジェストすることも可能です。

MySQL データベース全体を DLF に同期する

以下の CDC YAML ジョブでは、MySQL データベース全体を DLF に同期します:

source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: mysql_test.\.*
  server-id: 8601-8604
  # (任意)増分フェーズで新規作成されたテーブルからのデータ同期を有効化します。
  scan.binlog.newly-added-table.enabled: true
  # (任意)テーブルおよびフィールドのコメントを同期します。
  include-comments.enabled: true
  # (任意)無制限チャンクの配布を優先し、潜在的な TaskManager OutOfMemory 問題を防止します。
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (任意)解析フィルターを有効化して読み取りを高速化します。
  scan.only.deserialize.captured.tables.changelog.enabled: true

sink:
  type: paimon
  # メタストアの種類。rest に設定します。
  catalog.properties.metastore: rest
  # トークンプロバイダー。dlf に設定します。
  catalog.properties.token.provider: dlf
  # DLF Rest Catalog Server へのアクセス URI。形式は http://[region-id]-vpc.dlf.aliyuncs.com です(例:http://cn-hangzhou-vpc.dlf.aliyuncs.com)。
  catalog.properties.uri: dlf_uri
  # DLF Catalog の名前。
  catalog.properties.warehouse: your_warehouse
  # (任意)削除ベクターを有効化して読み取りパフォーマンスを向上させます。
  table.properties.deletion-vectors.enabled: true
説明
  • MySQL ソースのパラメーターの詳細については、「MySQL」をご参照ください。

  • 削除ベクターを有効化する設定(deletion-vectors.enabled)をテーブル作成パラメーターに追加してください。これにより、書き込みおよび更新パフォーマンスへの影響を最小限に抑えつつ、読み取りパフォーマンスを大幅に向上させることができます。これにより、ニアリアルタイムでの更新と高速クエリが可能になります。

  • DLF には自動ファイルマージ機能が備わっているため、テーブル作成パラメーターにファイルマージやバケット関連のパラメーター(例:bucket、num-sorted-run.compaction-trigger)を追加しないでください。

DLF のパーティションテーブルへの書き込み

データインジェストジョブのソーステーブルには、通常、パーティションフィールド情報が含まれません。パーティション付きの子孫テーブルに書き込むには、Flink CDC データインジェストジョブ開発リファレンスで定義されている partition-keys 設定を使用してパーティションフィールドを構成する必要があります(Flink CDC データインジェストジョブ開発リファレンスを参照)。以下の例は、この設定の構成方法を示しています:

source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: mysql_test.\.*
  server-id: 8601-8604
  # (任意)増分フェーズで新規作成されたテーブルからのデータ同期を有効化します。
  scan.binlog.newly-added-table.enabled: true
  # (任意)テーブルおよびフィールドのコメントを同期します。
  include-comments.enabled: true
  # (任意)無制限チャンクの配布を優先し、潜在的な TaskManager OutOfMemory 問題を防止します。
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (任意)解析フィルターを有効化して読み取りを高速化します。
  scan.only.deserialize.captured.tables.changelog.enabled: true

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (任意)削除ベクターを有効化して読み取りパフォーマンスを向上させます。
  table.properties.deletion-vectors.enabled: true

transform:
  - source-table: mysql_test.tbl1
    # (任意)パーティションフィールドを設定します。
    partition-keys: id,pt
  - source-table: mysql_test.tbl2
    partition-keys: id,pt

DLF のアペンドオンリー(Append-Only)テーブルへの書き込み

データインジェストジョブのソーステーブルには、完全な変更データが含まれています。削除操作を宛先テーブルにおける挿入操作に変換することでソフトデリートを実装するには、「Flink CDC データインジェストジョブ開発リファレンス」をご参照ください。以下の例は、その構成方法を示しています:

source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: mysql_test.\.*
  server-id: 8601-8604
  # (任意)増分フェーズで新規作成されたテーブルからのデータ同期を有効化します。
  scan.binlog.newly-added-table.enabled: true
  # (任意)テーブルおよびフィールドのコメントを同期します。
  include-comments.enabled: true
  # (任意)無制限チャンクの配布を優先し、潜在的な TaskManager OutOfMemory 問題を防止します。
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (任意)解析フィルターを有効化して読み取りを高速化します。
  scan.only.deserialize.captured.tables.changelog.enabled: true

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (任意)削除ベクターを有効化して読み取りパフォーマンスを向上させます。
  table.properties.deletion-vectors.enabled: true
  
transform:
  - source-table: mysql_test.tbl1
    # (任意)パーティションフィールドを設定します。
    partition-keys: id,pt
    # (任意)ソフトデリートを実装します。
    projection: \*, __data_event_type__ AS op_type
    converter-after-transform: SOFT_DELETE
  - source-table: mysql_test.tbl2
    # (任意)パーティションフィールドを設定します。
    partition-keys: id,pt
    # (任意)ソフトデリートを実装します。
    projection: \*, __data_event_type__ AS op_type
    converter-after-transform: SOFT_DELETE
説明
  • 変更タイプを新しいフィールドとして宛先テーブルに書き込むために、projection に __data_event_type を追加してください。converter-after-transform を SOFT_DELETE に設定すると、削除操作が挿入操作に変換されます。これにより、すべての変更操作が下流テーブルに完全に記録されます。詳細については、「Flink CDC データインジェストジョブ開発リファレンス」をご参照ください。

Kafka CDC データをリアルタイムで DLF に同期する

リアルタイム配信を活用して、MySQL データを Kafka へリアルタイムで同期できます。その後、CDC YAML ジョブを構成して、Kafka のデータを DLF へ同期します。

仮に、Kafka の inventory Topic に customers および products の 2 つのテーブルのデータが格納されており、データ形式が Debezium JSON であるとします。以下の例のジョブでは、これらの 2 つのテーブルから DLF の対応する宛先テーブルへデータを同期します:

source:
  type: kafka
  name: Kafka Source
  properties.bootstrap.servers: ${kafka.bootstrap.servers}
  topic: inventory
  scan.startup.mode: earliest-offset
  value.format: debezium-json
  debezium-json.distributed-tables: true

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (任意)削除ベクターを有効化して読み取りパフォーマンスを向上させます。
  table.properties.deletion-vectors.enabled: true

# Debezium JSON にはプライマリキー情報が含まれないため、テーブルにプライマリキーを追加します。
transform:
  - source-table: \.*.\.*
    projection: \*
    primary-keys: id
説明
  • Kafka データソースでは、canal-json、debezium-json(デフォルト)、json の各形式でデータを読み取ることが可能です。

  • Kafka メッセージの形式が変更された場合(例:新規フィールドの追加など)、その変更は Paimon テーブルスキーマへ自動的に同期されます。これにより、データ同期の柔軟性が向上します。

  • データ形式が debezium-json の場合、Debezium JSON メッセージにはプライマリキー情報が記録されないため、変換ルールを用いてテーブルにプライマリキーを手動で追加する必要があります:

    transform:
      - source-table: \.*.\.*
        projection: \*
        primary-keys: id
  • 単一テーブルのデータが複数のパーティションに分散している場合、またはシャーディングを用いて異なるパーティションのテーブルをマージする場合は、debezium-json.distributed-tables または canal-json.distributed-tables の設定項目を true に設定してください。

  • Kafka データソースでは、複数のスキーマ推論ポリシーがサポートされています。schema.inference.strategy 設定項目を用いてポリシーを指定できます。スキーマ推論および変更同期ポリシーの詳細については、「Message Queue for Kafka」をご参照ください。