レプリカの同期、キャッシュのリフレッシュ、検索インデックスの更新など、ご利用のアプリケーションがデータベースの変更にリアルタイムで対応する必要がある場合、それらの変更を確実にキャプチャし、ダウンストリームに配信する方法が必要です。Canal は、軽量なオープンソースの変更データキャプチャ (CDC) ツールであり、MySQL のバイナリログを解析し、行レベルの各変更をメッセージとして ApsaraMQ for RocketMQ に配信します。その後、コンシューマーはこれらのメッセージを処理して、データ同期、キャッシュの無効化、イベント駆動型ワークフローなどのビジネスロジックを駆動します。
このトピックでは、Elastic Compute Service (ECS) インスタンスに Canal をデプロイし、MySQL と ApsaraMQ for RocketMQ の両方に接続し、エンドツーエンドのメッセージ配信を検証する手順について説明します。
仕組み
Canal はレプリケーションクライアントとして MySQL に接続し、バイナリログを読み取り、各変更レコードをメッセージとして ApsaraMQ for RocketMQ に転送します。その後、ダウンストリームのコンシューマーがこれらのメッセージをリアルタイムで処理します。

設定には 3 つの手順が含まれます:
MySQL の設定:バイナリロギングを有効にし、テスト用のデータベースとテーブルを作成します。
Canal のデプロイ:ECS インスタンスに Canal Deployer をインストールし、MySQL と ApsaraMQ for RocketMQ の両方に接続します。
パイプラインの検証:MySQL に行を挿入し、対応するメッセージが ApsaraMQ for RocketMQ コンソールに表示されることを確認します。
ユースケース
Canal と ApsaraMQ for RocketMQ で構築された CDC パイプラインは、一般的に以下の目的で使用されます:
データベースミラーリング:レプリカデータベースをソースと同期させます。
リアルタイムバックアップ:完全なダンプを行わずに、増分変更をバックアップストアにストリーミングします。
インデックス構築とリアルタイムメンテナンス:シャーディングされた異種インデックスや転置インデックスなど、インデックスをリアルタイムで構築・維持します。
キャッシュの無効化:基になるデータが変更されたときに、ビジネスキャッシュをリフレッシュまたは無効化します。
増分データの処理:特定のデータ変更に基づいてダウンストリームのワークフローをトリガーします。
前提条件
Canal は、サーバーレスの ApsaraMQ for RocketMQ 5.x インスタンスをサポートしていません。標準 (非サーバーレス) インスタンスを使用してください。
リソース
開始する前に、以下が揃っていることを確認してください:
| リソース | 要件 | リファレンス |
|---|---|---|
| ApsaraMQ for RocketMQ インスタンス | トピックとグループが作成された実行中の 5.x インスタンス | インスタンスの作成 |
| MySQL インスタンス | 実行中の MySQL インスタンス (ApsaraDB RDS for MySQL またはセルフマネージド) | ApsaraDB RDS for MySQL インスタンスの作成 |
| ECS インスタンス | Canal Deployer をデプロイするためのマシン。MySQL と ApsaraMQ for RocketMQ インスタンスの両方にネットワークアクセスできる必要があります | ECS インスタンスの作成と管理 |
ネットワーク
Canal Deployer を実行している ECS インスタンスまたはコンテナは、MySQL インスタンスと ApsaraMQ for RocketMQ インスタンスの両方にネットワーク経由で到達できる必要があります。最も簡単な設定にするには、すべてのリソースを同じ VPC にデプロイしてください。
サポートされているバージョン
| サービス | バージョン | 注 |
|---|---|---|
| Canal | 1.1.6 | 他のバージョンについては、「Canal リリース」をご参照ください。 |
| MySQL | 8.0 | Canal は MySQL 5.1.x、5.5.x、5.6.x、5.7.x もサポートしています。 |
| ApsaraMQ for RocketMQ | 5.x | Canal は 4.x と 5.x の両方のインスタンスをサポートしています。5.x を推奨します。 |
手順1:MySQL の設定
1.1 バイナリロギングの有効化
ApsaraDB RDS for MySQL
ApsaraDB RDS for MySQL インスタンスでは、バイナリロギングはデフォルトで有効になっており、必要なレプリケーション権限は自動的に付与されます。1.2 データベースの作成 に進んでください。
セルフマネージド MySQL
my.cnfファイルを編集して、ROW フォーマットでバイナリロギングを有効にします:[mysqld] log-bin=mysql-bin # バイナリロギングを有効化 binlog-format=ROW # Canal に必要なフォーマット server_id=1 # Canal サーバー ID とは異なる必要がありますレプリケーション権限を持つ専用の MySQL ユーザーを作成します:
CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; FLUSH PRIVILEGES;
1.2 データベースの作成
canal という名前のデータベースを作成します:
CREATE DATABASE canal DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci;1.3 テーブルの作成
canal データベースに students という名前のテーブルを作成します:
CREATE TABLE students (
id INT AUTO_INCREMENT,
name VARCHAR(100) NOT NULL,
age INT,
gender VARCHAR(10),
PRIMARY KEY (id)
);手順2:Canal のデプロイ
Canal Deployer をホストする予定の ECS インスタンスで次のコマンドを実行します。
2.1 JDK のインストール
sudo yum install java-1.8.0-openjdk2.2 Canal Deployer のダウンロードと展開
# Canal Deployer 1.1.6 をダウンロード
sudo wget https://github.com/alibaba/canal/releases/download/canal-1.1.6/canal.deployer-1.1.6.tar.gz
# インストールディレクトリを作成し、アーカイブを展開
sudo mkdir -p /usr/local/canal-server
sudo tar -zxvf canal.deployer-1.1.6.tar.gz -C /usr/local/canal-server2.3 canal.properties の設定
メインの設定ファイルを開きます:
sudo vi /usr/local/canal-server/conf/canal.properties次のパラメーターを設定します。プレースホルダーの値を、ご利用の ApsaraMQ for RocketMQ インスタンスの実際の詳細に置き換えます。
# サーバーモード -- 変更を ApsaraMQ for RocketMQ に配信
canal.serverMode = rocketMQ
# ApsaraMQ for RocketMQ の認証情報
# ApsaraMQ for RocketMQ コンソールの [アクセス制御] ページの [インテリジェント認証] タブで確認できます。
canal.aliyun.accessKey = <your-access-key>
canal.aliyun.secretKey = <your-secret-key>
# アクセスチャネル -- Alibaba Cloud マネージドインスタンスの場合は "cloud" に設定
canal.mq.accessChannel = cloud
# メッセージフォーマット -- ApsaraMQ for RocketMQ の場合は false にする必要があります。
# ApsaraMQ for RocketMQ では、複数の変更レコードを 1 つのメッセージにカプセル化することはできません。
# false に設定した場合、コンシューマーはメッセージ本文を逆シリアル化する必要があります。
# Java の場合は、com.alibaba.otter.canal.client.CanalMessageDeserializer#deserializer(byte[]) を使用します。
# 他のプログラミング言語で記述されたアプリケーションの場合は、前述の
# Java メソッドを参照して逆シリアル化を実装してください。
canal.mq.flatMessage = false
# プロデューサーグループ名
rocketmq.producer.group = canal_test
# メッセージトレース (トレースが必要な場合を除き無効化)
rocketmq.enable.message.trace = false
rocketmq.customized.trace.topic =
# 名前空間 -- ApsaraMQ for RocketMQ 5.x インスタンスの場合は空白のままにします
rocketmq.namespace =
# ApsaraMQ for RocketMQ エンドポイント
# ApsaraMQ for RocketMQ コンソールの [インスタンス詳細] ページで確認できます。
rocketmq.namesrv.addr = <your-rocketmq-endpoint>
# リトライとチャネル設定
rocketmq.retry.times.when.send.failed = 0
rocketmq.vip.channel.enabled = false
rocketmq.tag =次のプレースホルダーを実際の値に置き換えます:
| プレースホルダー | 説明 | 確認場所 |
|---|---|---|
<your-access-key> | ApsaraMQ for RocketMQ インスタンスのユーザー名 | [アクセス制御] ページの [インテリジェント認証] タブ |
<your-secret-key> | ApsaraMQ for RocketMQ インスタンスのパスワード | [アクセス制御] ページの [インテリジェント認証] タブ |
<your-rocketmq-endpoint> | ApsaraMQ for RocketMQ インスタンスのエンドポイント (例:rmq-cn-xxx.cn-hangzhou.rmq.aliyuncs.com:8080) | [インスタンス詳細] ページ |
2.4 instance.properties の設定
インスタンス設定ファイルを開きます:
sudo vi /usr/local/canal-server/conf/example/instance.properties次のパラメーターを設定して、Canal が MySQL インスタンスとターゲットトピックを指すようにします:
# MySQL 接続
canal.instance.master.address = <your-mysql-endpoint>:3306
canal.instance.dbUsername = <your-mysql-username>
canal.instance.dbPassword = <your-mysql-password>
# テーブルフィルター -- canal スキーマ内のすべてのテーブルをリッスン
canal.instance.filter.regex = canal\\..*
# ApsaraMQ for RocketMQ インスタンスのターゲットトピック
canal.mq.topic = canal_topic次のプレースホルダーを実際の値に置き換えます:
| プレースホルダー | 説明 | 例 |
|---|---|---|
<your-mysql-endpoint> | MySQL インスタンスのエンドポイント | rm-uf62****.rwlb.rds.aliyuncs.com |
<your-mysql-username> | MySQL ユーザー名 | canal |
<your-mysql-password> | MySQL パスワード | -- |
2.5 Canal Deployer の起動
/usr/local/canal-server/bin/startup.sh2.6 Canal が正常に起動したことの確認
Canal サーバーのログを確認します:正常に起動すると、以下のような出力が生成されます:
sudo cat /usr/local/canal-server/logs/canal/canal.log2024-07-15 17:24:12.154 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler 2024-07-15 17:24:12.202 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations 2024-07-15 17:24:12.497 [main] INFO c.a.o.c.c.rocketmq.producer.CanalRocketMQProducer - ##Start RocketMQ producer## 2024-07-15 17:24:12.799 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server. 2024-07-15 17:24:12.984 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[172.17.XX.XX:11111] 2024-07-15 17:24:16.208 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......Canal インスタンスのログを確認します:
start successfulメッセージを探します:sudo cat /usr/local/canal-server/logs/example/example.log2024-07-15 18:22:15.667 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 2024-07-15 18:22:15.699 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^canal\..*$ 2024-07-15 18:22:15.699 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : ^mysql\.slave_.*$ 2024-07-15 18:22:16.030 [main] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
手順3:パイプラインの検証
3.1 MySQL への行の挿入
手順1.3 で作成した students テーブルに行を挿入するために、次の SQL ステートメントを実行します:
INSERT INTO students (name, age, gender) VALUES ('Tome', 18, 'male');3.2 ApsaraMQ for RocketMQ コンソールでのメッセージの確認
ApsaraMQ for RocketMQ コンソールにログインします。
左側のナビゲーションウィンドウで、[インスタンス] をクリックします。
手順2.3 で設定したインスタンスの名前をクリックします。
左側のナビゲーションウィンドウで、[メッセージクエリ] をクリックします。
Canal によって送信されたメッセージを見つけ、
INSERT文からの変更データが含まれていることを確認します。

メッセージが表示されれば、CDC パイプラインは正常に機能しています。Canal は、canal\\..* フィルターに一致するテーブルに対する INSERT、UPDATE、または DELETE 操作をキャプチャし続け、設定されたトピックに配信します。
設定リファレンス
次の表は、canal.properties 内のすべての ApsaraMQ for RocketMQ 関連パラメーターをリストしています。
| パラメーター | 説明 | 必須 |
|---|---|---|
canal.serverMode | 配信先。rocketMQ に設定します。 | はい |
canal.aliyun.accessKey | ApsaraMQ for RocketMQ インスタンスのユーザー名。 | はい |
canal.aliyun.secretKey | ApsaraMQ for RocketMQ インスタンスのパスワード。 | はい |
canal.mq.accessChannel | アクセス方法。Alibaba Cloud インスタンスの場合は cloud に設定します。 | はい |
canal.mq.flatMessage | メッセージのシリアル化フォーマット。ApsaraMQ for RocketMQ の場合は false にする必要があります。 | はい |
rocketmq.namesrv.addr | ApsaraMQ for RocketMQ エンドポイント。 | はい |
rocketmq.producer.group | ApsaraMQ for RocketMQ インスタンスのプロデューサーグループ名。 | はい |
rocketmq.namespace | インスタンスの名前空間。5.x インスタンスの場合は空白のままにします。 | いいえ |
rocketmq.enable.message.trace | メッセージトレースを有効にします。 | いいえ |
rocketmq.customized.trace.topic | メッセージトレースを保存するためのトピック。 | いいえ |
rocketmq.retry.times.when.send.failed | 失敗時の送信リトライ回数。 | いいえ |
rocketmq.vip.channel.enabled | VIP Netty チャネルを使用します。 | いいえ |
rocketmq.tag | メッセージタグ。 | いいえ |
次のステップ
CDC メッセージフローをモニターするには、
rocketmq.enable.message.traceをtrueに設定してメッセージトレースを有効にします。追加のデータベースまたはテーブルからの変更をキャプチャするには、
instance.propertiesのcanal.instance.filter.regexパラメーターを目的の正規表現パターンで更新します。Canal の設定と機能の詳細については、「Canal ドキュメント」をご参照ください。