すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for Kafka:MySQL データベースから ApsaraMQ for Kafka インスタンスにデータを同期するために Kafka Connect を使用する

最終更新日:Jan 11, 2025

このトピックでは、Kafka Connect のソースコネクタを使用して、MySQL データベースから ApsaraMQ for Kafka インスタンスにデータを同期する方法について説明します。

背景情報

Kafka Connect は、ApsaraMQ for Kafka にデータストリームをインポートしたり、ApsaraMQ for Kafka からデータストリームをエクスポートしたりするために使用されます。 Kafka Connect は、さまざまなソースコネクタを使用してサードパーティシステムから ApsaraMQ for Kafka ブローカーにデータをインポートし、さまざまなシンクコネクタを使用して ApsaraMQ for Kafka ブローカーからサードパーティシステムにデータをエクスポートします。system

前提条件

Kafka Connect のソースコネクタを使用してデータを同期する前に、以下の操作が実行されていることを確認してください。

  • MySQL ソースコネクタパッケージがダウンロードされていること。

    説明

    このトピックの例では、MySQL ソースコネクタ V0.5.2 を使用しています。

  • Kafka Connect がダウンロードされていること。

    説明

    このトピックの例では、Kafka Connect V0.10.2.2 を使用しています。

  • Docker がインストールされていること。

手順 1:Kafka Connect を構成する

  1. ダウンロードした MySQL ソースコネクタパッケージを指定したディレクトリに解凍します。

  2. Kafka Connect の connect-distributed.properties 構成ファイルで、MySQL ソースコネクタのインストールパスを指定します。

    plugin.path=/kafka/connect/plugins
    重要

    Kafka Connect の以前のバージョンでは、plugin.path パラメーターはサポートされていません。 CLASSPATH パラメーターを使用してパスを指定する必要があります。

    export CLASSPATH=/kafka/connect/plugins/mysql-connector/*

手順 2:Kafka Connect を起動する

connect-distributed.properties 構成ファイルが構成された後、次のいずれかの方法を選択して Kafka Connect を起動します。

  • インターネットからアクセスする場合

    1. export KAFKA_OPTS="-Djava.security.auth.login.config=kafka_client_jaas.conf" コマンドを実行して、java.security.auth.login.config を構成します。

    2. bin/connect-distributed.sh config/connect-distributed.properties コマンドを実行して、Kafka Connect を起動します。

  • 仮想プライベートクラウド (VPC) からアクセスする場合

    bin/connect-distributed.sh config/connect-distributed.properties コマンドを実行して、Kafka Connect を起動します。

手順 3:MySQL をインストールする

  1. docker-compose-mysql.yaml ファイルをダウンロードします。

  2. 次のコマンドを実行して MySQL をインストールします。

    export DEBEZIUM_VERSION=0.5
    docker-compose -f docker-compose-mysql.yaml up

手順 4:MySQL を構成する

  1. 構成ファイルに次のコンテンツを追加して、MySQL のバイナリロギングを有効にし、バイナリロギングモードとして row を指定します。

    [mysqld]
    log-bin=mysql-bin
    binlog-format=ROW
    server_id=1 
  2. 次のコマンドを実行して、MySQL ユーザーに権限を付与します。

    GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium' IDENTIFIED BY 'dbz';
    説明

    この例では、MySQL ユーザーの名前は debezium で、パスワードは dbz です。

手順 5:MySQL ソースコネクタを起動する

  1. register-mysql.json ファイルをダウンロードします。

  2. register-mysql.json ファイルを構成します。

    • VPC からアクセスする場合

      ## ApsaraMQ for Kafka インスタンスのエンドポイント。エンドポイントは ApsaraMQ for Kafka コンソールで取得できます。
      ## ApsaraMQ for Kafka コンソールで取得したデフォルトのエンドポイント。
      "database.history.kafka.bootstrap.servers" : "kafka:9092",
      ## あらかじめ ApsaraMQ for Kafka コンソールで、MySQL データベースで指定されたトピックと同じ名前のトピックを作成する必要があります。この例では、server1 という名前のトピックが作成されます。
      ## すべてのテーブルの変更は、server1.$DATABASE.$TABLE の形式の名前を持つトピックに記録されます (例: server1.inventory.products)。
      ## したがって、事前に ApsaraMQ for Kafka コンソールですべての関連トピックを作成する必要があります。
      "database.server.name": "server1",
      ## スキーマの変更はこのトピックに記録されます。
      ## このトピックは、事前に ApsaraMQ for Kafka コンソールで作成する必要があります。
      "database.history.kafka.topic": "schema-changes-inventory"
    • インターネットからアクセスする場合

      ## ApsaraMQ for Kafka インスタンスのエンドポイント。エンドポイントは ApsaraMQ for Kafka コンソールで取得できます。データベースのスキーマの変更は、ApsaraMQ for Kafka ブローカーに保存されます。
      ## ApsaraMQ for Kafka コンソールで取得した SSL エンドポイント。
      "database.history.kafka.bootstrap.servers" : "kafka:9092",
      ## あらかじめ ApsaraMQ for Kafka コンソールで、MySQL データベースで指定されたトピックと同じ名前のトピックを作成する必要があります。この例では、server1 という名前のトピックが作成されます。
      ## すべてのテーブルの変更は、server1.$DATABASE.$TABLE の形式の名前を持つトピックに記録されます (例: server1.testDB.products)。
      ## したがって、事前に ApsaraMQ for Kafka コンソールですべての関連トピックを作成する必要があります。
      "database.server.name": "server1",
      ## スキーマの変更はこのトピックに記録されます。
      ## このトピックは、事前に ApsaraMQ for Kafka コンソールで作成する必要があります。
      "database.history.kafka.topic": "schema-changes-inventory",
      ## SSL ベースのインターネットアクセスを有効にするには、次の構成を指定します。
      "database.history.producer.ssl.truststore.location": "kafka.client.truststore.jks",
      "database.history.producer.ssl.truststore.password": "KafkaOnsClient",
      "database.history.producer.security.protocol": "SASL_SSL",
      "database.history.producer.sasl.mechanism": "PLAIN",
      "database.history.consumer.ssl.truststore.location": "kafka.client.truststore.jks",
      "database.history.consumer.ssl.truststore.password": "KafkaOnsClient",
      "database.history.consumer.security.protocol": "SASL_SSL",
      "database.history.consumer.sasl.mechanism": "PLAIN",
  3. register-mysql.json ファイルを構成した後、構成に基づいて ApsaraMQ for Kafka コンソールで関連トピックを作成する必要があります。詳細については、「ステップ 1: トピックを作成する」をご参照ください。

    このトピックの例では、database:inventory データベースが MySQL に事前に作成されています。データベースには次のテーブルが含まれています。

    • customers

    • orders

    • products

    • products_on_hand

    上記の構成に基づいて、CreateTopic オペレーションを呼び出して次のトピックを作成する必要があります。

    • server1

    • server1.inventory.customers

    • server1.inventory.orders

    • server1.inventory.products

    • server1.inventory.products_on_hand

    register-mysql.json ファイルの構成に基づいて、スキーマの変更は schema-changes-testDB に保存する必要があります。したがって、CreateTopic オペレーションを呼び出して schema-changes-inventory トピックを作成する必要があります。 CreateTopic オペレーションを呼び出してトピックを作成する方法については、「CreateTopic」をご参照ください。

  4. 次のコマンドを実行して MySQL ソースコネクタを起動します。

    curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-mysql.json

結果の確認

ApsaraMQ for Kafka が MySQL からデータを受信できるかどうかを確認するには、次の手順を実行します。

  1. MySQL 内のテーブルのデータを変更します。

  2. ApsaraMQ for Kafka コンソールにログインします。 [メッセージクエリ] ページで、変更されたデータをクエリします。