本教程介绍如何使用Kafka Connect的Source Connector将MySQL的数据同步至消息队列Kafka版

背景信息

Kafka Connect主要用于将数据流输入和输出消息队列Kafka版。Kafka Connect主要通过各种Source Connector的实现,将数据从第三方系统输入到Kafka broker,通过各种Sink Connector实现,将数据从Kafka broker中导入到第三方系统。system

前提条件

在开始本教程前,请确保您已完成以下操作:

  • 已下载MySQL Source Connector。
    说明 本教程以0.5.2版本的MySQL Source Connector为例。
  • 已下载Kafka Connect。
    说明 本教程以0.10.2.2版本的Kafka Connect为例。
  • 已安装docker。

步骤一:配置Kafka Connect

  1. 将下载完成的MySQL Connector解压到指定目录。
  2. 在Kafka Connect的配置文件connect-distributed.properties中配置插件安装位置。
    plugin.path=/kafka/connect/plugins
    注意

    Kafka Connect的早期版本不支持配置plugin.path,您需要在CLASSPATH中指定插件位置。

    export CLASSPATH=/kafka/connect/plugins/mysql-connector/*

步骤二:启动Kafka Connect

在配置好connect-distributed.properties后,执行以下命令启动Kafka Connect。

  • VPC接入

    执行命令bin/connect-distributed.sh config/connect-distributed.properties启动Kafka Connect。

步骤三:安装MySQL

  1. 下载docker-compose-mysql.yaml
  2. 执行以下命令安装MySQL。
    export DEBEZIUM_VERSION=0.5
    docker-compose -f docker-compose-mysql.yaml up

步骤四:配置MySQL

  1. 执行以下命令开启MySQL的binlog写入功能,并配置binlog模式为row。
    [mysqld]
    log-bin=mysql-bin
    binlog-format=ROW
    server_id=1 
  2. 执行以下命令设置MySQL的User权限。
    GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium' IDENTIFIED BY 'dbz';
    说明 示例中MySQL的User为debezium,密码为dbz

步骤五:启动MySQL Connector

  1. 下载register-mysql.json
  2. 编辑register-mysql.json
    • VPC接入
      ## Kafka接入点,通过控制台获取
      ## 您在控制台获取的默认接入点
      "database.history.kafka.bootstrap.servers" : "kafka:9092",
      ## 需要提前在控制台创建同名Topic,在本例中创建topic:server1
      ## 所有Table的变更数据,会记录在server1.$DATABASE.$TABLE的Topic中,如 server1.inventory.products
      ## 因此用户需要提前在控制台中创建所有相关 Topic
      "database.server.name": "server1",
      ## 记录schema变化信息将记录在这个Topic中
      ## 需要提前在控制台创建
      "database.history.kafka.topic": "schema-changes-inventory"
  3. 配置好register-mysql.json后,您需要根据配置在控制台创建相应的Topic,相关操作步骤请参见步骤一:创建Topic
    按照本教程中的方式安装的MySQL,您可以看到MySQL中已经提前创建好了database:inventory。其中有四张表:
    • customers
    • orders
    • products
    • products_on_hand
    根据以上配置,您需要使用OpenAPI创建Topic:
    • server1
    • server1.inventory.customers
    • server1.inventory.orders
    • server1.inventory.products
    • server1.inventory.products_on_hand

    register-mysql.json中,配置了将schema变化信息记录在schema-changes-testDB,因此您还需要使用OpenAPI创建Topic:schema-changes-inventory。 使用OpenAPI创建Topic,请参见CreateTopic

  4. 执行以下命令启动MySQL Connector。
    > curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-mysql.json

结果验证

按照以下步骤操作确认消息队列Kafka版能否接收到MySQL的变更数据。

  1. 变更MySQL Table中的数据。
  2. 在控制台的消息查询页面,查询变更数据。