Use Jindo DistCp to copy data
Jindo DistCp is a distributed data copy tool built on E-MapReduce (EMR). Based on Apache DistCp, it is optimized for large-scale data transfers between Hadoop Distributed File System (HDFS) and Object Storage Service (OSS), with native support for OSS storage classes, Cloud Monitor alerts, and S3-compatible sources.
Prerequisites
Before you begin, ensure that you have:
Java Development Kit (JDK) 8 installed on your computer
An EMR cluster. For more information, see Create a cluster cluster.").
Parameter reference
All parameters are optional except --src and --dest.
| Parameter | Description | Default | Constraints |
|---|---|---|---|
--src | Source directory | — | Required |
--dest | Destination directory | — | Required. For HDFS destinations, use /path, hdfs://hostname:ip/path, or hdfs://headerIp:ip/path only. |
--parallelism | Number of parallel reduce tasks (mapreduce.job.reduces) | 7 | — |
--srcPattern | Regular expression to filter source files | — | Must match the full file path |
--deleteOnSuccess | Delete source files after a successful copy | — | — |
--outputCodec | Compression codec for copied files | keep | Valid values: gzip, gz, lzo, lzop, snappy, none, keep. To use LZO in an open source Hadoop cluster, install the gplcompression native library and the hadoop-lzo package. |
--outputManifest | Name of the manifest file to generate | — | GZ format only. Set --requirePreviousManifest=false to create a new manifest. |
--previousManifest | Path to an existing manifest file | — | Used with --outputManifest for incremental tracking, or with --copyFromManifest to replay a manifest. |
--requirePreviousManifest | Whether a previous manifest is required | — | Set to false when generating the first manifest. |
--copyFromManifest | Copy files listed in a manifest instead of listing a directory | — | Requires --previousManifest. |
--srcPrefixesFile | Path to a file containing source URI prefixes | — | One prefix per line. |
--groupBy | Regular expression to group input files for merging | — | Cannot be used with --enableBalancePlan or --enableDynamicPlan. |
--targetSize | Target size for merged output files (MB) | — | Cannot be used with --enableBalancePlan or --enableDynamicPlan. |
--enableBalancePlan | Optimize task allocation when file sizes are similar within each size group | — | Cannot be used with --groupBy or --targetSize. |
--enableDynamicPlan | Optimize task allocation when most files are small | — | Cannot be used with --groupBy or --targetSize. |
--enableTransaction | Enable job-level transaction support | — | — |
--diff | Compare source and destination file lists | — | Does not return accurate size differences if files were compressed or decompressed during the copy. |
--update | Copy only missing or changed files and directories (incremental update) | — | — |
--filters | Path to a filter file; each line is a regular expression | — | Matching files are excluded from copy and --diff operations. |
--queue | YARN queue name | — | — |
--bandwidth | Bandwidth limit per node (MB/s) | — | — |
--policy | OSS storage class for copied files | — | Valid values: coldArchive, archive, ia. Cold Archive is available only in select regions. |
--key | AccessKey ID for OSS access | — | Required when accessing OSS from outside EMR or when AccessKey-free access is not supported. |
--secret | AccessKey secret for OSS access | — | See --key. |
--endPoint | OSS endpoint | — | See --key. |
--s3Key | Amazon S3 access key | — | — |
--s3Secret | Amazon S3 secret key | — | — |
--s3EndPoint | Amazon S3 endpoint | — | — |
--enableCMS | Report task failures to Cloud Monitor | — | Requires Cloud Monitor environment variables to be set. See Use Cloud Monitor. |
--cleanUpPending | Clean up incomplete multipart uploads after the task finishes | — | Alternatively, clean up in the OSS console. |
Connect to the EMR cluster
SSH into the master node of your EMR cluster. For more information, see Connect to the master node of an EMR cluster in SSH mode cluster by using an SSH key pair or password.").
Copy files
Basic copy
Copy all files from /opt/tmp in HDFS to an OSS bucket:
jindo distcp --src /opt/tmp --dest oss://<yourBucketName>/tmpReplace <yourBucketName> with the name of your OSS bucket. If you do not specify a root directory in --dest, Jindo DistCp creates one automatically.
Adjust parallelism
--parallelism controls how many reduce tasks run in parallel. The default is 7. Increase it when you have more cluster resources available:
jindo distcp --src /opt/tmp --dest oss://<yourBucketName>/tmp --parallelism 20Filter files by pattern
--srcPattern accepts a regular expression that must match the full file path. To copy only .log files from /data/incoming/hourly_table:
jindo distcp --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --srcPattern '.*\.log' --parallelism 20Verify the result:
hdfs dfs -ls oss://<yourBucketName>/hourly_table/2017-02-01/03Expected output — only .log files are copied:
Found 2 items
-rw-rw-rw- 1 4891 2020-04-17 20:52 oss://<yourBucketName>/hourly_table/2017-02-01/03/1.log
-rw-rw-rw- 1 4891 2020-04-17 20:52 oss://<yourBucketName>/hourly_table/2017-02-01/03/2.logDelete source files after copy
--deleteOnSuccess removes source files once the copy succeeds:
jindo distcp --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --deleteOnSuccess --parallelism 20Compress files during copy
--outputCodec compresses files as they are copied. Valid values: gzip, gz, lzo, lzop, snappy, none, keep (default).
none: decompress files that are already compressedkeep: copy files without changing their compression
jindo distcp --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --outputCodec=gz --parallelism 20Verify that files in the destination are compressed:
hdfs dfs -ls oss://<yourBucketName>/hourly_table/2017-02-01/03Expected output:
Found 6 items
-rw-rw-rw- 1 938 2020-04-17 20:58 oss://<yourBucketName>/hourly_table/2017-02-01/03/000151.sst.gz
-rw-rw-rw- 1 1956 2020-04-17 20:58 oss://<yourBucketName>/hourly_table/2017-02-01/03/1.log.gz
-rw-rw-rw- 1 1956 2020-04-17 20:58 oss://<yourBucketName>/hourly_table/2017-02-01/03/2.log.gz
-rw-rw-rw- 1 1956 2020-04-17 20:58 oss://<yourBucketName>/hourly_table/2017-02-01/03/OPTIONS-000109.gz
-rw-rw-rw- 1 506 2020-04-17 20:58 oss://<yourBucketName>/hourly_table/2017-02-01/03/emp01.txt.gz
-rw-rw-rw- 1 506 2020-04-17 20:58 oss://<yourBucketName>/hourly_table/2017-02-01/03/emp06.txt.gzTo use the LZO codec in an open source Hadoop cluster, install the gplcompression native library and the hadoop-lzo package.
Copy files in multiple folders
--srcPrefixesFile points to a file where each line is a source URI prefix. Jindo DistCp copies all files under the listed prefixes:
jindo distcp --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --srcPrefixesFile file:///opt/folders.txt --parallelism 20Sample folders.txt:
hdfs://emr-header-1.cluster-50466:9000/data/incoming/hourly_table/2017-02-01
hdfs://emr-header-1.cluster-50466:9000/data/incoming/hourly_table/2017-02-02Merge small files
Reading large numbers of small files from HDFS degrades query performance. Use --groupBy and --targetSize to merge them into larger files.
--groupBy is a regular expression that groups files by a captured pattern. --targetSize sets the maximum output file size in MB.
The following example merges all .txt files (grouped by name prefix) into files no larger than 10 MB:
jindo distcp --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --targetSize=10 --groupBy='.*/([a-z]+).*.txt' --parallelism 20Verify the result:
hdfs dfs -ls oss://<yourBucketName>/hourly_table/2017-02-01/03/Expected output — two .txt files merged into one:
Found 1 items
-rw-rw-rw- 1 2032 2020-04-17 21:18 oss://<yourBucketName>/hourly_table/2017-02-01/03/emp2--groupByand--targetSizecannot be used with--enableBalancePlanor--enableDynamicPlan.
Optimize task allocation
Use one of these options to improve copy performance based on your file size distribution. Neither option can be used with --groupBy or --targetSize.
--enableBalancePlan: Use when both small and large files are to be copied but the file sizes are not significantly different among small files and among large files. Distributes tasks evenly across reducers.jindo distcp --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --enableBalancePlan --parallelism 20--enableDynamicPlan: Use when file sizes vary significantly and most files are small. Allocates tasks dynamically to avoid reducer imbalance.jindo distcp --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --enableDynamicPlan --parallelism 20
Enable transaction support
--enableTransaction ensures job-level atomicity:
jindo distcp --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --enableTransaction --parallelism 20Exclude files with filters
--filters points to a file where each line is a regular expression. Files with paths matching any expression are excluded from both copy and --diff operations.
hadoop jar jindo-distcp-3.5.0.jar --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --filters /path/to/filterfile.txt --parallelism 20Sample filter file:
.*\.tmp.
.*\.staging.*Files whose paths contain .tmp or .staging are skipped.
Limit bandwidth
--bandwidth caps the bandwidth used by each node (in MB/s), preventing the task from consuming all available network capacity:
jindo distcp --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --bandwidth 100 --parallelism 20Specify a YARN queue
--queue assigns the DistCp job to a specific YARN queue:
jindo distcp --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --queue yarnqueueTrack changes with manifest files
A manifest file records all files copied by a job — their destination paths, source paths, and sizes. Use manifests to track incremental copies and verify completeness.
Generate a manifest
Set --requirePreviousManifest=false to create the first manifest. The file is always compressed in GZ format:
jindo distcp --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --outputManifest=manifest-2020-04-17.gz --requirePreviousManifest=false --parallelism 20Read the manifest:
hadoop fs -text oss://<yourBucketName>/hourly_table/manifest-2020-04-17.gz > before.lst
cat before.lstEach line is a JSON object describing one file:
{"path":"oss://<yourBucketName>/hourly_table/2017-02-01/03/000151.sst","baseName":"2017-02-01/03/000151.sst","srcDir":"oss://<yourBucketName>/hourly_table","size":2252}
{"path":"oss://<yourBucketName>/hourly_table/2017-02-01/03/1.log","baseName":"2017-02-01/03/1.log","srcDir":"oss://<yourBucketName>/hourly_table","size":4891}
...Copy newly added files
Pass the previous manifest to --previousManifest so that Jindo DistCp only copies files not already listed:
jindo distcp --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --outputManifest=manifest-2020-04-18.gz --previousManifest=oss://<yourBucketName>/hourly_table/manifest-2020-04-17.gz --parallelism 20Compare manifests to see what was newly copied:
hadoop fs -text oss://<yourBucketName>/hourly_table/manifest-2020-04-18.gz > current.lst
diff before.lst current.lstOutput shows the two newly added files:
3a4,5
> {"path":"oss://<yourBucketName>/hourly_table/2017-02-01/03/5.log","baseName":"2017-02-01/03/5.log","srcDir":"oss://<yourBucketName>/hourly_table","size":4891}
> {"path":"oss://<yourBucketName>/hourly_table/2017-02-01/03/6.log","baseName":"2017-02-01/03/6.log","srcDir":"oss://<yourBucketName>/hourly_table","size":4891}Replay a manifest
To copy exactly the files listed in a previous manifest:
jindo distcp --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --previousManifest=oss://<yourBucketName>/hourly_table/manifest-2020-04-17.gz --copyFromManifest --parallelism 20Verify and sync
Check for differences
--diff compares source and destination file lists. Run it after a copy to verify completeness:
jindo distcp --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --diffIf all files are copied:
INFO distcp.JindoDistCp: Jindo DistCp job exit with 0If some files are missing, Jindo DistCp generates a manifest in the destination directory listing the missing files. Use --copyFromManifest and --previousManifest to copy them:
jindo distcp --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --previousManifest=file:///opt/manifest-2020-04-17.gz --copyFromManifest --parallelism 20--diffdoes not return accurate file size differences if files were compressed or decompressed during the copy. For HDFS destinations, use/path,hdfs://hostname:ip/path, orhdfs://headerIp:ip/path. Formats likehdfs:///pathandhdfs:/pathare not supported.
Sync incrementally
--update skips the files and directories that are the same as those in the destination and copies only new or changed files and directories:
hadoop jar jindo-distcp-3.5.0.jar --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --update --parallelism 20Write to OSS storage classes
Use --policy to write data directly to a specific OSS storage class.
Cold Archive (available in select regions):
hadoop jar jindo-distcp-3.5.0.jar --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --policy coldArchive --parallelism 20For supported regions, see Overview, Archive, and Cold Archive.").
Archive:
jindo distcp --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --policy archive --parallelism 20Infrequent Access (IA):
jindo distcp --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --policy ia --parallelism 20
Access OSS with an AccessKey pair
When accessing OSS from outside EMR, or when AccessKey-free access is not supported, specify credentials directly in the command:
jindo distcp --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --key <yourAccessKeyId> --secret <yourAccessKeySecret> --endPoint oss-cn-hangzhou.aliyuncs.com --parallelism 20Replace <yourAccessKeyId> and <yourAccessKeySecret> with your Alibaba Cloud AccessKey ID and AccessKey secret.
Use Amazon S3 as a source
Specify S3 credentials with --s3Key, --s3Secret, and --s3EndPoint:
jindo distcp jindo-distcp-2.7.3.jar --src s3a://yourbucket/ --dest oss://<your_bucket>/hourly_table --s3Key yourkey --s3Secret yoursecret --s3EndPoint s3-us-west-1.amazonaws.comTo avoid passing credentials on every command, add them to core-site.xml in your Hadoop configuration:
<configuration>
<property>
<name>fs.s3a.access.key</name>
<value>xxx</value>
</property>
<property>
<name>fs.s3a.secret.key</name>
<value>xxx</value>
</property>
<property>
<name>fs.s3.endpoint</name>
<value>s3-us-west-1.amazonaws.com</value>
</property>
</configuration>Then run without explicit credentials:
jindo distcp /tmp/jindo-distcp-2.7.3.jar --src s3://smartdata1/ --dest s3://smartdata1/tmp --s3EndPoint s3-us-west-1.amazonaws.comUse Cloud Monitor
Cloud Monitor can alert you when a DistCp job fails. To configure alerting:
Create an alert contact or alert contact group. For more information, see Create an alert contact or alert group.
Obtain an alert token:
In the left-side navigation pane, choose Alerts > Alert Contacts.
Click the Alert Contact Group tab.
Find your alert contact group and click Access External alert.
Record the alert token shown in the panel.
In the panel, click Test Command to configure the following environment variables: Example:
Variable Description cmsAccessKeyIdAccessKey ID of your Alibaba Cloud account cmsAccessSecretAccessKey secret of your Alibaba Cloud account cmsRegionRegion ID of the cluster, for example, cn-hangzhoucmsTokenAlert token obtained in step 2 cmsLevelAlert level: INFO(email and DingTalk),WARN(SMS, email, and DingTalk), orCRITICAL(phone call, SMS, email, and DingTalk)export cmsAccessKeyId=<your_key_id> export cmsAccessSecret=<your_key_secret> export cmsRegion=cn-hangzhou export cmsToken=<your_cms_token> export cmsLevel=WARN hadoop jar jindo-distcp-3.5.0.jar \ --src /data/incoming/hourly_table \ --dest oss://yang-hhht/hourly_table \ --enableCMS
Clean up incomplete uploads
If a DistCp job is interrupted, partially uploaded files may remain in OSS. These files are tracked by OSS using an uploadId and are not visible in normal listings. Add --cleanUpPending to remove them automatically when the job finishes:
jindo distcp --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --cleanUpPending --parallelism 20Alternatively, delete incomplete uploads in the OSS console.
Monitor job progress with DistCp counters
After a job completes, check the MapReduce counter output for the JindoDistcpCounter group:
JindoDistcpCounter
BYTES_EXPECTED=10000
BYTES_SKIPPED=10000
FILES_EXPECTED=11
FILES_SKIPPED=11| Counter | Description |
|---|---|
COPY_FAILED | Files that failed to copy. An alert is triggered when this value is not 0. |
CHECKSUM_DIFF | Files that failed checksum verification. Added to COPY_FAILED. |
FILES_EXPECTED | Total files to copy |
FILES_COPIED | Files successfully copied |
FILES_SKIPPED | Files skipped during incremental updates |
BYTES_SKIPPED | Bytes skipped during incremental updates |
DIFF_FILES | Files with differences found by --diff. An alert is triggered when this value is not 0. |
SAME_FILES | Files with no differences found by --diff |
DST_MISS | Files missing from the destination. Added to DIFF_FILES. |
LENGTH_DIFF | Files with different sizes in source and destination. Added to DIFF_FILES. |
CHECKSUM_DIFF | Files that failed checksum verification. Added to DIFF_FILES. |
DIFF_FAILED | Files with errors during --diff. Check log files for details. |