This tutorial describes how to synchronize data from a MySQL database to Message Queue for Apache Kafka through a source connector of Kafka Connect.

Background information

Kafka Connect is used to import data streams to and export data streams from Message Queue for Apache Kafka. In Kafka Connect, data is imported from third-party systems to a Kafka broker through various implementations of source connectors, and exported from the Kafka broker to the third-party systems through various implementations of sink connectors.

Prerequisites

Ensure that you have completed the following operations:

  • You have downloaded a MySQL source connector.
    Note In this tutorial, MySQL connector 0.5.2 is used as an example.
  • You have downloaded Kafka Connect.
    Note In this tutorial, Kafka Connect 0.10.2.2 is used as an example.
  • You have installed Docker.

Step 1: Configure Kafka Connect

  1. Decompress the downloaded MySQL source connector package to the specified directory.
  2. In the configuration file connect-distributed.properties of Kafka Connect, configure the plug-in installation path.
    plugin.path=/kafka/connect/plugins
    Notice

    In Kafka Connect 0.10.2.2 and earlier, the configuration of plugin.path is not supported, and you need to specify the plug-in path in CLASSPATH.

    export CLASSPATH=/kafka/connect/plugins/mysql-connector/*

Step 2: Start Kafka Connect

After connect-distributed.properties is configured, run the following command to start Kafka Connect:

  • Access from VPC

    Run the following command to start Kafka Connect: bin/connect-distributed.sh config/connect-distributed.properties.

Step 3: Install MySQL

  1. Download docker-compose-mysql.yaml.
  2. Run the following command to install MySQL:
    export DEBEZIUM_VERSION=0.5 docker-compose -f docker-compose-mysql.yaml up

Step 4: Configure MySQL

  1. Run the following command to enable binary logging for MySQL and set the binary logging mode to row.
    [mysqld] log-bin=mysql-bin binlog-format=ROW server_id=1 
  2. Run the following command to grant permissions to a role in the User table of MySQL.
    GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *. * TO 'debezium' IDENTIFIED BY 'dbz';
    Note In this example, the permission is granted to debezium in the User table of MySQL and the password is dbz.

Step 5: Start the MySQL source connector

  1. Download register-mysql.json.
  2. Edit register-mysql.json.
    • Access from VPC
      ## The endpoint of the Message Queue for Apache Kafka instance, which can be obtained in the console. ## The default endpoint that you obtained in the console. "database.history.kafka.bootstrap.servers" : "kafka:9092", ## You need to create a topic with this name in the console in advance. In this example, create a server1 topic. ## All table change data is recorded in the server1. $DATABASE. for example, the server1.inventory.products topic. ## Therefore, you must create all the related topics in the console in advance. "database.server.name": "server1", ## Schema changes are recorded in this topic. ## You must create this topic in the console in advance. "database.history.kafka.topic": "schema-changes-inventory"
  3. After configuring register-mysql.json, you must create the corresponding topics in the console according to the configuration. For more information about the steps, see Step 1: Create a topic.
    Note
    In the MySQL installed according to this tutorial, you can see that database:inventory has been created in advance. The database contains the following tables:
    • customers
    • orders
    • products
    • products_on_hand
    According to the preceding configuration, you need to create the following topics by calling the CreateTopic API operation:
    • server1
    • server1.inventory.customers
    • server1.inventory.orders
    • server1.inventory.products
    • server1.inventory.products_on_hand

    According to the configuration in register-mysql.json, schema, change information needs to be stored in schema-changes-testDB. Therefore, you need to create the schema-changes-inventory topic by calling the CreateTopic API operation. For more information about how to create a topic by calling the CreateTopic API operation, see CreateTopic.

  4. Run the following command to start the MySQL source connector.
    > curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-mysql.json

Verification

Follow the following steps to check whether Message Queue for Apache Kafka can receive change data from MySQL.

  1. Modify the data of the table in MySQL.
  2. In the console, click Message Query and then query the change data on the page that appears.