このトピックでは、データインジェスト YAML タスクを使用して、リアルタイムデータを一般的なメッセージキューに書き込むためのベストプラクティスについて説明します。
データベースの完全なデータと増分データを Kafka にリアルタイムで書き込む
データインジェスト YAML タスクを使用して、MySQL から Kafka にデータをインポートできます。その後、必要に応じてデータをさまざまなダウンストリームシステムに配布できます。このアプローチにより、複数のタスクからビジネスデータベースへの直接接続が回避され、その処理負荷が軽減されます。
MySQL バイナリログデータを Kafka に同期する
一部のシナリオでは、データ監査やデータ再生などのタスクのために、生のバイナリログデータを保存したい場合があります。データインジェスト YAML タスクは、生の MySQL バイナリログデータを Kafka に同期することをサポートしています。これにより、バイナリログデータを分散方式で読み取ることができ、データホットスポットの問題を解決するのに役立ちます。
データベース kafka_test には、customers と products という 2 つのテーブルが含まれていると仮定します。次のタスクは、これらのテーブルのデータを customers と products という名前の 2 つの Topic に同期します。
source:
type: mysql
name: MySQL Source
hostname: ${secret_values.mysql.hostname}
port: ${mysql.port}
username: ${secret_values.mysql.username}
password: ${secret_values.mysql.password}
tables: kafka_test.\.*
server-id: 8601-8604
# (オプション) 新しく追加されたテーブルの完全データと増分データを同期します。
scan.newly-added-table.enabled: true
# (オプション) テーブルとフィールドのコメントを同期します。
include-comments.enabled: true
# (オプション) TaskManager の OutOfMemory エラーの可能性を回避するために、無制限のチャンクの配布を優先します。
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (オプション) 読み取りを高速化するために解析フィルターを有効にします。
scan.only.deserialize.captured.tables.changelog.enabled: true
# データ変更時刻をメタデータとして送信します。
metadata-column.include-list: op_ts
sink:
type: kafka
name: Kafka Sink
properties.bootstrap.servers: ${kafka.bootstraps.server}
# Alibaba Cloud Kafka は、べき等書き込みまたはトランザクション書き込みをサポートしていません。べき等性機能を無効にします。
properties.enable.idempotence: false
# (オプション) アップストリームテーブルと Kafka Topic 間のマッピングを設定します。
sink.tableId-to-topic.mapping: kafka_test.customers:customers;kafka_test.products:products次のコードは、customers テーブルに対する UPDATE 文によって生成された Kafka メッセージのメッセージ本文のフォーマットを示しています。
// debezium-json
{
"before": {
"id": 4,
"name": "John",
"address": "New York",
"phone_number": "2222",
"age": 12
},
"after": {
"id": 4,
"name": "John",
"address": "New York",
"phone_number": "1234",
"age": 12
},
"op": "u",
"source": {
"db": "kafka_test",
"table": "customers",
"ts_ms": 1728528674000
}
}
// canal-json
{
"old": [
{
"id": 4,
"name": "John",
"address": "New York",
"phone_number": "2222",
"age": 12
}
],
"data": [
{
"id": 4,
"name": "John",
"address": "New York",
"phone_number": "1234",
"age": 12
}
],
"type": "UPDATE",
"database": "kafka_test",
"table": "customers",
"pkNames": [
"id"
],
"ts": 1728528674000,
"es": 0
}バイナリログは、json、canal-json、または debezium-json (デフォルト) 形式で書き込むことができます。詳細については、「Message Queue for Apache Kafka」をご参照ください。
sink.tableId-to-topic.mapping パラメーターを使用しない場合、Topic は database.table 形式を使用して Kafka に作成されます。たとえば、MySQL テーブル
kafka_test.customersの場合、Kafka 内の対応する Topic 名はkafka_test.customersです。sink.tableId-to-topic.mapping を使用して、アップストリームテーブルと宛先 Topic 間のマッピングを構成できます。これにより、Kafka メッセージ内のソーステーブル名を保持したまま、宛先 Topic 名を変更できます。詳細については、「Message Queue for Apache Kafka」をご参照ください。デフォルトでは、すべてのデータは Topic のパーティション 0 に書き込まれます。この動作は、
partition.strategy構成を使用して調整できます。詳細については、「Message Queue for Apache Kafka」をご参照ください。たとえば、`partition.strategy: hash-by-key` という構成設定を使用できます。この設定は、プライマリキーのハッシュ値に基づいて、各テーブルのデータを複数のパーティションに書き込みます。これにより、同じプライマリキーを持つデータが同じパーティションに送信され、順序が維持されることが保証されます。Alibaba Cloud Message Queue for Apache Kafka は、べき等書き込みまたはトランザクション書き込みをサポートしていません。データインジェストの宛先として使用する場合は、べき等性機能を無効にするために、構成項目
properties.enable.idempotence: falseをシンク構成に追加する必要があります。
Kafka から DLF へのリアルタイムインジェスト
MySQL バイナリログデータを Kafka に同期することで、MySQL データを Kafka に保存するためのソリューションが提供されます。その後、データインジェスト YAML タスクを構成して、Kafka データを DLF ストレージに同期できます。
inventory という名前の Kafka Topic に、customers と products という 2 つのテーブルのデータが debezium-json 形式で含まれていると仮定します。次のタスクは、これら 2 つのテーブルのデータを DLF 内の対応するテーブルに同期します。
source:
type: kafka
name: Kafka Source
properties.bootstrap.servers: ${kafka.bootstrap.servers}
topic: inventory
scan.startup.mode: earliest-offset
value.format: debezium-json
debezium-json.distributed-tables: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (オプション) タスクを送信するためのユーザー名。競合を避けるために、各タスクに一意のユーザー名を設定します。
commit.user: your_job_name
# (オプション) 読み取りパフォーマンスを向上させるために削除ベクターを有効にします。
table.properties.deletion-vectors.enabled: true
# debezium-json にはプライマリキー情報が含まれていません。テーブルにプライマリキーを追加する必要があります。
transform:
- source-table: \.*.\.*
projection: \*
primary-keys: idKafka データソースは、canal-json および debezium-json (デフォルト) 形式でのデータの読み取りをサポートしています。
データ形式が debezium-json の場合、変換ルールを使用してテーブルにプライマリキーを手動で追加する必要があります。
transform: - source-table: \.*.\.* projection: \* primary-keys: id単一テーブルのデータが複数のパーティションに分散している場合、またはシャーディング後に異なるパーティションのテーブルをマージする必要がある場合は、debezium-json.distributed-tables または canal-json.distributed-tables 構成項目を true に設定します。
Kafka データソースは、複数のスキーマ推論ポリシーをサポートしています。ポリシーは、schema.inference.strategy 構成項目を使用して設定できます。スキーマ推論と変更同期ポリシーの詳細については、「Message Queue for Apache Kafka」をご参照ください。