すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:Flink CDC を使用したリアルタイムデータ分布

最終更新日:Nov 09, 2025

このトピックでは、データインジェスト YAML タスクを使用して、リアルタイムデータを一般的なメッセージキューに書き込むためのベストプラクティスについて説明します。

データベースの完全なデータと増分データを Kafka にリアルタイムで書き込む

データインジェスト YAML タスクを使用して、MySQL から Kafka にデータをインポートできます。その後、必要に応じてデータをさまざまなダウンストリームシステムに配布できます。このアプローチにより、複数のタスクからビジネスデータベースへの直接接続が回避され、その処理負荷が軽減されます。

MySQL バイナリログデータを Kafka に同期する

一部のシナリオでは、データ監査やデータ再生などのタスクのために、生のバイナリログデータを保存したい場合があります。データインジェスト YAML タスクは、生の MySQL バイナリログデータを Kafka に同期することをサポートしています。これにより、バイナリログデータを分散方式で読み取ることができ、データホットスポットの問題を解決するのに役立ちます。

データベース kafka_test には、customers と products という 2 つのテーブルが含まれていると仮定します。次のタスクは、これらのテーブルのデータを customers と products という名前の 2 つの Topic に同期します。

source:
  type: mysql
  name: MySQL Source
  hostname: ${secret_values.mysql.hostname}
  port: ${mysql.port}
  username: ${secret_values.mysql.username}
  password: ${secret_values.mysql.password}
  tables: kafka_test.\.*
  server-id: 8601-8604
  # (オプション) 新しく追加されたテーブルの完全データと増分データを同期します。
  scan.newly-added-table.enabled: true
  # (オプション) テーブルとフィールドのコメントを同期します。
  include-comments.enabled: true
  # (オプション) TaskManager の OutOfMemory エラーの可能性を回避するために、無制限のチャンクの配布を優先します。
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (オプション) 読み取りを高速化するために解析フィルターを有効にします。
  scan.only.deserialize.captured.tables.changelog.enabled: true
  # データ変更時刻をメタデータとして送信します。
  metadata-column.include-list: op_ts

sink:
  type: kafka
  name: Kafka Sink
  properties.bootstrap.servers: ${kafka.bootstraps.server}
  # Alibaba Cloud Kafka は、べき等書き込みまたはトランザクション書き込みをサポートしていません。べき等性機能を無効にします。
  properties.enable.idempotence: false
  # (オプション) アップストリームテーブルと Kafka Topic 間のマッピングを設定します。
  sink.tableId-to-topic.mapping: kafka_test.customers:customers;kafka_test.products:products

次のコードは、customers テーブルに対する UPDATE 文によって生成された Kafka メッセージのメッセージ本文のフォーマットを示しています。

// debezium-json
{
  "before": {
    "id": 4,
    "name": "John",
    "address": "New York",
    "phone_number": "2222",
    "age": 12
  },
  "after": {
    "id": 4,
    "name": "John",
    "address": "New York",
    "phone_number": "1234",
    "age": 12
  },
  "op": "u",
  "source": {
    "db": "kafka_test",
    "table": "customers",
    "ts_ms": 1728528674000
  }
}

// canal-json
{
  "old": [
    {
      "id": 4,
      "name": "John",
      "address": "New York",
      "phone_number": "2222",
      "age": 12
    }
  ],
  "data": [
    {
      "id": 4,
      "name": "John",
      "address": "New York",
      "phone_number": "1234",
      "age": 12
    }
  ],
  "type": "UPDATE",
  "database": "kafka_test",
  "table": "customers",
  "pkNames": [
    "id"
  ],
  "ts": 1728528674000,
  "es": 0
}
説明
  • バイナリログは、json、canal-json、または debezium-json (デフォルト) 形式で書き込むことができます。詳細については、「Message Queue for Apache Kafka」をご参照ください。

  • sink.tableId-to-topic.mapping パラメーターを使用しない場合、Topic は database.table 形式を使用して Kafka に作成されます。たとえば、MySQL テーブル kafka_test.customers の場合、Kafka 内の対応する Topic 名は kafka_test.customers です。sink.tableId-to-topic.mapping を使用して、アップストリームテーブルと宛先 Topic 間のマッピングを構成できます。これにより、Kafka メッセージ内のソーステーブル名を保持したまま、宛先 Topic 名を変更できます。詳細については、「Message Queue for Apache Kafka」をご参照ください。

  • デフォルトでは、すべてのデータは Topic のパーティション 0 に書き込まれます。この動作は、partition.strategy 構成を使用して調整できます。詳細については、「Message Queue for Apache Kafka」をご参照ください。たとえば、`partition.strategy: hash-by-key` という構成設定を使用できます。この設定は、プライマリキーのハッシュ値に基づいて、各テーブルのデータを複数のパーティションに書き込みます。これにより、同じプライマリキーを持つデータが同じパーティションに送信され、順序が維持されることが保証されます。

  • Alibaba Cloud Message Queue for Apache Kafka は、べき等書き込みまたはトランザクション書き込みをサポートしていません。データインジェストの宛先として使用する場合は、べき等性機能を無効にするために、構成項目 properties.enable.idempotence: false をシンク構成に追加する必要があります。

Kafka から DLF へのリアルタイムインジェスト

MySQL バイナリログデータを Kafka に同期することで、MySQL データを Kafka に保存するためのソリューションが提供されます。その後、データインジェスト YAML タスクを構成して、Kafka データを DLF ストレージに同期できます。

inventory という名前の Kafka Topic に、customers と products という 2 つのテーブルのデータが debezium-json 形式で含まれていると仮定します。次のタスクは、これら 2 つのテーブルのデータを DLF 内の対応するテーブルに同期します。

source:
  type: kafka
  name: Kafka Source
  properties.bootstrap.servers: ${kafka.bootstrap.servers}
  topic: inventory
  scan.startup.mode: earliest-offset
  value.format: debezium-json
  debezium-json.distributed-tables: true

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (オプション) タスクを送信するためのユーザー名。競合を避けるために、各タスクに一意のユーザー名を設定します。
  commit.user: your_job_name
  # (オプション) 読み取りパフォーマンスを向上させるために削除ベクターを有効にします。
  table.properties.deletion-vectors.enabled: true

# debezium-json にはプライマリキー情報が含まれていません。テーブルにプライマリキーを追加する必要があります。
transform:
  - source-table: \.*.\.*
    projection: \*
    primary-keys: id
説明
  • Kafka データソースは、canal-json および debezium-json (デフォルト) 形式でのデータの読み取りをサポートしています。

  • データ形式が debezium-json の場合、変換ルールを使用してテーブルにプライマリキーを手動で追加する必要があります。

    transform:
      - source-table: \.*.\.*
        projection: \*
        primary-keys: id
  • 単一テーブルのデータが複数のパーティションに分散している場合、またはシャーディング後に異なるパーティションのテーブルをマージする必要がある場合は、debezium-json.distributed-tables または canal-json.distributed-tables 構成項目を true に設定します。

  • Kafka データソースは、複数のスキーマ推論ポリシーをサポートしています。ポリシーは、schema.inference.strategy 構成項目を使用して設定できます。スキーマ推論と変更同期ポリシーの詳細については、「Message Queue for Apache Kafka」をご参照ください。