このトピックでは、変更データキャプチャ (CDC) YAML データインジェストジョブを使用して、リアルタイムデータをデータウェアハウスに書き込むためのベストプラクティスについて説明します。
Flink CDC から Hologres データウェアハウスへのリアルタイム同期
CDC YAML データインジェストジョブを使用してデータを Hologres に同期し、リアルタイムデータウェアハウスを構築することで、Flink の強力なリアルタイム処理能力と、Hologres のバイナリログ、行列表ハイブリッドストレージ、強力なリソース隔離などの特徴を最大限に活用できます。この組み合わせにより、効率的でスケーラブルなリアルタイムのデータ処理と分析が可能になります。
MySQL から Hologres データウェアハウスへのリアルタイム同期
次のコードは、MySQL データベース全体を Hologres に同期するための最も基本的な CDC YAML データインジェストジョブを示しています。
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: holo_test.\.*
server-id: 8601-8604
# (オプション) テーブルコメントとフィールドコメントを同期します。
include-comments.enabled: true
# (オプション) 非有界シャードのディストリビューションを優先して、TaskManager の OutOfMemory エラーの可能性を防ぎます。
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (オプション) 非有界シャードのディストリビューションを優先して、TaskManager の OutOfMemory エラーの可能性を防ぎます。
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (オプション) 解析フィルターを有効にして、データの読み取りを高速化します。
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: hologres
name: Hologres Sink
endpoint: ****.hologres.aliyuncs.com:80
dbname: cdcyaml_test
username: ${secret_values.holo-username}
password: ${secret_values.holo-password}
sink.type-normalize-strategy: BROADEN
より寛容な型マッピングの使用
Hologres コネクタは列の型変更イベントを処理できませんが、複数の型マッピング関係をサポートしています。データソースの変更をより適切にサポートするために、複数の MySQL データ型をより広範な Hologres の型にマッピングできます。これにより、不要な型変更イベントをスキップし、ジョブが期待どおりに実行されるようになります。マッピングは、設定項目 sink.type-normalize-strategy を使用して変更できます。デフォルト値は STANDARD です。詳細については、「データインジェスト YAML ジョブの Hologres コネクタ型マッピング」をご参照ください。
たとえば、`ONLY_BIGINT_OR_TEXT` を使用して、Hologres の `int8` 型と `text` 型にのみマッピングできます。この場合、MySQL の列の型が `INT` から `BIGINT` に変更されても、Hologres は両方の MySQL の型を `int8` 型にマッピングします。その結果、ジョブはサポートされていない型変換のエラーを報告しません。
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: holo_test.\.*
server-id: 8601-8604
# (オプション) テーブルコメントとフィールドコメントを同期します。
include-comments.enabled: true
# (オプション) 非有界シャードのディストリビューションを優先して、TaskManager の OutOfMemory エラーの可能性を防ぎます。
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (オプション) 非有界シャードのディストリビューションを優先して、TaskManager の OutOfMemory エラーの可能性を防ぎます。
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (オプション) 解析フィルターを有効にして、データの読み取りを高速化します。
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: hologres
name: Hologres Sink
endpoint: ****.hologres.aliyuncs.com:80
dbname: cdcyaml_test
username: ${secret_values.holo-username}
password: ${secret_values.holo-password}
sink.type-normalize-strategy: ONLY_BIGINT_OR_TEXT
パーティションテーブルへのデータ書き込み
CDC YAML データインジェストジョブで Hologres コネクタを sink として使用する場合、パーティションテーブルにデータを書き込むことができます。詳細については、「パーティションテーブルへのデータ書き込み」をご参照ください。
Kafka から Hologres データウェアハウスへのリアルタイム同期
「リアルタイムディストリビューションの実装」では、MySQL データを Kafka に保存するソリューションについて説明しています。その後、データインジェスト YAML ファイルを使用してデータを Hologres に同期し、リアルタイムデータウェアハウスを構築できます。
`inventory` という名前の Kafka トピックに、`customers` と `products` の 2 つのテーブルのデータが `debezium-json` フォーマットで含まれていると仮定します。次のジョブは、これら 2 つのテーブルのデータを Hologres の対応するテーブルに同期できます。
source:
type: kafka
name: Kafka Source
properties.bootstrap.servers: ${kafka.bootstrap.servers}
topic: inventory
scan.startup.mode: earliest-offset
value.format: debezium-json
debezium-json.distributed-tables: true
sink:
type: hologres
name: Hologres Sink
endpoint: ****.hologres.aliyuncs.com:80
dbname: cdcyaml_test
username: ${secret_values.holo-username}
password: ${secret_values.holo-password}
sink.type-normalize-strategy: ONLY_BIGINT_OR_TEXT
transform:
- source-table: \.*.\.*
projection: \*
primary-keys: id
-
Kafka データソースは、`json`、`canal-json`、および `debezium-json` (デフォルト) のフォーマットでのデータ読み取りをサポートしています。
-
データ形式が `debezium-json` の場合、変換ルールを使用してテーブルにプライマリキーを手動で追加する必要があります。
transform: - source-table: \.*.\.* projection: \* primary-keys: id -
単一テーブルのデータが複数のパーティションに分散している場合、または異なるパーティションのシャーディングされたテーブルのデータをマージする必要がある場合は、設定項目 debezium-json.distributed-tables または canal-json.distributed-tables を `true` に設定する必要があります。
-
Kafka データソースは、複数のスキーマ推論ポリシーをサポートしています。ポリシーは、設定項目 schema.inference.strategy を使用して設定できます。スキーマの推論と変更の同期ポリシーの詳細については、「Message Queue for Kafka」をご参照ください。