Canal は MySQL のバイナリログから行レベルの変更をキャプチャし、メッセージとして ApsaraMQ for RocketMQ に配信します。このチュートリアルでは、MySQL のバイナリロギングの設定、Canal Deployer のデプロイ、変更イベントが RocketMQ トピックに到達することの確認まで、セットアップの全手順を説明します。
仕組み
変更データキャプチャ (CDC) は、データベース内の INSERT、UPDATE、DELETE 操作を追跡し、ダウンストリームシステムにストリーミングします。Canal は、MySQL レプリカとして動作し、バイナリログイベントをリアルタイムで受信するオープンソースの CDC ツールです。ApsaraMQ for RocketMQ と組み合わせることで、Canal はデータベースの変更がダウンストリームの処理を自動的にトリガーするイベント駆動型アーキテクチャを実現します。

Canal はレプリカとして MySQL に接続し、バイナリログイベントを受信して ApsaraMQ for RocketMQ に転送します。その後、コンシューマーは RocketMQ トピックをサブスクライブして変更イベントを処理します。
一般的なユースケース
データベースミラーリング:読み取り専用レプリカまたはセカンダリデータストアの同期を維持します。
リアルタイムバックアップ:ディザスタリカバリのために、変更を永続的なメッセージキューにストリーミングします。
インデックスメンテナンス:データの変更に応じて、検索インデックス、転置インデックス、またはシャーディングされた異種インデックスを再構築します。
キャッシュの無効化:基になるデータが更新されたときに、ビジネスキャッシュを更新します。
イベント駆動型処理:増分データ変更に基づいてビジネスロジックをトリガーします。
前提条件
開始する前に、以下が準備できていることを確認してください。
ApsaraMQ for RocketMQ インスタンスが [実行中] の状態であること。セットアップ手順については、「インスタンスの作成」をご参照ください。
MySQL インスタンスが [実行中] の状態であること。このチュートリアルでは ApsaraDB RDS for MySQL を使用します。セットアップ手順については、「ApsaraDB RDS for MySQL インスタンスの作成」をご参照ください。
Canal Deployer を実行するための Linux サーバー。このチュートリアルでは Elastic Compute Service (ECS) インスタンスを使用します。セットアップ手順については、「ECS インスタンスの作成と管理」をご参照ください。
Canal サーバー、MySQL インスタンス、ApsaraMQ for RocketMQ インスタンス間のネットワーク接続が確保されていること。通常、Virtual Private Cloud (VPC) 内の ECS インスタンスとコンテナはこの要件を満たします。
認証用の AccessKey ペア。手順については、「AccessKey ペアの作成」をご参照ください。
バージョンの互換性
| サービス | 使用バージョン | サポートされているバージョン |
|---|---|---|
| Canal | 1.1.6 | 他のバージョンについては、Canal のリリースをご参照ください。 |
| MySQL | 8.0 | 5.1.x、5.5.x、5.6.x、5.7.x、および 8.0.x |
| ApsaraMQ for RocketMQ | 5.x | 4.x および 5.x (5.x を推奨) |
ApsaraMQ for RocketMQ 5.x インスタンスについては、「ApsaraMQ for RocketMQ 5.x への変更データの同期」をご参照ください。
ステップ 1: MySQL の設定
バイナリロギングの有効化
ApsaraDB RDS for MySQL
バイナリロギングはデフォルトで有効になっており、ご利用の Alibaba Cloud アカウントには必要なダンプ権限が自動的に付与されています。
セルフマネージド MySQL
MySQL の設定ファイル (
my.cnf) を編集して、ROW フォーマットのバイナリロギングを有効にします。[mysqld] log-bin=mysql-bin # バイナリロギングを有効化 binlog-format=ROW # Canal に必須のフォーマット server_id=1 # Canal のセカンダリノード ID とは異なる ID にする必要がありますCanal 専用の MySQL ユーザーを作成し、レプリケーション権限を付与します。
CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; FLUSH PRIVILEGES;
テストデータベースの作成
CREATE DATABASE canal DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci;テストテーブルの作成
canal データベースに students テーブルを作成します。
CREATE TABLE students (
id INT AUTO_INCREMENT,
name VARCHAR(100) NOT NULL,
age INT,
gender VARCHAR(10),
PRIMARY KEY (id)
);ステップ 2: Canal Deployer のデプロイ
JDK のインストール
sudo yum install java-1.8.0-openjdkCanal Deployer のダウンロードと展開
# Canal Deployer 1.1.6 をダウンロード
sudo wget https://github.com/alibaba/canal/releases/download/canal-1.1.6/canal.deployer-1.1.6.tar.gz
# インストールディレクトリを作成して展開
sudo mkdir -p /usr/local/canal-server
sudo tar -zxvf canal.deployer-1.1.6.tar.gz -C /usr/local/canal-serverRocketMQ 向け Canal の設定
Canal の動作は 2 つの設定ファイルで制御されます:グローバル設定用の canal.properties と、インスタンスごとの設定用の instance.properties です。
グローバル設定 (canal.properties)
/usr/local/canal-server/conf/canal.properties を編集します。
sudo vi /usr/local/canal-server/conf/canal.properties以下のパラメーターを設定します。
# 出力先:変更イベントを RocketMQ に送信
canal.serverMode = rocketMQ
# Alibaba Cloud 認証
canal.aliyun.accessKey = <your-access-key-id>
canal.aliyun.secretKey = <your-access-key-secret>
# アクセスチャネル:Alibaba Cloud マネージドの RocketMQ の場合は "cloud" に設定
canal.mq.accessChannel = cloud
# メッセージフォーマット:ApsaraMQ for RocketMQ の場合は false にする必要があります
canal.mq.flatMessage = false
# RocketMQ プロデューサー設定
rocketmq.producer.group = canal_test
rocketmq.enable.message.trace = false
rocketmq.customized.trace.topic =
rocketmq.namespace =
rocketmq.namesrv.addr = <your-rocketmq-endpoint>
rocketmq.retry.times.when.send.failed = 0
rocketmq.vip.channel.enabled = false
rocketmq.tag =以下のプレースホルダーを実際の値に置き換えてください。
| プレースホルダー | 説明 | 例 |
|---|---|---|
<your-access-key-id> | ご利用の Alibaba Cloud アカウントの AccessKey ID | LTAI5tXxx |
<your-access-key-secret> | ご利用の Alibaba Cloud アカウントの AccessKey Secret | xXxXxXx |
<your-rocketmq-endpoint> | ApsaraMQ for RocketMQ コンソールの [インスタンス詳細] ページにあるエンドポイント | rmq-cn-xxx.cn-hangzhou.rmq.aliyuncs.com:8080 |
本番環境の設定ファイルに AccessKey の認証情報をプレーンテキストで保存しないでください。より安全な認証のために、環境変数を使用するか、RAM ロールを ECS インスタンスにアタッチしてください。
主要なパラメーターの詳細
| パラメーター | 説明 |
|---|---|
canal.serverMode | 出力先のタイプ。ApsaraMQ for RocketMQ の場合は rocketMQ に設定します。 |
canal.mq.accessChannel | Alibaba Cloud マネージドの RocketMQ を使用する場合は cloud に設定します。 |
canal.mq.flatMessage | 必ず false に設定する必要があります。ApsaraMQ for RocketMQ は、複数の変更レコードを単一のフラットメッセージにカプセル化することをサポートしていません。コンシューマーはメッセージ本文を逆シリアル化する必要があります。Java の場合は、com.alibaba.otter.canal.client.CanalMessageDeserializer#deserializer(byte[]) を使用します。 |
rocketmq.namespace | ApsaraMQ for RocketMQ 5.x インスタンスの場合は空白のままにします。 |
rocketmq.namesrv.addr | ご利用の ApsaraMQ for RocketMQ インスタンスのエンドポイント ([インスタンス詳細] ページで確認できます)。 |
インスタンス設定 (instance.properties)
/usr/local/canal-server/conf/example/instance.properties を編集します。
sudo vi /usr/local/canal-server/conf/example/instance.properties以下のパラメーターを設定します。
# MySQL 接続
canal.instance.master.address = <your-mysql-endpoint>:3306
canal.instance.dbUsername = <your-db-username>
canal.instance.dbPassword = <your-db-password>
# テーブルフィルター:"canal" スキーマ内のすべてのテーブルをキャプチャ
canal.instance.filter.regex = canal\\..*
# ターゲット RocketMQ トピック
canal.mq.topic = canal_topic以下のプレースホルダーを実際の値に置き換えてください。
| プレースホルダー | 説明 | 例 |
|---|---|---|
<your-mysql-endpoint> | ご利用の MySQL インスタンスのエンドポイント | rm-uf62****.rwlb.rds.aliyuncs.com |
<your-db-username> | MySQL ユーザー名 | canal |
<your-db-password> | MySQL パスワード | canal |
主要なパラメーターの詳細
| パラメーター | 説明 |
|---|---|
canal.instance.master.address | ポート付きの MySQL エンドポイント。MySQL のデフォルトポートは 3306 です。 |
canal.instance.filter.regex | Canal がモニターするテーブルを制御する正規表現。canal\\..* は canal スキーマ内のすべてのテーブルをキャプチャします。 |
canal.mq.topic | 変更イベントを受信する RocketMQ トピック。このトピックは、ご利用の ApsaraMQ for RocketMQ インスタンスで事前に作成しておく必要があります。 |
Canal Deployer の起動
/usr/local/canal-server/bin/startup.sh起動の確認
Canal サーバーログをチェックして、正常に起動したことを確認します。
sudo cat /usr/local/canal-server/logs/canal/canal.log起動に成功すると、以下のような出力が生成されます。
2024-07-15 17:24:12.154 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler
2024-07-15 17:24:12.202 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations
2024-07-15 17:24:12.497 [main] INFO c.a.o.c.c.rocketmq.producer.CanalRocketMQProducer - ##Start RocketMQ producer##
2024-07-15 17:24:12.799 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server.
2024-07-15 17:24:12.984 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[172.17.XX.XX:11111]
2024-07-15 17:24:16.208 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......最後の the canal server is running now という行は、Canal が正常に起動し、RocketMQ に接続されたことを示します。
次に、インスタンスログをチェックして、Canal が MySQL に接続し、モニタリングを開始したことを確認します。
sudo cat /usr/local/canal-server/logs/example/example.log接続が成功したことを示す以下の主要な兆候を探してください。
2024-07-15 18:22:15.667 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example
2024-07-15 18:22:15.699 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^canal\..*$
2024-07-15 18:22:15.699 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : ^mysql\.slave_.*$テーブルフィルターの初期化に関する WARN エントリは想定内のものであり、エラーを示すものではありません。これらは、Canal が canal.instance.filter.regex の設定を正しく読み込んだことを確認するものです。
インスタンスログにエラーが表示される場合は、以下の一般的な原因を確認してください。
| 症状 | 考えられる原因 | 解決策 |
|---|---|---|
| 接続拒否またはタイムアウト | 不正な MySQL エンドポイントまたはファイアウォールルール | canal.instance.master.address を確認し、Canal サーバーが MySQL ポート (デフォルトは 3306) に到達できることを確認してください。 |
| アクセス拒否 | 不正な MySQL 認証情報または権限不足 | canal.instance.dbUsername と canal.instance.dbPassword を確認してください。ユーザーに SELECT、REPLICATION SLAVE、REPLICATION CLIENT 権限があることを確認してください。 |
| RocketMQ プロデューサーの起動失敗 | 不正な RocketMQ エンドポイントまたは AccessKey | rocketmq.namesrv.addr、canal.aliyun.accessKey、および canal.aliyun.secretKey |
ステップ 3: 結果の確認
テストデータの挿入
students テーブルに行を挿入して、変更イベントをトリガーします。
INSERT INTO `students` (`name`, `age`, `gender`) VALUES ('Tome', 18, 'male');ApsaraMQ for RocketMQ でのメッセージの確認
ApsaraMQ for RocketMQ コンソールにログインします。
左側のナビゲーションウィンドウで、[インスタンス] をクリックします。
[インスタンス] ページで、設定したインスタンスの名前をクリックします。
[インスタンス詳細] ページの左側のナビゲーションウィンドウで、[メッセージクエリ] をクリックします。
クエリ結果に Canal からのメッセージが表示されることを確認します。

メッセージが表示されない場合は、Canal がバイナリログイベントを処理するまで数秒待ってから、クエリを再試行してください。30 秒経ってもメッセージが表示されない場合は、/usr/local/canal-server/logs/example/example.log にある Canal インスタンスログでエラーを確認してください。
設定リファレンス
canal.properties パラメーター
| パラメーター | 必須 | デフォルト | 説明 |
|---|---|---|---|
canal.serverMode | はい | tcp | 出力先。ApsaraMQ for RocketMQ の場合は rocketMQ に設定します。 |
canal.aliyun.accessKey | はい | -- | Alibaba Cloud 認証用の AccessKey ID。 |
canal.aliyun.secretKey | はい | -- | Alibaba Cloud 認証用の AccessKey Secret。 |
canal.mq.accessChannel | はい | local | Alibaba Cloud マネージドの RocketMQ の場合は cloud に設定します。 |
canal.mq.flatMessage | はい | false | ApsaraMQ for RocketMQ の場合は false にする必要があります。 |
rocketmq.producer.group | はい | -- | RocketMQ インスタンス上のプロデューサーグループ名。 |
rocketmq.namesrv.addr | はい | -- | RocketMQ インスタンスのエンドポイント。 |
rocketmq.namespace | いいえ | (空) | 5.x インスタンスの場合は空白のままにします。 |
rocketmq.enable.message.trace | いいえ | false | メッセージトレースを有効または無効にします。 |
rocketmq.customized.trace.topic | いいえ | (空) | メッセージトレースを保存するためのカスタムトピック。 |
rocketmq.retry.times.when.send.failed | いいえ | 0 | 送信失敗時のリトライ回数。 |
rocketmq.vip.channel.enabled | いいえ | false | VIP Netty チャネルを有効または無効にします。 |
rocketmq.tag | いいえ | (空) | すべてのメッセージに適用されるタグ。 |
instance.properties パラメーター
| パラメーター | 必須 | 説明 |
|---|---|---|
canal.instance.master.address | はい | host:port 形式の MySQL エンドポイント。 |
canal.instance.dbUsername | はい | レプリケーション権限を持つ MySQL ユーザー名。 |
canal.instance.dbPassword | はい | MySQL パスワード。 |
canal.instance.filter.regex | はい | モニター対象のテーブルをフィルタリングするための正規表現。 |
canal.mq.topic | はい | 変更イベントのターゲットとなる RocketMQ トピック。 |
次のステップ
異なるテーブルを異なる RocketMQ トピックにルーティングするには、
canal.mq.dynamicTopicを使用して動的トピックルーティングを設定します。詳細と例については、Canal のオープンソースドキュメントをご参照ください。ApsaraMQ for RocketMQ 5.x インスタンスと統合するには、「ApsaraMQ for RocketMQ 5.x への変更データの同期」をご参照ください。
本番環境へのデプロイでは、高可用性を実現するために複数のノードに Canal をデプロイし、一時的なネットワーク障害に対応するために
rocketmq.retry.times.when.send.failedを調整することを検討してください。