This topic provides an example on how to use Maxwell and describes the parameters and data type mappings of Maxwell.
Background information
Maxwell is a Changelog Data Capture (CDC) tool that streams changes from MySQL to Kafka, Kinesis, and other streaming connectors in real time. Maxwell provides a unified format for changelogs and can serialize messages by using JSON. Connectors that support the Maxwell format include Apache Kafka connector and Object Storage Service (OSS) connector.
Flink can parse Maxwell JSON messages into INSERT, UPDATE, or DELETE messages and send the messages to the Flink SQL system. Maxwell is suitable for the following scenarios:
Synchronization of incremental data from a database to another system
Log auditing
Real-time materialized views of databases
Temporal joins of database tables
Flink can also encode INSERT, UPDATE, or DELETE messages in the Flink SQL system into Maxwell JSON messages and send the messages to a data store, such as Kafka.
Flink cannot merge an UPDATE_BEFORE message and an UPDATE_AFTER message into one UPDATE message. Instead, Flink encodes an UPDATE_BEFORE message as a Maxwell message of the DELETE type and an UPDATE_AFTER message as a Maxwell message of the INSERT type.
Example
The products table in a MySQL database contains the following columns: id, name, description, and weight. The following sample code provides an example of a JSON message that includes an update operation captured from the products table.
{
"database":"test",
"table":"e",
"type":"insert",
"ts":1477053217,
"xid":23396,
"commit":true,
"position":"master.000006:800911",
"server_id":23042,
"thread_id":108,
"primary_key": [1, "2016-10-21 05:33:37.523000"],
"primary_key_columns": ["id", "c"],
"data":{
"id":111,
"name":"scooter",
"description":"Big 2-wheel scooter",
"weight":5.15
},
"old":{
"weight":5.18,
}
}For more information about the description of each field in the preceding example, see Maxwell.
The JSON message in the preceding example includes an update event of the products table. In the row whose value of id is 111, the value of weight is changed from 5.18 to 5.15. If the message is synchronized to a Kafka topic named products_binlog, you can execute the following DDL statement to use this topic and parse the change event.
CREATE TABLE topic_products (
-- Make sure that the schema of the table that you create is the same as the schema of the products table in the MySQL database.
id BIGINT,
name STRING,
description STRING,
weight DECIMAL(10, 2)
) WITH (
'connector' = 'kafka',
'topic' = 'products_binlog',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'format' = 'maxwell-json'
)After you create a Flink table based on the data of the Kafka topic, you can use Maxwell messages as the source of changelogs.
-- Create a real-time materialized view based on the products table in the MySQL database.
-- Calculate the most recent average weight of the same products.
SELECT name, AVG(weight) FROM topic_products GROUP BY name;
-- Synchronize all data and incremental changes of the products table in the MySQL database to the products index of Elasticsearch for subsequent queries.
INSERT INTO elasticsearch_products
SELECT * FROM topic_products;Parameters
Parameter | Required | Default value | Data type | Description |
format | Yes | (none) | STRING | The format that you want to use. If you want to use Maxwell, set this parameter to maxwell-json. |
maxwell-json.ignore-parse-errors | No | false | BOOLEAN | Valid values:
|
maxwell-json.timestamp-format.standard | No | SQL | STRING | The formats of the input and output timestamps. Valid values:
|
maxwell-json.map-null-key.mode | No | FAIL | STRING | The method that is used to handle a null key value in the map. Valid values:
|
maxwell-json.map-null-key.literal | No | null | STRING | If the maxwell-json.map-null-key.mode parameter is set to LITERAL, the specified string constant is used to replace the null key value in the map. |
maxwell-json.encode.decimal-as-plain-number | No | false | BOOLEAN | Valid values:
|
Data type mappings
Maxwell uses the JSON format for serialization and deserialization. For more information about data type mappings, see JSON Format.
Others
Available metadata
The following format metadata fields can be declared as read-only (VIRTUAL) columns in DDL statements.
Format metadata fields are available only if the related connector forwards format metadata. Only the Kafka connector can declare the metadata field that is used to specify the value format.
Key | Data type | Description |
database | STRING NULL | The source database. This field corresponds to the database field in Maxwell records. |
table | STRING NULL | The table of the source database. This field corresponds to the table field in Maxwell records. |
primary-key-columns | ARRAY<STRING> NULL | The array of primary key names. This field corresponds to the primary_key_columns field in Maxwell records. |
ingestion-timestamp | TIMESTAMP_LTZ(3) NULL | The timestamp at which the connector processes the event. This field corresponds to the ts field in Maxwell records. |
The following sample code provides an example on how to access Maxwell metadata fields in Kafka.
CREATE TABLE KafkaTable (
origin_database STRING METADATA FROM 'value.database' VIRTUAL,
origin_table STRING METADATA FROM 'value.table' VIRTUAL,
origin_primary_key_columns ARRAY<STRING> METADATA FROM 'value.primary-key-columns' VIRTUAL,
origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
user_id BIGINT,
item_id BIGINT,
behavior STRING
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'value.format' = 'maxwell-json'
);FAQ
What do I do if duplicate change events are delivered when a fault occurs?
In most cases, Maxwell can use the exactly-once semantics to deliver each change event, and Flink can consume the change events that are generated by Maxwell as expected. If a fault occurs, Maxwell can ensure only the at-least-once semantics for delivery. In this case, Maxwell may deliver duplicate change events to Kafka. As a result, Flink may obtain duplicate change events when it consumes data from Kafka. This may lead to incorrect results or unexpected exceptions in Flink queries. To avoid this issue, we recommend that you set the deployment parameter table.exec.source.cdc-events-duplicate to true and define the primary key on the source. This way, Flink can generate an additional stateful operator. The primary key is used to deduplicate change events and generate a normalized changelog stream.