All Products
Search
Document Center

Realtime Compute for Apache Flink:Canal

Last Updated:Mar 26, 2026

Canal is a Change Data Capture (CDC) tool that captures MySQL changes and streams them to downstream systems in real time. Canal provides a unified changelog format with JSON or protobuf serialization—protobuf is the default.

Realtime Compute for Apache Flink supports Canal JSON as both a source and a sink format. As a source, Flink parses Canal JSON messages into INSERT, UPDATE, or DELETE changelog events and feeds them into the Flink SQL system. As a sink, Flink encodes Flink SQL changelog events into Canal JSON messages and writes them to a data store such as Kafka.

Canal works with the following Flink connectors:

Use cases

  • Incremental data sync: Capture row-level changes from MySQL and propagate them to another system without full table scans.

  • Log auditing: Record every INSERT, UPDATE, and DELETE event for compliance and audit trails.

  • Real-time materialized views: Keep an aggregated or transformed view of a database table up to date as changes arrive.

  • Temporal joins: Enrich streaming data by joining it against a versioned snapshot of a database table.

How it works

When Canal captures a change event from MySQL binlog, it produces a JSON message that includes the changed row data, the source database and table, the change type (INSERT, UPDATE, or DELETE), and timestamps.

Flink reads these messages from Kafka and translates them into Flink SQL changelog events:

  • A Canal INSERT event becomes a Flink INSERT message.

  • A Canal UPDATE event splits into an UPDATE_BEFORE message (encoded as DELETE) and an UPDATE_AFTER message (encoded as INSERT).

  • A Canal DELETE event becomes a Flink DELETE message.

Important

Flink cannot merge UPDATE_BEFORE and UPDATE_AFTER back into a single UPDATE message. When writing Canal JSON to a sink, Flink encodes UPDATE_BEFORE as DELETE and UPDATE_AFTER as INSERT.

Example

The following Canal JSON message captures an update to the products table in a MySQL database named inventory. The row with id=111 had its weight changed from 5.15 to 5.18.

Key fields in the message:

Field Description
data Row values after the change.
old Previous values of changed columns only. In this example, only weight changed, so only weight appears.
type Change type: UPDATE, INSERT, or DELETE.
es Event timestamp from the source database (milliseconds since epoch).
ts Timestamp at which Canal wrote the message to Kafka (milliseconds since epoch). es and ts differ: es reflects when MySQL committed the change, while ts reflects when Canal published it.
mysqlType MySQL column type for each column.
sqlType JDBC type code for each column.
{
  "data": [
    {
      "id": "111",
      "name": "scooter",
      "description": "Big 2-wheel scooter",
      "weight": "5.18"
    }
  ],
  "database": "inventory",
  "es": 1589373560000,
  "id": 9,
  "isDdl": false,
  "mysqlType": {
    "id": "INTEGER",
    "name": "VARCHAR(255)",
    "description": "VARCHAR(512)",
    "weight": "FLOAT"
  },
  "old": [
    {
      "weight": "5.15"
    }
  ],
  "pkNames": [
    "id"
  ],
  "sql": "",
  "sqlType": {
    "id": 4,
    "name": 12,
    "description": 12,
    "weight": 7
  },
  "table": "products",
  "ts": 1589373560798,
  "type": "UPDATE"
}
For the full Canal JSON field specification, see the Canal documentation.

Assume this message is published to a Kafka topic named products_binlog. Create a Flink table to consume and parse it:

CREATE TABLE topic_products (
  -- Schema must match the products table in MySQL
  id BIGINT,
  name STRING,
  description STRING,
  weight DECIMAL(10, 2)
) WITH (
  'connector' = 'kafka',
  'topic' = 'products_binlog',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'format' = 'canal-json'
);

Once the table is defined, run queries against it as a changelog source:

-- Build a real-time materialized view: average weight per product name
SELECT name, AVG(weight) FROM topic_products GROUP BY name;

-- Sync all changes to an Elasticsearch index for search
INSERT INTO elasticsearch_products
SELECT * FROM topic_products;

Parameters

Parameter Required Default Data type Description
format Yes (none) STRING Set to canal-json.
canal-json.ignore-parse-errors No false BOOLEAN Set to true to skip rows that fail to parse. By default (false), a parse failure stops the deployment with an error. Use true only in environments where occasional malformed messages are acceptable and silent skipping is preferable to job failure.
canal-json.timestamp-format.standard No SQL STRING Timestamp format for input and output. SQL parses yyyy-MM-dd HH:mm:ss.s{precision} (for example, 2020-12-30 12:13:14.123). ISO-8601 parses yyyy-MM-ddTHH:mm:ss.s{precision} (for example, 2020-12-30T12:13:14.123).
canal-json.map-null-key.mode No FAIL STRING Behavior when a MAP key is null. FAIL returns an error. DROP silently discards the record. LITERAL replaces the null key with the string specified by canal-json.map-null-key.literal.
canal-json.map-null-key.literal No null STRING The string to substitute for a null MAP key when canal-json.map-null-key.mode is set to LITERAL.
canal-json.encode.decimal-as-plain-number No false BOOLEAN Set to true to encode DECIMAL values as plain numbers (for example, 0.000000027). By default (false), DECIMAL values may be written in scientific notation (for example, 2.7E-8).
canal-json.database.include No (none) STRING A regular expression to filter Canal records by database name. Only changelogs from matching databases are consumed. Uses Java Pattern syntax.
canal-json.table.include No (none) STRING A regular expression to filter Canal records by table name. Only changelogs from matching tables are consumed. Uses Java Pattern syntax.

Data type mappings

Canal uses JSON for serialization and deserialization. For data type mappings between Canal JSON and Flink SQL types, see JSON Format.

Canal is also compatible with the Canal extension change type INIT used by Data Transmission Service (DTS) to store data in Kafka clusters. For details, see Data storage formats of Kafka clusters.

Available metadata

The following metadata fields can be declared as read-only VIRTUAL columns in a DDL statement. Metadata fields are only available when the connector propagates format metadata—only the Apache Kafka connector supports this for its value format.

Key Data type Description
database STRING NULL Source database name. Corresponds to the database field in Canal records.
table STRING NULL Source table name. Corresponds to the table field in Canal records.
sql-type MAP\<STRING, INT\> NULL SQL data type mappings (JDBC type codes). Corresponds to the sqlType field in Canal records.
pk-names ARRAY\<STRING\> NULL Array of primary key column names. Corresponds to the pkNames field in Canal records.
ingestion-timestamp TIMESTAMP_LTZ(3) NULL Timestamp at which Canal wrote the event to Kafka. Corresponds to the ts field in Canal records.

Declare metadata fields with METADATA FROM 'value.<key>' and the VIRTUAL keyword:

CREATE TABLE KafkaTable (
  origin_database STRING METADATA FROM 'value.database' VIRTUAL,
  origin_table STRING METADATA FROM 'value.table' VIRTUAL,
  origin_sql_type MAP<STRING, INT> METADATA FROM 'value.sql-type' VIRTUAL,
  origin_pk_names ARRAY<STRING> METADATA FROM 'value.pk-names' VIRTUAL,
  origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
  user_id BIGINT,
  item_id BIGINT,
  behavior STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_behavior',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'value.format' = 'canal-json'
);

Caveats

Duplicate change events

Canal normally delivers each change event with exactly-once semantics. If a fault occurs, Canal falls back to at-least-once semantics and may deliver duplicate events to Kafka, which can cause incorrect query results or unexpected exceptions in Flink.

To deduplicate events, set the job configuration parameter table.exec.source.cdc-events-duplicate to true and define a primary key on the source table. Flink generates an additional stateful operator that uses the primary key to deduplicate change events and produce a normalized changelog stream.