全部产品
Search
文档中心

检索分析服务Elasticsearch版:通过DTS将MySQL数据实时同步到阿里云Elasticsearch

更新时间:Aug 18, 2023

当您需要将企业线上的RDS MySQL中的生产数据实时同步到阿里云Elasticsearch中进行搜索分析时,可通过数据传输服务DTS(Data Transmission Service),快速创建RDS MySQL到阿里云Elasticsearch的实时同步作业。本文介绍如何配置RDS MySQL到阿里云Elasticsearch的实时同步作业,并验证全量和增量数据的同步结果。

背景信息

  • 数据传输服务DTS是一种集数据迁移、数据订阅及数据实时同步于一体的数据传输服务,详细信息请参见数据传输服务DTS。DTS支持同步的SQL操作包括Insert、Delete和Update,支持同步的数据源版本要求请参见同步方案概览

  • 通过在DTS中配置从源(MySQL)同步到目标(Elasticsearch),可实现全量、增量数据同步。适用于对实时同步要求较高的关系型数据库中数据的同步场景或需要将关系型数据库中的全量或增量数据同步到阿里云Elasticsearch场景。

前提条件

已创建RDS MySQL实例、阿里云Elasticsearch实例。建议您在同一专有网络下创建相关实例。

使用限制

  • 通过DTS将数据同步至阿里云Elasticsearch,不支持7.16和8.x版本的Elasticsearch实例。

  • DTS不支持同步DDL操作,如果源库中待同步的表在同步的过程中已经执行了DDL操作,您需要先移除同步对象,然后在Elasticsearch实例中移除该表对应的索引,最后新增同步对象。详情请参见移除同步对象新增同步对象

注意事项

  • DTS在执行全量数据初始化时将占用源库和目标库一定的读写资源,可能会导致数据库的负载上升,在数据库性能较差、规格较低或业务量较大的情况下(例如源库有大量慢SQL、存在无主键表或目标库存在死锁等),可能会加重数据库压力,甚至导致数据库服务不可用。因此您需要在执行数据同步前评估源库和目标库的性能,同时建议您在业务低峰期执行数据同步(例如源库和目标库的CPU负载在30%以下)。

  • 如果源库中待同步的表需要执行增加列的操作,您只需先在Elasticsearch实例中修改对应表的mapping,然后在源库中执行相应的DDL操作,最后暂停并启动DTS同步实例即可。

操作步骤

步骤一:环境准备

  1. 在阿里云Elasticsearch实例中开启自动创建索引功能。

    具体操作步骤请参见配置YML参数。本文使用6.7版本的实例。

    说明

    阿里云Elasticsearch为了保证用户操作数据的安全性,默认把自动创建索引的配置设置为不允许。通过DTS同步数据,使用的是提交数据的方式创建索引,而不是Create index API方式。所以在同步数据前,需要先开启集群的自动创建索引功能。

  2. 创建一个数据库和表,并插入数据。

    您可以选择使用阿里云的RDS数据库,也可以在本地服务器上的自建数据库。本文以RDS MySQL数据库为例,创建RDS MySQL数据库及表的详细步骤请参见快速入门

    重要

    建议您创建与阿里云Elasticsearch实例同一地域下的RDS MySQL实例,不同地域的同步任务不能确保互通。

    本文创建的数据库、使用的建表语句及数据如下。

    • 在RDS MySQL中创建数据库(Database)

      test_logstash

    • 建表语句

      -- create table
      CREATE TABLE `es_test` (
          `id` bigint(32) NOT NULL,
          `name` varchar(32) NULL,
          `age` bigint(32) NULL,
          `hobby` varchar(32) NULL,
          PRIMARY KEY (`id`)
      ) ENGINE=InnoDB
      DEFAULT CHARACTER SET=utf8;
      
      -- insert data
      INSERT INTO `es_test` (`id`,`name`,`age`,`hobby`) VALUES (1,'user1',22,'music');
      INSERT INTO `es_test` (`id`,`name`,`age`,`hobby`) VALUES (2,'user2',23,'sport');
      INSERT INTO `es_test` (`id`,`name`,`age`,`hobby`) VALUES (3,'user3',43,'game');
      INSERT INTO `es_test` (`id`,`name`,`age`,`hobby`) VALUES (4,'user4',24,'run');
      INSERT INTO `es_test` (`id`,`name`,`age`,`hobby`) VALUES (5,'user5',42,'basketball');

步骤二:配置数据同步链路

  1. 登录数据传输控制台

  2. 在左侧导航栏,选择数据传输(DTS)数据同步

    说明

    本文操作以DTS新版控制台为例,旧版控制台相关操作请参见数据同步操作指导

  3. 单击创建任务,按照页面提示创建并配置数据同步任务。

    您需要依次完成源库及目标库配置、任务对象配置、映射字段配置、高级配置和库表字段配置,本文使用的配置及相关说明如下,更多详细信息请参见MySQL为源的数据同步PolarDB-X同步至Elasticsearch

    1. 配置源库及目标库。

      配置源库及目标库

      类别

      配置

      说明

      任务名称

      • DTS为每个任务自动生成一个同步作业名称,该名称没有唯一性要求。

      • 建议配置具有业务意义的名称,便于后续的识别。

      源库信息

      数据库类型

      选择MySQL

      接入方式

      选择云实例

      实例地区

      选择源MySQL数据库所属地域。

      是否跨阿里云账号

      本场景为同一阿里云账号间同步数据,选择不跨账号

      实例ID

      选择源RDS MySQL实例ID。

      数据库账号

      填入源RDS MySQL实例的数据库账号,需具备待同步对象的读权限。

      数据库密码

      填入数据库账号对应的密码。

      连接方式

      根据需求选择非加密连接SSL安全连接。如果设置为SSL安全连接,您需要提前开启RDS MySQL实例的SSL加密功能,详情请参见设置SSL加密

      目标库信息

      数据库类型

      选择ElasticSearch

      接入方式

      固定为云实例

      实例地区

      选择目标Elasticsearch实例所属地域,建议与源MySQL数据库保持一致。

      实例ID

      选择目标Elasticsearch实例ID。

      数据库账号

      填入连接Elasticsearch实例的账号,默认账号为elastic。

      数据库密码

      填入数据库账号对应的密码。

    2. 配置任务对象。

      配置

      说明

      同步类型

      固定选中增量同步。默认情况下,您还需要同时选中库表结构同步全量同步。预检查完成后,DTS会将源实例中待同步对象的全量数据在目标集群中初始化,作为后续增量同步数据的基线数据。

      目标已存在表的处理模式

      • 预检查并报错拦截:检查目标数据库中是否有同名的表。如果目标数据库中没有同名的表,则通过该检查项目;如果目标数据库中有同名的表,则在预检查阶段提示错误,数据同步任务不会被启动。

        说明

        如果目标库中同名的表不方便删除或重命名,您可以更改该表在目标库中的名称,请参见库表列名映射

      • 忽略报错并继续执行:跳过目标数据库中是否有同名表的检查项。

        警告

        选择为忽略报错并继续执行,可能导致数据不一致,给业务带来风险,例如:

        • 表结构一致的情况下,如在目标库遇到与源库主键或唯一键的值相同的记录:

          • 全量期间,DTS会保留目标集群中的该条记录,即源库中的该条记录不会同步至目标数据库中。

          • 增量期间,DTS不会保留目标集群中的该条记录,即源库中的该条记录会覆盖至目标数据库中。

        • 表结构不一致的情况下,可能会导致无法初始化数据、只能同步部分列的数据或同步失败,请谨慎操作。

      索引名称

      • 表名

        选择为表名后,在目标Elasticsearch实例中创建的索引名称和表名一致,在本案例中即为es_test。

      • 库名_表名

        选择为库名_表名后,在目标Elasticsearch实例中创建的索引名称为库名_表名,在本案例中即为test_logstash_es_test。

      源库对象

      源库对象框中单击待同步对象,然后单击向右将其移动至已选择对象框。

      已选择对象

      • 如需更改单个同步对象在目标实例中的名称,请右击已选择对象中的同步对象,设置方式,请参见库表列名单个映射

      • 如需批量更改同步对象在目标实例中的名称,请单击已选择对象方框右上方的批量编辑,设置方式,请参见库表列名批量映射

    3. 配置映射字段,修改同步后的字段名称。

      如果您需要修改同步后的字段名称,可在已选择对象区域框中,右键单击对应的表名,设置该表在目标Elasticsearch实例中的索引名称、Type名称等信息,然后单击确定。本文使用的配置及相关说明如下,未提及的配置保持默认,更多详细信息请参见库表列名单个映射

      配置映射字段

      配置

      说明

      索引名称

      自定义索引名称,详情请参见基本概念

      重要

      输入索引名称时,请确保Elasticsearch集群中不存在同名索引,否则报错index already exists

      Type名称

      自定义索引类型名称,详情请参见基本概念

      过滤条件

      您可以设置SQL过滤条件,过滤待同步的数据,只有满足过滤条件的数据才会被同步到目标实例,详情请参见通过SQL条件过滤任务数据

      字段参数

      选择所需的字段参数字段参数值,字段参数及取值介绍请参见Mapping parameters

      重要

      如果添加参数时,将对应参数的index值设置为false,那么该字段将不能被查询,详情请参见index

    4. 配置高级参数。

      本文使用默认配置,相关说明如下。

      配置

      说明

      选择调度该任务的专属集群

      • DTS默认调度到共享集群上,用户不需要选择调度到指定的集群节点上。

      • 专属集群需要先购买后,运行成功才能选择使用

      • 您可以购买指定规格的专属集群来运行DTS迁移、同步、订阅等任务,实现与其他用户的DTS任务资源隔离,专属集群中的DTS实例相比共享集群的实例,实现资源独享,稳定性更好,性能更优。

      设置告警

      是否设置告警,当同步失败或延迟超过阈值后,将通知告警联系人。

      源库、目标库无法连接后的重试时间

      在同步任务启动后,若源库或目标库连接失败则DTS会报错,并会立即进行持续的重试连接,默认持续重试时间为720分钟,您也可以在取值范围(10~1440分钟)内自定义重试时间,建议设置30分钟以上。如果DTS在设置的重试时间内重新连接上源库、目标库,同步任务将自动恢复。否则,同步任务将会失败。

      说明
      • 针对同源或者同目标的多个DTS实例,如DTS实例A和DTS实例B,设置网络重试时间时A设置30分钟,B设置60分钟,则重试时间以低的30分钟为准。

      • 由于连接重试期间,DTS将收取任务运行费用,建议您根据业务需要自定义重试时间,或者在源和目标库实例释放后尽快释放DTS实例。

      分片配置

      根据目标Elasticsearch中索引的分片配置,设置索引的主分片和副本分片的数量。Elasticsearch 7.x以下版本的索引默认包含5个主shard,1个副shard。Elasticsearch 7.x及以上版本的索引默认包含1个主shard,1个副shard。

      重要

      shard大小和数量是影响Elasticsearch集群稳定性和性能的重要因素,您需要设置合理的shard数,shard的评估方法请参见规格容量评估

      字符串Index

      同步至目标Elasticsearch实例中的字符串编入索引的方式:

      • analyzed:先分析字符串,再写入索引。您还需要选择具体的分析器,分析器的类型及作用,请参见分析器

      • not analyzed:不分析,直接使用原始值写入索引。

      • no:不写入索引。

      时区

      DTS同步时间类型的数据(如DATETIME、TIMESTAMP)至目标Elasticsearch实例时,您可以选择所带时区。

      说明

      如目标实例中此类时间类型数据无需带有时区,则在同步前您需在目标实例中设置该时间类型数据的文档类型(type)。

      DOCID取值

      DOCID默认为表的主键,如表中无主键,则DOCID为Elasticsearch自动生成的ID列。

      配置ETL功能

      • ETL配置功能暂不支持在目标端进行表结构变更,涉及变更操作需要您在链路启动前预先在目标端完成。

      • ETL配置的二次修改可能因您的配置造成链路中断或历史数据修改,请谨慎操作。

      • ETL配置二次修改启动时仅针对启动后的增量数据进行适配,历史数据不支持修改后的配置。

      • 详情参见:在DTS迁移或同步任务中配置ETL

    5. 配置库表字段,设置待同步的表在目标Elasticsearch的_routing策略和_id取值。

      本文使用的配置及相关说明如下。配置库表字段

      类型

      说明

      设置_routing

      设置_routing可以将文档路由存储在目标Elasticsearch实例的指定分片上,请参见_routing

      • 选择为,您可以自定义列进行路由。

      • 选择为,则用_id进行路由。

      说明

      创建的目标Elasticsearch实例为7.x版本时,您必须选择为

      _id取值

      • 表的主键列

        联合主键合并为一列。

      • 业务主键

        如果选择为业务主键,那么您还需要设置对应的业务主键列

  4. 配置完成后,根据页面提示保存任务、进行预检查、购买并启动任务。

    购买成功后,同步任务正式开始,您可在数据同步界面查看具体任务进度。待全量同步完成,增量同步进行中时,您即可在Elasticsearch中查看同步成功的数据。查看数据同步作业状态

    重要

    由于MySQL和Elasticsearch实例支持的数据类型不同,数据类型无法一一对应。所以DTS在进行结构初始化时,会根据目标库支持的数据类型进行类型映射,详情请参见结构初始化涉及的数据类型映射关系

步骤三:验证数据同步结果

  1. 登录目标阿里云Elasticsearch实例的Kibana控制台,根据页面提示进入Kibana主页。
    登录Kibana控制台的具体操作,请参见登录Kibana控制台
    说明 本文以阿里云Elasticsearch 6.7.0版本为例,其他版本操作可能略有差别,请以实际界面为准。
  2. 在左侧导航栏,单击Dev Tools
  3. Console中,执行如下命令查看全量数据同步结果。

    GET /es_test_index/es_test_type/_search

    预期结果如下。查询结果

  4. 在MySQL中插入一条数据,在Elasticsearch中查看增量数据同步结果。

    例如通过以下SQL语句插入一条数据。

    INSERT INTO `test_logstash`.`es_test` (`id`,`name`,`age`,`hobby`) VALUES (6,'user6',30,'dance');

    在Elasticsearch中查看结果,预期结果如下。验证增量数据同步