canal は、Alibaba Group が提供するオープンソースツールであり、MySQL のバイナリログを解析し、増分変更をダウンストリームのコンシューマーにストリーミングします。本ガイドでは、Elastic Compute Service (ECS) インスタンス上に canal サーバーおよび canal アダプターをデプロイし、ApsaraDB RDS for MySQL の増分データをリアルタイムで Alibaba Cloud Elasticsearch クラスターへ同期する手順について説明します。
仕組み
canal は、ApsaraDB RDS for MySQL インスタンスのセカンダリノードとして動作し、そのバイナリログをサブスクライブします。MySQL で行が挿入、更新、または削除されると、canal は各変更イベントをキャプチャし、RESTful API 経由で Elasticsearch へ書き込みます。
前提条件
開始する前に、以下のリソースが用意されていることを確認してください。
-
ApsaraDB RDS for MySQL インスタンス。詳細については、「ApsaraDB RDS for MySQL インスタンスの作成」をご参照ください。本ガイドでは MySQL 5.7 インスタンスを使用します。
-
Alibaba Cloud Elasticsearch クラスター。次を参照してください。Alibaba Cloud Elasticsearch クラスターの作成。このガイドでは、Elasticsearch V6.7 クラスター(カーネル強化エディション)を使用します。
-
CentOS 7.6(64 ビット)を実行中の ECS インスタンス(canal サーバーおよび canal アダプターのホストとして使用)。詳細については、「ウィザードを使用したインスタンスの作成」をご参照ください。
-
上記 3 つのリソースが同一の VPC 内にあること。
-
ECS インスタンスの IP アドレスを Elasticsearch クラスターの IP アドレスホワイトリストに追加します。 詳細については、「Elasticsearch クラスターのパブリックまたは非公開 IP アドレスホワイトリストを設定する」をご参照ください。
制限事項
-
canal は 増分データのみ を同期します。フルテーブルエクスポートはサポートされていません。
-
canal アダプターは、HTTPS 経由で Elasticsearch クラスターに接続できません。
-
Elasticsearch インデックスのマッピングにおけるフィールド名およびデータの型は、対応する MySQL テーブルのカラムと完全に一致している必要があります。
-
Java 開発キット(JDK)のバージョンは 1.8.0 以降である必要があります。
-
canal と Elasticsearch のバージョン互換性:
説明 canal 1.1.4 を使用して Elasticsearch V7.X クラスターへデータを同期することはできません。Elasticsearch V7.x クラスターには canal 1.1.5、Elasticsearch V8.x クラスターには canal 1.1.7 を使用してください。本ガイドでは、Elasticsearch V6.7 クラスターと canal 1.1.4 を併用します。canal バージョン 対応する Elasticsearch バージョン 1.1.4 6.x 1.1.5 7.x 1.1.7 8.x
ステップ 1:MySQL データソースの準備
ApsaraDB RDS コンソールにログインし、データベースおよびテーブルを作成します。本ガイドでは、以下のテーブルスキーマを使用します。
CREATE TABLE `es_test` (
`id` bigint(32) NOT NULL,
`name` text NOT NULL,
`count` text NOT NULL,
`color` text NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB
DEFAULT CHARACTER SET=utf8;
一般的な手順については、「ApsaraDB RDS for MySQL の一般的なワークフロー」をご参照ください。
ステップ 2:マッピングを含む Elasticsearch インデックスの作成
インデックスのマッピングにおけるフィールド名およびデータの型は、MySQL テーブルと完全に一致させる必要があります。
-
Alibaba Cloud Elasticsearch クラスターの Kibana コンソールにログインします。詳細については、「Kibana コンソールへのログイン」をご参照ください。
説明 本ガイドでは Elasticsearch V6.7.0 を使用します。他のバージョンでは操作内容が異なる場合があります。 -
左側のナビゲーションウィンドウで、Dev Tools をクリックします。
-
Console タブで、以下のコマンドを実行して
es_testインデックスを作成します。PUT es_test?include_type_name=true { "settings": { "index": { "number_of_shards": "5", "number_of_replicas": "1" } }, "mappings": { "_doc": { "properties": { "count": { "type": "text" }, "id": { "type": "integer" }, "name": { "type": "text", "analyzer": "ik_smart" }, "color": { "type": "text" } } } } }成功時の応答例:
{ "acknowledged": true, "shards_acknowledged": true, "index": "es_test" }
ステップ 3:JDK のインストール
-
ECS インスタンスに接続します。詳細については、「パスワードまたはキーを使用した Linux インスタンスへの接続」をご参照ください。
説明 本例では、一般ユーザを使用します。 -
利用可能な JDK パッケージを検索します。
sudo yum search java | grep -i --color JDK -
JDK をインストールします。本ガイドでは
java-1.8.0-openjdk-devel.x86_64を使用します。sudo yum install java-1.8.0-openjdk-devel.x86_64 -
環境変数を構成します。
-
~/.bash_profileを開きます。vim ~/.bash_profile -
以下の行を追加します。
JAVA_HOMEは、実際の JDK インストールパスに置き換えてください。「find / -name 'java'」を実行してパスを特定できます。export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.362.b08-1.el7_9.x86_64 export CLASSPATH=.:$JAVA_HOME/jre/lib/rt.jar:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar export PATH=$PATH:$JAVA_HOME/bin -
Esc キーを押下し、
:wqを入力して保存後、設定を適用します。source ~/.bash_profile
-
-
インストールを確認します。
java -version期待される出力:
openjdk version "1.8.0_362" OpenJDK Runtime Environment (build 1.8.0_362-b08) OpenJDK 64-Bit Server VM (build 25.362-b08, mixed mode)
ステップ 4:canal サーバーのインストールおよび起動
-
canal サーバー 1.1.4 をダウンロードします。
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gzその他のバージョンについては、「canal のリリース」をご参照ください。
-
パッケージを解凍します。
tar -zxvf canal.deployer-1.1.4.tar.gz -
conf/example/instance.propertiesを編集します。パラメーター 説明 例 canal.instance.master.addressApsaraDB RDS for MySQL インスタンスの内部エンドポイントおよびポート。RDS インスタンスの 基本情報 ページから取得します。 rm-bp1u1xxxxxxxxx6ph.mysql.rds.aliyuncs.com:3306canal.instance.dbUsernameデータベースのユーザー名。RDS インスタンスの アカウント ページから取得します。 canal_usercanal.instance.dbPasswordデータベースのパスワード。 <your-password>vi conf/example/instance.properties以下のパラメーターを設定します。

-
Esc キーを押下し、
:wqを入力して保存します。 -
canal サーバーを起動し、ログを確認します。
./bin/startup.sh cat logs/canal/canal.log
ステップ 5:canal アダプターのインストールおよび起動
-
canal アダプター 1.1.4 をダウンロードします。
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.adapter-1.1.4.tar.gz -
パッケージを解凍します。
tar -zxvf canal.adapter-1.1.4.tar.gz -
conf/application.ymlを編集します。パラメーター 説明 例 canal.conf.canalServerHostcanal サーバーのアドレス。デフォルト値のままにしてください。 127.0.0.1:11111canal.conf.srcDataSources.defaultDS.urlApsaraDB RDS for MySQL インスタンスの JDBC 接続 URL。形式は jdbc:mysql://<endpoint>:<port>/<database>?useUnicode=trueです。エンドポイントおよびポートは、基本情報 ページから取得します。jdbc:mysql://rm-bp1xxxxxxxxxnd6ph.mysql.rds.aliyuncs.com:3306/elasticsearch?useUnicode=truecanal.conf.srcDataSources.defaultDS.usernameデータベースユーザー名。これは、[アカウント]ページから取得します。 canal_usercanal.conf.srcDataSources.defaultDS.passwordデータベースのパスワード。 <your-password>canal.conf.canalAdapters.groups.outerAdapters.hostsname: es(`name: es` の下)Elasticsearch クラスターの内部エンドポイントおよびポート。クラスターの 基本情報 ページから取得します。 es-cn-v64xxxxxxxxx3medp.elasticsearch.aliyuncs.com:9200canal.conf.canalAdapters.groups.outerAdapters.moderestトランスポートモード。`rest` に設定します。restcanal.conf.canalAdapters.groups.outerAdapters.properties.security.authElasticsearch の認証情報( <username>:<password>形式)。elastic:es_passwordcanal.conf.canalAdapters.groups.outerAdapters.properties.cluster.nameElasticsearch クラスター ID。クラスターの 基本情報 ページから取得します。 es-cn-v64xxxxxxxxx3medpvi conf/application.yml以下のパラメーターを設定します。

-
Esc キーを押下し、
:wqを入力して保存します。 -
MySQL カラムと Elasticsearch フィールドのマッピングを定義するため、フィールドマッピングファイル
conf/es/*.ymlを編集します。パラメーター 説明 本ガイドで使用する値 esMapping._indexステップ 2 で作成した Elasticsearch インデックスの名前。 es_testesMapping._typeElasticsearch インデックスのタイプ。 _docesMapping._idドキュメント ID フィールド。 _idesMapping.sql同期対象のカラムを選択する SQL クエリ。 select t.id as _id, t.id, t.count, t.name, t.color from es_test t
-
canal アダプターを起動し、ログを確認します。
説明 canal アダプター 1.1.4 には MySQL ドライバー 5.1.40 がバンドルされています。ApsaraDB RDS インスタンスが MySQL 8.x を実行している場合、アダプターの起動に失敗します。詳細については、トラブルシューティングセクションの「MySQL 8.x 向け MySQL ドライバーの置き換え」をご参照ください。./bin/startup.sh cat logs/adapter/adapter.log
ステップ 6:同期の確認
-
ApsaraDB RDS for MySQL データベースで、
es_testテーブルに 1 行を挿入します。INSERT INTO `ES`.`es_test` (`count`, `id`, `name`, `color`) VALUES ('11', 2, 'canal_test2', 'red');UPDATE文またはDELETE文でもテスト可能です。 -
Alibaba Cloud Elasticsearch クラスターの Kibana コンソールにログインします。詳細については、「Kibana コンソールへのログイン」をご参照ください。
-
左側のナビゲーションウィンドウで、Dev Tools をクリックします。
-
Console タブで、以下のクエリを実行します。
重要canal は増分変更のみを同期します。
GET /es_test/_search同期が正常に機能している場合、応答には挿入した行が含まれます。

トラブルシューティング
以下の例では、root ユーザを使用しています。
canal アダプター 1.1.5 の起動時に ClassCastException が発生する
エラー:
java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassCastException:
com.alibaba.druid.pool.DruidDataSource cannot be cast to
com.alibaba.druid.pool.DruidDataSource
at com.alibaba.otter.canal.client.adapter.es7x.ES7xAdapter.init(ES7xAdapter.java:54)
~[client-adapter.es7x-1.1.5-jar-with-dependencies.jar:na]
client-adapter.es7x-1.1.5-jar-with-dependencies.jar を、canal.adapter-1.1.5\plugin ディレクトリ内から、canal-1.1.5-alpha-2 リリース版のものに置き換えます。背景情報については、「canal issue #3534」をご参照ください。
-
canal-1.1.5-alpha-2 アダプターをダウンロードします。
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5-alpha-2/canal.adapter-1.1.5-SNAPSHOT.tar.gz -
パッケージを解凍します。
tar -zxvf canal.adapter-1.1.5-SNAPSHOT.tar.gz -
修正済みプラグインファイルをアダプターの plugin ディレクトリにコピーします。
説明 ソースパスは、パッケージを解凍した場所によって異なります。cp canal.adapter-1.1.5-SNAPSHOT/plugin/client-adapter.es7x-1.1.5-SNAPSHOT-jar-with-dependencies.jar canal/canal.adapter/plugin -
元のプラグインファイルを削除します。
rm -rf client-adapter.es7x-1.1.5-jar-with-dependencies.jar -
新しいファイルを、期待されるファイル名にリネームします。
mv client-adapter.es7x-1.1.5-SNAPSHOT-jar-with-dependencies.jar client-adapter.es7x-1.1.5-jar-with-dependencies.jar
MySQL 8.x 向け MySQL ドライバーの置き換え
canal アダプター 1.1.4 には MySQL ドライバー 5.1.40 がバンドルされていますが、これは MySQL 8.x と互換性がありません。アダプター起動時に Unknown system variable 'query_cache_size' というエラーが表示された場合は、MySQL 8.0 コネクタでドライバーを置き換えてください。
-
MySQL 8.0 コネクタをダウンロードします。
wget https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-8.0.29.zip -
パッケージを解凍します。
unzip mysql-connector-java-8.0.29.zip -
コネクタ JAR をアダプターの lib ディレクトリにコピーします。
mv mysql-connector-java-8.0.29/mysql-connector-java-8.0.29.jar lib/ -
必要な権限を設定します。
chmod 777 lib/mysql-connector-java-8.0.29.jar chmod +st lib/mysql-connector-java-8.0.29.jar -
古い MySQL 5.x ドライバーを削除します。
rm -rf lib/mysql-connector-java-5.1.40.jar