このトピックでは、Canal を使用して MySQL データベースから ApsaraMQ for Kafka にデータを同期する方法について説明します。
背景情報
Canal は、MySQL データベースの増分データを解析し、増分データをサブスクライブおよびコンシュームできるようにします。Canal はセカンダリ MySQL データベースの役割を担い、プライマリ MySQL データベースにダンプ要求を開始します。プライマリ MySQL データベースは、ダンプ要求を受信すると、バイナリログを Canal にプッシュします。Canal はバイナリログを解析し、増分データを取得してから、増分データを同期します。Canal は ApsaraMQ for Kafka に接続し、MySQL データベースの増分データを分析のために ApsaraMQ for Kafka に書き込むことができます。Canal の仕組みの詳細については、GitHub の Canal ドキュメントをご参照ください。
前提条件
Canal を使用してデータを同期する前に、以下の条件が満たされていることを確認してください。
MySQL がインストールおよび初期化され、関連設定が構成されていること。詳細については、Canal クイックスタートをご参照ください。
ApsaraMQ for Kafka コンソールでインスタンスが作成され、インスタンスにトピックが作成されていること。詳細については、手順 3: リソースの作成をご参照ください。
手順
Canal パッケージをダウンロードします。このトピックの例では、V1.1.5 を使用します。
次のコマンドを実行してディレクトリを作成します。この例では、/home/doc/tools/canal.deployer-1.1.5 ディレクトリが作成されます。
mkdir -p /home/doc/tools/canal.deployer-1.1.5Canal パッケージを /home/doc/tools/canal.deployer-1.1.5 ディレクトリにコピーし、パッケージを解凍します。
tar -zxvf canal.deployer-1.1.5-SNAPSHOT.tar.gz -C /home/doc/tools/canal.deployer-1.1.5/home/doc/tools/canal.deployer-1.1.5 ディレクトリで、次のコマンドを実行して instance.properties ファイルを変更します。
vi conf/example/instance.propertiesinstance.properties ファイルのパラメーター テーブルの説明に基づいてパラメーターを設定します。
# MySQL データベースの情報に基づいて、以下のパラメーターを設定します。 ################################################# ... # データベースの URL。 canal.instance.master.address=192.168.XX.XX:3306 # データベースへの接続に使用するユーザー名とパスワード。 ... canal.instance.dbUsername=**** canal.instance.dbPassword=**** ... # mq config # Message Queue for Apache Kafka コンソールで作成したトピック。 canal.mq.topic=mysql_test # データベース名またはテーブル名に基づいて動的トピックを指定します。 #canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\\..*,.*\\..* # データを同期する Message Queue for Apache Kafka トピックのパーティション。 canal.mq.partition=0 # 以下の 2 つのパラメーターは、canal.mq.partition パラメーターと一緒に設定することはできません。以下の 2 つのパラメーターを設定すると、データは Message Queue for Apache Kafka トピックの異なるパーティションに同期されます。 #canal.mq.partitionsNum=3 # データベース名.テーブル名:一意の主キー。複数のテーブル設定はコンマ (,) で区切ります。 #canal.mq.partitionHash=mytest.person:id,mytest.role:id #################################################表 1. instance.properties ファイルのパラメーター
パラメーター
必須
説明
canal.instance.master.address
はい
MySQL データベースの URL。
canal.instance.dbUsername
はい
MySQL データベースへの接続に使用するユーザー名。
canal.instance.dbPassword
はい
MySQL データベースへの接続に使用するパスワード。
canal.mq.topic
はい
ApsaraMQ for Kafka インスタンストピック。トピック管理ApsaraMQ for Kafka コンソールの [トピック] ページでトピックを作成できます。詳細については、手順 3: リソースの作成をご参照ください。
canal.mq.dynamicTopic
いいえ
動的トピックの照合に使用される正規表現。正規表現を指定すると、Canal は式を評価して、MySQL データベースの異なるテーブルのデータを Message Queue for Apache Kafka インスタンスの異なるトピックに同期します。詳細については、パラメーターの説明をご参照ください。
canal.mq.partition
いいえ
データベースデータを同期する ApsaraMQ for Kafka トピックのパーティション。
canal.mq.partitionsNum
いいえ
トピックのパーティション数。このパラメーターは、canal.mq.partitionHash パラメーターと一緒に使用して、ApsaraMQ for Kafka トピックの異なるパーティションにデータを同期します。
canal.mq.partitionHash
いいえ
パーティションの照合に使用される正規表現。詳細については、パラメーターの説明をご参照ください。
次のコマンドを実行して canal.properties ファイルを開きます。
vi conf/canal.propertiescanal.properties ファイルのパラメーター テーブルの説明に基づいてパラメーターを設定します。
クライアントをインターネット経由で ApsaraMQ for Kafka に接続する場合、認証と暗号化に SASL_SSL プロトコルが使用され、Secure Sockets Layer (SSL) エンドポイントが必要になります。エンドポイントの詳細については、エンドポイント間の比較をご参照ください。
# ... # 値を kafka に設定します。 canal.serverMode = kafka # ... # Message Queue for Apache Kafka の設定を構成します。 # Message Queue for Apache Kafka インスタンスの SSL エンドポイント。エンドポイントは、Message Queue for Apache Kafka コンソールの [インスタンスの詳細] ページで取得できます。 kafka.bootstrap.servers = alikafka-pre-cn-zv**********-1.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv**********-2.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv**********-3.alikafka.aliyuncs.com:9093 # 必要に応じてパラメーターを設定するか、以下のデフォルト設定を保持します。 kafka.acks = all kafka.compression.type = none kafka.batch.size = 16384 kafka.linger.ms = 1 kafka.max.request.size = 1048576 kafka.buffer.memory = 33554432 kafka.max.in.flight.requests.per.connection = 1 kafka.retries = 0 # クライアントがインターネット経由で Message Queue for Apache Kafka に接続する場合、認証と暗号化に SASL_SSL プロトコルが使用されます。ネットワークプロトコルと ID 認証メカニズムを指定する必要があります。 kafka.ssl.truststore.location= ../conf/kafka_client_truststore_jks kafka.ssl.truststore.password= KafkaOnsClient kafka.security.protocol= SASL_SSL kafka.sasl.mechanism = PLAIN kafka.ssl.endpoint.identification.algorithm =表 2. canal.properties ファイルのパラメーター
パラメーター
必須
説明
canal.serverMode
はい
Canal のサーバータイプ。このパラメーターを kafka に設定します。
kafka.bootstrap.servers
はい
ApsaraMQ for Kafka インスタンスのエンドポイント。アクセスポイント情報インスタンスの詳細ApsaraMQ for Kafka コンソールの ページの セクションでエンドポイントを取得できます。
kafka.ssl.truststore.location
はい
ルート SSL 証明書 kafka.client.truststore.jks の保存パス。
説明クライアントがインターネット経由で Message Queue for Apache Kafka に接続する場合、メッセージ送信のセキュリティを確保するために認証と暗号化が必要です。つまり、SSL エンドポイントを使用して Message Queue for Apache Kafka に接続し、SASL_SSL プロトコルを使用して認証と暗号化を行う必要があります。詳細については、エンドポイント間の比較をご参照ください。
kafka.acks
はい
ブローカーがメッセージを受信した後、クライアントが ApsaraMQ for Kafka ブローカーから受信できる確認応答 (ACK) のレベル。有効な値:
0: クライアントはブローカーからの ACK を待機しません。
1: リーダーがメッセージを受信した後、クライアントは ACK を受信します。リーダーはメッセージをログに書き込みますが、すべてのフォロワーからの完全な ACK を待たずに応答します。
all: すべての同期レプリカがメッセージを受信した後、クライアントは ACK を受信します。リーダーは、同期レプリカの完全なセットがメッセージを確認応答するのを待機します。
kafka.compression.type
はい
データの圧縮に使用されるアルゴリズム。デフォルトでは、データは圧縮されません。有効な値:
none
gzip
snappy
kafka.batch.size
はい
クライアントが送信するバッチに累積されるメッセージの最大サイズ。単位: バイト。
このパラメーターは、バッチで送信できる最大バイト数を指定します。クライアントがブローカーにリクエストを送信するたびに、データはバッチに分散されます。これにより、送信されるリクエストの数が削減されます。バッチサイズが小さいとスループットが低下する可能性があり、バッチサイズが大きいとメモリスペースの無駄が多くなる可能性があります。このパラメーターを設定すると、メッセージ処理用に固定サイズのバッファーが割り当てられます。これは、クライアントとブローカーの両方のパフォーマンスを向上させるのに役立ちます。
説明kafka.batch.size パラメーターと kafka.linger.ms パラメーターは、バッチメッセージ処理を構成するために使用されます。バッチのサイズまたはクライアントがバッチにメッセージが累積されるのを待機する時間が指定されたしきい値を超えると、バッチ内のメッセージは送信準備完了になります。
kafka.linger.ms
はい
クライアントがバッチにメッセージが累積されるのを待機する最大時間。単位: ミリ秒。
クライアントは、待機時間が指定された値に達すると、ブローカーにリクエストを送信します。これは、バッチメッセージ処理を促進し、送信されるリクエストの数を削減します。
kafka.max.request.size
はい
クライアントから送信されるリクエストの最大バイト数。
kafka.buffer.memory
はい
クライアントがブローカーへの送信を待機しているメッセージをバッファリングするために使用できるメモリの合計サイズ。
kafka.max.in.flight.requests.per.connection
はい
クライアントが単一の接続で送信できる未確認のリクエストの数。このパラメーターを 1 に設定すると、クライアントは、ブローカーがリクエストに応答する前に、同じブローカーにリクエストを送信できません。
kafka.retries
はい
メッセージの送信に失敗した場合にクライアントがメッセージを再送信するかどうかを指定します。このパラメーターを 0 に設定すると、メッセージの送信に失敗した場合、クライアントはメッセージを再送信しません。このパラメーターを 0 より大きい値に設定すると、メッセージの送信に失敗した場合、クライアントはメッセージを再送信します。
kafka.ssl.truststore.password
はい
ルート SSL 証明書のトラストストアのパスワード。このパラメーターを KafkaOnsClient に設定します。
kafka.security.protocol
はい
認証と暗号化に SASL_SSL プロトコルを使用する場合は、このパラメーターを SASL_SSL に設定します。
kafka.sasl.mechanism
はい
ID 認証に使用される Simple Authentication and Security Layer (SASL) メカニズム。SSL エンドポイントを使用して Message Queue for Apache Kafka に接続する場合は、このパラメーターを PLAIN に設定します。
クライアントがインターネット経由で Message Queue for Apache Kafka に接続する場合、ID 認証に SASL が使用されます。bin/startup.sh ファイルで環境変数を構成し、kafka_client_producer_jaas.conf ファイルで ApsaraMQ for Kafka インスタンスの SASL ユーザーのユーザー名とパスワードを指定する必要があります。
vi bin/startup.shコマンドを実行し、startup.sh ファイルで環境変数を構成します。JAVA_OPTS=" $JAVA_OPTS -Djava.awt.headless=true -Djava.net.preferIPv4Stack=true -Dfile.encoding=UTF-8 -Djava.security.auth.login.config=/home/doc/tools/canal.deployer-1.1.5/conf/kafka_client_jaas.conf"vi conf/kafka_client_producer_jaas.confコマンドを実行し、kafka_client_producer_jaas.conf ファイルで Message Queue for Apache Kafka インスタンスの SASL ユーザーのユーザー名とパスワードを指定します。説明Message Queue for Apache Kafka インスタンスでアクセス制御リスト (ACL) 機能が無効になっている場合、インスタンスの詳細ApsaraMQ for Kafka コンソールの ページで SASL ユーザーのデフォルトのユーザー名とパスワードを取得できます。
Message Queue for Apache Kafka インスタンスで ACL 機能が有効になっている場合、使用する SASL ユーザーが PLAIN タイプであり、ユーザーがメッセージを送受信する権限を持っていることを確認してください。詳細については、SASL ユーザーに権限を付与するをご参照ください。
KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="SASL ユーザーのユーザー名" password="SASL ユーザーのパスワード"; };
クライアントが仮想プライベートクラウド (VPC) 内の Message Queue for Apache Kafka に接続する場合、認証と暗号化は不要です。メッセージの送信には PLAINTEXT プロトコルが使用され、ApsaraMQ for Kafka への接続にはデフォルトエンドポイントが使用されます。この場合、canal.serverMode パラメーターと kafka.bootstrap.servers パラメーターを設定するだけで済みます。エンドポイントの詳細については、エンドポイント間の比較をご参照ください。
# ... # 値を kafka に設定します。 canal.serverMode = kafka # ... # Message Queue for Apache Kafka の設定を構成します。 # Message Queue for Apache Kafka インスタンスのデフォルトエンドポイント。エンドポイントは、Message Queue for Apache Kafka コンソールの [インスタンスの詳細] ページで取得できます。 kafka.bootstrap.servers = alikafka-pre-cn-zv**********-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-3-vpc.alikafka.aliyuncs.com:9092 # 必要に応じてパラメーターを設定するか、以下のデフォルト設定を保持します。 kafka.acks = all kafka.compression.type = none kafka.batch.size = 16384 kafka.linger.ms = 1 kafka.max.request.size = 1048576 kafka.buffer.memory = 33554432 kafka.max.in.flight.requests.per.connection = 1 kafka.retries = 0
/home/doc/tools/canal.deployer-1.1.5 ディレクトリで、次のコマンドを実行して Canal を起動します。
sh bin/startup.shログファイル /home/doc/tools/canal.deployer-1.1.5/logs/canal/canal.log を確認して、Canal が ApsaraMQ for Kafka に接続され、Canal が実行されていることを確認します。
2013-02-05 22:45:27.967 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## canal サーバーを起動します。 2013-02-05 22:45:28.113 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## canal サーバー [10.1.XX.XX:11111] を起動します。 2013-02-05 22:45:28.210 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## canal サーバーが実行中です ......ログファイル /home/doc/tools/canal.deployer-1.1.5/logs/example/example.log を確認して、CanalInstance オブジェクトが起動されていることを確認します。
2013-02-05 22:50:45.636 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - クラスパスリソース [canal.properties] からプロパティファイルをロードしています 2013-02-05 22:50:45.641 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - クラスパスリソース [example/instance.properties] からプロパティファイルをロードしています 2013-02-05 22:50:45.803 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - 1-example の CannalInstance を起動しています 2013-02-05 22:50:45.810 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - 起動に成功しました....
構成をテストする
Canal の起動後、データ同期テストを実行します。
mysql という名前の MySQL データベースで、T_Student という名前のテーブルを作成します。次のサンプルコードは、テーブル内のデータのクエリ方法の例を示しています。
mysql> select * from T_Student; +--------+---------+------+------+ | stuNum | stuName | age | sex | +--------+---------+------+------+ | 1 | Wang | 18 | girl | | 2 | Zhang | 17 | boy | +--------+---------+------+------+ 2 rows in set (0.00 sec)ログファイル /home/doc/tools/canal.deployer-1.1.5/logs/example/meta.log を確認します。データベースで追加、削除、または更新操作が実行されるたびに、meta.log ファイルにレコードが生成されます。ログファイルを確認して、Canal がデータを収集したかどうかを確認します。
tail -f example/meta.log 2020-07-29 09:21:05.110 - clientId:1001 cursor:[log.000001,29723,1591190230000,1,] address[/192.168.XX.XX:3306] 2020-07-29 09:23:46.109 - clientId:1001 cursor:[log.000001,30047,1595985825000,1,] address[localhost/192.168.XX.XX:3306] 2020-07-29 09:24:50.547 - clientId:1001 cursor:[log.000001,30047,1595985825000,1,] address[/192.168.XX.XX:3306] 2020-07-29 09:26:45.547 - clientId:1001 cursor:[log.000001,30143,1595986005000,1,] address[localhost/192.168.XX.XX:3306] 2020-07-29 09:30:04.546 - clientId:1001 cursor:[log.000001,30467,1595986204000,1,] address[localhost/192.168.XX.XX:3306] 2020-07-29 09:30:16.546 - clientId:1001 cursor:[log.000001,30734,1595986215000,1,] address[localhost/192.168.XX.XX:3306] 2020-07-29 09:30:36.547 - clientId:1001 cursor:[log.000001,31001,1595986236000,1,] address[localhost/192.168.XX.XX:3306]ApsaraMQ for Kafka コンソールにログインし、メッセージをクエリして、MySQL データベースの増分データが ApsaraMQ for Kafka インスタンスに同期されているかどうかを確認します。コンソールでメッセージをクエリする方法の詳細については、メッセージのクエリをご参照ください。

データが同期された後、次のコマンドを実行して Canal を停止します。
sh bin/stop.sh