このトピックでは、Canalを使用してデータベースの変更データをApsaraMQ for RocketMQに同期する方法について説明します。
背景情報
変更データキャプチャ (CDC) は、データベース内のデータに対する変更を識別してキャプチャするために使用されるソリューションです。 CDCは、異種データソース間でデータを同期するために一般的に使用されます。 Canalは、データベース内の増分ログを解析するために使用できる軽量のCDCツールであり、増分データをサブスクライブして消費することができます。 Canalは、ApsaraMQ for RocketMQが提供するさまざまなメッセージ処理ポリシーを使用して、データ変更レコードをApsaraMQ for RocketMQに配信し、多様なビジネスロジックを実装できます。
Canalはオープンソースプロジェクトです。 詳細については、「canal」をご参照ください。
シナリオ
次の項目では、バイナリログに基づいて増分データがサブスクライブされ、使用される一般的なシナリオについて説明します。
データベースミラーリング
リアルタイムデータベースバックアップ
インデックスの構築とリアルタイムのインデックスメンテナンス (異種インデックスと転置インデックスのシャーディングなど)
ビジネスキャッシュの更新
ビジネスロジックに関連する増分データの処理
ソリューションの紹介
次の図は、Canalを使用してデータベースの変更データをApsaraMQ for RocketMQに同期するために使用されるCDCソリューションを示しています。
上の図では、CanalはMySQLデータベースからバイナリログをリッスンして受信するために使用されるライブラリとして偽装されています。 その後、ログはストレージまたはApsaraMQ for RocketMQなどのミドルウェアシステムに同期されます。
次のステップが実行されます。
MySQLの設定: MySQLのバイナリログ機能を有効にし、テストに必要なデータベースとテーブルを作成します。
Canalのデプロイ: サーバーとして機能するCanal Deployerをデプロイし、Canal Deployerを使用してMySQLデータベースのバイナリログを受信します。
Verify the result: データ変更後のメッセージ送信を確認します。
環境要件
Resources
実行状態のApsaraMQ for RocketMQインスタンスが作成されます。 詳細については、「インスタンスの作成」をご参照ください。
実行状態のMySQLインスタンスが作成されます。 このトピックでは、ApsaraDB RDS for MySQLインスタンスが作成されます。 詳細については、「ApsaraDB RDS for MySQL インスタンスの作成」をご参照ください。
運河関連コンポーネントを展開および実行するために使用される機械を準備する。 このトピックの例では、Elastic Compute Service (ECS) インスタンスが作成され、使用されます。 詳細については、「コンソールでのECSインスタンスの作成と管理 (エクスプレスバージョン) 」をご参照ください。
ネットワーク
Canal Deployerがデプロイされているノードは、データベースおよびApsaraMQ for RocketMQインスタンスに接続できます。 ほとんどの場合、仮想プライベートクラウド (VPC) に存在するECSインスタンスとコンテナをノードとして使用できます。
バージョン
サービス | バージョン | 説明 |
運河 | 1.1.6 | その他のバージョンについては、「canal」をご参照ください。 |
MySQL | 8.0 | Canalは、エンジンがMySQL 5.1.x、5.5.x、5.6.x、5.7.x、8.0.xのソースデータベースをサポートしています。 |
ApsaraMQ for RocketMQ | 5.x |
|
1. MySQLの設定
1.1 バイナリロギング機能の有効化
ApsaraDB RDS for MySQLインスタンス
ApsaraDB RDS for MySQLインスタンスのバイナリログ機能は自動的に有効になり、Alibaba Cloudアカウントにはバイナリログをダンプする権限が自動的に付与されます。 このステップはスキップできます。
自己管理MySQLデータベース
バイナリログ機能を有効にし、バイナリログの形式としてROWを指定します。 次のサンプルコードは、my.cn fファイルの設定の例を示しています。
[mysqld] log-bin=mysql-bin # Enable the binary logging feature. binlog-format=ROW # Specify ROW as the format of binary logs. server_id=1 # Specify the server ID of the MySQL database. The server ID cannot be the same as the secondary node ID of Canal.
canalという名前のユーザーを作成し、セカンダリMySQLノードでユーザー権限を付与します。
CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; FLUSH PRIVILEGES;
1.2 データベースの作成
次のSQL文を実行して、canalという名前のデータベースを作成します。
CREATE DATABASE canal DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci;
1.3 テーブルの作成
次のSQL文を実行して、canalデータベースにstudentsという名前のテーブルを作成します。
CREATE TABLE students (
id INT AUTO_INCREMENT,
name VARCHAR(100) NOT NULL,
age INT,
gender VARCHAR(10),
PRIMARY KEY (id)
);
2. Canalのデプロイ
2.1 JDKのインストール
次のコマンドを実行してJDKをインストールします。
sudo yum install java-1.8.0-openjdk
2.2 Canal Deployerのダウンロード
次のコマンドを実行して、Canal Deployerのインストールパッケージをダウンロードします。
sudo wget https://github.com/alibaba/canal/releases/download/canal-1.1.6/canal.deployer-1.1.6.tar.gz
2.3 インストールパッケージを解凍する
次のコマンドを実行してcanal-serverという名前のディレクトリを作成し、ダウンロードしたインストールパッケージをディレクトリに解凍します。
# Create a directory named canal-server.
sudo mkdir -p /usr/local/canal-server
# Decompress the downloaded installation package to the canal-server directory.
sudo tar -zxvf canal.deployer-1.1.6.tar.gz -C /usr/local/canal-server
2.4 設定の変更
次のコマンドを実行して、canal.propertiesファイルを設定します。
sudo vi /usr/local/canal-server/conf/canal.properties
# The server mode.
canal.serverMode = rocketMQ
# The AccessKey ID that is used for authentication.
canal.aliyun.accessKey = 6W0xz2uPf******
# The AccessKey secret that is used for authentication.
canal.aliyun.secretKey = sK56k1DrGx******
# The method that is used to access message queues.
canal.mq.accessChannel = cloud
# The format in which messages are sent. ApsaraMQ for RocketMQ does not allow you to encapsulate multiple change records into a message. You must set canal.mq.flatMessage to false. After a consumer receives a message body, the consumer must deserialize the message body. For Java applications, you can use the com.alibaba.otter.canal.client.CanalMessageDeserializer#deserializer(byte[]) method to implement the deserialization. For applications written in other programming languages, you can refer to the preceding Java method to implement the deserialization.
canal.mq.flatMessage = false
# The name of the group on the ApsaraMQ for RocketMQ instance.
rocketmq.producer.group = canal_test
# Specify whether to enable the message trace feature.
rocketmq.enable.message.trace = false
# The topic that is used to store message traces.
rocketmq.customized.trace.topic =
# The namespace of the ApsaraMQ for RocketMQ instance. If the instance is an ApsaraMQ for RocketMQ 5.x instance, you do not need to configure this parameter.
rocketmq.namespace =
# The endpoint of the ApsaraMQ for RocketMQ instance. You can obtain the endpoint on the Instance Details page in the ApsaraMQ for RocketMQ console.
rocketmq.namesrv.addr = rmq-cn-xxx.{$RegionId}.rmq.aliyuncs.com:8080
# The number of retries.
rocketmq.retry.times.when.send.failed = 0
# Specify whether to enable the VIP Netty channel to send messages.
rocketmq.vip.channel.enabled = false
# The message tag.
rocketmq.tag =
Alibaba Cloudが提供するAccessKey IDとAccessKeyシークレットの取得方法については、「AccessKeyペアの作成」をご参照ください。
次のコマンドを実行して、instance.propertiesファイルを設定します。
sudo vi /usr/local/canal-server/conf/example/instance.properties
# The endpoint of the ApsaraDB RDS for MySQL instance.
canal.instance.master.address=rm-uf62****.rwlb.rds.aliyuncs.com:3306
# The username that is used to access the ApsaraDB RDS for MySQL database.
canal.instance.dbUsername=xxx
# The password that is used to access the ApsaraDB RDS for MySQL database.
canal.instance.dbPassword=xxx
# The MySQL tables to which Canal listens. The value of this parameter can be a regular expression. The value canal\\..* specifies that all tables in the canal schema are listened to.
canal.instance.filter.regex=canal\\..*
# The name of the topic on the ApsaraMQ for RocketMQ instance.
canal.mq.topic=canal_topic
2.5 Canal Deployerを起動する
次のコマンドを実行してCanal Deployerを起動します。
/usr/local/canal-server/bin/startup.sh
2.6 スタートアップの確認
次のコマンドを実行して、canal.logファイルを表示し、Canalが起動しているかどうかを確認します。
sudo vi /usr/local/canal-server/logs/canal/canal.log
2024-07-15 17:24:12.154 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler
2024-07-15 17:24:12.202 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations
2024-07-15 17:24:12.497 [main] INFO c.a.o.c.c.rocketmq.producer.CanalRocketMQProducer - ##Start RocketMQ producer##
2024-07-15 17:24:12.799 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server.
2024-07-15 17:24:12.984 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[172.17.XX.XX:11111]
2024-07-15 17:24:16.208 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......
次のコマンドを実行してexample.logファイルを表示し、Canalインスタンスが起動したことを確認します。
sudo vi /usr/local/canal-server/logs/example/example.log
2024-07-15 18:22:15.667 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example
2024-07-15 18:22:15.699 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^canal\..*$
2024-07-15 18:22:15.699 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : ^mysql\.slave_.*$
2024-07-15 18:22:16.030 [main] INFO c.a.otter.canal.instance.core.AbstractCanalIn
3. 結果の検証
3.1 MySQLデータベースへのデータエントリの追加
次のSQL文を実行して、1.3 Create a tableで作成したstudentsテーブルにデータエントリを追加します。
INSERT INTO`students` (`name`, `age`, `gender`)VALUES('Tome', 18, 'male');
3.2 Canalから送信されたメッセージを表示する
ApsaraMQ for RocketMQ コンソールにログインします。 左側のナビゲーションウィンドウで、[インスタンス] をクリックします。 [インスタンス] ページで、設定したインスタンスの名前をクリックします。 [インスタンスの詳細] ページの左側のナビゲーションウィンドウで、[メッセージクエリ] をクリックして、Canalによって送信されたメッセージを表示します。