このトピックでは、Change Data Capture(CDC)データインジェスト YAML ジョブを活用して、リアルタイムデータを Alibaba Cloud Data Lake Formation(DLF)に書き込む際のベストプラクティスについて説明します。
Alibaba Cloud Data Lake Formation(DLF)は、メタデータ、データストレージ、データ管理を統合的に提供するフルマネージドプラットフォームです。DLF には、メタデータ管理、権限管理、ストレージ最適化などの機能が備わっています。詳細については、「Data Lake Formation とは?」をご参照ください。
データインジェストジョブでは、DLF の Paimon Catalog を宛先として使用できます。また、大規模なデータベース全体をデータレイクにインジェストすることも可能です。
MySQL データベース全体を DLF に同期する
以下の CDC YAML ジョブでは、MySQL データベース全体を DLF に同期します:
source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: mysql_test.\.*
server-id: 8601-8604
# (任意)増分フェーズで新規作成されたテーブルからのデータ同期を有効化します。
scan.binlog.newly-added-table.enabled: true
# (任意)テーブルおよびフィールドのコメントを同期します。
include-comments.enabled: true
# (任意)無制限チャンクの配布を優先し、潜在的な TaskManager OutOfMemory 問題を防止します。
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (任意)解析フィルターを有効化して読み取りを高速化します。
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: paimon
# メタストアの種類。rest に設定します。
catalog.properties.metastore: rest
# トークンプロバイダー。dlf に設定します。
catalog.properties.token.provider: dlf
# DLF Rest Catalog Server へのアクセス URI。形式は http://[region-id]-vpc.dlf.aliyuncs.com です(例:http://cn-hangzhou-vpc.dlf.aliyuncs.com)。
catalog.properties.uri: dlf_uri
# DLF Catalog の名前。
catalog.properties.warehouse: your_warehouse
# (任意)削除ベクターを有効化して読み取りパフォーマンスを向上させます。
table.properties.deletion-vectors.enabled: trueMySQL ソースのパラメーターの詳細については、「MySQL」をご参照ください。
削除ベクターを有効化する設定(deletion-vectors.enabled)をテーブル作成パラメーターに追加してください。これにより、書き込みおよび更新パフォーマンスへの影響を最小限に抑えつつ、読み取りパフォーマンスを大幅に向上させることができます。これにより、ニアリアルタイムでの更新と高速クエリが可能になります。
DLF には自動ファイルマージ機能が備わっているため、テーブル作成パラメーターにファイルマージやバケット関連のパラメーター(例:bucket、num-sorted-run.compaction-trigger)を追加しないでください。
DLF のパーティションテーブルへの書き込み
データインジェストジョブのソーステーブルには、通常、パーティションフィールド情報が含まれません。パーティション付きの子孫テーブルに書き込むには、Flink CDC データインジェストジョブ開発リファレンスで定義されている partition-keys 設定を使用してパーティションフィールドを構成する必要があります(Flink CDC データインジェストジョブ開発リファレンスを参照)。以下の例は、この設定の構成方法を示しています:
source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: mysql_test.\.*
server-id: 8601-8604
# (任意)増分フェーズで新規作成されたテーブルからのデータ同期を有効化します。
scan.binlog.newly-added-table.enabled: true
# (任意)テーブルおよびフィールドのコメントを同期します。
include-comments.enabled: true
# (任意)無制限チャンクの配布を優先し、潜在的な TaskManager OutOfMemory 問題を防止します。
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (任意)解析フィルターを有効化して読み取りを高速化します。
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (任意)削除ベクターを有効化して読み取りパフォーマンスを向上させます。
table.properties.deletion-vectors.enabled: true
transform:
- source-table: mysql_test.tbl1
# (任意)パーティションフィールドを設定します。
partition-keys: id,pt
- source-table: mysql_test.tbl2
partition-keys: id,ptDLF のアペンドオンリー(Append-Only)テーブルへの書き込み
データインジェストジョブのソーステーブルには、完全な変更データが含まれています。削除操作を宛先テーブルにおける挿入操作に変換することでソフトデリートを実装するには、「Flink CDC データインジェストジョブ開発リファレンス」をご参照ください。以下の例は、その構成方法を示しています:
source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: mysql_test.\.*
server-id: 8601-8604
# (任意)増分フェーズで新規作成されたテーブルからのデータ同期を有効化します。
scan.binlog.newly-added-table.enabled: true
# (任意)テーブルおよびフィールドのコメントを同期します。
include-comments.enabled: true
# (任意)無制限チャンクの配布を優先し、潜在的な TaskManager OutOfMemory 問題を防止します。
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (任意)解析フィルターを有効化して読み取りを高速化します。
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (任意)削除ベクターを有効化して読み取りパフォーマンスを向上させます。
table.properties.deletion-vectors.enabled: true
transform:
- source-table: mysql_test.tbl1
# (任意)パーティションフィールドを設定します。
partition-keys: id,pt
# (任意)ソフトデリートを実装します。
projection: \*, __data_event_type__ AS op_type
converter-after-transform: SOFT_DELETE
- source-table: mysql_test.tbl2
# (任意)パーティションフィールドを設定します。
partition-keys: id,pt
# (任意)ソフトデリートを実装します。
projection: \*, __data_event_type__ AS op_type
converter-after-transform: SOFT_DELETE変更タイプを新しいフィールドとして宛先テーブルに書き込むために、projection に __data_event_type を追加してください。converter-after-transform を SOFT_DELETE に設定すると、削除操作が挿入操作に変換されます。これにより、すべての変更操作が下流テーブルに完全に記録されます。詳細については、「Flink CDC データインジェストジョブ開発リファレンス」をご参照ください。
Kafka CDC データをリアルタイムで DLF に同期する
リアルタイム配信を活用して、MySQL データを Kafka へリアルタイムで同期できます。その後、CDC YAML ジョブを構成して、Kafka のデータを DLF へ同期します。
仮に、Kafka の inventory Topic に customers および products の 2 つのテーブルのデータが格納されており、データ形式が Debezium JSON であるとします。以下の例のジョブでは、これらの 2 つのテーブルから DLF の対応する宛先テーブルへデータを同期します:
source:
type: kafka
name: Kafka Source
properties.bootstrap.servers: ${kafka.bootstrap.servers}
topic: inventory
scan.startup.mode: earliest-offset
value.format: debezium-json
debezium-json.distributed-tables: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (任意)削除ベクターを有効化して読み取りパフォーマンスを向上させます。
table.properties.deletion-vectors.enabled: true
# Debezium JSON にはプライマリキー情報が含まれないため、テーブルにプライマリキーを追加します。
transform:
- source-table: \.*.\.*
projection: \*
primary-keys: idKafka データソースでは、canal-json、debezium-json(デフォルト)、json の各形式でデータを読み取ることが可能です。
Kafka メッセージの形式が変更された場合(例:新規フィールドの追加など)、その変更は Paimon テーブルスキーマへ自動的に同期されます。これにより、データ同期の柔軟性が向上します。
データ形式が debezium-json の場合、Debezium JSON メッセージにはプライマリキー情報が記録されないため、変換ルールを用いてテーブルにプライマリキーを手動で追加する必要があります:
transform: - source-table: \.*.\.* projection: \* primary-keys: id単一テーブルのデータが複数のパーティションに分散している場合、またはシャーディングを用いて異なるパーティションのテーブルをマージする場合は、debezium-json.distributed-tables または canal-json.distributed-tables の設定項目を true に設定してください。
Kafka データソースでは、複数のスキーマ推論ポリシーがサポートされています。schema.inference.strategy 設定項目を用いてポリシーを指定できます。スキーマ推論および変更同期ポリシーの詳細については、「Message Queue for Kafka」をご参照ください。