This tutorial describes how to synchronize data from a MySQL database to Message Queue for Apache Kafka through a source connector of Kafka Connect.
Ensure that you have completed the following operations:
Step 1: Configure Kafka Connect
- Decompress the downloaded MySQL source connector package to the specified directory.
- In the configuration file connect-distributed.properties of Kafka Connect, configure the plug-in installation path.
In Kafka Connect 0.10.2.2 and earlier, the configuration of plugin.path is not supported, and you need to specify the plug-in path in CLASSPATH.
Step 2: Start Kafka Connect
After connect-distributed.properties is configured, run the following command to start Kafka Connect:
- Access from VPC
Run the following command to start Kafka Connect:
Step 3: Install MySQL
- Download docker-compose-mysql.yaml.
- Run the following command to install MySQL:
export DEBEZIUM_VERSION=0.5 docker-compose -f docker-compose-mysql.yaml up
Step 4: Configure MySQL
- Run the following command to enable binary logging for MySQL and set the binary logging
mode to row.
[mysqld] log-bin=mysql-bin binlog-format=ROW server_id=1
- Run the following command to grant permissions to a role in the User table of MySQL.
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *. * TO 'debezium' IDENTIFIED BY 'dbz';Note In this example, the permission is granted to debezium in the User table of MySQL and the password is dbz.
Step 5: Start the MySQL source connector
- Download register-mysql.json.
- Edit register-mysql.json.
- Access from VPC
## The endpoint of the Message Queue for Apache Kafka instance, which can be obtained in the console. ## The default endpoint that you obtained in the console. "database.history.kafka.bootstrap.servers" : "kafka:9092", ## You need to create a topic with this name in the console in advance. In this example, create a server1 topic. ## All table change data is recorded in the server1. $DATABASE. for example, the server1.inventory.products topic. ## Therefore, you must create all the related topics in the console in advance. "database.server.name": "server1", ## Schema changes are recorded in this topic. ## You must create this topic in the console in advance. "database.history.kafka.topic": "schema-changes-inventory"
- Access from VPC
- After configuring register-mysql.json, you must create the corresponding topics in
the console according to the configuration. For more information about the steps,
see Step 1: Create a topic.
NoteIn the MySQL installed according to this tutorial, you can see that database:inventory has been created in advance. The database contains the following tables:
According to the configuration in register-mysql.json, schema, change information needs to be stored in schema-changes-testDB. Therefore, you need to create the schema-changes-inventory topic by calling the CreateTopic API operation. For more information about how to create a topic by calling the CreateTopic API operation, see CreateTopic.
- Run the following command to start the MySQL source connector.
> curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-mysql.json
Follow the following steps to check whether Message Queue for Apache Kafka can receive change data from MySQL.
- Modify the data of the table in MySQL.
- In the console, click Message Query and then query the change data on the page that appears.