すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:データウェアハウスへのリアルタイムデータインジェスチョン

最終更新日:Aug 11, 2025

このトピックでは、変更データキャプチャ(CDC)YAML データインジェスチョンジョブを使用してリアルタイムデータをデータウェアハウスに書き込むためのベストプラクティスについて説明します。

Flink CDC から Hologres データウェアハウスへのリアルタイム同期

CDC YAML データインジェスチョンジョブを使用して Hologres にデータを同期してリアルタイムデータウェアハウスを構築すると、Flink の強力なリアルタイム処理機能と、バイナリロギング、行と列のハイブリッドストレージ、強力なリソース分離などの Hologres の機能を最大限に活用できます。この組み合わせにより、効率的かつスケーラブルなリアルタイムデータ処理と分析が可能になります。

MySQL から Hologres データウェアハウスへのリアルタイム同期

次のコードは、MySQL データベース全体を Hologres に同期するための最も基本的な CDC YAML データインジェスチョンジョブを示しています。

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: holo_test.\.*
  server-id: 8601-8604
  # (オプション) テーブルのコメントとフィールドのコメントを同期します。
  include-comments.enabled: true
  # (オプション) TaskManager の OutOfMemory エラーの可能性を防ぐために、無制限のシャードの分散を優先します。
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (オプション) TaskManager の OutOfMemory エラーの可能性を防ぐために、無制限のシャードの分散を優先します。
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (オプション) データ読み取りを高速化するために解析フィルターを有効にします。
  scan.only.deserialize.captured.tables.changelog.enabled: true  

sink:
  type: hologres
  name: Hologres Sink
  endpoint: ****.hologres.aliyuncs.com:80
  dbname: cdcyaml_test
  username: ${secret_values.holo-username}
  password: ${secret_values.holo-password}
  sink.type-normalize-strategy: BROADEN
説明
  • ジョブを初めて開始するときは、MySQL ソースの include-comments.enabled を設定して、テーブルのコメントとフィールドのコメントを同期することをお勧めします。詳細については、「MySQL」をご参照ください。

  • ジョブを初めて開始するときに TaskManager の OutOfMemory エラーが発生するのを防ぐために、MySQL ソースの scan.incremental.snapshot.unbounded-chunk-first.enabled を設定することをお勧めします。詳細については、「MySQL」をご参照ください。

より許容範囲の広い型マッピングを使用する

Hologres コネクタは列型変更イベントを処理できませんが、複数の型マッピング関係をサポートしています。データソースの変更をより適切にサポートするために、複数の MySQL データ型をより広範な Hologres 型にマッピングできます。これにより、不要な型変更イベントをスキップし、ジョブが期待どおりに実行されるようにすることができます。sink.type-normalize-strategy 設定項目を使用してマッピングを変更できます。デフォルト値は STANDARD です。詳細については、「データインジェスチョン YAML ジョブの Hologres コネクタ型マッピング」をご参照ください。

たとえば、ONLY_BIGINT_OR_TEXT を使用して、Hologres の int8 型と text 型のみに型をマッピングできます。この場合、MySQL の列の型が INT から BIGINT に変更されると、Hologres は両方の MySQL 型を int8 型にマッピングします。その結果、ジョブはサポートされていない型変換のエラーを報告しません。

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: holo_test.\.*
  server-id: 8601-8604
  # (オプション) テーブルのコメントとフィールドのコメントを同期します。
  include-comments.enabled: true
  # (オプション) TaskManager の OutOfMemory エラーの可能性を防ぐために、無制限のシャードの分散を優先します。
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (オプション) TaskManager の OutOfMemory エラーの可能性を防ぐために、無制限のシャードの分散を優先します。
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (オプション) データ読み取りを高速化するために解析フィルターを有効にします。
  scan.only.deserialize.captured.tables.changelog.enabled: true

sink:
  type: hologres
  name: Hologres Sink
  endpoint: ****.hologres.aliyuncs.com:80
  dbname: cdcyaml_test
  username: ${secret_values.holo-username}
  password: ${secret_values.holo-password}
  sink.type-normalize-strategy: ONLY_BIGINT_OR_TEXT

パーティションテーブルにデータを書き込む

CDC YAML データインジェスチョンジョブで Hologres コネクタをシンクとして使用する場合、パーティションテーブルにデータを書き込むことができます。詳細については、「パーティションテーブルにデータを書き込む」をご参照ください。

Kafka から Hologres データウェアハウスへのリアルタイム同期

Flink CDC を使用してリアルタイムデータ配信を実装するでは、MySQL データを Kafka に格納するためのソリューションについて説明しています。また、CDC YAML データインジェスチョンジョブを使用して Kafka から Hologres にデータを同期し、リアルタイムデータウェアハウスを構築することもできます。

inventory という名前の Kafka トピックに、debezium-json 形式の customers テーブルと products テーブルの 2 つのテーブルのデータが含まれているとします。次のジョブは、2 つのテーブルのデータを Hologres の対応するテーブルに同期できます。

source:
  type: kafka
  name: Kafka Source
  properties.bootstrap.servers: ${kafka.bootstrap.servers}
  topic: inventory
  scan.startup.mode: earliest-offset
  value.format: debezium-json
  debezium-json.distributed-tables: true

sink:
  type: hologres
  name: Hologres Sink
  endpoint: ****.hologres.aliyuncs.com:80
  dbname: cdcyaml_test
  username: ${secret_values.holo-username}
  password: ${secret_values.holo-password}
  sink.type-normalize-strategy: ONLY_BIGINT_OR_TEXT
 
transform:
  - source-table: \.*.\.*
    projection: \*
    primary-keys: id
説明
  • Kafka データソースは、json、canal-json、および debezium-json(デフォルト)形式のデータの読み取りをサポートしています。

  • データ形式が debezium-json の場合は、変換ルールを使用してテーブルにプライマリキーを手動で追加する必要があります。

    transform:
      - source-table: \.*.\.*
        projection: \*
        primary-keys: id
  • 1 つのテーブルのデータが複数のパーティションに分散されている場合、または異なるパーティションのシャーディングテーブルのデータをマージする必要がある場合は、debezium-json.distributed-tables または canal-json.distributed-tables 設定項目を true に設定する必要があります。

  • Kafka データソースは複数のスキーマ推論ポリシーをサポートしています。schema.inference.strategy 設定項目を使用してポリシーを設定できます。スキーマ推論と変更同期ポリシーの詳細については、「Message Queue for Kafka」をご参照ください。