All Products
Search
Document Center

ApsaraMQ for Kafka:Use Kafka Connect to synchronize data from a MySQL database to an ApsaraMQ for Kafka instance

Last Updated:Dec 28, 2023

This topic describes how to use a source connector of Kafka Connect to synchronize data from a MySQL database to an ApsaraMQ for Kafka instance.

Background information

Kafka Connect is used to import data streams to and export data streams from ApsaraMQ for Kafka. Kafka Connect uses various source connectors to import data from third-party systems to ApsaraMQ for Kafka brokers and various sink connectors to export data from ApsaraMQ for Kafka brokers to third-party systems.system

Prerequisites

Before you use a source connector of Kafka Connect to synchronize data, make sure that the following operations are performed:

  • The MySQL source connector package is downloaded.

    Note

    In the example of this topic, MySQL source connector V0.5.2 is used.

  • Kafka Connect is downloaded.

    Note

    In the example of this topic, Kafka Connect V0.10.2.2 is used.

  • Docker is installed.

Step 1: Configure Kafka Connect

  1. Decompress the downloaded MySQL source connector package to the specified directory.

  2. In the connect-distributed.properties configuration file of Kafka Connect, specify the installation path of the MySQL source connector.

    plugin.path=/kafka/connect/plugins
    Important

    In earlier versions of Kafka Connect, the plugin.path parameter is not supported. You must specify the path by using the CLASSPATH parameter.

    export CLASSPATH=/kafka/connect/plugins/mysql-connector/*

Step 2: Start Kafka Connect

After the connect-distributed.properties configuration file is configured, select one of the following methods to start Kafka Connect:

  • Access from the Internet

    1. Run the export KAFKA_OPTS="-Djava.security.auth.login.config=kafka_client_jaas.conf" command to configure java.security.auth.login.config.

    2. Run the bin/connect-distributed.sh config/connect-distributed.properties command to start Kafka Connect.

  • Access from a virtual private cloud (VPC)

    Run the bin/connect-distributed.sh config/connect-distributed.properties command to start Kafka Connect.

Step 3: Install MySQL

  1. Download the docker-compose-mysql.yaml file.

  2. Run the following commands to install MySQL:

    export DEBEZIUM_VERSION=0.5
    docker-compose -f docker-compose-mysql.yaml up

Step 4: Configure MySQL

  1. Add the following content to the configuration file to enable binary logging for MySQL and specify row as the binary logging mode:

    [mysqld]
    log-bin=mysql-bin
    binlog-format=ROW
    server_id=1 
  2. Run the following command to grant permissions to the MySQL user:

    GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium' IDENTIFIED BY 'dbz';
    Note

    In this example, the name of the MySQL user is debezium and the password is dbz.

Step 5: Start the MySQL source connector

  1. Download the register-mysql.json file.

  2. Configure the register-mysql.json file.

    • Access from a VPC

      ## The endpoint of the ApsaraMQ for Kafka instance. You can obtain the endpoint in the ApsaraMQ for Kafka console. 
      ## The default endpoint that you obtained in the ApsaraMQ for Kafka console. 
      "database.history.kafka.bootstrap.servers" : "kafka:9092",
      ## You must create a topic that has the same name as the specified topic in the MySQL database in the ApsaraMQ for Kafka console in advance. In this example, a topic named server1 is created. 
      ## All table changes are recorded in topics named in the format of server1.$DATABASE.$TABLE, such as server1.inventory.products. 
      ## Therefore, you must create all related topics in the ApsaraMQ for Kafka console in advance. 
      "database.server.name": "server1",
      ## Schema changes are recorded in this topic. 
      ## You must create this topic in the ApsaraMQ for Kafka console in advance. 
      "database.history.kafka.topic": "schema-changes-inventory"
    • Access from the Internet

      ## The endpoint of the ApsaraMQ for Kafka instance. You can obtain the endpoint in the ApsaraMQ for Kafka console. Schema changes of the database are stored on the ApsaraMQ for Kafka broker. 
      ## The SSL endpoint that you obtained in the ApsaraMQ for Kafka console. 
      "database.history.kafka.bootstrap.servers" : "kafka:9092",
      ## You must create a topic that has the same name as the specified topic in the MySQL database in the ApsaraMQ for Kafka console in advance. In this example, a topic named server1 is created. 
      ## All table changes are recorded in topics named in the format of server1.$DATABASE.$TABLE, such as server1.testDB.products. 
      ## Therefore, you must create all related topics in the ApsaraMQ for Kafka console in advance. 
      "database.server.name": "server1",
      ## Schema changes are recorded in this topic. 
      ## You must create this topic in the ApsaraMQ for Kafka console in advance. 
      "database.history.kafka.topic": "schema-changes-inventory",
      ## Specify the following configurations to enable SSL-based Internet access: 
      "database.history.producer.ssl.truststore.location": "kafka.client.truststore.jks",
      "database.history.producer.ssl.truststore.password": "KafkaOnsClient",
      "database.history.producer.security.protocol": "SASL_SSL",
      "database.history.producer.sasl.mechanism": "PLAIN",
      "database.history.consumer.ssl.truststore.location": "kafka.client.truststore.jks",
      "database.history.consumer.ssl.truststore.password": "KafkaOnsClient",
      "database.history.consumer.security.protocol": "SASL_SSL",
      "database.history.consumer.sasl.mechanism": "PLAIN",
  3. After you configure the register-mysql.json file, you must create the related topics in the ApsaraMQ for Kafka console based on the configurations. For more information, see Step 1: Create a topic.

    In the example of this topic, the database:inventory database is created in advance in MySQL. The database contains the following tables:

    • customers

    • orders

    • products

    • products_on_hand

    You must create the following topics by calling the CreateTopic operation based on the preceding configurations:

    • server1

    • server1.inventory.customers

    • server1.inventory.orders

    • server1.inventory.products

    • server1.inventory.products_on_hand

    Based on the configurations in the register-mysql.json file, schema changes need to be stored in schema-changes-testDB. Therefore, you must create the schema-changes-inventory topic by calling the CreateTopic operation. For information about how to create a topic by calling the CreateTopic operation, see CreateTopic.

  4. Run the following command to start the MySQL source connector:

    curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-mysql.json

Verify the result

Perform the following steps to check whether ApsaraMQ for Kafka can receive data from MySQL:

  1. Modify the data of a table in MySQL.

  2. Log on to the ApsaraMQ for Kafka console. On the Message Query page, query the changed data.