すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for Kafka:Canal を使用して MySQL データベースから Message Queue for Apache Kafka にデータを同期する

最終更新日:Mar 19, 2025

このトピックでは、Canal を使用して MySQL データベースから ApsaraMQ for Kafka にデータを同期する方法について説明します。

背景情報

Canal は、MySQL データベースの増分データを解析し、増分データをサブスクライブおよびコンシュームできるようにします。Canal はセカンダリ MySQL データベースの役割を担い、プライマリ MySQL データベースにダンプ要求を開始します。プライマリ MySQL データベースは、ダンプ要求を受信すると、バイナリログを Canal にプッシュします。Canal はバイナリログを解析し、増分データを取得してから、増分データを同期します。Canal は ApsaraMQ for Kafka に接続し、MySQL データベースの増分データを分析のために ApsaraMQ for Kafka に書き込むことができます。Canal の仕組みの詳細については、GitHub の Canal ドキュメントをご参照ください。Background information

前提条件

Canal を使用してデータを同期する前に、以下の条件が満たされていることを確認してください。

手順

  1. Canal パッケージをダウンロードします。このトピックの例では、V1.1.5 を使用します。

  2. 次のコマンドを実行してディレクトリを作成します。この例では、/home/doc/tools/canal.deployer-1.1.5 ディレクトリが作成されます。

    mkdir -p /home/doc/tools/canal.deployer-1.1.5
  3. Canal パッケージ/home/doc/tools/canal.deployer-1.1.5 ディレクトリにコピーし、パッケージを解凍します。

    tar -zxvf canal.deployer-1.1.5-SNAPSHOT.tar.gz -C /home/doc/tools/canal.deployer-1.1.5
  4. /home/doc/tools/canal.deployer-1.1.5 ディレクトリで、次のコマンドを実行して instance.properties ファイルを変更します。

    vi conf/example/instance.properties

    instance.properties ファイルのパラメーター テーブルの説明に基づいてパラメーターを設定します。

    # MySQL データベースの情報に基づいて、以下のパラメーターを設定します。
    #################################################
    ...
    # データベースの URL。
    canal.instance.master.address=192.168.XX.XX:3306
    # データベースへの接続に使用するユーザー名とパスワード。
    ...
    canal.instance.dbUsername=****
    canal.instance.dbPassword=****
    ...
    # mq config
    # Message Queue for Apache Kafka コンソールで作成したトピック。
    canal.mq.topic=mysql_test
    # データベース名またはテーブル名に基づいて動的トピックを指定します。
    #canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\\..*,.*\\..*
    # データを同期する Message Queue for Apache Kafka トピックのパーティション。
    canal.mq.partition=0
    # 以下の 2 つのパラメーターは、canal.mq.partition パラメーターと一緒に設定することはできません。以下の 2 つのパラメーターを設定すると、データは Message Queue for Apache Kafka トピックの異なるパーティションに同期されます。
    #canal.mq.partitionsNum=3
    # データベース名.テーブル名:一意の主キー。複数のテーブル設定はコンマ (,) で区切ります。
    #canal.mq.partitionHash=mytest.person:id,mytest.role:id
    #################################################

    表 1. instance.properties ファイルのパラメーター

    パラメーター

    必須

    説明

    canal.instance.master.address

    はい

    MySQL データベースの URL。

    canal.instance.dbUsername

    はい

    MySQL データベースへの接続に使用するユーザー名。

    canal.instance.dbPassword

    はい

    MySQL データベースへの接続に使用するパスワード。

    canal.mq.topic

    はい

    ApsaraMQ for Kafka インスタンストピック。トピック管理ApsaraMQ for Kafka コンソールの [トピック] ページでトピックを作成できます。詳細については、手順 3: リソースの作成をご参照ください。

    canal.mq.dynamicTopic

    いいえ

    動的トピックの照合に使用される正規表現。正規表現を指定すると、Canal は式を評価して、MySQL データベースの異なるテーブルのデータを Message Queue for Apache Kafka インスタンスの異なるトピックに同期します。詳細については、パラメーターの説明をご参照ください。

    canal.mq.partition

    いいえ

    データベースデータを同期する ApsaraMQ for Kafka トピックのパーティション。

    canal.mq.partitionsNum

    いいえ

    トピックのパーティション数。このパラメーターは、canal.mq.partitionHash パラメーターと一緒に使用して、ApsaraMQ for Kafka トピックの異なるパーティションにデータを同期します。

    canal.mq.partitionHash

    いいえ

    パーティションの照合に使用される正規表現。詳細については、パラメーターの説明をご参照ください。

  5. 次のコマンドを実行して canal.properties ファイルを開きます。

    vi conf/canal.properties

    canal.properties ファイルのパラメーター テーブルの説明に基づいてパラメーターを設定します。

    • クライアントをインターネット経由で ApsaraMQ for Kafka に接続する場合、認証と暗号化に SASL_SSL プロトコルが使用され、Secure Sockets Layer (SSL) エンドポイントが必要になります。エンドポイントの詳細については、エンドポイント間の比較をご参照ください。

      # ...
      # 値を kafka に設定します。
      canal.serverMode = kafka
      # ...
      # Message Queue for Apache Kafka の設定を構成します。
      # Message Queue for Apache Kafka インスタンスの SSL エンドポイント。エンドポイントは、Message Queue for Apache Kafka コンソールの [インスタンスの詳細] ページで取得できます。
      kafka.bootstrap.servers = alikafka-pre-cn-zv**********-1.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv**********-2.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv**********-3.alikafka.aliyuncs.com:9093
      # 必要に応じてパラメーターを設定するか、以下のデフォルト設定を保持します。
      kafka.acks = all
      kafka.compression.type = none
      kafka.batch.size = 16384
      kafka.linger.ms = 1
      kafka.max.request.size = 1048576
      kafka.buffer.memory = 33554432
      kafka.max.in.flight.requests.per.connection = 1
      kafka.retries = 0
      
      # クライアントがインターネット経由で Message Queue for Apache Kafka に接続する場合、認証と暗号化に SASL_SSL プロトコルが使用されます。ネットワークプロトコルと ID 認証メカニズムを指定する必要があります。
      kafka.ssl.truststore.location= ../conf/kafka_client_truststore_jks
      kafka.ssl.truststore.password= KafkaOnsClient
      kafka.security.protocol= SASL_SSL
      kafka.sasl.mechanism = PLAIN
      kafka.ssl.endpoint.identification.algorithm =

      表 2. canal.properties ファイルのパラメーター

      パラメーター

      必須

      説明

      canal.serverMode

      はい

      Canal のサーバータイプ。このパラメーターを kafka に設定します。

      kafka.bootstrap.servers

      はい

      ApsaraMQ for Kafka インスタンスのエンドポイント。アクセスポイント情報インスタンスの詳細ApsaraMQ for Kafka コンソールの ページの セクションでエンドポイントを取得できます。

      kafka.ssl.truststore.location

      はい

      ルート SSL 証明書 kafka.client.truststore.jks の保存パス。

      説明

      クライアントがインターネット経由で Message Queue for Apache Kafka に接続する場合、メッセージ送信のセキュリティを確保するために認証と暗号化が必要です。つまり、SSL エンドポイントを使用して Message Queue for Apache Kafka に接続し、SASL_SSL プロトコルを使用して認証と暗号化を行う必要があります。詳細については、エンドポイント間の比較をご参照ください。

      kafka.acks

      はい

      ブローカーがメッセージを受信した後、クライアントが ApsaraMQ for Kafka ブローカーから受信できる確認応答 (ACK) のレベル。有効な値:

      • 0: クライアントはブローカーからの ACK を待機しません。

      • 1: リーダーがメッセージを受信した後、クライアントは ACK を受信します。リーダーはメッセージをログに書き込みますが、すべてのフォロワーからの完全な ACK を待たずに応答します。

      • all: すべての同期レプリカがメッセージを受信した後、クライアントは ACK を受信します。リーダーは、同期レプリカの完全なセットがメッセージを確認応答するのを待機します。

      kafka.compression.type

      はい

      データの圧縮に使用されるアルゴリズム。デフォルトでは、データは圧縮されません。有効な値:

      • none

      • gzip

      • snappy

      kafka.batch.size

      はい

      クライアントが送信するバッチに累積されるメッセージの最大サイズ。単位: バイト。

      このパラメーターは、バッチで送信できる最大バイト数を指定します。クライアントがブローカーにリクエストを送信するたびに、データはバッチに分散されます。これにより、送信されるリクエストの数が削減されます。バッチサイズが小さいとスループットが低下する可能性があり、バッチサイズが大きいとメモリスペースの無駄が多くなる可能性があります。このパラメーターを設定すると、メッセージ処理用に固定サイズのバッファーが割り当てられます。これは、クライアントとブローカーの両方のパフォーマンスを向上させるのに役立ちます。

      説明

      kafka.batch.size パラメーターと kafka.linger.ms パラメーターは、バッチメッセージ処理を構成するために使用されます。バッチのサイズまたはクライアントがバッチにメッセージが累積されるのを待機する時間が指定されたしきい値を超えると、バッチ内のメッセージは送信準備完了になります。

      kafka.linger.ms

      はい

      クライアントがバッチにメッセージが累積されるのを待機する最大時間。単位: ミリ秒。

      クライアントは、待機時間が指定された値に達すると、ブローカーにリクエストを送信します。これは、バッチメッセージ処理を促進し、送信されるリクエストの数を削減します。

      kafka.max.request.size

      はい

      クライアントから送信されるリクエストの最大バイト数。

      kafka.buffer.memory

      はい

      クライアントがブローカーへの送信を待機しているメッセージをバッファリングするために使用できるメモリの合計サイズ。

      kafka.max.in.flight.requests.per.connection

      はい

      クライアントが単一の接続で送信できる未確認のリクエストの数。このパラメーターを 1 に設定すると、クライアントは、ブローカーがリクエストに応答する前に、同じブローカーにリクエストを送信できません。

      kafka.retries

      はい

      メッセージの送信に失敗した場合にクライアントがメッセージを再送信するかどうかを指定します。このパラメーターを 0 に設定すると、メッセージの送信に失敗した場合、クライアントはメッセージを再送信しません。このパラメーターを 0 より大きい値に設定すると、メッセージの送信に失敗した場合、クライアントはメッセージを再送信します。

      kafka.ssl.truststore.password

      はい

      ルート SSL 証明書のトラストストアのパスワード。このパラメーターを KafkaOnsClient に設定します。

      kafka.security.protocol

      はい

      認証と暗号化に SASL_SSL プロトコルを使用する場合は、このパラメーターを SASL_SSL に設定します。

      kafka.sasl.mechanism

      はい

      ID 認証に使用される Simple Authentication and Security Layer (SASL) メカニズム。SSL エンドポイントを使用して Message Queue for Apache Kafka に接続する場合は、このパラメーターを PLAIN に設定します。

      クライアントがインターネット経由で Message Queue for Apache Kafka に接続する場合、ID 認証に SASL が使用されます。bin/startup.sh ファイルで環境変数を構成し、kafka_client_producer_jaas.conf ファイルで ApsaraMQ for Kafka インスタンスの SASL ユーザーのユーザー名とパスワードを指定する必要があります。

      1. vi bin/startup.sh コマンドを実行し、startup.sh ファイルで環境変数を構成します。

        JAVA_OPTS=" $JAVA_OPTS -Djava.awt.headless=true -Djava.net.preferIPv4Stack=true -Dfile.encoding=UTF-8 -Djava.security.auth.login.config=/home/doc/tools/canal.deployer-1.1.5/conf/kafka_client_jaas.conf"
      2. vi conf/kafka_client_producer_jaas.conf コマンドを実行し、kafka_client_producer_jaas.conf ファイルで Message Queue for Apache Kafka インスタンスの SASL ユーザーのユーザー名とパスワードを指定します。

        説明
        • Message Queue for Apache Kafka インスタンスでアクセス制御リスト (ACL) 機能が無効になっている場合、インスタンスの詳細ApsaraMQ for Kafka コンソールの ページで SASL ユーザーのデフォルトのユーザー名とパスワードを取得できます。

        • Message Queue for Apache Kafka インスタンスで ACL 機能が有効になっている場合、使用する SASL ユーザーが PLAIN タイプであり、ユーザーがメッセージを送受信する権限を持っていることを確認してください。詳細については、SASL ユーザーに権限を付与するをご参照ください。

        KafkaClient {  org.apache.kafka.common.security.plain.PlainLoginModule required
                       username="SASL ユーザーのユーザー名"  
                       password="SASL ユーザーのパスワード";
        };
    • クライアントが仮想プライベートクラウド (VPC) 内の Message Queue for Apache Kafka に接続する場合、認証と暗号化は不要です。メッセージの送信には PLAINTEXT プロトコルが使用され、ApsaraMQ for Kafka への接続にはデフォルトエンドポイントが使用されます。この場合、canal.serverMode パラメーターと kafka.bootstrap.servers パラメーターを設定するだけで済みます。エンドポイントの詳細については、エンドポイント間の比較をご参照ください。

      # ...
      # 値を kafka に設定します。
      canal.serverMode = kafka
      # ...
      # Message Queue for Apache Kafka の設定を構成します。
      # Message Queue for Apache Kafka インスタンスのデフォルトエンドポイント。エンドポイントは、Message Queue for Apache Kafka コンソールの [インスタンスの詳細] ページで取得できます。
      kafka.bootstrap.servers = alikafka-pre-cn-zv**********-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-3-vpc.alikafka.aliyuncs.com:9092
      # 必要に応じてパラメーターを設定するか、以下のデフォルト設定を保持します。
      kafka.acks = all
      kafka.compression.type = none
      kafka.batch.size = 16384
      kafka.linger.ms = 1
      kafka.max.request.size = 1048576
      kafka.buffer.memory = 33554432
      kafka.max.in.flight.requests.per.connection = 1
      kafka.retries = 0
  6. /home/doc/tools/canal.deployer-1.1.5 ディレクトリで、次のコマンドを実行して Canal を起動します。

    sh bin/startup.sh
    • ログファイル /home/doc/tools/canal.deployer-1.1.5/logs/canal/canal.log を確認して、Canal が ApsaraMQ for Kafka に接続され、Canal が実行されていることを確認します。

      2013-02-05 22:45:27.967 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## canal サーバーを起動します。
      2013-02-05 22:45:28.113 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## canal サーバー [10.1.XX.XX:11111] を起動します。
      2013-02-05 22:45:28.210 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## canal サーバーが実行中です ......
    • ログファイル /home/doc/tools/canal.deployer-1.1.5/logs/example/example.log を確認して、CanalInstance オブジェクトが起動されていることを確認します。

      2013-02-05 22:50:45.636 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - クラスパスリソース [canal.properties] からプロパティファイルをロードしています
      2013-02-05 22:50:45.641 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - クラスパスリソース [example/instance.properties] からプロパティファイルをロードしています
      2013-02-05 22:50:45.803 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - 1-example の CannalInstance を起動しています
      2013-02-05 22:50:45.810 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - 起動に成功しました....

構成をテストする

Canal の起動後、データ同期テストを実行します。

  1. mysql という名前の MySQL データベースで、T_Student という名前のテーブルを作成します。次のサンプルコードは、テーブル内のデータのクエリ方法の例を示しています。

    mysql> select * from T_Student;
    +--------+---------+------+------+
    | stuNum | stuName | age  | sex  |
    +--------+---------+------+------+
    |      1 | Wang    |   18 | girl |
    |      2 | Zhang    |   17 | boy |
    +--------+---------+------+------+
    2 rows in set (0.00 sec)

    ログファイル /home/doc/tools/canal.deployer-1.1.5/logs/example/meta.log を確認します。データベースで追加、削除、または更新操作が実行されるたびに、meta.log ファイルにレコードが生成されます。ログファイルを確認して、Canal がデータを収集したかどうかを確認します。

    tail -f example/meta.log
    2020-07-29 09:21:05.110 - clientId:1001 cursor:[log.000001,29723,1591190230000,1,] address[/192.168.XX.XX:3306]
    2020-07-29 09:23:46.109 - clientId:1001 cursor:[log.000001,30047,1595985825000,1,] address[localhost/192.168.XX.XX:3306]
    2020-07-29 09:24:50.547 - clientId:1001 cursor:[log.000001,30047,1595985825000,1,] address[/192.168.XX.XX:3306]
    2020-07-29 09:26:45.547 - clientId:1001 cursor:[log.000001,30143,1595986005000,1,] address[localhost/192.168.XX.XX:3306]
    2020-07-29 09:30:04.546 - clientId:1001 cursor:[log.000001,30467,1595986204000,1,] address[localhost/192.168.XX.XX:3306]
    2020-07-29 09:30:16.546 - clientId:1001 cursor:[log.000001,30734,1595986215000,1,] address[localhost/192.168.XX.XX:3306]
    2020-07-29 09:30:36.547 - clientId:1001 cursor:[log.000001,31001,1595986236000,1,] address[localhost/192.168.XX.XX:3306]
  2. ApsaraMQ for Kafka コンソールにログインし、メッセージをクエリして、MySQL データベースの増分データが ApsaraMQ for Kafka インスタンスに同期されているかどうかを確認します。コンソールでメッセージをクエリする方法の詳細については、メッセージのクエリをご参照ください。

    Message query

  3. データが同期された後、次のコマンドを実行して Canal を停止します。

    sh bin/stop.sh