すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for RocketMQ:Canal を使用して MySQL の変更データを ApsaraMQ for RocketMQ にストリーミングする

最終更新日:Mar 11, 2026

Canal は MySQL のバイナリログから行レベルの変更をキャプチャし、メッセージとして ApsaraMQ for RocketMQ に配信します。このチュートリアルでは、MySQL のバイナリロギングの設定、Canal Deployer のデプロイ、変更イベントが RocketMQ トピックに到達することの確認まで、セットアップの全手順を説明します。

仕組み

変更データキャプチャ (CDC) は、データベース内の INSERT、UPDATE、DELETE 操作を追跡し、ダウンストリームシステムにストリーミングします。Canal は、MySQL レプリカとして動作し、バイナリログイベントをリアルタイムで受信するオープンソースの CDC ツールです。ApsaraMQ for RocketMQ と組み合わせることで、Canal はデータベースの変更がダウンストリームの処理を自動的にトリガーするイベント駆動型アーキテクチャを実現します。

CDC solution architecture

Canal はレプリカとして MySQL に接続し、バイナリログイベントを受信して ApsaraMQ for RocketMQ に転送します。その後、コンシューマーは RocketMQ トピックをサブスクライブして変更イベントを処理します。

一般的なユースケース

  • データベースミラーリング:読み取り専用レプリカまたはセカンダリデータストアの同期を維持します。

  • リアルタイムバックアップ:ディザスタリカバリのために、変更を永続的なメッセージキューにストリーミングします。

  • インデックスメンテナンス:データの変更に応じて、検索インデックス、転置インデックス、またはシャーディングされた異種インデックスを再構築します。

  • キャッシュの無効化:基になるデータが更新されたときに、ビジネスキャッシュを更新します。

  • イベント駆動型処理:増分データ変更に基づいてビジネスロジックをトリガーします。

前提条件

開始する前に、以下が準備できていることを確認してください。

  • ApsaraMQ for RocketMQ インスタンスが [実行中] の状態であること。セットアップ手順については、「インスタンスの作成」をご参照ください。

  • MySQL インスタンスが [実行中] の状態であること。このチュートリアルでは ApsaraDB RDS for MySQL を使用します。セットアップ手順については、「ApsaraDB RDS for MySQL インスタンスの作成」をご参照ください。

  • Canal Deployer を実行するための Linux サーバー。このチュートリアルでは Elastic Compute Service (ECS) インスタンスを使用します。セットアップ手順については、「ECS インスタンスの作成と管理」をご参照ください。

  • Canal サーバー、MySQL インスタンス、ApsaraMQ for RocketMQ インスタンス間のネットワーク接続が確保されていること。通常、Virtual Private Cloud (VPC) 内の ECS インスタンスとコンテナはこの要件を満たします。

  • 認証用の AccessKey ペア。手順については、「AccessKey ペアの作成」をご参照ください。

バージョンの互換性

サービス使用バージョンサポートされているバージョン
Canal1.1.6他のバージョンについては、Canal のリリースをご参照ください。
MySQL8.05.1.x、5.5.x、5.6.x、5.7.x、および 8.0.x
ApsaraMQ for RocketMQ5.x4.x および 5.x (5.x を推奨)
説明

ApsaraMQ for RocketMQ 5.x インスタンスについては、「ApsaraMQ for RocketMQ 5.x への変更データの同期」をご参照ください。

ステップ 1: MySQL の設定

バイナリロギングの有効化

ApsaraDB RDS for MySQL

バイナリロギングはデフォルトで有効になっており、ご利用の Alibaba Cloud アカウントには必要なダンプ権限が自動的に付与されています。テストデータベースの作成 に進んでください。

セルフマネージド MySQL

  1. MySQL の設定ファイル (my.cnf) を編集して、ROW フォーマットのバイナリロギングを有効にします。

        [mysqld]
        log-bin=mysql-bin   # バイナリロギングを有効化
        binlog-format=ROW   # Canal に必須のフォーマット
        server_id=1         # Canal のセカンダリノード ID とは異なる ID にする必要があります
  2. Canal 専用の MySQL ユーザーを作成し、レプリケーション権限を付与します。

        CREATE USER canal IDENTIFIED BY 'canal';
        GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
        FLUSH PRIVILEGES;

テストデータベースの作成

CREATE DATABASE canal DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci;

テストテーブルの作成

canal データベースに students テーブルを作成します。

CREATE TABLE students (
    id INT AUTO_INCREMENT,
    name VARCHAR(100) NOT NULL,
    age INT,
    gender VARCHAR(10),
    PRIMARY KEY (id)
);

ステップ 2: Canal Deployer のデプロイ

JDK のインストール

sudo yum install java-1.8.0-openjdk

Canal Deployer のダウンロードと展開

# Canal Deployer 1.1.6 をダウンロード
sudo wget https://github.com/alibaba/canal/releases/download/canal-1.1.6/canal.deployer-1.1.6.tar.gz

# インストールディレクトリを作成して展開
sudo mkdir -p /usr/local/canal-server
sudo tar -zxvf canal.deployer-1.1.6.tar.gz -C /usr/local/canal-server

RocketMQ 向け Canal の設定

Canal の動作は 2 つの設定ファイルで制御されます:グローバル設定用の canal.properties と、インスタンスごとの設定用の instance.properties です。

グローバル設定 (canal.properties)

/usr/local/canal-server/conf/canal.properties を編集します。

sudo vi /usr/local/canal-server/conf/canal.properties

以下のパラメーターを設定します。

# 出力先:変更イベントを RocketMQ に送信
canal.serverMode = rocketMQ

# Alibaba Cloud 認証
canal.aliyun.accessKey = <your-access-key-id>
canal.aliyun.secretKey = <your-access-key-secret>

# アクセスチャネル:Alibaba Cloud マネージドの RocketMQ の場合は "cloud" に設定
canal.mq.accessChannel = cloud

# メッセージフォーマット:ApsaraMQ for RocketMQ の場合は false にする必要があります
canal.mq.flatMessage = false

# RocketMQ プロデューサー設定
rocketmq.producer.group = canal_test
rocketmq.enable.message.trace = false
rocketmq.customized.trace.topic =
rocketmq.namespace =
rocketmq.namesrv.addr = <your-rocketmq-endpoint>
rocketmq.retry.times.when.send.failed = 0
rocketmq.vip.channel.enabled = false
rocketmq.tag =

以下のプレースホルダーを実際の値に置き換えてください。

プレースホルダー説明
<your-access-key-id>ご利用の Alibaba Cloud アカウントの AccessKey IDLTAI5tXxx
<your-access-key-secret>ご利用の Alibaba Cloud アカウントの AccessKey SecretxXxXxXx
<your-rocketmq-endpoint>ApsaraMQ for RocketMQ コンソールの [インスタンス詳細] ページにあるエンドポイントrmq-cn-xxx.cn-hangzhou.rmq.aliyuncs.com:8080
重要

本番環境の設定ファイルに AccessKey の認証情報をプレーンテキストで保存しないでください。より安全な認証のために、環境変数を使用するか、RAM ロールを ECS インスタンスにアタッチしてください。

主要なパラメーターの詳細

パラメーター説明
canal.serverMode出力先のタイプ。ApsaraMQ for RocketMQ の場合は rocketMQ に設定します。
canal.mq.accessChannelAlibaba Cloud マネージドの RocketMQ を使用する場合は cloud に設定します。
canal.mq.flatMessage必ず false に設定する必要があります。ApsaraMQ for RocketMQ は、複数の変更レコードを単一のフラットメッセージにカプセル化することをサポートしていません。コンシューマーはメッセージ本文を逆シリアル化する必要があります。Java の場合は、com.alibaba.otter.canal.client.CanalMessageDeserializer#deserializer(byte[]) を使用します。
rocketmq.namespaceApsaraMQ for RocketMQ 5.x インスタンスの場合は空白のままにします。
rocketmq.namesrv.addrご利用の ApsaraMQ for RocketMQ インスタンスのエンドポイント ([インスタンス詳細] ページで確認できます)。

インスタンス設定 (instance.properties)

/usr/local/canal-server/conf/example/instance.properties を編集します。

sudo vi /usr/local/canal-server/conf/example/instance.properties

以下のパラメーターを設定します。

# MySQL 接続
canal.instance.master.address = <your-mysql-endpoint>:3306
canal.instance.dbUsername = <your-db-username>
canal.instance.dbPassword = <your-db-password>

# テーブルフィルター:"canal" スキーマ内のすべてのテーブルをキャプチャ
canal.instance.filter.regex = canal\\..*

# ターゲット RocketMQ トピック
canal.mq.topic = canal_topic

以下のプレースホルダーを実際の値に置き換えてください。

プレースホルダー説明
<your-mysql-endpoint>ご利用の MySQL インスタンスのエンドポイントrm-uf62****.rwlb.rds.aliyuncs.com
<your-db-username>MySQL ユーザー名canal
<your-db-password>MySQL パスワードcanal

主要なパラメーターの詳細

パラメーター説明
canal.instance.master.addressポート付きの MySQL エンドポイント。MySQL のデフォルトポートは 3306 です。
canal.instance.filter.regexCanal がモニターするテーブルを制御する正規表現。canal\\..*canal スキーマ内のすべてのテーブルをキャプチャします。
canal.mq.topic変更イベントを受信する RocketMQ トピック。このトピックは、ご利用の ApsaraMQ for RocketMQ インスタンスで事前に作成しておく必要があります。

Canal Deployer の起動

/usr/local/canal-server/bin/startup.sh

起動の確認

Canal サーバーログをチェックして、正常に起動したことを確認します。

sudo cat /usr/local/canal-server/logs/canal/canal.log

起動に成功すると、以下のような出力が生成されます。

2024-07-15 17:24:12.154 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler
2024-07-15 17:24:12.202 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations
2024-07-15 17:24:12.497 [main] INFO  c.a.o.c.c.rocketmq.producer.CanalRocketMQProducer - ##Start RocketMQ producer##
2024-07-15 17:24:12.799 [main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server.
2024-07-15 17:24:12.984 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[172.17.XX.XX:11111]
2024-07-15 17:24:16.208 [main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......

最後の the canal server is running now という行は、Canal が正常に起動し、RocketMQ に接続されたことを示します。

次に、インスタンスログをチェックして、Canal が MySQL に接続し、モニタリングを開始したことを確認します。

sudo cat /usr/local/canal-server/logs/example/example.log

接続が成功したことを示す以下の主要な兆候を探してください。

2024-07-15 18:22:15.667 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example
2024-07-15 18:22:15.699 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^canal\..*$
2024-07-15 18:22:15.699 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : ^mysql\.slave_.*$
説明

テーブルフィルターの初期化に関する WARN エントリは想定内のものであり、エラーを示すものではありません。これらは、Canal が canal.instance.filter.regex の設定を正しく読み込んだことを確認するものです。

インスタンスログにエラーが表示される場合は、以下の一般的な原因を確認してください。

症状考えられる原因解決策
接続拒否またはタイムアウト不正な MySQL エンドポイントまたはファイアウォールルールcanal.instance.master.address を確認し、Canal サーバーが MySQL ポート (デフォルトは 3306) に到達できることを確認してください。
アクセス拒否不正な MySQL 認証情報または権限不足canal.instance.dbUsernamecanal.instance.dbPassword を確認してください。ユーザーに SELECT、REPLICATION SLAVE、REPLICATION CLIENT 権限があることを確認してください。
RocketMQ プロデューサーの起動失敗不正な RocketMQ エンドポイントまたは AccessKeyrocketmq.namesrv.addrcanal.aliyun.accessKey、および canal.aliyun.secretKey

ステップ 3: 結果の確認

テストデータの挿入

students テーブルに行を挿入して、変更イベントをトリガーします。

INSERT INTO `students` (`name`, `age`, `gender`) VALUES ('Tome', 18, 'male');

ApsaraMQ for RocketMQ でのメッセージの確認

  1. ApsaraMQ for RocketMQ コンソールにログインします。

  2. 左側のナビゲーションウィンドウで、[インスタンス] をクリックします。

  3. [インスタンス] ページで、設定したインスタンスの名前をクリックします。

  4. [インスタンス詳細] ページの左側のナビゲーションウィンドウで、[メッセージクエリ] をクリックします。

  5. クエリ結果に Canal からのメッセージが表示されることを確認します。

Message query result

メッセージが表示されない場合は、Canal がバイナリログイベントを処理するまで数秒待ってから、クエリを再試行してください。30 秒経ってもメッセージが表示されない場合は、/usr/local/canal-server/logs/example/example.log にある Canal インスタンスログでエラーを確認してください。

設定リファレンス

canal.properties パラメーター

パラメーター必須デフォルト説明
canal.serverModeはいtcp出力先。ApsaraMQ for RocketMQ の場合は rocketMQ に設定します。
canal.aliyun.accessKeyはい--Alibaba Cloud 認証用の AccessKey ID。
canal.aliyun.secretKeyはい--Alibaba Cloud 認証用の AccessKey Secret。
canal.mq.accessChannelはいlocalAlibaba Cloud マネージドの RocketMQ の場合は cloud に設定します。
canal.mq.flatMessageはいfalseApsaraMQ for RocketMQ の場合は false にする必要があります。
rocketmq.producer.groupはい--RocketMQ インスタンス上のプロデューサーグループ名。
rocketmq.namesrv.addrはい--RocketMQ インスタンスのエンドポイント。
rocketmq.namespaceいいえ(空)5.x インスタンスの場合は空白のままにします。
rocketmq.enable.message.traceいいえfalseメッセージトレースを有効または無効にします。
rocketmq.customized.trace.topicいいえ(空)メッセージトレースを保存するためのカスタムトピック。
rocketmq.retry.times.when.send.failedいいえ0送信失敗時のリトライ回数。
rocketmq.vip.channel.enabledいいえfalseVIP Netty チャネルを有効または無効にします。
rocketmq.tagいいえ(空)すべてのメッセージに適用されるタグ。

instance.properties パラメーター

パラメーター必須説明
canal.instance.master.addressはいhost:port 形式の MySQL エンドポイント。
canal.instance.dbUsernameはいレプリケーション権限を持つ MySQL ユーザー名。
canal.instance.dbPasswordはいMySQL パスワード。
canal.instance.filter.regexはいモニター対象のテーブルをフィルタリングするための正規表現。
canal.mq.topicはい変更イベントのターゲットとなる RocketMQ トピック。

次のステップ

  • 異なるテーブルを異なる RocketMQ トピックにルーティングするには、canal.mq.dynamicTopic を使用して動的トピックルーティングを設定します。詳細と例については、Canal のオープンソースドキュメントをご参照ください。

  • ApsaraMQ for RocketMQ 5.x インスタンスと統合するには、「ApsaraMQ for RocketMQ 5.x への変更データの同期」をご参照ください。

  • 本番環境へのデプロイでは、高可用性を実現するために複数のノードに Canal をデプロイし、一時的なネットワーク障害に対応するために rocketmq.retry.times.when.send.failed を調整することを検討してください。