全部产品
Search
文档中心

检索分析服务Elasticsearch版:通过DataWorks将MaxCompute数据同步至Elasticsearch

更新时间:Apr 23, 2023

阿里云上拥有丰富的云存储、云数据库产品。当您需要对这些产品中的数据进行分析和搜索时,可以通过DataWorks的数据集成服务实现最快5分钟一次的离线数据采集,并同步到阿里云Elasticsearch中。本文以阿里云大数据计算服务MaxCompute(原名ODPS)为例。

背景信息

阿里云Elasticsearch支持同步的离线数据源包括:

  • 阿里云云数据库(MySQL、PostgreSQL、SQL Server、MongoDB、HBase)

  • 阿里云PolarDB-X(原DRDS升级版)

  • 阿里云MaxCompute

  • 阿里云OSS

  • 阿里云Tablestore

  • 自建HDFS、Oracle、FTP、DB2,及以上数据库的自建版本

操作流程

  1. 步骤一:环境准备

    创建DataWorks工作空间并开通MaxCompute服务、准备MaxCompute数据源、创建阿里云Elasticsearch实例。

  2. 步骤二:购买并创建独享资源组

    购买并创建一个数据集成独享资源组,并为该资源组绑定专有网络和工作空间。独享资源组可以保障数据快速、稳定地传输。

  3. 步骤三:添加数据源

    将MaxCompute和Elasticsearch数据源接入DataWorks的数据集成服务中。

  4. 步骤四:配置并运行数据同步任务

    通过向导模式配置数据同步任务,将数据集成系统同步成功的数据存储到Elasticsearch中。将独享资源组作为一个可以执行任务的资源,注册到DataWorks的数据集成服务中。这个资源组将获取数据源的数据,并执行将数据写入Elasticsearch中的任务(该任务将由数据集成系统统一下发)。
  5. 步骤五:验证数据同步结果

    在Kibana控制台中,查看同步成功的数据,并按条件查询数据。

步骤一:环境准备

  1. 创建DataWorks工作空间。创建时选择MaxCompute计算引擎服务。

    具体操作,请参见创建MaxCompute项目

  2. 创建MaxCompute表并导入测试数据。

    具体操作,请参见创建表导入数据

    本文使用的表结构和部分数据如下。

    图 1. 表结构表结构

    图 2. 表数据表数据

    说明

    本文提供的数据仅供测试。实际情况中,您可以将Hadoop数据迁移到MaxCompute后再进行同步。

  3. 创建阿里云Elasticsearch实例,并开启实例的自动创建索引功能。

    具体操作,请参见创建阿里云Elasticsearch实例配置YML参数。创建实例时,所选地域与DataWorks工作空间所在地域保持一致。

步骤二:购买并创建独享资源组

  1. 登录DataWorks控制台

  2. 选择相应地域后,在左侧导航栏,单击资源组列表

  3. 购买独享数据集成资源。

    具体操作,请参见新增和使用独享数据集成资源组

    重要

    购买时,所选地域需要与目标工作空间保持一致。

  4. 创建一个独享数据集成资源。

    具体操作,请参见新增和使用独享数据集成资源组。本文使用的配置如下,其中资源组类型选择独享数据集成资源组创建独享资源组

  5. 单击已创建的独享资源组右侧的网络设置,为该独享资源组绑定专有网络。

    具体操作,请参见绑定专有网络

    独享资源部署在DataWorks托管的专有网络中。DataWorks需要与Elasticsearch实例的专有网络连通才能同步数据,因此在绑定专有网络时,需要选择Elasticsearch实例所在专有网络交换机绑定专有网络

  6. 单击已创建的独享资源组右侧的修改归属工作空间,为该独享资源组绑定目标工作空间。

步骤三:添加数据源

  1. 进入DataWorks的数据集成页面。

    1. 在DataWorks控制台的左侧导航栏,单击工作空间列表

    2. 找到目标工作空间,单击其右侧操作列下的数据集成

  2. 在左侧导航栏,单击数据源

  3. 数据源管理页面,单击新增数据源

  4. 新增数据源对话框中,单击MaxCompute,进入新增MaxCompute数据源页面,填写数据源信息。

    新增MaxCompute (ODPS)数据源

    参数

    说明

    ODPS Endpoint

    MaxCompute服务的访问地址,不同区域的访问地址不同,详情请参见Endpoint

    ODPS项目名称

    进入DataWorks控制台,在左侧导航栏,单击计算引擎列表下的MaxCompute获取。

    AccessKey ID

    鼠标移至您的用户头像上,单击AccessKey 管理获取。

    AccessKey Secret

    鼠标移至您的用户头像上,单击AccessKey 管理获取。

    说明

    以上未提及的配置请自定义输入或保持默认。

    配置完成后,可与独享资源组进行连通性测试。连通状态显示为可连通时,表示连通成功。测试连通成功

  5. 单击完成

  6. 使用同样的方式添加Elasticsearch数据源。

    本示例选择数据源类型连接串模式,需要配置的参数说明如下。ES数据源配置

    参数

    说明

    Endpoint

    阿里云Elasticsearch的访问地址,格式为:http://<实例的内网或公网地址>:9200。实例的内网或公网地址可在基本信息页面获取,详细信息,请参见查看实例的基本信息

    重要

    如果您使用的是公网地址,需要将独享资源组的EIP地址添加到阿里云Elasticsearch的公网地址访问白名单中,详情请参见配置实例公网或私网访问白名单使用独享数据集成资源组执行任务需要在数据库添加的IP白名单

    用户名

    访问阿里云Elasticsearch实例的用户名,默认为elastic。

    密码

    对应用户的密码。elastic用户的密码在创建实例时设定,如果忘记可重置,重置密码的注意事项和操作步骤,请参见重置实例访问密码

    说明

    其他未提及的参数请自定义输入。

步骤四:配置并运行数据同步任务

  1. 在DataWorks的数据开发页面,新建一个业务流程。
    具体操作步骤请参见创建业务流程
  2. 新建一个离线同步任务。
    1. 展开新建的业务流程,右键单击数据集成,选择新建 > 离线同步
    2. 新建节点对话框中,输入节点名称,单击提交
  3. 选择数据源区域中,将数据来源指定为ODPS数据源,并选择待同步的表名称;将数据去向指定为Elasticsearch数据源,并填入索引名和索引类型。

    选择ODPS数据源
    说明
  4. 字段映射区域中,设置来源字段目标字段的映射关系。
  5. 通道控制区域中,配置执行任务的相关参数。
  6. 配置任务调度属性。
    在页面右侧,单击调度配置,按照需求配置相应的调度参数。各配置的详细说明请参见调度配置章节。
    重要
    • 在提交任务前,必须配置任务调度依赖的上游节点,详情请参见配置同周期调度依赖
    • 如果您希望对任务进行周期性调度,需要配置任务的时间属性,包括任务的具体执行时间、调度周期、生效周期、重跑属性等。
    • 周期任务将于配置任务开始的第二天00:00,按照您的配置规则生效执行。
  7. 配置执行同步任务所使用的资源组。
    选择资源组
    1. 在页面右侧,单击数据集成资源组配置
    2. 选择独享数据集成资源组为您创建的独享资源组。
  8. 提交任务。
    1. 保存当前配置,单击提交图标图标。
    2. 提交新版本对话框中,填入备注
    3. 单击确认
  9. 单击运行图标图标,运行任务。
    任务运行过程中,可查看运行日志。运行成功后,显示如下结果。任务运行成功

步骤五:验证数据同步结果

  1. 登录目标阿里云Elasticsearch实例的Kibana控制台。

    具体操作,请参见登录Kibana控制台

  2. 在左侧导航栏,单击Dev Tools(开发工具)。

  3. Console中,执行如下命令查看同步的数据。

    POST /odps_index/_search?pretty
    {
    "query": { "match_all": {}}
    }
    说明

    odps_index为您在数据同步脚本中设置的index字段的值。

    数据同步成功后,返回如下结果。查看同步的数据

  4. 执行如下命令,搜索文档中的categorybrand字段。

    POST /odps_index/_search?pretty
    {
    "query": { "match_all": {} },
    "_source": ["category", "brand"]
    }
  5. 执行如下命令,搜索category生鲜的文档。

    POST /odps_index/_search?pretty
    {
    "query": { "match": {"category":"生鲜"} }
    }
  6. 执行如下命令,按照trans_num字段对文档进行排序。

    POST /odps_index/_search?pretty
    {
    "query": { "match_all": {} },
    "sort": { "trans_num": { "order": "desc" } }
    }

    更多命令和访问方式,请参见Elastic.co官方帮助中心