すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for RocketMQ:Canal を使用した MySQL 変更データの ApsaraMQ for RocketMQ への同期

最終更新日:Mar 11, 2026

レプリカの同期、キャッシュのリフレッシュ、検索インデックスの更新など、ご利用のアプリケーションがデータベースの変更にリアルタイムで対応する必要がある場合、それらの変更を確実にキャプチャし、ダウンストリームに配信する方法が必要です。Canal は、軽量なオープンソースの変更データキャプチャ (CDC) ツールであり、MySQL のバイナリログを解析し、行レベルの各変更をメッセージとして ApsaraMQ for RocketMQ に配信します。その後、コンシューマーはこれらのメッセージを処理して、データ同期、キャッシュの無効化、イベント駆動型ワークフローなどのビジネスロジックを駆動します。

このトピックでは、Elastic Compute Service (ECS) インスタンスに Canal をデプロイし、MySQL と ApsaraMQ for RocketMQ の両方に接続し、エンドツーエンドのメッセージ配信を検証する手順について説明します。

仕組み

Canal はレプリケーションクライアントとして MySQL に接続し、バイナリログを読み取り、各変更レコードをメッセージとして ApsaraMQ for RocketMQ に転送します。その後、ダウンストリームのコンシューマーがこれらのメッセージをリアルタイムで処理します。

CDC solution

設定には 3 つの手順が含まれます:

  1. MySQL の設定:バイナリロギングを有効にし、テスト用のデータベースとテーブルを作成します。

  2. Canal のデプロイ:ECS インスタンスに Canal Deployer をインストールし、MySQL と ApsaraMQ for RocketMQ の両方に接続します。

  3. パイプラインの検証:MySQL に行を挿入し、対応するメッセージが ApsaraMQ for RocketMQ コンソールに表示されることを確認します。

ユースケース

Canal と ApsaraMQ for RocketMQ で構築された CDC パイプラインは、一般的に以下の目的で使用されます:

  • データベースミラーリング:レプリカデータベースをソースと同期させます。

  • リアルタイムバックアップ:完全なダンプを行わずに、増分変更をバックアップストアにストリーミングします。

  • インデックス構築とリアルタイムメンテナンス:シャーディングされた異種インデックスや転置インデックスなど、インデックスをリアルタイムで構築・維持します。

  • キャッシュの無効化:基になるデータが変更されたときに、ビジネスキャッシュをリフレッシュまたは無効化します。

  • 増分データの処理:特定のデータ変更に基づいてダウンストリームのワークフローをトリガーします。

前提条件

重要

Canal は、サーバーレスの ApsaraMQ for RocketMQ 5.x インスタンスをサポートしていません。標準 (非サーバーレス) インスタンスを使用してください。

リソース

開始する前に、以下が揃っていることを確認してください:

リソース要件リファレンス
ApsaraMQ for RocketMQ インスタンストピックとグループが作成された実行中の 5.x インスタンスインスタンスの作成
MySQL インスタンス実行中の MySQL インスタンス (ApsaraDB RDS for MySQL またはセルフマネージド)ApsaraDB RDS for MySQL インスタンスの作成
ECS インスタンスCanal Deployer をデプロイするためのマシン。MySQL と ApsaraMQ for RocketMQ インスタンスの両方にネットワークアクセスできる必要がありますECS インスタンスの作成と管理

ネットワーク

Canal Deployer を実行している ECS インスタンスまたはコンテナは、MySQL インスタンスと ApsaraMQ for RocketMQ インスタンスの両方にネットワーク経由で到達できる必要があります。最も簡単な設定にするには、すべてのリソースを同じ VPC にデプロイしてください。

サポートされているバージョン

サービスバージョン
Canal1.1.6他のバージョンについては、「Canal リリース」をご参照ください。
MySQL8.0Canal は MySQL 5.1.x、5.5.x、5.6.x、5.7.x もサポートしています。
ApsaraMQ for RocketMQ5.xCanal は 4.x と 5.x の両方のインスタンスをサポートしています。5.x を推奨します。

手順1:MySQL の設定

1.1 バイナリロギングの有効化

ApsaraDB RDS for MySQL

ApsaraDB RDS for MySQL インスタンスでは、バイナリロギングはデフォルトで有効になっており、必要なレプリケーション権限は自動的に付与されます。1.2 データベースの作成 に進んでください。

セルフマネージド MySQL

  1. my.cnf ファイルを編集して、ROW フォーマットでバイナリロギングを有効にします:

        [mysqld]
        log-bin=mysql-bin           # バイナリロギングを有効化
        binlog-format=ROW           # Canal に必要なフォーマット
        server_id=1                 # Canal サーバー ID とは異なる必要があります
  2. レプリケーション権限を持つ専用の MySQL ユーザーを作成します:

        CREATE USER canal IDENTIFIED BY 'canal';
        GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
        FLUSH PRIVILEGES;

1.2 データベースの作成

canal という名前のデータベースを作成します:

CREATE DATABASE canal DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci;

1.3 テーブルの作成

canal データベースに students という名前のテーブルを作成します:

CREATE TABLE students (
    id INT AUTO_INCREMENT,
    name VARCHAR(100) NOT NULL,
    age INT,
    gender VARCHAR(10),
    PRIMARY KEY (id)
);

手順2:Canal のデプロイ

Canal Deployer をホストする予定の ECS インスタンスで次のコマンドを実行します。

2.1 JDK のインストール

sudo yum install java-1.8.0-openjdk

2.2 Canal Deployer のダウンロードと展開

# Canal Deployer 1.1.6 をダウンロード
sudo wget https://github.com/alibaba/canal/releases/download/canal-1.1.6/canal.deployer-1.1.6.tar.gz

# インストールディレクトリを作成し、アーカイブを展開
sudo mkdir -p /usr/local/canal-server
sudo tar -zxvf canal.deployer-1.1.6.tar.gz -C /usr/local/canal-server

2.3 canal.properties の設定

メインの設定ファイルを開きます:

sudo vi /usr/local/canal-server/conf/canal.properties

次のパラメーターを設定します。プレースホルダーの値を、ご利用の ApsaraMQ for RocketMQ インスタンスの実際の詳細に置き換えます。

# サーバーモード -- 変更を ApsaraMQ for RocketMQ に配信
canal.serverMode = rocketMQ

# ApsaraMQ for RocketMQ の認証情報
# ApsaraMQ for RocketMQ コンソールの [アクセス制御] ページの [インテリジェント認証] タブで確認できます。
canal.aliyun.accessKey = <your-access-key>
canal.aliyun.secretKey = <your-secret-key>

# アクセスチャネル -- Alibaba Cloud マネージドインスタンスの場合は "cloud" に設定
canal.mq.accessChannel = cloud

# メッセージフォーマット -- ApsaraMQ for RocketMQ の場合は false にする必要があります。
# ApsaraMQ for RocketMQ では、複数の変更レコードを 1 つのメッセージにカプセル化することはできません。
# false に設定した場合、コンシューマーはメッセージ本文を逆シリアル化する必要があります。
# Java の場合は、com.alibaba.otter.canal.client.CanalMessageDeserializer#deserializer(byte[]) を使用します。
# 他のプログラミング言語で記述されたアプリケーションの場合は、前述の
# Java メソッドを参照して逆シリアル化を実装してください。
canal.mq.flatMessage = false

# プロデューサーグループ名
rocketmq.producer.group = canal_test

# メッセージトレース (トレースが必要な場合を除き無効化)
rocketmq.enable.message.trace = false
rocketmq.customized.trace.topic =

# 名前空間 -- ApsaraMQ for RocketMQ 5.x インスタンスの場合は空白のままにします
rocketmq.namespace =

# ApsaraMQ for RocketMQ エンドポイント
# ApsaraMQ for RocketMQ コンソールの [インスタンス詳細] ページで確認できます。
rocketmq.namesrv.addr = <your-rocketmq-endpoint>

# リトライとチャネル設定
rocketmq.retry.times.when.send.failed = 0
rocketmq.vip.channel.enabled = false
rocketmq.tag =

次のプレースホルダーを実際の値に置き換えます:

プレースホルダー説明確認場所
<your-access-key>ApsaraMQ for RocketMQ インスタンスのユーザー名[アクセス制御] ページの [インテリジェント認証] タブ
<your-secret-key>ApsaraMQ for RocketMQ インスタンスのパスワード[アクセス制御] ページの [インテリジェント認証] タブ
<your-rocketmq-endpoint>ApsaraMQ for RocketMQ インスタンスのエンドポイント (例:rmq-cn-xxx.cn-hangzhou.rmq.aliyuncs.com:8080)[インスタンス詳細] ページ

2.4 instance.properties の設定

インスタンス設定ファイルを開きます:

sudo vi /usr/local/canal-server/conf/example/instance.properties

次のパラメーターを設定して、Canal が MySQL インスタンスとターゲットトピックを指すようにします:

# MySQL 接続
canal.instance.master.address = <your-mysql-endpoint>:3306
canal.instance.dbUsername = <your-mysql-username>
canal.instance.dbPassword = <your-mysql-password>

# テーブルフィルター -- canal スキーマ内のすべてのテーブルをリッスン
canal.instance.filter.regex = canal\\..*

# ApsaraMQ for RocketMQ インスタンスのターゲットトピック
canal.mq.topic = canal_topic

次のプレースホルダーを実際の値に置き換えます:

プレースホルダー説明
<your-mysql-endpoint>MySQL インスタンスのエンドポイントrm-uf62****.rwlb.rds.aliyuncs.com
<your-mysql-username>MySQL ユーザー名canal
<your-mysql-password>MySQL パスワード--

2.5 Canal Deployer の起動

/usr/local/canal-server/bin/startup.sh

2.6 Canal が正常に起動したことの確認

  1. Canal サーバーのログを確認します:正常に起動すると、以下のような出力が生成されます:

        sudo cat /usr/local/canal-server/logs/canal/canal.log
        2024-07-15 17:24:12.154 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler
        2024-07-15 17:24:12.202 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations
        2024-07-15 17:24:12.497 [main] INFO  c.a.o.c.c.rocketmq.producer.CanalRocketMQProducer - ##Start RocketMQ producer##
        2024-07-15 17:24:12.799 [main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server.
        2024-07-15 17:24:12.984 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[172.17.XX.XX:11111]
        2024-07-15 17:24:16.208 [main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......
  2. Canal インスタンスのログを確認します:start successful メッセージを探します:

        sudo cat /usr/local/canal-server/logs/example/example.log
        2024-07-15 18:22:15.667 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example
        2024-07-15 18:22:15.699 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^canal\..*$
        2024-07-15 18:22:15.699 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : ^mysql\.slave_.*$
        2024-07-15 18:22:16.030 [main] INFO  c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....

手順3:パイプラインの検証

3.1 MySQL への行の挿入

手順1.3 で作成した students テーブルに行を挿入するために、次の SQL ステートメントを実行します:

INSERT INTO students (name, age, gender) VALUES ('Tome', 18, 'male');

3.2 ApsaraMQ for RocketMQ コンソールでのメッセージの確認

  1. ApsaraMQ for RocketMQ コンソールにログインします。

  2. 左側のナビゲーションウィンドウで、[インスタンス] をクリックします。

  3. 手順2.3 で設定したインスタンスの名前をクリックします。

  4. 左側のナビゲーションウィンドウで、[メッセージクエリ] をクリックします。

  5. Canal によって送信されたメッセージを見つけ、INSERT 文からの変更データが含まれていることを確認します。

Message query result

メッセージが表示されれば、CDC パイプラインは正常に機能しています。Canal は、canal\\..* フィルターに一致するテーブルに対する INSERTUPDATE、または DELETE 操作をキャプチャし続け、設定されたトピックに配信します。

設定リファレンス

次の表は、canal.properties 内のすべての ApsaraMQ for RocketMQ 関連パラメーターをリストしています。

パラメーター説明必須
canal.serverMode配信先。rocketMQ に設定します。はい
canal.aliyun.accessKeyApsaraMQ for RocketMQ インスタンスのユーザー名。はい
canal.aliyun.secretKeyApsaraMQ for RocketMQ インスタンスのパスワード。はい
canal.mq.accessChannelアクセス方法。Alibaba Cloud インスタンスの場合は cloud に設定します。はい
canal.mq.flatMessageメッセージのシリアル化フォーマット。ApsaraMQ for RocketMQ の場合は false にする必要があります。はい
rocketmq.namesrv.addrApsaraMQ for RocketMQ エンドポイント。はい
rocketmq.producer.groupApsaraMQ for RocketMQ インスタンスのプロデューサーグループ名。はい
rocketmq.namespaceインスタンスの名前空間。5.x インスタンスの場合は空白のままにします。いいえ
rocketmq.enable.message.traceメッセージトレースを有効にします。いいえ
rocketmq.customized.trace.topicメッセージトレースを保存するためのトピック。いいえ
rocketmq.retry.times.when.send.failed失敗時の送信リトライ回数。いいえ
rocketmq.vip.channel.enabledVIP Netty チャネルを使用します。いいえ
rocketmq.tagメッセージタグ。いいえ

次のステップ

  • CDC メッセージフローをモニターするには、rocketmq.enable.message.tracetrue に設定してメッセージトレースを有効にします。

  • 追加のデータベースまたはテーブルからの変更をキャプチャするには、instance.propertiescanal.instance.filter.regex パラメーターを目的の正規表現パターンで更新します。

  • Canal の設定と機能の詳細については、「Canal ドキュメント」をご参照ください。