This topic describes how to synchronize data from a MySQL database to Message Queue for Apache Kafka by using a source connector of Kafka Connect.

Background information

Kafka Connect is used to import data streams to and export data streams from Message Queue for Apache Kafka. Kafka Connect uses various source connectors to import data from third-party systems to Message Queue for Apache Kafka brokers, and uses various sink connectors to export data from Message Queue for Apache Kafka brokers to third-party systems. system

Prerequisites

Before you begin, make sure that the following requirements are met:

  • A MySQL source connector is downloaded.
    Note In this topic, MySQL source connector V0.5.2 is used as an example.
  • Kafka Connect is downloaded.
    Note In this topic, Kafka Connect V0.10.2.2 is used as an example.
  • Docker is installed.

Step 1: Configure Kafka Connect

  1. Decompress the downloaded MySQL source connector package to the specified directory.
  2. In the connect-distributed.properties configuration file of Kafka Connect, specify the installation path of the MySQL source connector.
    plugin.path=/kafka/connect/plugins
    Notice

    In earlier versions of Kafka Connect, the plugin.path parameter is not supported. You must specify the path by using the CLASSPATH parameter.

    export CLASSPATH=/kafka/connect/plugins/mysql-connector/*

Step 2: Start Kafka Connect

After the connect-distributed.properties configuration file is configured, run the following commands as required to start Kafka Connect:

  • Access from the Internet
    1. Run the export KAFKA_OPTS="-Djava.security.auth.login.config=kafka_client_jaas.conf" commend to configure java.security.auth.login.config.
    2. Run the bin/connect-distributed.sh config/connect-distributed.properties command to start Kafka Connect.
  • Access from a virtual private cloud (VPC)

    Run the bin/connect-distributed.sh config/connect-distributed.properties command to start Kafka Connect.

Step 3: Install MySQL

  1. Download the docker-compose-mysql.yaml file.
  2. Run the following commands to install MySQL:
    export DEBEZIUM_VERSION=0.5
    docker-compose -f docker-compose-mysql.yaml up

Step 4: Configure MySQL

  1. Run the following commands to enable binary logging for MySQL and set the binary logging mode to row:
    [mysqld]
    log-bin=mysql-bin
    binlog-format=ROW
    server_id=1 
  2. Run the following command to grant permissions to the MySQL user:
    GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium' IDENTIFIED BY 'dbz';
    Note In this example, the name of the MySQL user is debezium and the password is dbz.

Step 5: Start the MySQL source connector

  1. Download the register-mysql.json file.
  2. Configure the register-mysql.json file.
    • Access from a VPC
      ## Specify a Message Queue for Apache Kafka endpoint, which can be obtained in the Message Queue for Apache Kafka console. 
      ## The default endpoint that you obtained in the Message Queue for Apache Kafka console. 
      "database.history.kafka.bootstrap.servers" : "kafka:9092",
      ## You must create a topic in the Message Queue for Apache Kafka console with the same name as the specified topic in the MySQL database in advance. In this example, create a topic named server1. 
      ## All table changes are recorded in topics named in the format of server1.$DATABASE.$TABLE, such as server1.inventory.products. 
      ## Therefore, you must create all related topics in the Message Queue for Apache Kafka console in advance. 
      "database.server.name": "server1",
      ## Schema changes are recorded in this topic. 
      ## You must create this topic in the Message Queue for Apache Kafka console in advance. 
      "database.history.kafka.topic": "schema-changes-inventory"
    • Access from the Internet
      ## Specify the Message Queue for Apache Kafka endpoint, which can be obtained in the Message Queue for Apache Kafka console. Synchronize the schema changes of the database to Message Queue for Apache Kafka. 
      ## The SSL endpoint that you obtained in the Message Queue for Apache Kafka console. 
      "database.history.kafka.bootstrap.servers" : "kafka:9092",
      ## You must create a topic in the Message Queue for Apache Kafka console with the same name as the specified topic in the MySQL database in advance. In this example, create a topic named server1. 
      ## All table changes are recorded in topics named in the format of server1.$DATABASE.$TABLE, such as server1.testDB.products. 
      ## Therefore, you must create all related topics in the Message Queue for Apache Kafka console in advance. 
      "database.server.name": "server1",
      ## Schema changes are recorded in this topic. 
      ## You must create this topic in the Message Queue for Apache Kafka console in advance. 
      "database.history.kafka.topic": "schema-changes-inventory",
      ## Specify the following configurations to enable SSL-based Internet access: 
      "database.history.producer.ssl.truststore.location": "kafka.client.truststore.jks",
      "database.history.producer.ssl.truststore.password": "KafkaOnsClient",
      "database.history.producer.security.protocol": "SASL_SSL",
      "database.history.producer.sasl.mechanism": "PLAIN",
      "database.history.consumer.ssl.truststore.location": "kafka.client.truststore.jks",
      "database.history.consumer.ssl.truststore.password": "KafkaOnsClient",
      "database.history.consumer.security.protocol": "SASL_SSL",
      "database.history.consumer.sasl.mechanism": "PLAIN",
  3. After you configure the register-mysql.json file, you must create the related topics in the Message Queue for Apache Kafka console based on the configurations. For more information about the operations, see Step 1: Create a topic.
    In the example of this topic, the database:inventory database is created in advance in MySQL. The database contains the following tables:
    • customers
    • orders
    • products
    • products_on_hand
    You must create the following topics by calling the CreateTopic operation based on the preceding configurations:
    • server1
    • server1.inventory.customers
    • server1.inventory.orders
    • server1.inventory.products
    • server1.inventory.products_on_hand

    As configured in the register-mysql.json file, schema changes need to be stored in schema-changes-testDB. Therefore, you must create the schema-changes-inventory topic by calling the CreateTopic operation. For more information about how to create a topic by calling the CreateTopic operation, see CreateTopic.

  4. Run the following command to start the MySQL source connector:
    curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-mysql.json

Verify the results

Perform the following steps to check whether Message Queue for Apache Kafka can receive data from MySQL.

  1. Modify the data of a table in MySQL.
  2. Log on to the Message Queue for Apache Kafka console. On the Message Query page, query the table change data.