All Products
Search
Document Center

Realtime Compute for Apache Flink:Debezium

Last Updated:Jul 11, 2024

format, Debezium

This topic provides usage examples of the Debezium format and describes the format options and data type mappings.

Background information

Debezium is a Changelog Data Capture (CDC) tool that can stream data changes from various databases, such as MySQL, PostgreSQL, Oracle, and Microsoft SQL Server, to Kafka in real time. Debezium provides a unified format schema for changelogs and can serialize messages by using JSON and Apache Avro. The Apache Kafka connector and Object Storage Service (OSS) support the Debezium format.

Flink can interpret Debezium JSON messages and Debezium Avro messages as INSERT, UPDATE, or DELETE messages in the Flink SQL system. The Debezium format is suitable for the following scenarios:

  • Synchronization of incremental data from a database to another system

  • Log auditing

  • Real-time materialized views of databases

  • Temporal joins of database tables

Flink can also interpret INSERT, UPDATE, or DELETE messages in the Flink SQL system as Debezium JSON messages or Debezium Avro messages and then forwards the messages to data stores, such as Kafka.

Important

Flink SQL cannot process a single UPDATE message that contains data of both the UPDATE_BEFORE and UPDATE_AFTER types. Instead, when Flink writes data to a downstream storage system, Flink converts data of the UPDATE_BEFORE type into data of the DELETE type and data of the UPDATE_AFTER type into data of the INSERT type. Then, Flink interprets the converted messages as Debezium messages and writes the Debezium messages to the downstream storage system.

Example

Flink receives an update record of the products table in a MySQL database. The record is in the Debezium JSON format and contains the id, name, description, and weight columns.

{
  "before": {
    "id": 111,
    "name": "scooter",
    "description": "Big 2-wheel scooter",
    "weight": 5.18
  },
  "after": {
    "id": 111,
    "name": "scooter",
    "description": "Big 2-wheel scooter",
    "weight": 5.15
  },
  "source": {...},
  "op": "u",
  "ts_ms": 1589362330904,
  "transaction": null
};
Note

For more information about the description of each field in the preceding example, see Debezium.

The JSON message in the preceding example includes an update event of the products table. In the row whose value of id is 111, the value of weight is changed from 5.18 to 5.15. If the message is synchronized to a Kafka topic named products_binlog, you can execute the following DDL statement to use this topic and interpret the change event.

-- Use the debezium-json format to interpret Debezium JSON messages.
CREATE TABLE topic_products (
-- Make sure that the schema of the topic_products table is the same as the schema of the products table in the MySQL database.
  id BIGINT,
  name STRING,
  description STRING,
  weight DECIMAL(10, 2)
) WITH (
 'connector' = 'kafka',
 'topic' = 'products_binlog',
 'properties.bootstrap.servers' = 'localhost:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'debezium-json'
-- Use the debezium-json format to interpret Debezium JSON messages.
-- If the Debezium message is an Avro message, use the debezium-avro-confluent format.
);

When you configure Debezium Kafka Connect, you can set the value.converter.schemas.enable option to true based on your business requirements. This way, the schema information is included in the message body. The following sample code provides an example of a Debezium JSON message.

{
  "schema": {...},
  "payload": {
    "before": {
      "id": 111,
      "name": "scooter",
      "description": "Big 2-wheel scooter",
      "weight": 5.18
    },
    "after": {
      "id": 111,
      "name": "scooter",
      "description": "Big 2-wheel scooter",
      "weight": 5.15
    },
    "source": {...},
    "op": "u",
    "ts_ms": 1589362330904,
    "transaction": null
  }
}

To interpret a Debezium JSON message, you must add the 'debezium-json.schema-include' = 'true' configuration to the WITH clause of the DDL statement in the preceding example. By default, the debezium-json.schema-include option is set to false. In most cases, we recommend that you do not include the schema description in a Debezium JSON message because this makes the message long and reduces the parsing performance.

After you register the topic as a Flink table, you can use Debezium messages as the changelog source.

-- Calculate the latest average weight of the same product in the real-time materialized view of the products table in the MySQL database. 
-- Calculate the most recent average weight of the same products. 
SELECT name, AVG(weight) FROM topic_products GROUP BY name;

-- Synchronize all data and incremental changes of the products table in the MySQL database to the products index of Elasticsearch for subsequent queries. 
 
INSERT INTO elasticsearch_products
SELECT * FROM topic_products;

Format options

Flink provides the debezium-avro-confluent and debezium-json formats to interpret Debezium Avro messages or Debezium JSON messages.

debezium-avro-confluent

Use the debezium-avro-confluent format to interpret Debezium Avro messages.

Option

Required

Default value

Data type

Description

format

Yes

(none)

STRING

The format that you want to use. If you want to interpret Debezium Avro messages, set this option to debezium-avro-confluent.

debezium-avro-confluent.basic-auth.credentials-source

No

(none)

STRING

The source of basic authentication credentials for Schema Registry.

debezium-avro-confluent.basic-auth.user-info

No

(none)

STRING

The basic authentication user information of Schema Registry.

debezium-avro-confluent.bearer-auth.credentials-source

No

(none)

STRING

The source of bearer authentication credentials for Schema Registry.

debezium-avro-confluent.bearer-auth.token

No

(none)

STRING

The bearer authentication token for Schema Registry.

debezium-avro-confluent.properties

No

(none)

MAP

The property mapping, which is forwarded to Schema Registry. This parameter is suitable for options that are not officially exposed by using Flink configuration options.

Important

Flink configuration options have higher priorities than this option.

debezium-avro-confluent.ssl.keystore.location

No

(none)

STRING

The location of the SSL keystore.

debezium-avro-confluent.ssl.keystore.password

No

(none)

STRING

The password of the SSL keystore.

debezium-avro-confluent.ssl.truststore.location

No

(none)

STRING

The location of the SSL truststore.

debezium-avro-confluent.ssl.truststore.password

No

(none)

STRING

The password of the SSL truststore.

debezium-avro-confluent.subject

No

(none)

STRING

The Confluent Schema Registry subject under which the schema used by this format is registered during serialization. The Apache Kafka and Upsert Kafka connectors use '<topic_name>-value' or '<topic_name>-key' as the default subject name if this format is used as the value or key format of Kafka. If you use the file system connector as a sink, the debezium-avro-confluent.subject option is required.

debezium-avro-confluent.url

Yes

(none)

STRING

The URL of Confluent Schema Registry to obtain or register schemas.

debezium-json

Use the debezium-json format to interpret Debezium JSON messages.

Option

Required

Default value

Data type

Description

format

Yes

(none)

STRING

The format that you want to use. If you want to interpret Debezium JSON messages, set this option to debezium-json.

debezium-json.schema-include

No

false

BOOLEAN

When you configure Debezium Kafka Connect, you can set the value.converter.schemas.enable option of Kafka configurations to true based on your business requirements. This way, the schema information is included in the message body. This option specifies whether the schema is included in the Debezium JSON message.

Valid values:

  • true: The schema is included in the Debezium JSON message.

  • false: The schema is not included in the Debezium JSON message.

debezium-json.ignore-parse-errors

No

false

BOOLEAN

Valid values:

  • true: If the parsing fails, the current field or row is skipped.

  • false: An error is returned and the deployment fails to start. This is the default value.

debezium-json.timestamp-format.standard

No

SQL

STRING

The formats of the input timestamp and output timestamp. Valid values:

  • SQL: The input timestamp in the yyyy-MM-dd HH:mm:ss.s{precision} format is parsed. For example, the input timestamp is 2020-12-30 12:13:14.123. The output timestamp is in the same format as the input timestamp.

  • ISO-8601: The input timestamp in the yyyy-MM-ddTHH:mm:ss.s{precision} format is parsed. For example, the input timestamp is 2020-12-30T12:13:14.123. The output timestamp is in the same format as the input timestamp.

debezium-json.map-null-key.mode

No

FAIL

STRING

The method that is used to handle a null key value in the map. Valid values:

  • FAIL: An error is returned if a key value in the map is null.

  • DROP: Data whose key value is null in the map is discarded.

  • LITERAL: A string constant is used to replace an empty key value in a map. The value of the string constant is specified by the canal-json.map-null-key.literal option.

debezium-json.map-null-key.literal

No

null

STRING

If the debezium-json.map-null-key.mode option is set to LITERAL, the specified string constant is used to replace the null key value in the map.

debezium-json.encode.decimal-as-plain-number

No

false

BOOLEAN

Valid values:

  • true: All data of the DECIMAL type remains unchanged and is not expressed in the scientific notation format. For example, 0.000000027 is expressed as 0.000000027.

  • false: All data of the DECIMAL type is expressed in the scientific notation format. For example, 0.000000027 is expressed as 2.7E-8.

Data type mappings

Debezium uses the JSON format for serialization and deserialization. For more information about data type mappings, see the JSON Format documentation and Confluent Avro Format documentation.

Others

Available metadata

The format metadata fields that are described in the following table can be declared as read-only (VIRTUAL) columns in DDL statements.

Important

The format metadata fields are available only when the related connector forwards format metadata. Only the Apache Kafka connector can declare metadata fields for its value format.

Key

Data type

Description

schema

STRING NULL

A JSON string that describes the payload schema. If the schema is not included in the Debezium record, the value of this option is null.

ingestion-timestamp

TIMESTAMP_LTZ(3) NULL

The timestamp at which the connector processes the event. This key corresponds to the ts_ms field in the Debezium record.

source.timestamp

TIMESTAMP_LTZ(3) NULL

The timestamp at which the source system creates the event. This key corresponds to the source.ts_ts field in the Debezium record.

source.database

STRING NULL

The source database. This key corresponds to the source.db field in the Debezium record if the field is available.

source.schema

STRING NULL

The schema of the source database. This key corresponds to the source.schema field in the Debezium record if the field is available.

source.table

STRING NULL

The table of the source database. This key corresponds to the source.table or source.collection field in the Debezium record if the field is available.

source.properties

MAP<STRING, STRING> NULL

The mapping of various source properties. This key corresponds to the source field in the Debezium record.

The following sample code provides an example on how to access Debezium metadata fields in Kafka:

CREATE TABLE KafkaTable (
  origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
  event_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
  origin_database STRING METADATA FROM 'value.source.database' VIRTUAL,
  origin_schema STRING METADATA FROM 'value.source.schema' VIRTUAL,
  origin_table STRING METADATA FROM 'value.source.table' VIRTUAL,
  origin_properties MAP<STRING, STRING> METADATA FROM 'value.source.properties' VIRTUAL,
  user_id BIGINT,
  item_id BIGINT,
  behavior STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_behavior',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'value.format' = 'debezium-json'
);

FAQ

What do I do if duplicate change events are delivered when a fault occurs?

In a normal operating environment, Debezium can use the exactly-once semantics to deliver each change event of the source table. Then, Realtime Compute for Apache Flink can consume the change events that are generated by Debezium as expected. If a fault occurs, Debezium can ensure only the at-least-once delivery. In this case, Debezium may deliver duplicate change events to Kafka. Realtime Compute for Apache Flink obtains duplicate events when it consumes data from Kafka. This may cause incorrect results or unexpected exceptions for Realtime Compute for Apache Flink queries. To avoid this issue, we recommend that you set the deployment parameter table.exec.source.cdc-events-duplicate to true and define the primary key on the source. This way, Realtime Compute for Apache Flink can generate an additional stateful operator. The primary key is used to deduplicate change events and generate a normalized changelog stream.

Note

For more information about the message delivery semantics of Debezium, see Debezium.

What do I do if data generated by using the Debezium connector for PostgreSQL cannot be interpreted as expected?

If you use the Debezium connector for PostgreSQL to capture changes to Kafka, make sure that REPLICA IDENTITY of the monitored table is set to FULL. The default value is DEFAULT. If REPLICA IDENTITY is not set to FULL, Flink SQL cannot interpret Debezium data as expected.

If REPLICA IDENTITY is set to FULL, the UPDATE and DELETE events include the previous values of all columns. If REPLICA IDENTITY is not set to FULL, the before field of the UPDATE and DELETE events includes only the value of the primary key field. If REPLICA IDENTITY is not set to FULL and no primary key is specified, the value of the before field of the UPDATE and DELETE events is null. You can execute the ALTER TABLE <your-table-name> REPLICA IDENTITY FULL statement to modify the configuration of REPLICA IDENTITY.

Note

For more information, see Debezium connector for PostgreSQL.