阿里云上拥有丰富的云存储、云数据库产品。当您需要对这些产品中的数据进行分析和搜索时,可以通过DataWorks的数据集成服务实现最快5分钟一次的离线数据采集,并同步到阿里云Elasticsearch中。本文以阿里云大数据计算服务MaxCompute(原名ODPS)为例。
背景信息
阿里云Elasticsearch支持同步的离线数据源包括:
- 阿里云云数据库(MySQL、PostgreSQL、SQL Server、PPAS、MongoDB、HBase)
- 阿里云PolarDB-X(原DRDS升级版)
- 阿里云MaxCompute
- 阿里云OSS
- 阿里云Tablestore
- 自建HDFS、Oracle、FTP、DB2,及以上数据库的自建版本
操作流程
- 准备工作
创建DataWorks工作空间并开通MaxCompute服务、准备MaxCompute数据源、创建阿里云Elasticsearch实例。
- 步骤一:购买并创建独享资源组
购买并创建一个数据集成独享资源组,并为该资源组绑定专有网络和工作空间。独享资源组可以保障数据快速、稳定地传输。
- 步骤二:添加数据源
将MaxCompute和Elasticsearch数据源接入DataWorks的数据集成服务中。
- 步骤三:配置并运行数据同步任务
配置一个数据同步的脚本,将数据集成系统同步成功的数据存储到Elasticsearch中。将独享资源组作为一个可以执行任务的资源,注册到DataWorks的数据集成服务中。这个资源组将获取数据源的数据,并执行将数据写入Elasticsearch中的任务(该任务将由数据集成系统统一下发)。
- 步骤四:验证数据同步结果
在Kibana控制台中,查看同步成功的数据,并按条件查询数据。
准备工作
- 创建DataWorks工作空间。创建时选择MaxCompute计算引擎服务。
- 创建MaxCompute表并导入测试数据。
具体操作步骤请参见
创建和查看表、
导入数据。
本文使用的表结构和部分数据如下。
图 1. 表结构
图 2. 表数据
- 创建阿里云Elasticsearch实例,并开启实例的自动创建索引功能。
步骤一:购买并创建独享资源组
- 登录DataWorks控制台。
- 选择相应地域后,在左侧导航栏,单击资源组列表。
- 参见购买独享数据集成资源组,购买独享数据集成资源。
注意 购买时,所选地域需要与目标工作空间保持一致。
- 参见新增独享数据集成资源组,创建一个独享数据集成资源。
本文使用的配置如下,其中
资源组类型选择
独享数据集成资源组。

- 单击已创建的独享资源组右侧的专有网络绑定,参见绑定专有网络,为该独享资源组绑定专有网络。
独享资源部署在DataWorks托管的专有网络中。DataWorks需要与Elasticsearch实例的专有网络连通才能同步数据,因此在绑定专有网络时,需要选择Elasticsearch实例所在
专有网络和
交换机。

- 单击已创建的独享资源组右侧的修改归属工作空间,参见修改归属工作空间,为该独享资源组绑定目标工作空间。
步骤二:添加数据源
- 进入DataWorks的数据集成页面。
- 在DataWorks控制台的左侧导航栏,单击工作空间列表。
- 找到目标工作空间,单击其右侧操作列下的进入数据集成。
- 在左侧导航栏,单击数据源。
- 在数据源管理页面,单击新增数据源。
- 在新增数据源对话框中,单击MaxCompute(ODPS),进入新增MaxCompute(ODPS)数据源页面,填写数据源信息。

参数 |
说明 |
ODPS Endpoint |
MaxCompute服务的访问地址,不同区域的访问地址不同,详情请参见配置Endpoint。
|
ODPS项目名称 |
进入DataWorks控制台,在左侧导航栏,单击计算引擎列表下的MaxCompute获取。
|
AccessKey ID |
鼠标移至您的用户头像上,单击AccessKey 管理获取。
|
AccessKey Secret |
鼠标移至您的用户头像上,单击AccessKey 管理获取。
|
配置完成后,可与独享资源组进行连通性测试。
连通状态显示为
可连通时,表示连通成功。

- 单击完成。
- 使用同样的方式添加Elasticsearch数据源。

参数 |
说明 |
Endpoint |
阿里云Elasticsearch的访问地址,格式为:http://<实例的内网或公网地址>:9200 。实例的内网或公网地址可在基本信息页面获取,详情请参见查看实例的基本信息。
|
用户名 |
访问阿里云Elasticsearch实例的用户名,默认为elastic。 |
密码 |
对应用户的密码。elastic用户的密码在创建实例时设定,如果忘记可重置,重置密码的注意事项和操作步骤请参见重置实例访问密码。
|
步骤三:配置并运行数据同步任务
- 在DataWorks的数据开发页面,新建一个业务流程。
- 展开新建的业务流程,右键单击数据集成,选择。
- 在新建节点对话框中,输入节点名称,单击提交。
- 在页面上方工具栏,单击
图标。
- 确认后,配置数据同步脚本。
详细配置说明请参见
通过脚本模式配置任务。
说明 您可以单击页面上方工具栏中的

图标,导入对应的脚本配置模板,并在模板的基础上进行修改,完成数据同步脚本配置。
示例脚本如下。
{
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
},
"setting": {
"errorLimit": {
"record": "0"
},
"speed": {
"concurrent": 1,
"throttle": false
}
},
"steps": [
{
"category": "reader",
"name": "Reader",
"parameter": {
"column": [
"create_time",
"category",
"brand",
"buyer_id",
"trans_num",
"trans_amount",
"click_cnt"
],
"datasource": "odps_es",
"partition": "pt=1",
"table": "hive_doc_good_sale"
},
"stepType": "odps"
},
{
"category": "writer",
"name": "Writer",
"parameter": {
"batchSize": 1000,
"cleanup": true,
"column": [
{
"name": "create_time",
"type": "id"
},
{
"name": "category",
"type": "text"
},
{
"name": "brand",
"type": "text"
},
{
"name": "buyer_id",
"type": "text"
},
{
"name": "trans_num",
"type": "integer"
},
{
"name": "trans_amount",
"type": "double"
},
{
"name": "click_cnt",
"type": "integer"
}
],
"datasource": "ES_data_source",
"discovery": false,
"index": "odps_index",
"indexType": "_doc",
"splitter": ","
},
"stepType": "elasticsearch"
}
],
"type": "job",
"version": "2.0"
}
同步脚本的配置分为三个部分。
配置 |
说明 |
setting |
用来配置同步中的一些丢包和最大并发等参数。
说明 如果待同步的数据量较大,可适当增大最大并发数。
|
Reader |
用来配置Maxcompute Reader。如果您的MaxCompute表是一个分区表,需要在partition 字段中设置分区信息,详情请参见MaxCompute Reader。本案例中的分区信息为pt=1 。
说明 如果待同步的数据量较大,可使用分区拆分数据进行同步。
|
Writer |
用来配置Elasticsearch Writer,详情请参见Elasticsearch Writer。
index :索引名称。
indexType :索引类型,7.0及以上版本的Elasticsearch必须使用_doc 。
|
- 保存配置脚本,单击右侧的调度配置,按照需求配置相应的调度参数。
各配置的详细说明请参见
调度配置章节。
注意
- 在提交任务前,必须配置任务调度依赖的上游节点,详情请参见依赖关系。
- 如果您希望对任务进行周期性调度,需要配置任务的时间属性,包括任务的具体执行时间、调度周期、生效周期、重跑属性等。
- 周期任务将于配置任务开始的第二天00:00,按照您的配置规则生效执行。
- 配置执行同步任务所使用的资源组。
- 在脚本配置页面右侧,单击数据集成资源组配置。
- 选择方案为独享数据集成资源组。
- 选择独享数据集成资源组为您创建的独享资源组。
- 提交任务。
- 保存当前配置,单击
图标。
- 在提交新版本对话框中,填入备注。
- 单击确认。
- 单击
图标,运行任务。任务运行过程中,可查看运行日志。运行成功后,显示如下结果。

步骤四:验证数据同步结果
- 登录目标阿里云Elasticsearch实例的Kibana控制台。
- 在左侧导航栏,单击Dev Tools(开发工具)。
- 在Console中,执行如下命令查看同步的数据。
POST /odps_index/_search?pretty
{
"query": { "match_all": {}}
}
说明 odps_index
为您在数据同步脚本中设置的index
字段的值。
数据同步成功后,返回如下结果。

- 执行如下命令,搜索文档中的
category
和brand
字段。 POST /odps_index/_search?pretty
{
"query": { "match_all": {} },
"_source": ["category", "brand"]
}
- 执行如下命令,搜索
category
为生鲜
的文档。POST /odps_index/_search?pretty
{
"query": { "match": {"category":"生鲜"} }
}
- 执行如下命令,按照
trans_num
字段对文档进行排序。POST /odps_index/_search?pretty
{
"query": { "match_all": {} },
"sort": { "trans_num": { "order": "desc" } }
}
更多命令和访问方式,请参见Elastic.co官方帮助中心。