All Products
Search
Document Center

Realtime Compute for Apache Flink:Maxwell

Last Updated:Sep 19, 2023

This topic provides an example on how to use Maxwell and describes the parameters and data type mappings of Maxwell.

Background information

Maxwell is a Changelog Data Capture (CDC) tool that streams changes from MySQL to Kafka, Kinesis, and other streaming connectors in real time. Maxwell provides a unified format for changelogs and can serialize messages by using JSON. Connectors that support the Maxwell format include Apache Kafka connector and Object Storage Service (OSS) connector.

Flink can parse Maxwell JSON messages into INSERT, UPDATE, or DELETE messages and send the messages to the Flink SQL system. Maxwell is suitable for the following scenarios:

  • Synchronization of incremental data from a database to another system

  • Log auditing

  • Real-time materialized views of databases

  • Temporal joins of database tables

Flink can also encode INSERT, UPDATE, or DELETE messages in the Flink SQL system into Maxwell JSON messages and send the messages to a data store, such as Kafka.

Important

Flink cannot merge an UPDATE_BEFORE message and an UPDATE_AFTER message into one UPDATE message. Instead, Flink encodes an UPDATE_BEFORE message as a Maxwell message of the DELETE type and an UPDATE_AFTER message as a Maxwell message of the INSERT type.

Example

The products table in a MySQL database contains the following columns: id, name, description, and weight. The following sample code provides an example of a JSON message that includes an update operation captured from the products table.

{
   "database":"test",
   "table":"e",
   "type":"insert",
   "ts":1477053217,
   "xid":23396,
   "commit":true,
   "position":"master.000006:800911",
   "server_id":23042,
   "thread_id":108,
   "primary_key": [1, "2016-10-21 05:33:37.523000"],
   "primary_key_columns": ["id", "c"],
   "data":{
     "id":111,
     "name":"scooter",
     "description":"Big 2-wheel scooter",
     "weight":5.15
   },
   "old":{
     "weight":5.18,
   }
}
Note

For more information about the description of each field in the preceding example, see Maxwell.

The JSON message in the preceding example includes an update event of the products table. In the row whose value of id is 111, the value of weight is changed from 5.18 to 5.15. If the message is synchronized to a Kafka topic named products_binlog, you can execute the following DDL statement to use this topic and parse the change event.

CREATE TABLE topic_products (
  -- Make sure that the schema of the table that you create is the same as the schema of the products table in the MySQL database. 
  id BIGINT,
  name STRING,
  description STRING,
  weight DECIMAL(10, 2)
) WITH (
 'connector' = 'kafka',
 'topic' = 'products_binlog',
 'properties.bootstrap.servers' = 'localhost:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'maxwell-json'
)

After you create a Flink table based on the data of the Kafka topic, you can use Maxwell messages as the source of changelogs.

-- Create a real-time materialized view based on the products table in the MySQL database. 
-- Calculate the most recent average weight of the same products. 
SELECT name, AVG(weight) FROM topic_products GROUP BY name;

-- Synchronize all data and incremental changes of the products table in the MySQL database to the products index of Elasticsearch for subsequent queries. 
 
INSERT INTO elasticsearch_products
SELECT * FROM topic_products;

Parameters

Parameter

Required

Default value

Data type

Description

format

Yes

(none)

STRING

The format that you want to use. If you want to use Maxwell, set this parameter to maxwell-json.

maxwell-json.ignore-parse-errors

No

false

BOOLEAN

Valid values:

  • true: If parsing fails, the current field or row is skipped.

  • false: If parsing fails, an error is returned and the deployment fails to start. This is the default value.

maxwell-json.timestamp-format.standard

No

SQL

STRING

The formats of the input and output timestamps. Valid values:

  • SQL: The input timestamp in the yyyy-MM-dd HH:mm:ss.s{precision} format is parsed. For example, the input timestamp is 2020-12-30 12:13:14.123. The output timestamp is in the same format as the input timestamp.

  • ISO-8601: The input timestamp in the yyyy-MM-ddTHH:mm:ss.s{precision} format is parsed. For example, the input timestamp is 2020-12-30T12:13:14.123. The output timestamp is in the same format as the input timestamp.

maxwell-json.map-null-key.mode

No

FAIL

STRING

The method that is used to handle a null key value in the map. Valid values:

  • FAIL: An error is returned if a key value in the map is null.

  • DROP: Data whose key value is null in the map is discarded.

  • LITERAL: A string constant is used to replace a null key value in the map. The value of the string constant is specified by the maxwell-json.map-null-key.literal parameter.

maxwell-json.map-null-key.literal

No

null

STRING

If the maxwell-json.map-null-key.mode parameter is set to LITERAL, the specified string constant is used to replace the null key value in the map.

maxwell-json.encode.decimal-as-plain-number

No

false

BOOLEAN

Valid values:

  • true: All data of the DECIMAL type remains unchanged and is not expressed in the scientific notation format. For example, 0.000000027 is expressed as 0.000000027.

  • false: All data of the DECIMAL type is expressed in the scientific notation format. For example, 0.000000027 is expressed as 2.7E-8.

Data type mappings

Maxwell uses the JSON format for serialization and deserialization. For more information about data type mappings, see JSON Format.

Others

Available metadata

The following format metadata fields can be declared as read-only (VIRTUAL) columns in DDL statements.

Important

Format metadata fields are available only if the related connector forwards format metadata. Only the Kafka connector can declare the metadata field that is used to specify the value format.

Key

Data type

Description

database

STRING NULL

The source database. This field corresponds to the database field in Maxwell records.

table

STRING NULL

The table of the source database. This field corresponds to the table field in Maxwell records.

primary-key-columns

ARRAY<STRING> NULL

The array of primary key names. This field corresponds to the primary_key_columns field in Maxwell records.

ingestion-timestamp

TIMESTAMP_LTZ(3) NULL

The timestamp at which the connector processes the event. This field corresponds to the ts field in Maxwell records.

The following sample code provides an example on how to access Maxwell metadata fields in Kafka.

CREATE TABLE KafkaTable (
  origin_database STRING METADATA FROM 'value.database' VIRTUAL,
  origin_table STRING METADATA FROM 'value.table' VIRTUAL,
  origin_primary_key_columns ARRAY<STRING> METADATA FROM 'value.primary-key-columns' VIRTUAL,
  origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
  user_id BIGINT,
  item_id BIGINT,
  behavior STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_behavior',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'value.format' = 'maxwell-json'
);

FAQ

What do I do if duplicate change events are delivered when a fault occurs?

In most cases, Maxwell can use the exactly-once semantics to deliver each change event, and Flink can consume the change events that are generated by Maxwell as expected. If a fault occurs, Maxwell can ensure only the at-least-once semantics for delivery. In this case, Maxwell may deliver duplicate change events to Kafka. As a result, Flink may obtain duplicate change events when it consumes data from Kafka. This may lead to incorrect results or unexpected exceptions in Flink queries. To avoid this issue, we recommend that you set the deployment parameter table.exec.source.cdc-events-duplicate to true and define the primary key on the source. This way, Flink can generate an additional stateful operator. The primary key is used to deduplicate change events and generate a normalized changelog stream.