このトピックでは、変更データキャプチャ(CDC)YAML データインジェスチョンジョブを使用してリアルタイムデータをデータウェアハウスに書き込むためのベストプラクティスについて説明します。
Flink CDC から Hologres データウェアハウスへのリアルタイム同期
CDC YAML データインジェスチョンジョブを使用して Hologres にデータを同期してリアルタイムデータウェアハウスを構築すると、Flink の強力なリアルタイム処理機能と、バイナリロギング、行と列のハイブリッドストレージ、強力なリソース分離などの Hologres の機能を最大限に活用できます。この組み合わせにより、効率的かつスケーラブルなリアルタイムデータ処理と分析が可能になります。
MySQL から Hologres データウェアハウスへのリアルタイム同期
次のコードは、MySQL データベース全体を Hologres に同期するための最も基本的な CDC YAML データインジェスチョンジョブを示しています。
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: holo_test.\.*
server-id: 8601-8604
# (オプション) テーブルのコメントとフィールドのコメントを同期します。
include-comments.enabled: true
# (オプション) TaskManager の OutOfMemory エラーの可能性を防ぐために、無制限のシャードの分散を優先します。
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (オプション) TaskManager の OutOfMemory エラーの可能性を防ぐために、無制限のシャードの分散を優先します。
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (オプション) データ読み取りを高速化するために解析フィルターを有効にします。
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: hologres
name: Hologres Sink
endpoint: ****.hologres.aliyuncs.com:80
dbname: cdcyaml_test
username: ${secret_values.holo-username}
password: ${secret_values.holo-password}
sink.type-normalize-strategy: BROADENより許容範囲の広い型マッピングを使用する
Hologres コネクタは列型変更イベントを処理できませんが、複数の型マッピング関係をサポートしています。データソースの変更をより適切にサポートするために、複数の MySQL データ型をより広範な Hologres 型にマッピングできます。これにより、不要な型変更イベントをスキップし、ジョブが期待どおりに実行されるようにすることができます。sink.type-normalize-strategy 設定項目を使用してマッピングを変更できます。デフォルト値は STANDARD です。詳細については、「データインジェスチョン YAML ジョブの Hologres コネクタ型マッピング」をご参照ください。
たとえば、ONLY_BIGINT_OR_TEXT を使用して、Hologres の int8 型と text 型のみに型をマッピングできます。この場合、MySQL の列の型が INT から BIGINT に変更されると、Hologres は両方の MySQL 型を int8 型にマッピングします。その結果、ジョブはサポートされていない型変換のエラーを報告しません。
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: holo_test.\.*
server-id: 8601-8604
# (オプション) テーブルのコメントとフィールドのコメントを同期します。
include-comments.enabled: true
# (オプション) TaskManager の OutOfMemory エラーの可能性を防ぐために、無制限のシャードの分散を優先します。
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (オプション) TaskManager の OutOfMemory エラーの可能性を防ぐために、無制限のシャードの分散を優先します。
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (オプション) データ読み取りを高速化するために解析フィルターを有効にします。
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: hologres
name: Hologres Sink
endpoint: ****.hologres.aliyuncs.com:80
dbname: cdcyaml_test
username: ${secret_values.holo-username}
password: ${secret_values.holo-password}
sink.type-normalize-strategy: ONLY_BIGINT_OR_TEXTパーティションテーブルにデータを書き込む
CDC YAML データインジェスチョンジョブで Hologres コネクタをシンクとして使用する場合、パーティションテーブルにデータを書き込むことができます。詳細については、「パーティションテーブルにデータを書き込む」をご参照ください。
Kafka から Hologres データウェアハウスへのリアルタイム同期
Flink CDC を使用してリアルタイムデータ配信を実装するでは、MySQL データを Kafka に格納するためのソリューションについて説明しています。また、CDC YAML データインジェスチョンジョブを使用して Kafka から Hologres にデータを同期し、リアルタイムデータウェアハウスを構築することもできます。
inventory という名前の Kafka トピックに、debezium-json 形式の customers テーブルと products テーブルの 2 つのテーブルのデータが含まれているとします。次のジョブは、2 つのテーブルのデータを Hologres の対応するテーブルに同期できます。
source:
type: kafka
name: Kafka Source
properties.bootstrap.servers: ${kafka.bootstrap.servers}
topic: inventory
scan.startup.mode: earliest-offset
value.format: debezium-json
debezium-json.distributed-tables: true
sink:
type: hologres
name: Hologres Sink
endpoint: ****.hologres.aliyuncs.com:80
dbname: cdcyaml_test
username: ${secret_values.holo-username}
password: ${secret_values.holo-password}
sink.type-normalize-strategy: ONLY_BIGINT_OR_TEXT
transform:
- source-table: \.*.\.*
projection: \*
primary-keys: idKafka データソースは、json、canal-json、および debezium-json(デフォルト)形式のデータの読み取りをサポートしています。
データ形式が debezium-json の場合は、変換ルールを使用してテーブルにプライマリキーを手動で追加する必要があります。
transform: - source-table: \.*.\.* projection: \* primary-keys: id1 つのテーブルのデータが複数のパーティションに分散されている場合、または異なるパーティションのシャーディングテーブルのデータをマージする必要がある場合は、debezium-json.distributed-tables または canal-json.distributed-tables 設定項目を true に設定する必要があります。
Kafka データソースは複数のスキーマ推論ポリシーをサポートしています。schema.inference.strategy 設定項目を使用してポリシーを設定できます。スキーマ推論と変更同期ポリシーの詳細については、「Message Queue for Kafka」をご参照ください。