format, Debezium
This topic provides usage examples of the Debezium format and describes the format options and data type mappings.
Background information
Debezium is a Changelog Data Capture (CDC) tool that can stream data changes from various databases, such as MySQL, PostgreSQL, Oracle, and Microsoft SQL Server, to Kafka in real time. Debezium provides a unified format schema for changelogs and can serialize messages by using JSON and Apache Avro. The Apache Kafka connector and Object Storage Service (OSS) support the Debezium format.
Flink can interpret Debezium JSON messages and Debezium Avro messages as INSERT, UPDATE, or DELETE messages in the Flink SQL system. The Debezium format is suitable for the following scenarios:
Synchronization of incremental data from a database to another system
Log auditing
Real-time materialized views of databases
Temporal joins of database tables
Flink can also interpret INSERT, UPDATE, or DELETE messages in the Flink SQL system as Debezium JSON messages or Debezium Avro messages and then forwards the messages to data stores, such as Kafka.
Flink SQL cannot process a single UPDATE message that contains data of both the UPDATE_BEFORE and UPDATE_AFTER types. Instead, when Flink writes data to a downstream storage system, Flink converts data of the UPDATE_BEFORE type into data of the DELETE type and data of the UPDATE_AFTER type into data of the INSERT type. Then, Flink interprets the converted messages as Debezium messages and writes the Debezium messages to the downstream storage system.
Example
Flink receives an update record of the products table in a MySQL database. The record is in the Debezium JSON format and contains the id, name, description, and weight columns.
{
"before": {
"id": 111,
"name": "scooter",
"description": "Big 2-wheel scooter",
"weight": 5.18
},
"after": {
"id": 111,
"name": "scooter",
"description": "Big 2-wheel scooter",
"weight": 5.15
},
"source": {...},
"op": "u",
"ts_ms": 1589362330904,
"transaction": null
};For more information about the description of each field in the preceding example, see Debezium.
The JSON message in the preceding example includes an update event of the products table. In the row whose value of id is 111, the value of weight is changed from 5.18 to 5.15. If the message is synchronized to a Kafka topic named products_binlog, you can execute the following DDL statement to use this topic and interpret the change event.
-- Use the debezium-json format to interpret Debezium JSON messages.
CREATE TABLE topic_products (
-- Make sure that the schema of the topic_products table is the same as the schema of the products table in the MySQL database.
id BIGINT,
name STRING,
description STRING,
weight DECIMAL(10, 2)
) WITH (
'connector' = 'kafka',
'topic' = 'products_binlog',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'format' = 'debezium-json'
-- Use the debezium-json format to interpret Debezium JSON messages.
-- If the Debezium message is an Avro message, use the debezium-avro-confluent format.
);When you configure Debezium Kafka Connect, you can set the value.converter.schemas.enable option to true based on your business requirements. This way, the schema information is included in the message body. The following sample code provides an example of a Debezium JSON message.
{
"schema": {...},
"payload": {
"before": {
"id": 111,
"name": "scooter",
"description": "Big 2-wheel scooter",
"weight": 5.18
},
"after": {
"id": 111,
"name": "scooter",
"description": "Big 2-wheel scooter",
"weight": 5.15
},
"source": {...},
"op": "u",
"ts_ms": 1589362330904,
"transaction": null
}
}To interpret a Debezium JSON message, you must add the 'debezium-json.schema-include' = 'true' configuration to the WITH clause of the DDL statement in the preceding example. By default, the debezium-json.schema-include option is set to false. In most cases, we recommend that you do not include the schema description in a Debezium JSON message because this makes the message long and reduces the parsing performance.
After you register the topic as a Flink table, you can use Debezium messages as the changelog source.
-- Calculate the latest average weight of the same product in the real-time materialized view of the products table in the MySQL database.
-- Calculate the most recent average weight of the same products.
SELECT name, AVG(weight) FROM topic_products GROUP BY name;
-- Synchronize all data and incremental changes of the products table in the MySQL database to the products index of Elasticsearch for subsequent queries.
INSERT INTO elasticsearch_products
SELECT * FROM topic_products;Format options
Flink provides the debezium-avro-confluent and debezium-json formats to interpret Debezium Avro messages or Debezium JSON messages.
debezium-avro-confluent
Use the debezium-avro-confluent format to interpret Debezium Avro messages.
Option | Required | Default value | Data type | Description |
format | Yes | (none) | STRING | The format that you want to use. If you want to interpret Debezium Avro messages, set this option to debezium-avro-confluent. |
debezium-avro-confluent.basic-auth.credentials-source | No | (none) | STRING | The source of basic authentication credentials for Schema Registry. |
debezium-avro-confluent.basic-auth.user-info | No | (none) | STRING | The basic authentication user information of Schema Registry. |
debezium-avro-confluent.bearer-auth.credentials-source | No | (none) | STRING | The source of bearer authentication credentials for Schema Registry. |
debezium-avro-confluent.bearer-auth.token | No | (none) | STRING | The bearer authentication token for Schema Registry. |
debezium-avro-confluent.properties | No | (none) | MAP | The property mapping, which is forwarded to Schema Registry. This parameter is suitable for options that are not officially exposed by using Flink configuration options. Important Flink configuration options have higher priorities than this option. |
debezium-avro-confluent.ssl.keystore.location | No | (none) | STRING | The location of the SSL keystore. |
debezium-avro-confluent.ssl.keystore.password | No | (none) | STRING | The password of the SSL keystore. |
debezium-avro-confluent.ssl.truststore.location | No | (none) | STRING | The location of the SSL truststore. |
debezium-avro-confluent.ssl.truststore.password | No | (none) | STRING | The password of the SSL truststore. |
debezium-avro-confluent.subject | No | (none) | STRING | The Confluent Schema Registry subject under which the schema used by this format is registered during serialization. The Apache Kafka and Upsert Kafka connectors use '<topic_name>-value' or '<topic_name>-key' as the default subject name if this format is used as the value or key format of Kafka. If you use the file system connector as a sink, the debezium-avro-confluent.subject option is required. |
debezium-avro-confluent.url | Yes | (none) | STRING | The URL of Confluent Schema Registry to obtain or register schemas. |
debezium-json
Use the debezium-json format to interpret Debezium JSON messages.
Option | Required | Default value | Data type | Description |
format | Yes | (none) | STRING | The format that you want to use. If you want to interpret Debezium JSON messages, set this option to debezium-json. |
debezium-json.schema-include | No | false | BOOLEAN | When you configure Debezium Kafka Connect, you can set the value.converter.schemas.enable option of Kafka configurations to true based on your business requirements. This way, the schema information is included in the message body. This option specifies whether the schema is included in the Debezium JSON message. Valid values:
|
debezium-json.ignore-parse-errors | No | false | BOOLEAN | Valid values:
|
debezium-json.timestamp-format.standard | No | SQL | STRING | The formats of the input timestamp and output timestamp. Valid values:
|
debezium-json.map-null-key.mode | No | FAIL | STRING | The method that is used to handle a null key value in the map. Valid values:
|
debezium-json.map-null-key.literal | No | null | STRING | If the debezium-json.map-null-key.mode option is set to LITERAL, the specified string constant is used to replace the null key value in the map. |
debezium-json.encode.decimal-as-plain-number | No | false | BOOLEAN | Valid values:
|
Data type mappings
Debezium uses the JSON format for serialization and deserialization. For more information about data type mappings, see the JSON Format documentation and Confluent Avro Format documentation.
Others
Available metadata
The format metadata fields that are described in the following table can be declared as read-only (VIRTUAL) columns in DDL statements.
The format metadata fields are available only when the related connector forwards format metadata. Only the Apache Kafka connector can declare metadata fields for its value format.
Key | Data type | Description |
schema | STRING NULL | A JSON string that describes the payload schema. If the schema is not included in the Debezium record, the value of this option is null. |
ingestion-timestamp | TIMESTAMP_LTZ(3) NULL | The timestamp at which the connector processes the event. This key corresponds to the ts_ms field in the Debezium record. |
source.timestamp | TIMESTAMP_LTZ(3) NULL | The timestamp at which the source system creates the event. This key corresponds to the source.ts_ts field in the Debezium record. |
source.database | STRING NULL | The source database. This key corresponds to the source.db field in the Debezium record if the field is available. |
source.schema | STRING NULL | The schema of the source database. This key corresponds to the source.schema field in the Debezium record if the field is available. |
source.table | STRING NULL | The table of the source database. This key corresponds to the source.table or source.collection field in the Debezium record if the field is available. |
source.properties | MAP<STRING, STRING> NULL | The mapping of various source properties. This key corresponds to the source field in the Debezium record. |
The following sample code provides an example on how to access Debezium metadata fields in Kafka:
CREATE TABLE KafkaTable (
origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
event_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
origin_database STRING METADATA FROM 'value.source.database' VIRTUAL,
origin_schema STRING METADATA FROM 'value.source.schema' VIRTUAL,
origin_table STRING METADATA FROM 'value.source.table' VIRTUAL,
origin_properties MAP<STRING, STRING> METADATA FROM 'value.source.properties' VIRTUAL,
user_id BIGINT,
item_id BIGINT,
behavior STRING
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'value.format' = 'debezium-json'
);FAQ
What do I do if duplicate change events are delivered when a fault occurs?
In a normal operating environment, Debezium can use the exactly-once semantics to deliver each change event of the source table. Then, Realtime Compute for Apache Flink can consume the change events that are generated by Debezium as expected. If a fault occurs, Debezium can ensure only the at-least-once delivery. In this case, Debezium may deliver duplicate change events to Kafka. Realtime Compute for Apache Flink obtains duplicate events when it consumes data from Kafka. This may cause incorrect results or unexpected exceptions for Realtime Compute for Apache Flink queries. To avoid this issue, we recommend that you set the deployment parameter table.exec.source.cdc-events-duplicate to true and define the primary key on the source. This way, Realtime Compute for Apache Flink can generate an additional stateful operator. The primary key is used to deduplicate change events and generate a normalized changelog stream.
For more information about the message delivery semantics of Debezium, see Debezium.
What do I do if data generated by using the Debezium connector for PostgreSQL cannot be interpreted as expected?
If you use the Debezium connector for PostgreSQL to capture changes to Kafka, make sure that REPLICA IDENTITY of the monitored table is set to FULL. The default value is DEFAULT. If REPLICA IDENTITY is not set to FULL, Flink SQL cannot interpret Debezium data as expected.
If REPLICA IDENTITY is set to FULL, the UPDATE and DELETE events include the previous values of all columns. If REPLICA IDENTITY is not set to FULL, the before field of the UPDATE and DELETE events includes only the value of the primary key field. If REPLICA IDENTITY is not set to FULL and no primary key is specified, the value of the before field of the UPDATE and DELETE events is null. You can execute the ALTER TABLE <your-table-name> REPLICA IDENTITY FULL statement to modify the configuration of REPLICA IDENTITY.
For more information, see Debezium connector for PostgreSQL.