全部产品
Search
文档中心

检索分析服务Elasticsearch版:通过DTS将PolarDB MySQL数据同步至阿里云Elasticsearch

更新时间:Aug 18, 2023

当您在使用PolarDB MySQL遇到查询慢的问题时,可以通过数据传输服务DTS(Data Transmission Service),将企业线上的PolarDB MySQL中的生产数据实时同步到阿里云Elasticsearch中进行搜索分析。本文介绍具体的实现方法。

背景信息

本案例需要使用以下三个云产品,相关介绍如下:
  • 数据传输服务DTS是一种集数据迁移、数据订阅及数据实时同步于一体的数据传输服务,详情请参见数据传输服务DTS。DTS支持同步的SQL操作包括:Insert、Delete、Update。
    重要 进行数据同步时,请选择DTS支持的数据源及其版本,详情请参见同步方案概览
  • PolarDB是阿里云自研的下一代关系型云数据库,有三个独立的引擎,分别可以100%兼容MySQL、100%兼容PostgreSQL、高度兼容Oracle语法。存储容量最高可达100TB,单库最多可扩展到16个节点,适用于企业多样化的数据库应用场景,详情请参见PolarDB MySQL概述
  • Elasticsearch是一个基于Lucene的实时分布式的搜索与分析引擎,它提供了一个分布式服务,可以使您快速的近乎于准实时的存储、查询和分析超大数据集,通常被用来作为构建复杂查询特性和需求强大应用的基础引擎或技术,详情请参见什么是阿里云Elasticsearch

本文适用的场景:对实时同步要求较高的关系型数据库中数据的同步场景。

注意事项

  • DTS在执行全量数据初始化时将占用源库和目标库一定的读写资源,可能会导致数据库的负载上升,在数据库性能较差、规格较低或业务量较大的情况下(例如源库有大量慢SQL、存在无主键表或目标库存在死锁等),可能会加重数据库压力,甚至导致数据库服务不可用。因此您需要在执行数据同步前评估源库和目标库的性能,同时建议您在业务低峰期执行数据同步(例如源库和目标库的CPU负载在30%以下)。

  • 如果源库中待同步的表需要执行增加列的操作,您只需先在Elasticsearch实例中修改对应表的mapping,然后在源库中执行相应的DDL操作,最后暂停并启动DTS同步实例即可。

使用限制

  • 通过DTS将数据同步至阿里云Elasticsearch,不支持7.16和8.x版本的Elasticsearch实例。

  • DTS不支持同步DDL操作,如果源库中待同步的表在同步的过程中已经执行了DDL操作,您需要先移除同步对象,然后在Elasticsearch实例中移除该表对应的索引,最后新增同步对象。详情请参见移除同步对象新增同步对象

操作流程

  1. 步骤一:环境准备
    完成创建Elasticsearch实例、创建PolarDB MySQL集群、准备测试数据等任务。
  2. 步骤二:配置数据同步链路
    通过数据传输服务DTS,快速创建并启动PolarDB MySQL到阿里云Elasticsearch的实时同步作业。
  3. 步骤三:查看数据同步结果
    在阿里云Elasticsearch的Kibana控制台中查看同步成功的数据。
  4. 步骤四:验证增量数据同步
    验证在PolarDB MySQL数据库中新增数据时,数据的同步效果。

步骤一:环境准备

  1. 在阿里云Elasticsearch实例中开启自动创建索引功能。

    具体操作步骤请参见配置YML参数。本文使用6.7版本的实例。

    说明

    阿里云Elasticsearch为了保证用户操作数据的安全性,默认把自动创建索引的配置设置为不允许。通过DTS同步数据,使用的是提交数据的方式创建索引,而不是Create index API方式。所以在同步数据前,需要先开启集群的自动创建索引功能。

  2. 创建云数据库PolarDB MySQL集群,并开启Binlog。
    具体操作步骤请参见购买按量付费集群开启Binlog
  3. 创建PolarDB MySQL数据库和表,并插入测试数据。
    具体操作步骤请参见管理数据库。本文使用的表结构和测试数据如下。测试数据
    • 建表语句
      CREATE TABLE `product` (
          `id` bigint(32) NOT NULL AUTO_INCREMENT,
          `name` varchar(32) NULL,
          `price` varchar(32) NULL,
          `code` varchar(32) NULL,
          `color` varchar(32) NULL,
          PRIMARY KEY (`id`)
      ) ENGINE=InnoDB
      DEFAULT CHARACTER SET=utf8;
    • 测试数据
      INSERT INTO `estest`.`product` (`id`,`name`,`price`,`code`,`color`) VALUES (1,'mobile phone A','2000','amp','golden');
      INSERT INTO `estest`.`product` (`id`,`name`,`price`,`code`,`color`) VALUES (2,'mobile phone B','2200','bmp','white');
      INSERT INTO `estest`.`product` (`id`,`name`,`price`,`code`,`color`) VALUES (3,'mobile phone C','2600','cmp','black');
      INSERT INTO `estest`.`product` (`id`,`name`,`price`,`code`,`color`) VALUES (4,'mobile phone D','2700','dmp','red');
      INSERT INTO `estest`.`product` (`id`,`name`,`price`,`code`,`color`) VALUES (5,'mobile phone E','2800','emp','silvery');

步骤二:配置数据同步链路

  1. 登录数据传输控制台

  2. 在左侧导航栏,选择数据传输(DTS)数据同步

    说明

    本文操作以DTS新版控制台为例,旧版控制台相关操作请参见数据同步操作指导

  3. 单击创建任务,按照页面提示创建并配置数据同步任务。
    您需要依次完成源库及目标库配置、任务对象配置、映射字段配置、高级配置和库表字段配置,本文使用的配置及相关说明如下,更多详细信息请参见PolarDB MySQL为源的数据同步PolarDB-X 2.0同步至Elasticsearch
    1. 配置源库及目标库。
      PolarDB MySQL DTS数据同步es源及目标配置
      类别配置说明
      任务名称
      • DTS为每个任务自动生成一个同步作业名称,该名称没有唯一性要求。
      • 建议配置具有业务意义的名称,便于后续的识别。
      源库信息数据库类型选择PolarDB MySQL
      接入方式选择云实例
      实例地区选择源PolarDB MySQL数据库所属地域。
      是否跨阿里云账号本场景为同一阿里云账号间同步数据,选择不跨账号
      实例ID选择源PolarDB MySQL实例ID。
      数据库账号填入源PolarDB MySQL实例的数据库账号,需具备待同步对象的读权限。
      数据库密码填入数据库账号对应的密码。
      目标库信息数据库类型选择ElasticSearch
      接入方式固定为云实例
      实例地区选择目标Elasticsearch实例所属地域,建议与源MySQL数据库保持一致。
      实例ID选择目标Elasticsearch实例ID。
      数据库账号填入连接Elasticsearch实例的账号,默认账号为elastic。
      数据库密码填入数据库账号对应的密码。
    2. 配置任务对象。
      配置说明
      任务步骤

      固定选中增量同步。默认情况下,您还需要同时选中库表结构同步全量同步。预检查完成后,DTS会将源实例中待同步对象的全量数据在目标集群中初始化,作为后续增量同步数据的基线数据。

      目标已存在表的处理模式
      • 预检查并报错拦截:检查目标数据库中是否有同名的表。如果目标数据库中没有同名的表,则通过该检查项目;如果目标数据库中有同名的表,则在预检查阶段提示错误,数据同步任务不会被启动。

        说明

        如果目标库中同名的表不方便删除或重命名,您可以更改该表在目标库中的名称,请参见库表列名映射

      • 忽略报错并继续执行:跳过目标数据库中是否有同名表的检查项。

        警告

        选择为忽略报错并继续执行,可能导致数据不一致,给业务带来风险,例如:

        • 表结构一致的情况下,如在目标库遇到与源库主键或唯一键的值相同的记录:

          • 全量期间,DTS会保留目标集群中的该条记录,即源库中的该条记录不会同步至目标数据库中。

          • 增量期间,DTS不会保留目标集群中的该条记录,即源库中的该条记录会覆盖至目标数据库中。

        • 表结构不一致的情况下,可能会导致无法初始化数据、只能同步部分列的数据或同步失败,请谨慎操作。

      索引名称
      • 表名

        选择为表名后,在目标Elasticsearch实例中创建的索引名称和表名一致,在本案例中即为product。

      • 库名_表名

        选择为库名_表名后,在目标Elasticsearch实例中创建的索引名称为库名_表名,在本案例中即为estest_product。

      同步对象

      源库对象框中单击待同步对象,然后单击向右将其移动至已选择对象框。

      映射名称更改
      • 如需更改单个同步对象在目标实例中的名称,请右击已选择对象中的同步对象,设置方式,请参见库表列名单个映射

      • 如需批量更改同步对象在目标实例中的名称,请单击已选择对象方框右上方的批量编辑,设置方式,请参见库表列名批量映射

    3. 配置映射字段,修改同步后的字段名称。
      如果您需要修改同步后的字段名称,可在已选择对象区域框中,右键单击对应的表名,设置该表在目标Elasticsearch实例中的索引名称、Type名称等信息,然后单击确定。本文使用的配置及相关说明如下,未提及的配置保持默认,更多详细信息请参见库表列名单个映射配置映射字段
      配置说明
      索引名称自定义索引名称,详情请参见基本概念
      重要 输入索引名称时,请确保Elasticsearch集群中不存在同名索引,否则报错index already exists
      Type名称自定义索引类型名称,详情请参见基本概念
      过滤条件您可以设置SQL过滤条件,过滤待同步的数据,只有满足过滤条件的数据才会被同步到目标实例,详情请参见通过SQL条件过滤任务数据
      字段参数选择所需的字段参数字段参数值,字段参数及取值介绍请参见Mapping parameters
      重要 如果添加参数时,将对应参数的index值设置为false,那么该字段将不能被查询,详情请参见index
    4. 配置高级参数。

      本文使用默认配置,相关说明如下。

      配置

      说明

      选择调度该任务的专属集群

      • DTS默认调度到共享集群上,用户不需要选择调度到指定的集群节点上。

      • 专属集群需要先购买后,运行成功才能选择使用

      • 您可以购买指定规格的专属集群来运行DTS迁移、同步、订阅等任务,实现与其他用户的DTS任务资源隔离,专属集群中的DTS实例相比共享集群的实例,实现资源独享,稳定性更好,性能更优。

      设置告警

      是否设置告警,当同步失败或延迟超过阈值后,将通知告警联系人。

      源库、目标库无法连接后的重试时间

      在同步任务启动后,若源库或目标库连接失败则DTS会报错,并会立即进行持续的重试连接,默认持续重试时间为720分钟,您也可以在取值范围(10~1440分钟)内自定义重试时间,建议设置30分钟以上。如果DTS在设置的重试时间内重新连接上源库、目标库,同步任务将自动恢复。否则,同步任务将会失败。

      说明
      • 针对同源或者同目标的多个DTS实例,如DTS实例A和DTS实例B,设置网络重试时间时A设置30分钟,B设置60分钟,则重试时间以低的30分钟为准。

      • 由于连接重试期间,DTS将收取任务运行费用,建议您根据业务需要自定义重试时间,或者在源和目标库实例释放后尽快释放DTS实例。

      分片配置

      根据目标Elasticsearch中索引的分片配置,设置索引的主分片和副本分片的数量。Elasticsearch 7.x以下版本的索引默认包含5个主shard,1个副shard。Elasticsearch 7.x及以上版本的索引默认包含1个主shard,1个副shard。

      重要

      shard大小和数量是影响Elasticsearch集群稳定性和性能的重要因素,您需要设置合理的shard数,shard的评估方法请参见规格容量评估

      字符串Index

      同步至目标Elasticsearch实例中的字符串编入索引的方式:

      • analyzed:先分析字符串,再写入索引。您还需要选择具体的分析器,分析器的类型及作用,请参见分析器

      • not analyzed:不分析,直接使用原始值写入索引。

      • no:不写入索引。

      时区

      DTS同步时间类型的数据(如DATETIME、TIMESTAMP)至目标Elasticsearch实例时,您可以选择所带时区。

      说明

      如目标实例中此类时间类型数据无需带有时区,则在同步前您需在目标实例中设置该时间类型数据的文档类型(type)。

      DOCID取值

      DOCID默认为表的主键,如表中无主键,则DOCID为Elasticsearch自动生成的ID列。

      配置ETL功能

      • ETL配置功能暂不支持在目标端进行表结构变更,涉及变更操作需要您在链路启动前预先在目标端完成。

      • ETL配置的二次修改可能因您的配置造成链路中断或历史数据修改,请谨慎操作。

      • ETL配置二次修改启动时仅针对启动后的增量数据进行适配,历史数据不支持修改后的配置。

      • 详情参见:在DTS迁移或同步任务中配置ETL

    5. 配置库表字段,设置待同步的表在目标Elasticsearch的_routing策略和_id取值。
      本文使用的数据库名称为estest,表名称为product,设置_routing为否,_id取值为id,相关说明如下。

      类型

      说明

      设置_routing

      设置_routing可以将文档路由存储在目标Elasticsearch实例的指定分片上,请参见_routing

      • 选择为,您可以自定义列进行路由。

      • 选择为,则用_id进行路由。

      说明

      创建的目标Elasticsearch实例为7.x版本时,您必须选择为

      _id取值

      • 表的主键列

        联合主键合并为一列。

      • 业务主键

        如果选择为业务主键,那么您还需要设置对应的业务主键列

  4. 配置完成后,根据页面提示保存任务、进行预检查、购买并启动任务。
    购买成功后,同步任务正式开始,您可在数据同步界面查看具体任务进度。待全量同步完成,增量同步进行中时,您即可在Elasticsearch中查看同步成功的数据。

步骤三:查看数据同步结果

  1. 登录目标阿里云Elasticsearch实例的Kibana控制台,根据页面提示进入Kibana主页。
    登录Kibana控制台的具体操作,请参见登录Kibana控制台
    说明 本文以阿里云Elasticsearch 6.7.0版本为例,其他版本操作可能略有差别,请以实际界面为准。
  2. 在左侧导航栏,单击Dev Tools
  3. Console中,执行如下命令查看全量数据同步结果。
    GET /product/_doc/_search
    预期结果如下。查询返回结果
  4. 通过控制台操作查看同步成功的数据。
    1. 为目标索引创建索引模式。
      1. 在左侧导航栏,单击Management
      2. Kibana区域,单击Index Patterns
      3. 单击Create index pattern
      4. Create index pattern页面的Index pattern name中,输入与同步成功的索引相匹配的索引模式名称(例如product、product*)。
      5. 单击Next step
      6. 单击Create index pattern
    2. 在左侧导航栏,单击Discover
    3. 选择您已创建的索引模式,查看同步成功的数据。
      查看同步成功的数据

步骤四:验证增量数据同步

  1. 进入PolarDB控制台
  2. 在PolarDB MySQL数据库中插入一条数据。
    INSERT INTO `estest`.`product` (`id`,`name`,`price`,`code`,`color`) VALUES (6,'mobile phone F','2750','fmp','white');
  3. 登录Kibana控制台,在左侧导航栏,单击Discover,选择您已创建的索引模式,查看同步成功的增量数据。
    查看同步成功的增量数据
    说明 您也可以使用同样的方式,在PolarDB MySQL数据库中删除或修改数据,然后查看数据同步结果。