All Products
Search
Document Center

Realtime Compute for Apache Flink:Canal

Last Updated:Dec 22, 2023

This topic provides an example on how to use Canal and describes the parameters and data type mappings of Canal.

Background information

Canal is a Changelog Data Capture (CDC) tool that can transmit MySQL changes to other systems in real time. Canal provides a unified data format for changelogs and can serialize messages by using JSON or protobuf. protobuf is the default format for Canal. Connectors that support Canal include Apache Kafka connector and Object Storage Service (OSS) connector.

Flink can parse Canal JSON messages into INSERT, UPDATE, or DELETE messages and send the messages to the Flink SQL system. Canal is suitable for the following scenarios:

  • Synchronization of incremental data from a database to another system

  • Log auditing

  • Real-time materialized views of databases

  • Temporal joins of database tables

Flink can also encode the INSERT, UPDATE, or DELETE messages in the Flink SQL system into Canal JSON messages and send the messages to a data store, such as Kafka.

Important

Flink cannot merge an UPDATE_BEFORE message and an UPDATE_AFTER message into one UPDATE message. Instead, Flink encodes an UPDATE_BEFORE message as a Canal message of the DELETE type and an UPDATE_AFTER message as a Canal message of the INSERT type.

Example

Canal provides a uniform format for changelogs. The following sample code provides an example on how to capture update operations from the products table in a MySQL database.

{
  "data": [
    {
      "id": "111",
      "name": "scooter",
      "description": "Big 2-wheel scooter",
      "weight": "5.18"
    }
  ],
  "database": "inventory",
  "es": 1589373560000,
  "id": 9,
  "isDdl": false,
  "mysqlType": {
    "id": "INTEGER",
    "name": "VARCHAR(255)",
    "description": "VARCHAR(512)",
    "weight": "FLOAT"
  },
  "old": [
    {
      "weight": "5.15"
    }
  ],
  "pkNames": [
    "id"
  ],
  "sql": "",
  "sqlType": {
    "id": 4,
    "name": 12,
    "description": 12,
    "weight": 7
  },
  "table": "products",
  "ts": 1589373560798,
  "type": "UPDATE"
};
Note

For more information about the fields in the sample code, see the Canal documentation.

The products table in the MySQL database has the following columns: id, name, description, and weight. The JSON message in the preceding sample code includes an update event of the products table. In the row whose value of id is 111, the value of weight is changed from 5.15 to 5.18. If the message is synchronized to a Kafka topic named products_binlog, you can execute the following DDL statement to consume the message from the topic and parse the change event.

CREATE TABLE topic_products (
  -- Make sure that the schema of the table that you create is the same as the schema of the products table in the MySQL database. 
  id BIGINT,
  name STRING,
  description STRING,
  weight DECIMAL(10, 2)
) WITH (
 'connector' = 'kafka',
 'topic' = 'products_binlog',
 'properties.bootstrap.servers' = 'localhost:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'canal-json'  -- Use the Canal JSON format. 
);

After you create a Flink table based on the data of the Kafka topic, you can use Canal messages as the source of changelogs.

-- Create a real-time materialized view based on the products table in the MySQL database. 
-- Calculate the most recent average weight of the same products. 
SELECT name, AVG(weight) FROM topic_products GROUP BY name;

-- Synchronize all data and incremental changes of the products table in the MySQL database to the products index of Elasticsearch for subsequent queries. 
INSERT INTO elasticsearch_products
SELECT * FROM topic_products;

Parameters

Parameter

Required

Default value

Data type

Description

format

Yes

(none)

STRING

The format that you want to use. If you want to use Canal, set this parameter to canal-json.

canal-json.ignore-parse-errors

No

false

BOOLEAN

Valid values:

  • true: If the parsing fails, the current field or row is skipped.

  • false: If parsing fails, an error is returned and the deployment fails to start. This is the default value.

canal-json.timestamp-format.standard

No

SQL

STRING

The formats of the input timestamp and output timestamp. Valid values:

  • SQL: The input timestamp in the yyyy-MM-dd HH:mm:ss.s{precision} format is parsed. For example, the input timestamp is 2020-12-30 12:13:14.123. The output timestamp is in the same format as the input timestamp.

  • ISO-8601: The input timestamp in the yyyy-MM-ddTHH:mm:ss.s{precision} format is parsed. For example, the input timestamp is 2020-12-30T12:13:14.123. The output timestamp is in the same format as the input timestamp.

canal-json.map-null-key.mode

No

FAIL

STRING

The method that is used to handle a null key value in a map. Valid values:

  • FAIL: An error is returned if a key value in the map is null.

  • DROP: Data whose key value is null in the map is discarded.

  • LITERAL: A string constant is used to replace an empty key value in a map. The value of the string constant is specified by the canal-json.map-null-key.literal parameter.

canal-json.map-null-key.literal

No

null

STRING

If the canal-json.map-null-key.mode parameter is set to LITERAL, the specified string constant is used to replace the null key value in the map.

canal-json.encode.decimal-as-plain-number

No

false

BOOLEAN

Valid values:

  • true: All data of the DECIMAL type remains unchanged and is not expressed in the scientific notation format. For example, 0.000000027 is expressed as 0.000000027.

  • false: All data of the DECIMAL type is expressed in the scientific notation format. For example, 0.000000027 is expressed as 2.7E-8.

canal-json.database.include

No

(none)

STRING

An optional regular expression that matches the database metadata field in Canal records. Only the changelogs of the specified database are read. Regular expressions are compatible with Pattern of Java.

canal-json.table.include

No

(none)

STRING

An optional regular expression that matches the table metadata field in Canal records. Only the changelogs of the specified table are read. Regular expressions are compatible with Pattern of Java.

Data type mappings

Canal uses the JSON format for serialization and deserialization. For more information about data type mappings, see JSON Format. Canal is compatible with the Canal extension change type INIT that is used by Data Transmission Service (DTS) to store data in Kafka clusters. For more information, see Data storage formats of Kafka clusters.

Additional information

Available metadata

The following format metadata fields can be declared as read-only (VIRTUAL) columns in DDL statements.

Important

Format metadata fields are available only if the related connector forwards format metadata. Only the Apache Kafka connector can declare metadata fields for its value format.

Key

Data type

Description

database

STRING NULL

The source database. This field corresponds to the database field in Canal records.

table

STRING NULL

The table of the source database. This field corresponds to the table field in Canal records.

sql-type

MAP<STRING, INT> NULL

SQL data type mappings. This field corresponds to the sqlType field in Canal records.

pk-names

ARRAY<STRING> NULL

The array of primary key names. This field corresponds to the pkNames field in Canal records.

ingestion-timestamp

TIMESTAMP_LTZ(3) NULL

The timestamp at which the connector processes the event. This field corresponds to the ts field in Canal records.

The following sample code provides an example on how to access Canal metadata fields in Kafka.

CREATE TABLE KafkaTable (
  origin_database STRING METADATA FROM 'value.database' VIRTUAL,
  origin_table STRING METADATA FROM 'value.table' VIRTUAL,
  origin_sql_type MAP<STRING, INT> METADATA FROM 'value.sql-type' VIRTUAL,
  origin_pk_names ARRAY<STRING> METADATA FROM 'value.pk-names' VIRTUAL,
  origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
  user_id BIGINT,
  item_id BIGINT,
  behavior STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_behavior',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'value.format' = 'canal-json'
);

FAQ

What do I do if duplicate change events are delivered when a fault occurs?

In most cases, Canal can use the exactly-once semantics to deliver each change event, and Flink can consume the change events that are generated by Canal as expected. If a fault occurs, Canal can ensure only the at-least-once semantics for delivery. In this case, Canal may deliver duplicate change events to Kafka. As a result, Flink may obtain duplicate change events when it consumes data from Kafka. This may lead to incorrect results or unexpected exceptions in Flink queries. To avoid this issue, we recommend that you set the deployment parameter table.exec.source.cdc-events-duplicate to true and define the primary key on the source. This way, Flink can generate an additional stateful operator. The primary key is used to deduplicate change events and generate a normalized changelog stream.