本文为您介绍如何通过DataWorks数据同步功能,迁移HDFS数据至MaxCompute,或从MaxCompute迁移数据至HDFS。无论您使用Hadoop还是Spark,均可以与MaxCompute进行双向同步。
前提条件
开通MaxCompute并创建项目。
本文以在华东1(杭州)地域创建项目bigdata_DOC为例。详情请参见开通MaxCompute和DataWorks。
搭建Hadoop集群。
进行数据迁移前,您需要保证Hadoop集群环境正常。本文使用阿里云EMR服务自动化搭建Hadoop集群,详情请参见创建集群。
本文使用的EMR Hadoop版本信息如下:
EMR版本:EMR-3.11.0
集群类型:HADOOP
软件信息:HDFS2.7.2/YARN2.7.2/Hive2.3.3/Ganglia3.7.2/Spark2.2.1/HUE4.1.0/Zeppelin0.7.3/Tez0.9.1/Sqoop1.4.6/Pig0.14.0/ApacheDS2.0.0/Knox0.13.0
Hadoop集群使用经典网络,地域为华东1(杭州),主实例组ECS计算资源配置公网及内网IP,高可用选择为否(非HA模式)。
步骤一:数据准备
Hadoop集群创建测试数据。
通过阿里云账号登录阿里云E-MapReduce控制台。
在EMR控制台界面,选择目标项目并新建作业doc。本例中Hive建表语句如下。 关于EMR上新建作业更多信息请参见作业编辑。
CREATE TABLE IF NOT EXISTS hive_doc_good_sale( create_time timestamp, category STRING, brand STRING, buyer_id STRING, trans_num BIGINT, trans_amount DOUBLE, click_cnt BIGINT ) PARTITIONED BY (pt string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' lines terminated by '\n';
单击运行,出现
Query executed successfully
提示,则说明成功在EMR Hadoop集群上创建了表hive_doc_good_sale。插入测试数据。您可以选择从OSS或其他数据源导入测试数据,也可以手动插入少量的测试数据。本文中手动插入数据如下。
insert into hive_doc_good_sale PARTITION(pt =1 ) values('2018-08-21','外套','品牌A','lilei',3,500.6,7),('2018-08-22','生鲜','品牌B','lilei',1,303,8),('2018-08-22','外套','品牌C','hanmeimei',2,510,2),(2018-08-22,'卫浴','品牌A','hanmeimei',1,442.5,1),('2018-08-22','生鲜','品牌D','hanmeimei',2,234,3),('2018-08-23','外套','品牌B','jimmy',9,2000,7),('2018-08-23','生鲜','品牌A','jimmy',5,45.1,5),('2018-08-23','外套','品牌E','jimmy',5,100.2,4),('2018-08-24','生鲜','品牌G','peiqi',10,5560,7),('2018-08-24','卫浴','品牌F','peiqi',1,445.6,2),('2018-08-24','外套','品牌A','ray',3,777,3),('2018-08-24','卫浴','品牌G','ray',3,122,3),('2018-08-24','外套','品牌C','ray',1,62,7) ;
完成插入数据后,您可以执行
select * from hive_doc_good_sale where pt =1;
语句,检查Hadoop集群表中是否已存在数据可以用于迁移。
利用DataWorks新建目标表。
登录DataWorks控制台,单击左侧导航栏的 ,在下拉框中选择对应工作空间后单击进入数据开发。
在数据开发页面,右键单击目标工作流程,选择 。
在弹出的新建表对话框中,填写名称,并单击新建。
说明如果绑定多个实例,则需要选择MaxCompute引擎实例。
在表的编辑页面,选择DDL。
在DDL对话框中输入建表语句,单击生成表结构,并确认操作。本示例的建表语句如下所示。
CREATE TABLE IF NOT EXISTS hive_doc_good_sale( create_time string, category STRING, brand STRING, buyer_id STRING, trans_num BIGINT, trans_amount DOUBLE, click_cnt BIGINT ) PARTITIONED BY (pt string) ;
在建表过程中,需要考虑Hive数据类型与MaxCompute数据类型的映射,当前数据映射关系请参见数据类型映射表。
上述步骤同样可通过odpscmd命令行工具完成,命令行工具安装和配置请参见安装并配置MaxCompute客户端。
说明考虑到部分Hive与MaxCompute数据类型的兼容问题,建议在odpscmd客户端上执行以下命令。
set odps.sql.type.system.odps2=true; set odps.sql.hive.compatible=true;
单击提交到生产环境,完成表的创建。
完成建表后,单击左侧导航栏中的表管理,即可查看当前创建的MaxCompute表。
步骤二:数据同步
新建自定义资源组。
由于MaxCompute项目所处的网络环境与Hadoop集群中的数据节点(data node)网络通常不可达,您可以通过自定义资源组的方式,将DataWorks的同步任务运行在Hadoop集群的Master节点上(Hadoop集群内Master节点和数据节点通常可达)。
查看Hadoop集群数据节点 。
登录EMR控制台,单击EMR on ECS。
选择集群名称,在节点管理页签查看主机信息。
您也可单击Master节点的ECS ID,进入ECS实例详情页。然后单击远程连接进入ECS,执行
hadoop dfsadmin -report
命令查看data node。说明本示例的data node只具有内网地址,很难与DataWorks默认资源组互通,所以需要设置自定义资源组,将master node设置为执行DataWorks数据同步任务的节点。
新建任务资源组。
在DataWorks控制台上,进入
页面,单击右上角的新增自定义资源组。说明目前仅专业版及以上版本方可使用此入口。
添加服务器时,需要输入ECS UUID和机器IP等信息(对于经典网络类型,需要输入服务器名称。对于专有网络类型,需要输入服务器UUID)。目前仅DataWorks V2.0华东2(上海)支持添加经典网络类型的调度资源,对于其他地域,无论您使用的是经典网络还是专有网络类型,在添加调度资源组时都请选择专有网络类型。
机器IP需要填写master node公网IP(内网IP有可能不可达)。ECS的UUID需要进入master node管理终端,通过命令dmidecode | grep UUID获取(如果您的hadoop集群并非搭建在EMR环境上,也可以通过该命令获取)。
添加服务器后,需要保证master node与DataWorks网络可达。如果您使用的是ECS服务器,需要设置服务器安全组。
如果您使用的内网IP互通,请参见附录:ECS自建数据库的安全组配置。
如果您使用的是公网IP,可以直接设置安全组公网出入方向规则。本文中设置公网入方向放通所有端口(实际应用场景中,为了您的数据安全,强烈建议设置详细的放通规则)。
完成上述步骤后,按照提示安装自定义资源组agent。当前状态显示为可用时,则新增自定义资源组成功。
如果状态为不可用,您可以登录master node,执行
tail -f /home/admin/alisatasknode/logs/heartbeat.log
命令查看DataWorks与master node之间心跳报文是否超时。
新建数据源。
DataWorks新建工作空间后,默认数据源odps_first。因此只需要添加Hadoop集群数据源。更多详情请参见配置HDFS数据源。
进入数据集成页面,单击左侧导航栏中的数据源。
在数据源列表页面,单击新增数据源 。
在新增数据源页面中,选择数据源类型为HDFS。
填写HDFS数据源的各配置项。
配置
说明
数据源名称
数据源名称必须以字母、数字、下划线组合,且不能以数字和下划线开头。
数据源描述
对数据源进行简单描述,不得超过80个字符。
DefaultFS
对于EMR Hadoop集群而言,如果Hadoop集群为HA集群,则此处地址为
hdfs://emr-header-1的IP:8020
。如果Hadoop集群为非HA集群,则此处地址为hdfs://emr-header-1的IP:9000
。本实验中的emr-header-1与DataWorks通过公网连接,因此此处填写公网IP并放通安全组。
完成配置后,单击测试连通性。
测试连通性通过后,单击完成。
说明如果EMR Hadoop集群设置网络类型为专有网络,则不支持连通性测试。
配置数据同步任务 。
在数据开发页面的左侧菜单栏顶部,单击
图标,选择 。
在新建节点对话框中,输入名称和路径,单击确认。
成功创建数据同步节点后,按照界面指引选择数据来源、数据去向及资源组,确保其网络连通,然后单击工具栏中的转换脚本按钮。
单击提示对话框中的确认,即可进入脚本模式进行开发。
单击工具栏中的导入模板按钮。
在导入模板对话框中,选择来源类型、数据源、目标类型及数据源,单击确认。
新建同步任务完成后,通过导入模板已生成了基本的读取端配置。
此时您可以继续手动配置数据同步任务的读取端数据源,以及需要同步的表信息等。本示例的代码如下所示,更多详情请参见HDFS Reader。
{ "configuration": { "reader": { "plugin": "hdfs", "parameter": { "path": "/user/hive/warehouse/hive_doc_good_sale/", "datasource": "HDFS1", "column": [ { "index": 0, "type": "string" }, { "index": 1, "type": "string" }, { "index": 2, "type": "string" }, { "index": 3, "type": "string" }, { "index": 4, "type": "long" }, { "index": 5, "type": "double" }, { "index": 6, "type": "long" } ], "defaultFS": "hdfs://47.100.XX.XXX:9000", "fieldDelimiter": ",", "encoding": "UTF-8", "fileType": "text" } }, "writer": { "plugin": "odps", "parameter": { "partition": "pt=1", "truncate": false, "datasource": "odps_first", "column": [ "create_time", "category", "brand", "buyer_id", "trans_num", "trans_amount", "click_cnt" ], "table": "hive_doc_good_sale" } }, "setting": { "errorLimit": { "record": "1000" }, "speed": { "throttle": false, "concurrent": 1, "mbps": "1",//此处1mbps = 1MB/s。 } } }, "type": "job", "version": "1.0" }
其中,path参数为数据在Hadoop集群中存放的位置。您可以在登录Master Node后,执行
hdfs dfs -ls /user/hive/warehouse/hive_doc_good_sale
命令确认。对于分区表,您可以不指定分区,DataWorks数据同步会自动递归到分区路径。完成配置后,单击运行。如果提示任务运行成功,则说明同步任务已完成。如果运行失败,可以通过日志进行排查。
步骤三:查看结果
在DataStudio页面的左侧导航栏,单击临时查询。
选择 。
编写并执行SQL语句,查看导入hive_doc_good_sale的数据。
SQL语句如下所示:
--查看是否成功写入MaxCompute。 select * from hive_doc_good_sale where pt=1;
说明您也可以在odpscmd命令行工具中输入
select * FROM hive_doc_good_sale where pt =1;
,查询表结果。如果您想实现MaxCompute数据迁移至Hadoop,步骤与上述步骤类似,不同的是同步脚本内的reader和writer对象需要对调,具体实现脚本如下。
{ "configuration": { "reader": { "plugin": "odps", "parameter": { "partition": "pt=1", "isCompress": false, "datasource": "odps_first", "column": [ "create_time", "category", "brand", "buyer_id", "trans_num", "trans_amount", "click_cnt" ], "table": "hive_doc_good_sale" } }, "writer": { "plugin": "hdfs", "parameter": { "path": "/user/hive/warehouse/hive_doc_good_sale", "fileName": "pt=1", "datasource": "HDFS_data_source", "column": [ { "name": "create_time", "type": "string" }, { "name": "category", "type": "string" }, { "name": "brand", "type": "string" }, { "name": "buyer_id", "type": "string" }, { "name": "trans_num", "type": "BIGINT" }, { "name": "trans_amount", "type": "DOUBLE" }, { "name": "click_cnt", "type": "BIGINT" } ], "defaultFS": "hdfs://47.100.XX.XX:9000", "writeMode": "append", "fieldDelimiter": ",", "encoding": "UTF-8", "fileType": "text" } }, "setting": { "errorLimit": { "record": "1000" }, "speed": { "throttle": false, "concurrent": 1, "mbps": "1",//此处1mbps = 1MB/s。 } } }, "type": "job", "version": "1.0" }
说明您需要参见HDFS Writer,在运行上述同步任务前,对Hadoop集群进行设置。