本文介绍如何使用阿里云Jindo DistCp从HDFS迁移数据到OSS。

背景信息

在传统大数据领域,HDFS经常作为大规模数据的底层存储。在进行数据迁移、数据拷贝的场景中,最常用的是Hadoop自带的DistCp工具。但是该工具不能很好利用对象存储OSS的特性,导致效率低下并且不能保证数据一致性。此外,该工具提供的功能选项较单一,无法很好地满足用户的需求。

阿里云Jindo DistCp(分布式文件拷贝工具)用于大规模集群内部或集群之间拷贝文件。Jindo DistCp使用MapReduce实现文件分发,错误处理和恢复,把文件和目录的列表作为MapReduce任务的输入,每个任务会完成源列表中部分文件的拷贝。全量支持HDFS之间、HDFS与OSS之间、以及OSS之间的数据拷贝场景,提供多种个性化拷贝参数和多种拷贝策略。

相对于Hadoop DistCp,使用阿里云Jindo DistCp从HDFS迁移数据到OSS具有以下优势:
  • 效率高,在测试场景中最高可达到1.59倍的加速。
  • 基本功能丰富,提供多种拷贝方式和场景优化策略。
  • 深度结合OSS,对文件提供归档、压缩等操作。
  • 实现No-Rename拷贝,保证数据一致性。
  • 场景全面,可完全替代Hadoop DistCp,目前支持Hadoop2.7+和Hadoop3.x。

前提条件

  • 如果您使用的是自建ECS集群,需要具备Hadoop2.7+或Hadoop3.x环境以及进行MapReduce作业的能力。
  • 如果您使用的是阿里云E-MapReduce:
    • 对于EMR3.28.0/bigboot2.7.0及以上的版本,可以通过Shell命令的方式使用Jindo DistCp。更多信息,请参见Jindo DistCp使用说明
    • 对于EMR3.28.0/bigboot2.7.0以下的版本,可能会存在一定的兼容性问题,您可以通过提交工单申请处理。

步骤一:下载JAR包

步骤二:配置OSS的AccessKey

您可以通过以下任意方式配置AccessKey:
  • 在示例命令中配置AccessKey

    例如,在将HDFS中的目录拷贝到OSS指定路径的示例命令中结合--ossKey--ossSecret--ossEndPoint选项配置AccessKey。

    hadoop jar jindo-distcp-3.7.3.jar --src /data/incoming/examplefile --dest oss://examplebucket/example_file --ossKey LTAI5t7h6SgiLSganP2m**** --ossSecret KZo149BD9GLPNiDIEmdQ7dyNKG**** --ossEndPoint oss-cn-hangzhou.aliyuncs.com
  • 通过配置文件预先配置AccessKey

    将OSS的--ossKey--ossSecret--ossEndPoint预先配置在Hadoop的core-site.xml文件里。示例命令如下:

    <configuration>
        <property>
            <name>fs.oss.accessKeyId</name>
            <value>xxx</value>
        </property>
    
        <property>
            <name>fs.oss.accessKeySecret</name>
            <value>xxx</value>
        </property>
    
        <property>
            <name>fs.oss.endpoint</name>
            <!-- 阿里云ECS环境下推荐使用内网OSS Endpoint,即oss-cn-xxx-internal.aliyuncs.com -->
            <value>oss-cn-xxx.aliyuncs.com</value>
        </property>
    </configuration>
  • 配置免密功能

    配置免密功能,避免明文保存AccessKey,提高安全性。具体操作,请参见使用JindoFS SDK免密功能

步骤三:迁移或拷贝数据

以下以Jindo DistCp 3.7.3版本为例,您可以根据实际环境替换对应的版本号。

  • 全量迁移或拷贝数据

    将HDFS指定目录/data/incoming下的数据全量迁移或拷贝到OSS目标路径oss://examplebucket/incoming/,示例命令如下:

    hadoop jar jindo-distcp-3.7.3.jar --src /data/incoming --dest oss://examplebucket/incoming --ossKey LTAI5t7h6SgiLSganP2m**** --ossSecret KZo149BD9GLPNiDIEmdQ7dyNKG**** --ossEndPoint oss-cn-hangzhou.aliyuncs.com --parallelism 10

    示例中涉及的各参数或选项说明如下:

    参数及选项 说明 示例
    --src 待迁移或拷贝的HDFS数据所在的路径。 /data/incoming
    --dest OSS中存放迁移或拷贝数据的目标路径。 oss://examplebucket/incoming
    --ossKey 访问OSS的AccessKey ID。关于获取AccessKey ID的具体操作,请参见获取AccessKey LTAI5t7h6SgiLSganP2m****
    --ossSecret 访问OSS的AccessKey Secret。关于获取AccessKey Secret的具体操作,请参见获取AccessKey KZo149BD9GLPNiDIEmdQ7dyNKG****
    --ossEndPoint Bucket所在地域(Region)对应的访问域名(Endpoint)。关于OSS支持的地域和对应的访问域名列表信息,请参见访问域名和数据中心
    注意 ECS环境下推荐使用内网 ossEndPoint,即oss-cn-xxx-internal.aliyuncs.com
    oss-cn-hangzhou.aliyuncs.com
    --parallelism 根据集群资源调整任务并发数。 10
  • 增量迁移或拷贝数据

    如果您仅希望拷贝在上一次全量迁移或拷贝后源路径下新增的数据,此时您可以结合--update选项完成数据的增量迁移或拷贝。

    将HDFS指定目录/data/incoming下的数据增量迁移或拷贝到OSS目标路径oss://examplebucket/incoming,示例命令如下:

    hadoop jar jindo-distcp-3.7.3.jar --src /data/incoming --dest oss://examplebucket/incoming --ossKey LTAI5t7h6SgiLSganP2m**** --ossSecret KZo149BD9GLPNiDIEmdQ7dyNKG**** --ossEndPoint oss-cn-hangzhou.aliyuncs.com --update --parallelism 10

    使用--update选项时,默认开启校验和Checksum。开启后,DistCp将对源路径和目标路径的文件名称、文件大小以及文件的Checksum进行比较。如果源路径或目标路径下的文件名称、文件大小或者文件的Checksum不一致时,将自动触发增量迁移或拷贝任务。

    如果您不需要对源路径和目标路径的文件的Checksum进行比较,请增加--disableChecksum选项关闭Checksum校验,示例命令如下:

    hadoop jar jindo-distcp-3.7.3.jar --src /data/incoming --dest oss://examplebucket/incoming --ossKey LTAI5t7h6SgiLSganP2m**** --ossSecret KZo149BD9GLPNiDIEmdQ7dyNKG**** --ossEndPoint oss-cn-hangzhou.aliyuncs.com --update --disableChecksum --parallelism 10

附录一:Jindo DistCp支持的参数及选项

Jindo DistCp提供一系列的参数及选项。您可以通过以下命令获取各参数及选项的具体用法。

hadoop jar jindo-distcp-3.7.3.jar --help

各参数及选项的含义及其示例如下表所示。

参数及选项 说明 示例
--src 指定拷贝的源路径。 --src oss://exampleBucket/sourceDir
--dest 指定拷贝的目标路径。 --dest oss://exampleBucket/destDir
--parallelism 指定拷贝的任务并发数,可根据集群资源调节。 --parallelism 10
--policy 指定拷贝到OSS后的文件类型。取值:
  • ia:低频访问
  • archive:归档存储
  • coldArchive:冷归档存储
--policy archive
--srcPattern 指定通过正则表达式来选择或者过滤需要拷贝的文件,正则表达式必须为全路径正则匹配。 --srcPattern .*\.log
--deleteOnSuccess 指定在拷贝任务完成后删除源路径下的文件。 --deleteOnSuccess
--outputCodec 指定文件的压缩方式。当前版本支持编解码器gzip、gz、lzo、lzop和snappy,以及关键字none和keep。关键字含义如下:
  • none:保存为未压缩的文件。如果文件已压缩,则Jindo DistCp会将其解压缩。
  • keep(默认值):不更改文件压缩形态。
说明 如果您需要在开源Hadoop集群环境中使用lzo的压缩方式,请确保已安装gplcompression的native库和hadoop-lzo包。如果缺少相关环境,建议使用其他压缩方式进行压缩。
--outputCodec gzip
--srcPrefixesFile 指定需要拷贝的文件列表,列表里文件以src路径作为前缀。 --srcPrefixesFile file:///opt/folders.txt
--outputManifest 指定在dest目录下生成一个gzip压缩的文件,记录已完成拷贝的文件信息。 --outputManifest=manifest-2020-04-17.gz
--requirePreviousManifest 指定本次拷贝操作是否需要读取之前已拷贝的文件信息。取值如下:
  • false:不读取已拷贝的文件信息,直接拷贝全量数据。
  • true:读取已拷贝的文件信息,仅拷贝增量数据。
--requirePreviousManifest=false
--previousManifest 指定本次拷贝需要读取已拷贝文件信息所在的路径,完成增量更新。 --previousManifest=oss://exampleBucket/manifest-2020-04-16.gz
--copyFromManifest 从已完成的Manifest文件中进行拷贝,通常与--previousManifest选项配合使用。 --previousManifest oss://exampleBucket/manifest-2020-04-16.gz --copyFromManifest
--groupBy 通过正则表达式将符合规则的文件进行聚合。 --groupBy='.*/([a-z]+).*.txt'
--targetSize 指定聚合后的文件大小阈值,单位为MB。 --targetSize=10
--enableBalancePlan 适用于拷贝任务中数据量差异不大的场景,例如均为大于10 GB或者均为小于10 KB的文件。 --enableBalancePlan
--enableDynamicPlan 适用于拷贝任务中数据量差异较大的场景,例如10 GB大文件和10 KB小文件混合的场景。 --enableDynamicPlan
--enableTransaction 保证Job级别的一致性,默认是Task级别。 --enableTransaction
--diff 查看本次拷贝是否完成全部文件拷贝,并对未完成拷贝的文件生成文件列表。 --diff
--ossKey 访问OSS的AccessKey ID。 --ossKey LTAI5t7h6SgiLSganP2m****
--ossSecret 访问OSS的AccessKey Secret。 --ossSecret KZo149BD9GLPNiDIEmdQ7dyNKG****
--ossEndPoint Bucket所在地域对应的Endpoint。 --ossEndPoint oss-cn-hangzhou.aliyuncs.com
--cleanUpPending 清理OSS残留文件,清理过程需要消耗一定的时间。 --cleanUpPending
--queue Yarn队列名称。 --queue examplequeue1
--bandwidth 指定本次DistCp任务所用的单机带宽,单位为 MB。 --bandwidth 6
--disableChecksum 关闭Checksum校验。 --disableChecksum
--enableCMS 开启云监控告警功能。 --enableCMS
--update 使用增量同步功能,即仅同步上一次全量迁移或拷贝后源路径下新增的数据到目标路径。 --update
--filters 通过filters参数指定一个文件路径。在这个文件中,每一行配置一个正则表达式,对应DistCp任务中不需要拷贝或比对的文件。 --filters /path/to/filterfile.txt
--tmp 指定在使用DistCp工具的过程中,用于存放临时文件的目录。 --tmp /data
--overwrite 使用覆盖同步功能,即使用源路径完全覆盖目标路径。 --overwrite
--ignore 忽略数据迁移期间发生的异常,相关报错不会中断任务,并最终以DistCp Counter的形式透出。如果使用了--enableCMS,也会通过指定方式进行通知。 --ignore

附录二:场景示例

场景一:使用JindoDistCp成功传输数据后,如何验证数据完整性?

JindoDistCp提供了以下两种方式用于验证数据的完整性。

  • 方式一:DistCp Counters

    通过Distcp Counters信息中包含的BYTES_EXPECTED、FILES_EXPECTED等参数验证数据完整性。

    示例
        JindoDistcpCounter
            BYTES_COPIED=10000
            BYTES_EXPECTED=10000
            FILES_COPIED=11
            FILES_EXPECTED=11
            ...
        Shuffle Errors
            BAD_ID=0
            CONNECTION=0
            IO_ERROR=0
            WRONG_LENGTH=0
            WRONG_MAP=0
            WRONG_REDUCE=0

    示例中可能包含的Counter参数如下:

    参数 说明
    BYTES_COPIED 拷贝成功的字节数。
    BYTES_EXPECTED 预期拷贝的字节数。
    FILES_COPIED 拷贝成功的文件数。
    FILES_EXPECTED 预期拷贝的文件数。
    FILES_SKIPPED 增量拷贝时跳过的文件数。
    BYTES_SKIPPED 增量拷贝时跳过的字节数。
    COPY_FAILED 拷贝失败的文件数,不为0时触发告警。
    BYTES_FAILED 拷贝失败的字节数。
    DIFF_FILES 源路径与目标路径下不相同的文件数,不为0时触发告警。
    DIFF_FAILED 文件比较操作异常的文件数,并计入DIFF_FILE。
    SRC_MISS 源路径下不存在的文件数,并计入DIFF_FILES。
    DST_MISS 目标路径下不存在的文件数,并计入DIFF_FILES。
    LENGTH_DIFF 源文件和目标文件大小不一致的数量,并计入DIFF_FILES。
    CHECKSUM_DIFF Checksum校验失败的文件数,并计入COPY_FAILED。
    SAME_FILES 源路径与目标路径下完全相同的文件数。
  • 方式二:通过--diff选项

    您可以在示例中结合--diff选项对源路径和目标路径下文件名和文件大小进行比较。

    hadoop jar jindo-distcp-3.7.3.jar --src /data/incoming --dest oss://examplebucket/incoming --ossKey LTAI5t7h6SgiLSganP2m**** --ossSecret KZo149BD9GLPNiDIEmdQ7dyNKG**** --ossEndPoint oss-cn-hangzhou.aliyuncs.com --diff

场景二:从HDFS迁移数据到OSS过程中,迁移任务可能随时失败,要想支持断点续传,该使用哪些参数?

  1. 您可以在示例中结合--diff选项查看HDFS指定路径下的文件是否都已迁移至OSS。
    hadoop jar jindo-distcp-3.7.3.jar --src /data/incoming --dest oss://examplebucket/incoming --ossKey LTAI5t7h6SgiLSganP2m**** --ossSecret KZo149BD9GLPNiDIEmdQ7dyNKG**** --ossEndPoint oss-cn-hangzhou.aliyuncs.com --diff

    如果所有文件都已迁移完成,则提示如下信息,否则在执行目录下会生成一个manifest文件。

    INFO distcp.JindoDistCp: Jindo DistCp job exit with 0
  2. 对于生成的manifest文件,您可以使用--copyFromManifest--previousManifest选项迁移剩余文件。
    hadoop jar jindo-distcp-3.7.3.jar --src /data/incoming --dest oss://examplebucket/incoming --previousManifest=file:///opt/manifest-2020-04-17.gz --copyFromManifest --parallelism 20

    其中,--previousManifest选项后指定的file:///opt/manifest-2020-04-17.gz为当前执行命令的本地路径。

场景三:如果要以低频访问、归档或者冷归档的方式存储写入OSS的文件,该使用哪些参数?

您可以在示例中添加--policy选项来指定写入OSS文件的存储类型。以下以指定为低频访问类型为例:
hadoop jar jindo-distcp-3.7.3.jar --src /data/incoming --dest oss://examplebucket/incoming --ossKey LTAI5t7h6SgiLSganP2m**** --ossSecret KZo149BD9GLPNiDIEmdQ7dyNKG**** --ossEndPoint oss-cn-hangzhou.aliyuncs.com --policy ia --parallelism 10

如果需要指定为归档存储类型,请将--policy ia替换为--policy archive。如需指定为冷归档存储类型,请将--policy ia替换为--policy coldArchive。此外,目前冷归档存储仅支持部分地域,更多信息,请参见冷归档存储(Cold Archive)

场景四:了解待迁移或拷贝的源路径下数据的分布情况后,例如大文件和小文件的占比,该使用哪些参数来优化数据传输速度?

  • 小文件较多,大文件较大

    例如HDFS源路径下包含50万个大小为100 KB左右的文件,10个5 TB大小的文件,此时您可以结合--enableDynamicPlan选项优化数据传输速度。

    hadoop jar jindo-distcp-3.7.3.jar --src /data/incoming --dest oss://examplebucket/incoming --ossKey LTAI5t7h6SgiLSganP2m**** --ossSecret KZo149BD9GLPNiDIEmdQ7dyNKG**** --ossEndPoint oss-cn-hangzhou.aliyuncs.com --enableDynamicPlan --parallelism 10
  • 文件大小无明显差异

    例如HDFS源路径下包含100个大小为200 KB的文件,此时您可以结合--enableBalancePlan选项优化数据传输速度。

    hadoop jar jindo-distcp-3.7.3.jar --src /data/incoming --dest oss://examplebucket/incoming --ossKey LTAI5t7h6SgiLSganP2m**** --ossSecret KZo149BD9GLPNiDIEmdQ7dyNKG**** --ossEndPoint oss-cn-hangzhou.aliyuncs.com --enableBalancePlan --parallelism 10
说明 不支持在同一个示例中同时使用--enableDynamicPlan以及--enableBalancePlan选项。

场景五:迁移或者拷贝任务完成后,希望仅保留目标路径下的数据,且删除源路径下的指定数据,该使用哪些参数?

您可以结合--deleteOnSuccess选项,在迁移或者拷贝任务完成后,仅保留OSS目标路径下的数据,并删除HDFS源路径下的指定数据。

hadoop jar jindo-distcp-3.7.3.jar --src /data/incoming --dest oss://examplebucket/incoming --ossKey LTAI5t7h6SgiLSganP2m**** --ossSecret KZo149BD9GLPNiDIEmdQ7dyNKG**** --ossEndPoint oss-cn-hangzhou.aliyuncs.com --deleteOnSuccess --parallelism 10