すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:リアルタイムインジェストの実装

最終更新日:Mar 10, 2026

このトピックでは、変更データキャプチャ (CDC) YAML データインジェストジョブを使用して、リアルタイムデータをデータウェアハウスに書き込むためのベストプラクティスについて説明します。

Flink CDC から Hologres データウェアハウスへのリアルタイム同期

CDC YAML データインジェストジョブを使用してデータを Hologres に同期し、リアルタイムデータウェアハウスを構築することで、Flink の強力なリアルタイム処理能力と、Hologres のバイナリログ、行列表ハイブリッドストレージ、強力なリソース隔離などの特徴を最大限に活用できます。この組み合わせにより、効率的でスケーラブルなリアルタイムのデータ処理と分析が可能になります。

MySQL から Hologres データウェアハウスへのリアルタイム同期

次のコードは、MySQL データベース全体を Hologres に同期するための最も基本的な CDC YAML データインジェストジョブを示しています。

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: holo_test.\.*
  server-id: 8601-8604
  # (オプション) テーブルコメントとフィールドコメントを同期します。
  include-comments.enabled: true
  # (オプション) 非有界シャードのディストリビューションを優先して、TaskManager の OutOfMemory エラーの可能性を防ぎます。
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (オプション) 非有界シャードのディストリビューションを優先して、TaskManager の OutOfMemory エラーの可能性を防ぎます。
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (オプション) 解析フィルターを有効にして、データの読み取りを高速化します。
  scan.only.deserialize.captured.tables.changelog.enabled: true  

sink:
  type: hologres
  name: Hologres Sink
  endpoint: ****.hologres.aliyuncs.com:80
  dbname: cdcyaml_test
  username: ${secret_values.holo-username}
  password: ${secret_values.holo-password}
  sink.type-normalize-strategy: BROADEN
説明
  • ジョブの初回起動時にテーブルコメントとフィールドコメントを同期するために、MySQL ソースで `include-comments.enabled` を設定することを推奨します。詳細については、「MySQL」をご参照ください。

  • ジョブの初回起動時に TaskManager の OutOfMemory エラーが発生する可能性を防ぐために、MySQL ソースで `scan.incremental.snapshot.unbounded-chunk-first.enabled` を設定することを推奨します。詳細については、「MySQL」をご参照ください。

より寛容な型マッピングの使用

Hologres コネクタは列の型変更イベントを処理できませんが、複数の型マッピング関係をサポートしています。データソースの変更をより適切にサポートするために、複数の MySQL データ型をより広範な Hologres の型にマッピングできます。これにより、不要な型変更イベントをスキップし、ジョブが期待どおりに実行されるようになります。マッピングは、設定項目 sink.type-normalize-strategy を使用して変更できます。デフォルト値は STANDARD です。詳細については、「データインジェスト YAML ジョブの Hologres コネクタ型マッピング」をご参照ください。

たとえば、`ONLY_BIGINT_OR_TEXT` を使用して、Hologres の `int8` 型と `text` 型にのみマッピングできます。この場合、MySQL の列の型が `INT` から `BIGINT` に変更されても、Hologres は両方の MySQL の型を `int8` 型にマッピングします。その結果、ジョブはサポートされていない型変換のエラーを報告しません。

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: holo_test.\.*
  server-id: 8601-8604
  # (オプション) テーブルコメントとフィールドコメントを同期します。
  include-comments.enabled: true
  # (オプション) 非有界シャードのディストリビューションを優先して、TaskManager の OutOfMemory エラーの可能性を防ぎます。
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (オプション) 非有界シャードのディストリビューションを優先して、TaskManager の OutOfMemory エラーの可能性を防ぎます。
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (オプション) 解析フィルターを有効にして、データの読み取りを高速化します。
  scan.only.deserialize.captured.tables.changelog.enabled: true

sink:
  type: hologres
  name: Hologres Sink
  endpoint: ****.hologres.aliyuncs.com:80
  dbname: cdcyaml_test
  username: ${secret_values.holo-username}
  password: ${secret_values.holo-password}
  sink.type-normalize-strategy: ONLY_BIGINT_OR_TEXT

パーティションテーブルへのデータ書き込み

CDC YAML データインジェストジョブで Hologres コネクタを sink として使用する場合、パーティションテーブルにデータを書き込むことができます。詳細については、「パーティションテーブルへのデータ書き込み」をご参照ください。

Kafka から Hologres データウェアハウスへのリアルタイム同期

リアルタイムディストリビューションの実装」では、MySQL データを Kafka に保存するソリューションについて説明しています。その後、データインジェスト YAML ファイルを使用してデータを Hologres に同期し、リアルタイムデータウェアハウスを構築できます。

`inventory` という名前の Kafka トピックに、`customers` と `products` の 2 つのテーブルのデータが `debezium-json` フォーマットで含まれていると仮定します。次のジョブは、これら 2 つのテーブルのデータを Hologres の対応するテーブルに同期できます。

source:
  type: kafka
  name: Kafka Source
  properties.bootstrap.servers: ${kafka.bootstrap.servers}
  topic: inventory
  scan.startup.mode: earliest-offset
  value.format: debezium-json
  debezium-json.distributed-tables: true

sink:
  type: hologres
  name: Hologres Sink
  endpoint: ****.hologres.aliyuncs.com:80
  dbname: cdcyaml_test
  username: ${secret_values.holo-username}
  password: ${secret_values.holo-password}
  sink.type-normalize-strategy: ONLY_BIGINT_OR_TEXT
 
transform:
  - source-table: \.*.\.*
    projection: \*
    primary-keys: id
説明
  • Kafka データソースは、`json`、`canal-json`、および `debezium-json` (デフォルト) のフォーマットでのデータ読み取りをサポートしています。

  • データ形式が `debezium-json` の場合、変換ルールを使用してテーブルにプライマリキーを手動で追加する必要があります。

    transform:
      - source-table: \.*.\.*
        projection: \*
        primary-keys: id
  • 単一テーブルのデータが複数のパーティションに分散している場合、または異なるパーティションのシャーディングされたテーブルのデータをマージする必要がある場合は、設定項目 debezium-json.distributed-tables または canal-json.distributed-tables を `true` に設定する必要があります。

  • Kafka データソースは、複数のスキーマ推論ポリシーをサポートしています。ポリシーは、設定項目 schema.inference.strategy を使用して設定できます。スキーマの推論と変更の同期ポリシーの詳細については、「Message Queue for Kafka」をご参照ください。