MySQL データベースから Alibaba Cloud Elasticsearch クラスターに増分データを同期し、リアルタイムのパフォーマンスに高い要件がある場合は、Canal を使用してデータを同期できます。
背景情報
Canal は、Alibaba グループによって提供されるオープンソース製品です。 Canal は MySQL の増分ログデータを解析し、増分データのサブスクライブと消費を可能にします。 Canal の動作原理と概要については、Canal をご参照ください。この例では、Canal は ApsaraDB RDS for MySQL インスタンスのセカンダリノードとして使用され、インスタンスの増分データに対して生成されたバイナリログを受信します。次に、RESTful API を使用して、Alibaba Cloud Elasticsearch クラスターにデータを書き込みます。このトピックで説明されているソリューションは、データ同期のリアルタイムパフォーマンスに高い要件を持つユーザーのシナリオに適しています。
前提条件
ApsaraDB RDS for MySQL インスタンス、Alibaba Cloud Elasticsearch クラスター、および Elastic Compute Service (ECS) インスタンスが作成されています。同じ仮想プライベートクラウド (VPC) に作成することをお勧めします。
ApsaraDB RDS for MySQL インスタンスの作成方法については、ApsaraDB RDS for MySQL インスタンスの作成をご参照ください。この例では、MySQL 5.7 を実行する ApsaraDB RDS インスタンスが作成されます。
Alibaba Cloud Elasticsearch クラスターの作成方法については、Alibaba Cloud Elasticsearch クラスターの作成をご参照ください。この例では、カーネル強化版の Elasticsearch V6.7 クラスターが作成されます。
説明Canal を使用して Elasticsearch クラスターにデータを書き込む前に、Canal がデプロイされている ECS インスタンスの IP アドレスを Elasticsearch クラスターの IP アドレスホワイトリストに追加する必要があります。詳細については、Elasticsearch クラスターのパブリックまたはプライベート IP アドレスホワイトリストの設定をご参照ください。
ECS インスタンスは、Canal サーバーと Canal アダプターをデプロイするために使用されます。 ECS インスタンスの作成方法については、ウィザードを使用してインスタンスを作成するをご参照ください。この例では、[イメージ] [centos 7.6 64 ビット] を実行する ECS インスタンスが作成されます。
制限事項
このトピックで説明されているソリューションは、MySQL の増分データのみを Alibaba Cloud Elasticsearch クラスターに同期するために使用できます。
インストールする Java Development Kit (JDK) のバージョンは 1.8.0 以降である必要があります。
Canal 1.1.4 を使用して Elasticsearch V7.X クラスターにデータを同期することはできません。
Elasticsearch V7.X クラスターにデータを同期するには Canal 1.1.5 を使用し、Elasticsearch V8.X クラスターにデータを同期するには Canal 1.1.7 を使用する必要があります。 Logstash や Data Transmission Service (DTS) などの方法を使用して MySQL データを同期することもできます。
データを同期するための設定を行う場合は、必要なインデックスのマッピングをカスタマイズできます。インデックスのマッピングで定義されているフィールドの名前とデータ型が、目的の ApsaraDB RDS for MySQL データベースのフィールドの名前とデータ型と同じであることを確認する必要があります。
このトピックで説明されているソリューションを使用してデータを同期する場合は、Canal が使用可能であることを確認する必要があります。そうでない場合、障害が発生したり、サービスが中断されたりする可能性があります。たとえば、ECS インスタンスの再起動や例外による Canal の終了などのシナリオで、データ同期が中断されないようにする必要があります。
Canal アダプターは HTTPS 経由で Alibaba Cloud Elasticsearch クラスターに接続できません。
手順
手順 1: MySQL データソースを準備する
ApsaraDB RDS コンソールにログインし、ApsaraDB RDS for MySQL データベースとテーブルを作成します。詳細については、ApsaraDB RDS for MySQL を使用する一般的なワークフローをご参照ください。この例では、次のステートメントを使用してテーブルを作成します。
-- create table テーブルを作成する
CREATE TABLE `es_test` (
`id` bigint(32) NOT NULL,
`name` text NOT NULL,
`count` text NOT NULL,
`color` text NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB
DEFAULT CHARACTER SET=utf8;手順 2: インデックスを作成し、インデックスのマッピングを設定する
Elasticsearch クラスターの Kibana コンソールにログインします。詳細については、Kibana コンソールにログインするをご参照ください。
説明この例では、Elasticsearch V6.7.0 クラスターを使用しています。他のバージョンのクラスターでの操作は異なる場合があります。
表示されるページの左側のナビゲーションペインで、[dev Tools] をクリックします。
[console] タブで、次のコマンドを実行してインデックスを作成します。
この例では、es_test という名前のインデックスが作成されます。インデックスには、count、id、name、color の各フィールドが含まれています。
重要インデックスのマッピングで定義されているフィールドの名前とデータ型が、手順 1: MySQL データソースを準備するで準備したフィールドの名前とデータ型と同じであることを確認する必要があります。
PUT es_test?include_type_name=true { "settings" : { "index" : { "number_of_shards" : "5", "number_of_replicas" : "1" } }, "mappings" : { "_doc" : { "properties" : { "count": { // count フィールド "type": "text" // タイプを text に設定 }, "id": { // id フィールド "type": "integer" // タイプを integer に設定 }, "name": { // name フィールド "type" : "text", // タイプを text に設定 "analyzer": "ik_smart" // ik_smart アナライザーを使用 }, "color" : { // color フィールド "type" : "text" // タイプを text に設定 } } } } }インデックスが作成され、マッピングが設定されると、次の結果が返されます。
{ "acknowledged" : true, "shards_acknowledged" : true, "index" : "es_test" }
手順 3: JDK をインストールする
ECS インスタンスに接続します。
詳細については、パスワードまたはキーを使用して Linux インスタンスに接続するをご参照ください。
説明この例では、通常のユーザーを使用しています。
使用可能な JDK パッケージを表示します。
sudo yum search java | grep -i --color JDK必要なバージョンの JDK をインストールします。
この例では、[java-1.8.0-openjdk-devel.x86_64] を使用しています。
sudo yum install java-1.8.0-openjdk-devel.x86_64環境変数を設定します。
etc フォルダーにある profile ファイルを開きます。
vim ~/.bash_profile次の環境変数をファイルに追加します。
export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.362.b08-1.el7_9.x86_64 export CLASSPATH=.:$JAVA_HOME/jre/lib/rt.jar:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar export PATH=$PATH:$JAVA_HOME/bin重要環境変数を使用する場合は、JAVA_HOME を JDK のインストールパスに置き換える必要があります。
find / -name 'java'コマンドを実行して、JDK のインストールパスを照会できます。[esc] を押し、
:wqコマンドを実行してファイルを保存し、vi モードを終了します。次に、次のコマンドを実行して設定を適用します。source ~/.bash_profile
次のコマンドを実行して、JDK がインストールされているかどうかを確認します。
java -version次の結果が返された場合、JDK はインストールされています。
openjdk version "1.8.0_362" OpenJDK Runtime Environment (build 1.8.0_362-b08) OpenJDK 64-Bit Server VM (build 25.362-b08, mixed mode)
手順 4: Canal サーバーをインストールして起動する
Canal サーバーパッケージをダウンロードします。
この例では、Canal 1.1.4 サーバーを使用しています。
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz説明Canal 1.1.5 は Elasticsearch V7.0 クラスターをサポートしています。 Elasticsearch V7.0 クラスターを使用している場合は、Canal 1.1.5 パッケージをダウンロードする必要があります。詳細については、Canal リリースノートをご参照ください。
Canal サーバーと Canal アダプターのパッケージはインターネット経由でダウンロードする必要があります。 ECS インスタンスがインターネットにアクセスできることを確認してください。
次のコマンドを実行してパッケージを解凍します。
tar -zxvf canal.deployer-1.1.4.tar.gz次のコマンドを実行して、conf/example/ ディレクトリにある
instance.propertiesファイルを変更します。vi conf/example/instance.properties
パラメーター
説明
canal.instance.master.address
このパラメーターは、<ApsaraDB RDS for MySQL インスタンスの内部エンドポイント>:<内部ポート> の形式の値に設定する必要があります。必要な情報は、ApsaraDB RDS for MySQL インスタンスの [基本情報] ページで取得できます。例: rm-bp1u1xxxxxxxxx6ph.mysql.rds.aliyuncs.com:3306。
canal.instance.dbUsername
ApsaraDB RDS for MySQL データベースにログインするために使用されるユーザー名。ユーザー名は、ApsaraDB RDS for MySQL インスタンスの [アカウント] ページで取得できます。
canal.instance.dbPassword
ApsaraDB RDS for MySQL データベースにログインするために使用されるパスワード。
[esc] を押し、
:wqコマンドを実行してファイルを保存し、vi モードを終了します。Canal サーバーを起動し、ログをクエリします。
./bin/startup.sh cat logs/canal/canal.log
手順 5: Canal アダプターをインストールして起動する
Canal アダプターパッケージをダウンロードします。
この例では、Canal 1.1.4 アダプターを使用しています。
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.adapter-1.1.4.tar.gz説明Canal 1.1.5 は Elasticsearch V7.0 クラスタをサポートしています。Elasticsearch V7.0 クラスタを使用している場合は、Canal 1.1.5 パッケージをダウンロードする必要があります。詳細については、Canal リリースノート をご参照ください。
Canal サーバーと Canal アダプターのパッケージはインターネット経由でダウンロードする必要があります。 ECS インスタンスがインターネットにアクセスできることを確認してください。
次のコマンドを実行してパッケージを解凍します。
tar -zxvf canal.adapter-1.1.4.tar.gz次のコマンドを実行して、conf/ ディレクトリにある
application.ymlファイルを変更します。vi conf/application.yml
パラメーター
説明
canal.conf.canalServerHost
Canal デプロイヤのアドレス。デフォルト値 127.0.0.1:11111 を保持します。
canal.conf.srcDataSources.defaultDS.url
このパラメーターは、jdbc:mysql://<ApsaraDB RDS for MySQL インスタンスの内部エンドポイント>:<内部ポート>/<データベース名>?useUnicode=true の形式の値に設定する必要があります。必要な情報は、ApsaraDB RDS for MySQL インスタンスの [基本情報] ページで取得できます。例: jdbc:mysql://rm-bp1xxxxxxxxxnd6ph.mysql.rds.aliyuncs.com:3306/elasticsearch?useUnicode=true。
canal.conf.srcDataSources.defaultDS.username
ApsaraDB RDS for MySQL データベースにログインするために使用されるユーザー名。ユーザー名は、ApsaraDB RDS for MySQL インスタンスの [アカウント] ページで取得できます。
canal.conf.srcDataSources.defaultDS.password
ApsaraDB RDS for MySQL データベースにログインするために使用されるパスワード。
canal.conf.canalAdapters.groups.outerAdapters.hosts
name:es を見つけ、hosts を <Elasticsearch クラスターの内部エンドポイント>:<内部ポート> の形式の値に設定します。必要な情報は、Elasticsearch クラスターの [基本情報] ページで取得できます。例: es-cn-v64xxxxxxxxx3medp.elasticsearch.aliyuncs.com:9200。
canal.conf.canalAdapters.groups.outerAdapters.mode
このパラメーターを rest に設定します。
canal.conf.canalAdapters.groups.outerAdapters.properties.security.auth
このパラメーターは、<Elasticsearch クラスター>:<ユーザー名に対応するパスワード> の形式の値に設定する必要があります。例: elastic:es_password。
canal.conf.canalAdapters.groups.outerAdapters.properties.cluster.name
Elasticsearch クラスター。 ID は、Elasticsearch クラスターの [基本情報] ページで取得できます。例: es-cn-v64xxxxxxxxx3medp。
[esc] を押し、
:wqコマンドを実行してファイルを保存し、vi モードを終了します。前の手順を繰り返して、conf/es/ ディレクトリにある
*.ymlファイルを変更し、ApsaraDB RDS for MySQL データベースから Elasticsearch クラスターにマップするフィールドを指定します。
パラメーター
説明
esMapping._index
手順 2: インデックスを作成し、インデックスのマッピングを設定するで Elasticsearch クラスターに作成したインデックスの名前に値を設定します。この例では、[es_test] を使用しています。
esMapping._type
手順 2: インデックスを作成し、インデックスのマッピングを設定するで Elasticsearch クラスターに作成したインデックスのタイプに値を設定します。この例では、[_doc] を使用しています。
esMapping._id
Elasticsearch クラスターに同期するフィールドに対して生成されるドキュメントの ID。カスタム ID を指定できます。この例では、[_id] を使用しています。
esMapping.sql
Elasticsearch クラスターに同期するフィールドをクエリするために使用される SQL ステートメント。この例では、
select t.id as _id,t.id,t.count,t.name,t.color from es_test tステートメントを使用しています。 // es_test テーブルからすべてのフィールドをクエリするCanal アダプターを起動し、ログをクエリします。
./bin/startup.sh cat logs/adapter/adapter.log説明この例では、MySQL 5.7 を実行する ApsaraDB RDS for MySQL インスタンスを使用しています。別のバージョンの MySQL を実行する ApsaraDB RDS for MySQL インスタンスを使用する場合は、Canal アダプターの MySQL ドライバーのバージョンが、接続する ApsaraDB RDS for MySQL インスタンスの MySQL バージョンと一致していることを確認する必要があります。そうでない場合、Canal アダプターは起動に失敗します。詳細については、このトピックの FAQ セクションをご参照ください。
次の図に示す結果が返された場合、Canal アダプターは起動されています。

手順 6: 増分データの同期結果を確認する
ApsaraDB RDS for MySQL データベースで、[es_test] テーブルにデータを追加、変更、または削除します。
insert `ES`.`es_test`(`count`,`id`,`name`,`color`) values('11',2,'canal_test2','red');Elasticsearch クラスターの Kibana コンソールにログインします。詳細については、Kibana コンソールにログインするをご参照ください。
表示されるページの左側のナビゲーションペインで、[dev Tools] をクリックします。
[console] タブで、次のコマンドを実行して同期されたデータをクエリします。
GET /es_test/_searchデータ同期が成功すると、次の図に示す結果が返されます。
重要Canal は増分データのみ同期します。
FAQ
Q: アダプターの起動時に、Canal アダプターに対して生成されたログに次のエラーメッセージが返される場合はどうすればよいですか: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassCastException: com.alibaba.druid.pool.DruidDataSource cannot be cast to com.alibaba.druid.pool.DruidDataSource
at com.alibaba.otter.canal.client.adapter.es7x.ES7xAdapter.init(ES7xAdapter.java:54) ~[client-adapter.es7x-1.1.5-jar-with-dependencies.jar:na]
A: canal.adapter-1.1.5\plugin ディレクトリにある client-adapter.es7x-1.1.5-jar-with-dependencies.jar を canal-1.1.5-alpha-2 の関連ファイルに置き換えます。
詳細については、Canal の問題 をご参照ください。
この例では、root ユーザーを使用しています。
canal-1.1.5-alpha-2 のパッケージをダウンロードします。詳細については、Canal リリースノート をご参照ください。
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5-alpha-2/canal.adapter-1.1.5-SNAPSHOT.tar.gzパッケージを解凍します。
tar -zxvf canal.adapter-1.1.5-SNAPSHOT.tar.gzcanal-1.1.5-alpha-2 の plugin フォルダーにある client-adapter.es7x-1.1.5-SNAPSHOT-jar-with-dependencies.jar を canal.adapter-1.1.5\plugin ディレクトリにコピーします。
説明コピーする必要があるファイルのディレクトリは異なる場合があります。実際のディレクトリが優先されます。
cp canal.adapter-1.1.5-SNAPSHOT/plugin/client-adapter.es7x-1.1.5-SNAPSHOT-jar-with-dependencies.jar canal/canal.adapter/plugincanal.adapter-1.1.5\plugin ディレクトリから client-adapter.es7x-1.1.5-jar-with-dependencies.jar を削除します。
rm -rf client-adapter.es7x-1.1.5-jar-with-dependencies.jarファイル名を変更します。
mv client-adapter.es7x-1.1.5-SNAPSHOT-jar-with-dependencies.jar client-adapter.es7x-1.1.5-jar-with-dependencies.jar
Q: アダプターの起動時に、Canal アダプターに対して生成されたログに次のエラーメッセージが返される場合はどうすればよいですか: java.sql.SQLException: Unknown system variable 'query_cache_size'?
A: 考えられる原因は、Canal アダプターの MySQL ドライバーのバージョンが、接続する ApsaraDB RDS for MySQL データベースの MySQL バージョンと一致していないことです。たとえば、Canal アダプター 1.1.4 の MySQL ドライバーのバージョンは 5.1.40 です。このバージョンの Canal アダプターを使用する Canal を使用して、MySQL 8.x を実行する ApsaraDB RDS for MySQL データベースに接続すると、エラーメッセージが返されます。 Canal アダプターの MySQL ドライバーを変更し、MySQL ドライバーのバージョンが ApsaraDB RDS for MySQL データベースの MySQL バージョンと一致していることを確認してください。
Q: MySQL 8.0 を実行する ApsaraDB RDS for MySQL インスタンスから Elasticsearch クラスターにデータを同期したいと考えています。既存の MySQL ドライバーを MySQL 8.0 ドライバーに変更するにはどうすればよいですか?
A: この例では、root ユーザーを使用しています。
MySQL 8.0 ドライバーのパッケージをダウンロードします。
wget https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-8.0.29.zipパッケージを解凍します。
unzip mysql-connector-java-8.0.29.zip取得したファイルを Canal アダプターの lib ディレクトリにコピーします。
mv mysql-connector-java-8.0.29/mysql-connector-java-8.0.29.jar lib/権限を追加します。
chmod 777 lib/mysql-connector-java-8.0.29.jar chmod +st lib/mysql-connector-java-8.0.29.jarMySQL 5.x ドライバーを削除します。
rm -rf lib/mysql-connector-java-5.1.40.jar