このトピックでは、Realtime Compute for Apache Flink の Flink CDC を使用して、Paimon REST API を介して DLF カタログにアクセスする方法について説明します。
前提条件
Realtime Compute for Apache Flink ワークスペースを作成済みであること。詳細については、「Realtime Compute for Apache Flink の有効化」をご参照ください。
Realtime Compute for Apache Flink ワークスペースと DLF カタログが同じリージョンに存在すること。
Realtime Compute for Apache Flink ワークスペースの VPC を DLF の VPC ホワイトリストに追加済みであること。詳細については、「VPC ホワイトリストの設定」をご参照ください。
エンジン要件
Realtime Compute for Apache Flink ジョブは、Ververica Runtime (VVR) バージョン 11.1.0 以降を実行します。
DLF カタログの作成
詳細については、「DLF の概要」をご参照ください。
カタログに接続するための Flink CDC パラメーター
データインジェストジョブを作成するには、「Flink CDC データインジェストジョブの開発 (パブリックプレビュー)」をご参照ください。
Flink データインジェストジョブのシンクには、次の構成を使用します。
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (オプション) ジョブをサブミットするユーザー名。競合を避けるために、ジョブごとに異なるユーザー名を設定します。
commit.user: your_job_name
# (オプション) 削除ベクターを有効にして、読み取りパフォーマンスを向上させます。
table.properties.deletion-vectors.enabled: true次の表にパラメーターを示します。
パラメーター | 説明 | 必須 | 例 |
| メタストアのタイプ。`rest` に設定します。 | はい | rest |
| トークンプロバイダー。`dlf` に設定します。 | はい | dlf |
| DLF REST Catalog Server にアクセスするための URI。フォーマットは | はい | http://cn-hangzhou-vpc.dlf.aliyuncs.com |
| DLF カタログの名前。 | はい | dlf_test |
構成例
以下のセクションでは、Flink CDC YAML ジョブを使用して DLF データレイクにデータを同期するための典型的な構成例を示します。
MySQL データベース全体を DLF データレイクに同期する
次の YAML コードは、MySQL データベース全体を DLF に同期する CDC ジョブを示しています。
source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: mysql_test.\.*
server-id: 8601-8604
# (オプション) 増分フェーズ中に新しく作成されたテーブルからデータを同期します。
scan.binlog.newly-added-table.enabled: true
# (オプション) テーブルとフィールドのコメントを同期します。
include-comments.enabled: true
# (オプション) TaskManager の OutOfMemory エラーの可能性を防ぐために、無制限のシャードの分散を優先します。
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (オプション) 読み取りを高速化するために、解析フィルターを有効にします。
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (オプション) ジョブをサブミットするユーザー名。競合を避けるために、ジョブごとに異なるユーザー名を設定します。
commit.user: your_job_name
# (オプション) 削除ベクターを有効にして、読み取りパフォーマンスを向上させます。
table.properties.deletion-vectors.enabled: true[MySQL ソース構成] 次のパラメーターを設定します。詳細については、「MySQL」をご参照ください。
パラメーター: scan.binlog.newly-added-table.enabled
機能: 増分フェーズ中に新しく作成されたテーブルからデータを同期します。
パラメーター: include-comments.enabled
機能: テーブルとフィールドのコメントを同期します。
パラメーター: scan.incremental.snapshot.unbounded-chunk-first.enabled
機能: TaskManager の OutOfMemory エラーの可能性を防ぎます。
パラメーター: scan.only.deserialize.captured.tables.changelog.enabled: true
機能: ジョブに一致するテーブルのデータのみを解析して、読み取りを高速化します。
[Paimon シンク構成]
カタログ接続パラメーター
パラメータープレフィックス: catalog.properties
機能: カタログ接続情報を指定します。
テーブル作成パラメーター
パラメータープレフィックス: table.properties
機能: テーブル作成情報を指定します。
推奨構成: テーブル作成パラメーターに `deletion-vectors.enabled` 構成を追加します。これにより、書き込みおよび更新パフォーマンスへの影響を最小限に抑えながら読み取りパフォーマンスが大幅に向上し、ほぼリアルタイムの更新と高速クエリが実現します。
追加情報: DLF は自動ファイルマージ機能を提供します。`bucket` や `num-sorted-run.compaction-trigger` などのファイルマージおよびバケット関連のパラメーターをテーブル作成パラメーターに追加しないでください。
サブミッター
パラメーター名: commit.user
機能: Paimon に書き込まれたファイルをコミットするユーザー。
推奨構成: ジョブごとに異なるコミットユーザーを設定します。ジョブ名を使用できます。
追加情報: デフォルトのコミットユーザーは `admin` です。デフォルトのユーザーを使用すると、複数のジョブが同じテーブルに書き込むときにコミットの競合や不整合が発生する可能性があります。
DLF データレイクのパーティションテーブルにデータを書き込む
データインジェストジョブのソーステーブルには、通常、パーティションフィールドは含まれていません。パーティション分割された子孫テーブルにデータを書き込むには、`partition-keys` を使用してパーティションフィールドを設定できます。詳細については、「Flink CDC データインジェストジョブ開発リファレンス」をご参照ください。次のコードは構成例です。
source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: mysql_test.\.*
server-id: 8601-8604
# (オプション) 増分フェーズ中に新しく作成されたテーブルからデータを同期します。
scan.binlog.newly-added-table.enabled: true
# (オプション) テーブルとフィールドのコメントを同期します。
include-comments.enabled: true
# (オプション) TaskManager の OutOfMemory エラーの可能性を防ぐために、無制限のシャードの分散を優先します。
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (オプション) 読み取りを高速化するために、解析フィルターを有効にします。
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (オプション) ジョブをサブミットするユーザー名。競合を避けるために、ジョブごとに異なるユーザー名を設定します。
commit.user: your_job_name
# (オプション) 削除ベクターを有効にして、読み取りパフォーマンスを向上させます。
table.properties.deletion-vectors.enabled: true
transform:
- source-table: mysql_test.tbl1
# (オプション) パーティションフィールドを設定します。
partition-keys: id,pt
- source-table: mysql_test.tbl2
partition-keys: id,ptDLF データレイクの追加入力専用テーブルにデータを書き込む
データインジェストジョブのソーステーブルには、完全な変更タイプが含まれています。論理削除のために削除操作を挿入操作に変換するように子孫テーブルを構成するには、「Flink CDC データインジェストジョブ開発リファレンス」をご参照ください。次のコードは構成例です。
source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: mysql_test.\.*
server-id: 8601-8604
# (オプション) 増分フェーズ中に新しく作成されたテーブルからデータを同期します。
scan.binlog.newly-added-table.enabled: true
# (オプション) テーブルとフィールドのコメントを同期します。
include-comments.enabled: true
# (オプション) TaskManager の OutOfMemory エラーの可能性を防ぐために、無制限のシャードの分散を優先します。
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (オプション) 読み取りを高速化するために、解析フィルターを有効にします。
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (オプション) ジョブをサブミットするユーザー名。競合を避けるために、ジョブごとに異なるユーザー名を設定します。
commit.user: your_job_name
# (オプション) 削除ベクターを有効にして、読み取りパフォーマンスを向上させます。
table.properties.deletion-vectors.enabled: true
transform:
- source-table: mysql_test.tbl1
# (オプション) パーティションフィールドを設定します。
partition-keys: id,pt
# (オプション) 論理削除を実装します。
projection: \*, __data_event_type__ AS op_type
converter-after-transform: SOFT_DELETE
- source-table: mysql_test.tbl2
# (オプション) パーティションフィールドを設定します。
partition-keys: id,pt
# (オプション) 論理削除を実装します。
projection: \*, __data_event_type__ AS op_type
converter-after-transform: SOFT_DELETEプロジェクションに `__data_event_type` を追加して変更タイプを新しいフィールドとして子孫テーブルに書き込み、`converter-after-transform` を `SOFT_DELETE` に設定して削除操作を挿入操作に変換できます。これにより、ダウンストリームシステムはすべての変更操作を完全に記録できます。詳細については、「Flink CDC データインジェストジョブ開発リファレンス」をご参照ください。
Kafka から DLF データレイクにデータをリアルタイムで同期する
Kafka の `inventory` Topic に `customers` と `products` の 2 つのテーブルのデータが格納されており、データ形式が Debezium JSON であると仮定します。次のジョブ例では、これら 2 つのテーブルのデータを DLF の対応する宛先テーブルに同期します。
source:
type: kafka
name: Kafka Source
properties.bootstrap.servers: ${kafka.bootstrap.servers}
topic: inventory
scan.startup.mode: earliest-offset
value.format: debezium-json
debezium-json.distributed-tables: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (オプション) ジョブをサブミットするユーザー名。競合を避けるために、ジョブごとに異なるユーザー名を設定します。
commit.user: your_job_name
# (オプション) 削除ベクターを有効にして、読み取りパフォーマンスを向上させます。
table.properties.deletion-vectors.enabled: true
# Debezium JSON にはプライマリキー情報が含まれていません。テーブルにプライマリキーを個別に追加する必要があります。
transform:
- source-table: \.*.\.*
projection: \*
primary-keys: idKafka データソースは、`canal-json`、`debezium-json` (デフォルト)、および `json` フォーマットでのデータの読み取りをサポートしています。
データ形式が `debezium-json` の場合、Debezium JSON メッセージにはプライマリキー情報が記録されないため、変換ルールを使用してテーブルにプライマリキーを手動で追加する必要があります。
transform: - source-table: \.*.\.* projection: \* primary-keys: id単一テーブルのデータが複数のパーティションに分散している場合、またはシャーディング後に異なるパーティションのテーブルをマージする必要がある場合は、debezium-json.distributed-tables または canal-json.distributed-tables パラメーターを `true` に設定できます。
Kafka データソースは、複数のスキーマ推論ポリシーをサポートしています。schema.inference.strategy パラメーターを使用してポリシーを設定できます。スキーマ推論と変更同期ポリシーの詳細については、「Kafka メッセージキュー」をご参照ください。
より詳細なジョブ構成については、「Flink CDC データインジェストジョブ開発リファレンス」をご参照ください。