Canal是阿里巴巴集团提供的一个开源产品,能够通过解析数据库的增量日志,提供增量数据的订阅和消费功能。当您需要将MySQL中的增量数据同步至阿里云Elasticsearch时,可通过Canal来实现。本文以阿里云RDS MySQL为例,介绍具体的实现方法。

背景信息

Canal是Github中开源的ETL(Extract Transform Load)软件,其功能原理及详细说明请参见Canal

操作流程

  1. 准备工作
    创建同一专有网络下的RDS MySQL实例、阿里云Elasticsearch实例和ECS实例,了解Canal。各模块的作用如下:
    • RDS MySQL:存放源数据和增量数据。
    • Canal:解析数据库日志,同步获取到的增量变更。
    • 阿里云Elasticsearch:接收增量数据。
    • 阿里云ECS:部署Canal-server和Canal-adapter。
  2. 步骤一:准备MySQL数据源
    在RDS MySQL中,准备待同步的数据。
  3. 步骤二:创建索引
    在阿里云Elasticsearch实例中,创建索引。要求Mapping中定义的字段名称和类型与待同步数据保持一致。
  4. 步骤三:安装JDK
    在使用Canal前,必须先安装JDK,要求版本大于等于1.8.0。
  5. 步骤四:安装并启动Canal-server
    安装Canal-server,然后修改配置文件关联RDS MySQL。Canal-server模拟MySQL集群的一个slave,获取MySQL集群Master节点的二进制日志(binary log),并将日志推送给Canal-adapter。
  6. 步骤五:安装并启动Canal-adapter
    安装Canal-adapter,然后修改配置文件关联RDS MySQL和Elasticsearch,以及定义MySQL数据到Elasticsearch数据的映射字段,用来将数据同步到Elasticsearch。
  7. 步骤六:验证增量数据同步
    在RDS MySQL中新增、修改或删除数据,查看数据同步结果。

准备工作

步骤一:准备MySQL数据源

进入RDS控制台,创建RDS MySQL数据库和表。具体操作,请参见RDS MySQL快速入门

本文创建的表名称为es_test,包含的字段如下所示。es_test字段及索引

步骤二:创建索引

  1. 登录目标阿里云Elasticsearch实例的Kibana控制台,根据页面提示进入Kibana主页。
    登录Kibana控制台的具体操作,请参见登录Kibana控制台
    说明 本文以阿里云Elasticsearch 6.7.0版本为例,其他版本操作可能略有差别,请以实际界面为准。
  2. 在左侧导航栏,单击Dev Tools
  3. Console中,执行以下命令创建索引。
    以下示例创建的索引名称为es_test,包含countidnamecolor字段。
    注意 mappings中的字段需要与步骤一:准备MySQL数据源中创建的字段(名称和类型)保持一致。
    PUT es_test?include_type_name=true
    {
    
        "settings" : {
          "index" : {
            "number_of_shards" : "5",
            "number_of_replicas" : "1"
          }
        },
        "mappings" : {
            "_doc" : {
                "properties" : {
                  "count": {          
                       "type": "text"       
                   },
                  "id": {
                       "type": "integer"
                   },
                   "name": {
                        "type" : "text",
                        "analyzer": "ik_smart"                   
                    },
                    "color" : {
                        "type" : "text"                    
                    }
                }
            }
        }
    }
    创建成功后,返回如下结果。
    {
      "acknowledged" : true,
      "shards_acknowledged" : true,
      "index" : "es_test"
    }

步骤三:安装JDK

  1. 参见连接ECS实例,连接阿里云ECS实例,查看可用的JDK软件包列表。
    yum search java | grep -i --color JDK
  2. 选择合适的版本,安装JDK。
    本文选择java-1.8.0-openjdk-devel.x86_64
    yum install java-1.8.0-openjdk-devel.x86_64
  3. 配置环境变量。
    1. 打开etc文件夹下的profile文件。
      vi /etc/profile
    2. 在文件内添加如下的环境变量。
      export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.71-2.b15.el7_2.x86_64
      export CLASSPATH=.:$JAVA_HOME/jre/lib/rt.jar:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
      export PATH=$PATH:$JAVA_HOME/bin
      注意 JAVA_HOME需要替换为您JDK的安装路径,可通过find / -name 'java'命令查看。
    3. 按下Esc键,然后使用:wq保存文件并退出vi模式,随后执行以下命令使配置生效。
      source /etc/profile
  4. 执行以下任意命令,验证JDK是否安装成功。
    • java
    • javac
    • java -version
    显示如下结果说明JDK安装成功。jdk安装成功

步骤四:安装并启动Canal-server

  1. 下载Canal-server。
    本文使用1.1.4版本。
    wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz
    说明 目前Canal 1.1.5版本已支持Elasticsearch 7.0版本,如果您使用的是Elasticsearch 7.0,需要下载Canal 1.1.5版本。详细信息请参见Canal release note
  2. 解压。
    tar -zxvf canal.deployer-1.1.4.tar.gz
  3. 修改conf/example/instance.properties文件。
    vi conf/example/instance.properties
    修改conf/example/instance.properties文件
    配置项 说明
    canal.instance.master.address 需要设置为<RDS MySQL数据库的内网地址>:<内网端口>,相关信息可在RDS MySQL实例的基本信息页面获取。例如rm-bp1u1xxxxxxxxx6ph.mysql.rds.aliyuncs.com:3306
    canal.instance.dbUsername RDS MySQL数据库的账号名称,可在实例的账号管理页面获取。
    canal.instance.dbPassword RDS MySQL数据库的密码。
  4. 按下Esc键,然后使用:wq命令保存文件并退出vi模式。
  5. 启动Canal-server,并查看日志。
    ./bin/startup.sh
    cat logs/canal/canal.log
    启动canal-server

步骤五:安装并启动Canal-adapter

  1. 下载Canal-adapter。
    本文使用1.1.4版本。
    wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.adapter-1.1.4.tar.gz
    说明 目前Canal 1.1.5版本已支持Elasticsearch 7.0版本,如果您使用的是Elasticsearch 7.0,需要下载Canal 1.1.5版本。详细信息请参见Canal release note
  2. 解压。
    tar -zxvf canal.adapter-1.1.4.tar.gz
  3. 修改conf/application.yml文件。
    vi conf/application.yml
    修改conf/application.yml文件
    配置项 说明
    canal.conf.canalServerHost canalDeployer访问地址。保持默认(127.0.0.1:11111)即可。
    canal.conf.srcDataSources.defaultDS.url 需要设置为jdbc:mysql://<RDS MySQL内网地址>:<内网端口>/<数据库名称>?useUnicode=true,相关信息可在RDS MySQL实例的基本信息页面获取。例如jdbc:mysql://rm-bp1xxxxxxxxxnd6ph.mysql.rds.aliyuncs.com:3306/elasticsearch?useUnicode=true
    canal.conf.srcDataSources.defaultDS.username RDS MySQL数据库的账号名称,可在RDS MySQL实例的账号管理页面获取。
    canal.conf.srcDataSources.defaultDS.password RDS MySQL数据库的密码。
    canal.conf.canalAdapters.groups.outerAdapters.hosts 定位到name:es的位置,将hosts替换为<Elasticsearch实例的内网地址>:<内网端口>,相关信息可在Elasticsearch实例的基本信息页面获取。例如es-cn-v64xxxxxxxxx3medp.elasticsearch.aliyuncs.com:9200
    canal.conf.canalAdapters.groups.outerAdapters.mode 必须设置为rest。
    canal.conf.canalAdapters.groups.outerAdapters.properties.security.auth 需要设置为<Elasticsearch实例的账号>:<密码>。例如elastic:es_password
    canal.conf.canalAdapters.groups.outerAdapters.properties.cluster.name Elasticsearch实例的ID,可在实例的基本信息页面获取。例如es-cn-v64xxxxxxxxx3medp
  4. 按下Esc键,然后使用:wq命令保存文件并退出vi模式。
  5. 同样的方式,修改conf/es/*.yml文件,定义MySQL数据到Elasticsearch数据的映射字段。
    修改conf/es/*.yml文件
    配置项 说明
    esMapping._index 步骤二:创建索引章节中,在Elasticsearch实例中所创建的索引的名称。本文使用es_test
    esMapping._type 步骤二:创建索引章节中,在Elasticsearch实例中所创建的索引的类型。本文使用_doc
    esMapping._id 需要同步到Elasticsearch实例的文档的id,可自定义。本文使用_id
    esMapping.sql SQL语句,用来查询需要同步到Elasticsearch中的字段。本文使用select t.id as _id,t.id,t.count,t.name,t.color from es_test t
  6. 启动Canal-adapter服务,并查看日志。
    ./bin/startup.sh
    cat logs/adapter/adapter.log
    服务启动正常时,结果如下所示。Canal-adapter服务日志

步骤六:验证增量数据同步

  1. 在RDS MySQL数据库中,新增、修改或删除数据库中es_test表的数据。
    insert `elasticsearch`.`es_test`(`count`,`id`,`name`,`color`) values('11',2,'canal_test2','red');
  2. 登录目标阿里云Elasticsearch实例的Kibana控制台,根据页面提示进入Kibana主页。
    登录Kibana控制台的具体操作,请参见登录Kibana控制台
    说明 本文以阿里云Elasticsearch 6.7.0版本为例,其他版本操作可能略有差别,请以实际界面为准。
  3. 在左侧导航栏,单击Dev Tools
  4. Console中,执行以下命令查询同步成功的数据。
    GET /es_test/_search
    预期结果如下。数据同步成功结果