このトピックでは、Flink CDC を使用してリアルタイムデータを Alibaba Cloud Data Lake Formation (DLF) に取り込むためのベストプラクティスについて説明します。
DLF は、データとメタデータの管理とストレージを統合するフルマネージドプラットフォームです。また、権限管理とストレージの最適化機能も備えています。
Flink CDC を使用してデータベース全体から DLF でホストされている Paimon カタログにデータをロードするデータインジェストタスクを作成できます。
MySQL から DLF にデータベース全体を取り込む
YAML でコードを記述して、MySQL から DLF にデータを取り込みます:
source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: mysql_test.\.*
server-id: 8601-8604
# (オプション) 増分読み取り中に作成された新しいテーブルからデータを同期します。
scan.binlog.newly-added-table.enabled: true
# (オプション) テーブルとフィールドのコメントを同期します。
include-comments.enabled: true
# (オプション) TM OOM エラーを防ぐために、無制限のシャードの分散を優先します。
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (オプション) 変更イベントのみを逆シリアル化して、データ読み取りを高速化します。
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: paimon
# メタストアのタイプ。rest に固定されています。
catalog.properties.metastore: rest
# トークンプロバイダー。dlf に固定されています。
catalog.properties.token.provider: dlf
# DLF Rest Catalog Server へのアクセスに使用される URI。フォーマット: http://[region-id]-vpc.dlf.aliyuncs.com。例: http://ap-southeast-1-vpc.dlf.aliyuncs.com。
catalog.properties.uri: dlf_uri
# カタログ名。
catalog.properties.warehouse: your_warehouse
# (オプション) 削除ベクターを有効にして、読み取りパフォーマンスを向上させます。
table.properties.deletion-vectors.enabled: trueMySQL ソース構成の詳細については、「MySQL」をご参照ください。
ほぼリアルタイムの書き込みと高速な読み取りの両方を実現するには、
table.properties.deletion-vectors.enabledオプションを有効にします。これにより、書き込みへの影響を最小限に抑えながら、読み取りパフォーマンスが大幅に向上します。DLF は自動コンパクション機能を備えているため、ファイルコンパクション (
num-sorted-run.compaction-trigger) とバケット (bucket) に関連するテーブルプロパティを追加しないでください。
DLF パーティションテーブルにデータを取り込む
非パーティションテーブルから DLF パーティションテーブルにデータをロードするには、transform モジュールで partition-keys オプションを使用します。コード例:
source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: mysql_test.\.*
server-id: 8601-8604
# (オプション) 増分読み取り中に作成された新しいテーブルからデータを同期します。
scan.binlog.newly-added-table.enabled: true
# (オプション) テーブルとフィールドのコメントを同期します。
include-comments.enabled: true
# (オプション) TM OOM を防ぐために、無制限のシャードの分散を優先します。
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (オプション) 変更イベントのみを逆シリアル化して、データ読み取りを高速化します。
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (オプション) 削除ベクターを有効にして、読み取りパフォーマンスを向上させます。
table.properties.deletion-vectors.enabled: true
transform:
- source-table: mysql_test.tbl1
# (オプション) パーティションフィールドを設定します。
partition-keys: id,pt
- source-table: mysql_test.tbl2
partition-keys: id,ptDLF の追記専用テーブルにデータを取り込む
MySQL データを DLF に取り込み、ソースから削除されたデータに対して論理削除を実装します。
source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: mysql_test.\.*
server-id: 8601-8604
# (オプション) 増分読み取り中に作成された新しいテーブルからデータを同期します。
scan.binlog.newly-added-table.enabled: true
# (オプション) テーブルとフィールドのコメントを同期します。
include-comments.enabled: true
# (オプション) TM OOM を防ぐために、無制限のシャードの分散を優先します。
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (オプション) 変更イベントのみを逆シリアル化して、データ読み取りを高速化します。
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (オプション) 削除ベクターを有効にして、読み取りパフォーマンスを向上させます。
table.properties.deletion-vectors.enabled: true
transform:
- source-table: mysql_test.tbl1
# (オプション) パーティションフィールドを設定します。
partition-keys: id,pt
# (オプション) 論理削除を実装します。
projection: \*, __data_event_type__ AS op_type
converter-after-transform: SOFT_DELETE
- source-table: mysql_test.tbl2
# (オプション) パーティションフィールドを設定します。
partition-keys: id,pt
# (オプション) 論理削除を実装します。
projection: \*, __data_event_type__ AS op_type
converter-after-transform: SOFT_DELETEprojectionで__data_event_type__を指定すると、イベントタイプが新しい列としてシンクテーブルに追加されます。converter-after-transformをSOFT_DELETEに設定すると、削除が挿入に変換されます。これにより、すべての変更操作がダウンストリームシステムに完全に記録されることが保証されます。
Kafka から DLF にリアルタイムでデータを取り込む
Flink CDC を使用して MySQL から Kafka にリアルタイムでデータを同期した後、さらにデータを DLF にロードしてストレージできます。
Kafka の inventory トピック (2 つのテーブル、customers と products を含む) からデータを取り込みます:
source:
type: kafka
name: Kafka Source
properties.bootstrap.servers: ${kafka.bootstrap.servers}
topic: inventory
scan.startup.mode: earliest-offset
value.format: debezium-json
debezium-json.distributed-tables: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (オプション) 削除ベクターを有効にして、読み取りパフォーマンスを向上させます。
table.properties.deletion-vectors.enabled: true
# Debezium JSON にはプライマリキー情報が含まれていません。テーブルにプライマリキーを追加する必要があります。
transform:
- source-table: \.*.\.*
projection: \*
primary-keys: idKafka ソースの
value.formatの有効な値: canal-json、debezium-json (デフォルト)、および json。Kafka から Paimon へのスキーマ進化がサポートされており、データレプリケーションの柔軟性が向上します。
値のフォーマットが debezium-json の場合、transform モジュールでプライマリキーを明示的に指定する必要があります:
transform: - source-table: \.*.\.* projection: \* primary-keys: idシャードに分散されたデータを持つテーブルを取り込むには、debezium-json.distributed-tables または canal-json.distributed-tables を
trueに設定します。Kafka ソースは複数のスキーマ推論ポリシーをサポートしています。schema.inference.strategy オプションを使用してポリシーを設定できます。詳細については、「Kafka コネクタ」をご参照ください。