This topic provides an example on how to use Canal and describes the parameters and data type mappings of Canal.
Background information
Canal is a Changelog Data Capture (CDC) tool that can transmit MySQL changes to other systems in real time. Canal provides a unified data format for changelogs and can serialize messages by using JSON or protobuf. protobuf is the default format for Canal. Connectors that support Canal include Apache Kafka connector and Object Storage Service (OSS) connector.
Flink can parse Canal JSON messages into INSERT, UPDATE, or DELETE messages and send the messages to the Flink SQL system. Canal is suitable for the following scenarios:
Synchronization of incremental data from a database to another system
Log auditing
Real-time materialized views of databases
Temporal joins of database tables
Flink can also encode the INSERT, UPDATE, or DELETE messages in the Flink SQL system into Canal JSON messages and send the messages to a data store, such as Kafka.
Flink cannot merge an UPDATE_BEFORE message and an UPDATE_AFTER message into one UPDATE message. Instead, Flink encodes an UPDATE_BEFORE message as a Canal message of the DELETE type and an UPDATE_AFTER message as a Canal message of the INSERT type.
Example
Canal provides a uniform format for changelogs. The following sample code provides an example on how to capture update operations from the products table in a MySQL database.
{
"data": [
{
"id": "111",
"name": "scooter",
"description": "Big 2-wheel scooter",
"weight": "5.18"
}
],
"database": "inventory",
"es": 1589373560000,
"id": 9,
"isDdl": false,
"mysqlType": {
"id": "INTEGER",
"name": "VARCHAR(255)",
"description": "VARCHAR(512)",
"weight": "FLOAT"
},
"old": [
{
"weight": "5.15"
}
],
"pkNames": [
"id"
],
"sql": "",
"sqlType": {
"id": 4,
"name": 12,
"description": 12,
"weight": 7
},
"table": "products",
"ts": 1589373560798,
"type": "UPDATE"
};For more information about the fields in the sample code, see the Canal documentation.
The products table in the MySQL database has the following columns: id, name, description, and weight. The JSON message in the preceding sample code includes an update event of the products table. In the row whose value of id is 111, the value of weight is changed from 5.15 to 5.18. If the message is synchronized to a Kafka topic named products_binlog, you can execute the following DDL statement to consume the message from the topic and parse the change event.
CREATE TABLE topic_products (
-- Make sure that the schema of the table that you create is the same as the schema of the products table in the MySQL database.
id BIGINT,
name STRING,
description STRING,
weight DECIMAL(10, 2)
) WITH (
'connector' = 'kafka',
'topic' = 'products_binlog',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'format' = 'canal-json' -- Use the Canal JSON format.
);After you create a Flink table based on the data of the Kafka topic, you can use Canal messages as the source of changelogs.
-- Create a real-time materialized view based on the products table in the MySQL database.
-- Calculate the most recent average weight of the same products.
SELECT name, AVG(weight) FROM topic_products GROUP BY name;
-- Synchronize all data and incremental changes of the products table in the MySQL database to the products index of Elasticsearch for subsequent queries.
INSERT INTO elasticsearch_products
SELECT * FROM topic_products;Parameters
Parameter | Required | Default value | Data type | Description |
format | Yes | (none) | STRING | The format that you want to use. If you want to use Canal, set this parameter to canal-json. |
canal-json.ignore-parse-errors | No | false | BOOLEAN | Valid values:
|
canal-json.timestamp-format.standard | No | SQL | STRING | The formats of the input timestamp and output timestamp. Valid values:
|
canal-json.map-null-key.mode | No | FAIL | STRING | The method that is used to handle a null key value in a map. Valid values:
|
canal-json.map-null-key.literal | No | null | STRING | If the canal-json.map-null-key.mode parameter is set to LITERAL, the specified string constant is used to replace the null key value in the map. |
canal-json.encode.decimal-as-plain-number | No | false | BOOLEAN | Valid values:
|
canal-json.database.include | No | (none) | STRING | An optional regular expression that matches the database metadata field in Canal records. Only the changelogs of the specified database are read. Regular expressions are compatible with Pattern of Java. |
canal-json.table.include | No | (none) | STRING | An optional regular expression that matches the table metadata field in Canal records. Only the changelogs of the specified table are read. Regular expressions are compatible with Pattern of Java. |
Data type mappings
Canal uses the JSON format for serialization and deserialization. For more information about data type mappings, see JSON Format. Canal is compatible with the Canal extension change type INIT that is used by Data Transmission Service (DTS) to store data in Kafka clusters. For more information, see Data storage formats of Kafka clusters.
Additional information
Available metadata
The following format metadata fields can be declared as read-only (VIRTUAL) columns in DDL statements.
Format metadata fields are available only if the related connector forwards format metadata. Only the Apache Kafka connector can declare metadata fields for its value format.
Key | Data type | Description |
database | STRING NULL | The source database. This field corresponds to the database field in Canal records. |
table | STRING NULL | The table of the source database. This field corresponds to the table field in Canal records. |
sql-type | MAP<STRING, INT> NULL | SQL data type mappings. This field corresponds to the sqlType field in Canal records. |
pk-names | ARRAY<STRING> NULL | The array of primary key names. This field corresponds to the pkNames field in Canal records. |
ingestion-timestamp | TIMESTAMP_LTZ(3) NULL | The timestamp at which the connector processes the event. This field corresponds to the ts field in Canal records. |
The following sample code provides an example on how to access Canal metadata fields in Kafka.
CREATE TABLE KafkaTable (
origin_database STRING METADATA FROM 'value.database' VIRTUAL,
origin_table STRING METADATA FROM 'value.table' VIRTUAL,
origin_sql_type MAP<STRING, INT> METADATA FROM 'value.sql-type' VIRTUAL,
origin_pk_names ARRAY<STRING> METADATA FROM 'value.pk-names' VIRTUAL,
origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
user_id BIGINT,
item_id BIGINT,
behavior STRING
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'value.format' = 'canal-json'
);FAQ
What do I do if duplicate change events are delivered when a fault occurs?
In most cases, Canal can use the exactly-once semantics to deliver each change event, and Flink can consume the change events that are generated by Canal as expected. If a fault occurs, Canal can ensure only the at-least-once semantics for delivery. In this case, Canal may deliver duplicate change events to Kafka. As a result, Flink may obtain duplicate change events when it consumes data from Kafka. This may lead to incorrect results or unexpected exceptions in Flink queries. To avoid this issue, we recommend that you set the deployment parameter table.exec.source.cdc-events-duplicate to true and define the primary key on the source. This way, Flink can generate an additional stateful operator. The primary key is used to deduplicate change events and generate a normalized changelog stream.