すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:Debezium

最終更新日:Jan 08, 2025

形式, Debezium

このトピックでは、Debezium 形式の使用例を示し、形式オプションとデータ型マッピングについて説明します。

背景情報

Debezium は、MySQL、PostgreSQL、Oracle、Microsoft SQL Server などのさまざまなデータベースから Kafka にリアルタイムでデータ変更をストリーミングできる Change Data Capture (CDC) ツールです。Debezium は、変更ログの統一形式スキーマを提供し、JSON および Apache Avro を使用してメッセージをシリアル化できます。Kafka コネクタオブジェクトストレージサービス (OSS) は、Debezium 形式をサポートしています。

Flink は、Debezium JSON メッセージと Debezium Avro メッセージを、Flink SQL システムの INSERT、UPDATE、または DELETE メッセージとして解釈できます。Debezium 形式は、以下のシナリオに適しています。

  • データベースから別のシステムへの増分データの同期

  • ログ監査

  • データベースのリアルタイム マテリアライズド ビュー

  • データベース テーブルの時間的結合

Flink は、Flink SQL システムの INSERT、UPDATE、または DELETE メッセージを Debezium JSON メッセージまたは Debezium Avro メッセージとして解釈し、それらのメッセージを Kafka などのデータストアに転送することもできます。

重要

Flink SQL は、UPDATE_BEFORE 型と UPDATE_AFTER 型の両方のデータを含む単一の UPDATE メッセージを処理できません。代わりに、Flink がダウンストリーム ストレージ システムにデータを書き込む場合、Flink は UPDATE_BEFORE 型のデータを DELETE 型のデータに変換し、UPDATE_AFTER 型のデータを INSERT 型のデータに変換します。次に、Flink は変換されたメッセージを Debezium メッセージとして解釈し、Debezium メッセージをダウンストリーム ストレージ システムに書き込みます。

Flink は、MySQL データベースの products テーブルの更新レコードを受信します。レコードは Debezium JSON 形式であり、id、name、description、および weight 列が含まれています。

{
  "before": {
    "id": 111,
    "name": "scooter",
    "description": "Big 2-wheel scooter",  // 大型 2 輪スクーター
    "weight": 5.18
  },
  "after": {
    "id": 111,
    "name": "scooter",
    "description": "Big 2-wheel scooter",  // 大型 2 輪スクーター
    "weight": 5.15
  },
  "source": {...},
  "op": "u",
  "ts_ms": 1589362330904,
  "transaction": null
};
説明

上記の例の各フィールドの説明については、Debezium をご参照ください。

上記の例の JSON メッセージには、products テーブルの更新イベントが含まれています。id の値が 111 の行で、weight の値が 5.18 から 5.15 に変更されています。メッセージが products_binlog という名前の Kafka トピックに同期されている場合は、次の DDL ステートメントを実行してこのトピックを使用し、変更イベントを解釈できます。

-- debezium-json 形式を使用して、Debezium JSON メッセージを解釈します。
CREATE TABLE topic_products (
-- topic_products テーブルのスキーマが、MySQL データベースの products テーブルのスキーマと同じであることを確認します。
  id BIGINT,
  name STRING,
  description STRING,
  weight DECIMAL(10, 2)
) WITH (
 'connector' = 'kafka',
 'topic' = 'products_binlog',
 'properties.bootstrap.servers' = 'localhost:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'debezium-json'
-- debezium-json 形式を使用して、Debezium JSON メッセージを解釈します。
-- Debezium メッセージが Avro メッセージの場合は、debezium-avro-confluent 形式を使用します。
);

Debezium Kafka Connect を構成する場合、ビジネス要件に基づいて value.converter.schemas.enable オプションを true に設定できます。このようにして、スキーマ情報がメッセージ本文に含まれます。次のサンプル コードは、Debezium JSON メッセージの例を示しています。

{
  "schema": {...},  // スキーマ
  "payload": {  // ペイロード
    "before": {
      "id": 111,
      "name": "scooter",  // スクーター
      "description": "Big 2-wheel scooter",  // 大型 2 輪スクーター
      "weight": 5.18
    },
    "after": {
      "id": 111,
      "name": "scooter",  // スクーター
      "description": "Big 2-wheel scooter",  // 大型 2 輪スクーター
      "weight": 5.15
    },
    "source": {...},  // ソース
    "op": "u",  // 操作
    "ts_ms": 1589362330904,  // タイムスタンプ (ミリ秒)
    "transaction": null  // トランザクション
  }
}

Debezium JSON メッセージを解釈するには、'debezium-json.schema-include' = 'true' 構成を上記の例の DDL ステートメントの WITH 句に追加する必要があります。デフォルトでは、debezium-json.schema-include オプションは false に設定されています。ほとんどの場合、スキーマの説明を Debezium JSON メッセージに含めないことをお勧めします。これは、メッセージが長くなり、解析パフォーマンスが低下するためです。

トピックを Flink テーブルとして登録した後、Debezium メッセージを changelog ソースとして使用できます。

-- MySQL データベースの products テーブルのリアルタイム マテリアライズド ビューで、同じ製品の最新の平均重量を計算します。
SELECT name, AVG(weight) FROM topic_products GROUP BY name;

-- MySQL データベースの products テーブルのすべてのデータと増分変更を、後続のクエリのために Elasticsearch の products インデックスに同期します。
INSERT INTO elasticsearch_products
SELECT * FROM topic_products;

形式オプション

Flink は、Debezium Avro メッセージまたは Debezium JSON メッセージを解釈するための debezium-avro-confluent 形式と debezium-json 形式を提供します。

debezium-avro-confluent

Debezium Avro メッセージを解釈するには、debezium-avro-confluent 形式を使用します。

オプション

必須

デフォルト値

データ型

説明

format

はい

(なし)

STRING

使用する形式。Debezium Avro メッセージを解釈する場合は、このオプションを debezium-avro-confluent に設定します。

debezium-avro-confluent.basic-auth.credentials-source

いいえ

(なし)

STRING

スキーマレジストリの Basic 認証資格情報のソース。

debezium-avro-confluent.basic-auth.user-info

いいえ

(なし)

STRING

スキーマレジストリの Basic 認証ユーザー情報。

debezium-avro-confluent.bearer-auth.credentials-source

いいえ

(なし)

STRING

スキーマレジストリの Bearer 認証資格情報のソース。

debezium-avro-confluent.bearer-auth.token

いいえ

(なし)

STRING

スキーマレジストリの Bearer 認証トークン。

debezium-avro-confluent.properties

いいえ

(なし)

MAP

スキーマレジストリに転送されるプロパティ マッピング。このパラメータは、Flink 構成オプションを使用して公式に公開されていないオプションに適しています。

重要

Flink 構成オプションは、このオプションよりも優先順位が高くなります。

debezium-avro-confluent.ssl.keystore.location

いいえ

(なし)

STRING

SSL キーストアの場所。

debezium-avro-confluent.ssl.keystore.password

いいえ

(なし)

STRING

SSL キーストアのパスワード。

debezium-avro-confluent.ssl.truststore.location

いいえ

(なし)

STRING

SSL トラストストアの場所。

debezium-avro-confluent.ssl.truststore.password

いいえ

(なし)

STRING

SSL トラストストアのパスワード。

debezium-avro-confluent.subject

いいえ

(なし)

STRING

シリアル化中にこの形式で使用されるスキーマが登録される Confluent スキーマレジストリのサブジェクト。Apache Kafka コネクタと Upsert Kafka コネクタは、この形式が Kafka の値またはキー形式として使用される場合、デフォルトのサブジェクト名として「<topic_name>-value」または「<topic_name>-key」を使用します。ファイルシステム コネクタをシンクとして使用する場合は、debezium-avro-confluent.subject オプションが必要です。

debezium-avro-confluent.url

はい

(なし)

STRING

スキーマを取得または登録するための Confluent スキーマレジストリの URL。

debezium-json

Debezium JSON メッセージを解釈するには、debezium-json 形式を使用します。

オプション

必須

デフォルト値

データ型

説明

format

はい

(なし)

STRING

使用する形式。Debezium JSON メッセージを解釈する場合は、このオプションを debezium-json に設定します。

debezium-json.schema-include

いいえ

false

BOOLEAN

Debezium Kafka Connect を構成する場合、ビジネス要件に基づいて Kafka 構成の value.converter.schemas.enable オプションを true に設定できます。このようにして、スキーマ情報がメッセージ本文に含まれます。このオプションは、スキーマが Debezium JSON メッセージに含まれるかどうかを指定します。

有効な値:

  • true: スキーマは Debezium JSON メッセージに含まれます。

  • false: スキーマは Debezium JSON メッセージに含まれません。

debezium-json.ignore-parse-errors

いいえ

false

BOOLEAN

有効な値:

  • true: 解析に失敗した場合、現在のフィールドまたは行はスキップされます。

  • false: エラーが返され、デプロイの開始に失敗します。これはデフォルト値です。

debezium-json.timestamp-format.standard

いいえ

SQL

STRING

入力タイムスタンプと出力タイムスタンプの形式。有効な値:

  • SQL: yyyy-MM-dd HH:mm:ss.s{precision} 形式の入力タイムスタンプが解析されます。たとえば、入力タイムスタンプは 2020-12-30 12:13:14.123 です。出力タイムスタンプは入力タイムスタンプと同じ形式です。

  • ISO-8601: yyyy-MM-ddTHH:mm:ss.s{precision} 形式の入力タイムスタンプが解析されます。たとえば、入力タイムスタンプは 2020-12-30T12:13:14.123 です。出力タイムスタンプは入力タイムスタンプと同じ形式です。

debezium-json.map-null-key.mode

いいえ

FAIL

STRING

マップ内の null キー値を処理するために使用されるメソッド。有効な値:

  • FAIL: マップ内のキー値が null の場合、エラーが返されます。

  • DROP: マップ内のキー値が null のデータは破棄されます。

  • LITERAL: 文字列定数を使用して、マップ内の空のキー値を置き換えます。文字列定数の値は、canal-json.map-null-key.literal オプションで指定されます。

debezium-json.map-null-key.literal

いいえ

null

STRING

debezium-json.map-null-key.mode オプションが LITERAL に設定されている場合、指定された文字列定数を使用してマップ内の null キー値を置き換えます。

debezium-json.encode.decimal-as-plain-number

いいえ

false

BOOLEAN

有効な値:

  • true: DECIMAL 型のすべてのデータは変更されず、科学表記法形式では表現されません。たとえば、0.000000027 は 0.000000027 として表現されます。

  • false: DECIMAL 型のすべてのデータは科学表記法形式で表現されます。たとえば、0.000000027 は 2.7E-8 として表現されます。

データ型マッピング

Debezium は、シリアル化と逆シリアル化に JSON 形式を使用します。データ型マッピングの詳細については、「JSON 形式」のドキュメントと「Confluent Avro 形式」のドキュメントをご参照ください。

その他

使用可能なメタデータ

次の表で説明されている形式メタデータ フィールドは、DDL ステートメントで読み取り専用 (VIRTUAL) 列として宣言できます。

重要

形式メタデータ フィールドは、関連するコネクタが形式メタデータを転送する場合にのみ使用できます。Apache Kafka コネクタのみが、その値形式のメタデータ フィールドを宣言できます。

キー

データ型

説明

schema

STRING NULL

ペイロード スキーマを記述する JSON 文字列。スキーマが Debezium レコードに含まれていない場合、このオプションの値は null です。

ingestion-timestamp

TIMESTAMP_LTZ(3) NULL

コネクタがイベントを処理するタイムスタンプ。このキーは、Debezium レコードの ts_ms フィールドに対応します。

source.timestamp

TIMESTAMP_LTZ(3) NULL

ソースシステムがイベントを作成するタイムスタンプ。このキーは、Debezium レコードの source.ts_ts フィールドに対応します。

source.database

STRING NULL

ソースデータベース。このキーは、フィールドが使用可能な場合、Debezium レコードの source.db フィールドに対応します。

source.schema

STRING NULL

ソースデータベースのスキーマ。このキーは、フィールドが使用可能な場合、Debezium レコードの source.schema フィールドに対応します。

source.table

STRING NULL

ソースデータベースのテーブル。このキーは、フィールドが使用可能な場合、Debezium レコードの source.table または source.collection フィールドに対応します。

source.properties

MAP<STRING, STRING> NULL

さまざまなソースプロパティのマッピング。このキーは、Debezium レコードの source フィールドに対応します。

次のサンプル コードは、Kafka の Debezium メタデータ フィールドにアクセスする方法の例を示しています。

CREATE TABLE KafkaTable (
  origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,  // 処理タイムスタンプ
  event_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,  // イベントタイム
  origin_database STRING METADATA FROM 'value.source.database' VIRTUAL,  // データベース名
  origin_schema STRING METADATA FROM 'value.source.schema' VIRTUAL,  // スキーマ名
  origin_table STRING METADATA FROM 'value.source.table' VIRTUAL,  // テーブル名
  origin_properties MAP<STRING, STRING> METADATA FROM 'value.source.properties' VIRTUAL,  // プロパティ
  user_id BIGINT,  // ユーザーID
  item_id BIGINT,  // アイテムID
  behavior STRING  // 動作
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_behavior',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'value.format' = 'debezium-json'
);

FAQ

障害発生時に重複する変更イベントが配信された場合はどうすればよいですか?

通常の動作環境では、Debezium は exactly-once セマンティクスを使用して、ソース テーブルの各変更イベントを配信できます。その後、Realtime Compute for Apache Flink は、Debezium によって生成された変更イベントを想定どおりに消費できます。障害が発生した場合、Debezium は at-least-once 配信のみを保証できます。この場合、Debezium は重複する変更イベントを Kafka に配信する可能性があります。Realtime Compute for Apache Flink は、Kafka からデータを取得するときに重複イベントを取得します。これは、Realtime Compute for Apache Flink クエリの誤った結果または予期しない例外を引き起こす可能性があります。この問題を回避するには、デプロイ パラメータ table.exec.source.cdc-events-duplicate を true に設定し、ソースに主キーを定義することをお勧めします。このようにして、Realtime Compute for Apache Flink は追加のステートフル演算子を生成できます。主キーは、変更イベントの重複を排除し、正規化された changelog ストリームを生成するために使用されます。

説明

Debezium のメッセージ配信セマンティクスの詳細については、Debezium をご参照ください。

PostgreSQL 用 Debezium コネクタを使用して生成されたデータが想定どおりに解釈されない場合はどうすればよいですか?

PostgreSQL 用 Debezium コネクタを使用して Kafka への変更をキャプチャする場合は、監視対象テーブルの REPLICA IDENTITY が FULL に設定されていることを確認してください。デフォルト値は DEFAULT です。REPLICA IDENTITY が FULL に設定されていない場合、Flink SQL は Debezium データを想定どおりに解釈できません。

REPLICA IDENTITY が FULL に設定されている場合、UPDATE イベントと DELETE イベントには、すべての列の以前の値が含まれます。REPLICA IDENTITY が FULL に設定されていない場合、UPDATE イベントと DELETE イベントの before フィールドには、主キー フィールドの値のみが含まれます。REPLICA IDENTITY が FULL に設定されておらず、主キーが指定されていない場合、UPDATE イベントと DELETE イベントの before フィールドの値は null です。ALTER TABLE <your-table-name> REPLICA IDENTITY FULL ステートメントを実行して、REPLICA IDENTITY の構成を変更できます。

説明

詳細については、PostgreSQL 用 Debezium コネクタ をご参照ください。