Jindo DistCp is a distributed data copy tool built for E-MapReduce (EMR) clusters. It uses MapReduce for parallel execution, error handling, and reporting, and is optimized for copying data between Hadoop Distributed File System (HDFS) and Object Storage Service (OSS).
Prerequisites
Before you begin, make sure you have:
Java Development Kit (JDK) 8 installed on your computer
An EMR cluster of version 3.28.0 or later. For details, see Create a cluster
Parameters
Run the following command on the master node of your EMR cluster to view all available parameters:
jindo distcp --helpThe following table describes the key parameters. Parameters marked with a constraint cannot be combined with certain other parameters.
| Parameter | Description | Default | Constraint |
|---|---|---|---|
--src=VALUE | Source directory | — | — |
--dest=VALUE | Destination directory | — | — |
--parallelism=VALUE | Number of parallel reduce tasks (maps to mapreduce.job.reduces) | 7 | — |
--srcPattern=VALUE | Regular expression to filter source files. Must match the full path. | — | — |
--srcPrefixesFile=VALUE | File containing source URI prefixes; copies from multiple directories at once | — | — |
--deleteOnSuccess | Deletes source files after a successful copy | — | — |
--outputCodec=VALUE | Compression codec for output files. Values: gzip, gz, lzo, lzop, snappy, none, keep | keep | — |
--outputManifest=VALUE | Name of the manifest file (GZ format) to generate after copying | — | — |
--requirePreviousManifest=VALUE | Set to false to generate a new manifest when no previous manifest exists | — | — |
--previousManifest=VALUE | Path to an existing manifest file; skips files already listed | — | — |
--copyFromManifest | Copies only files listed in the manifest specified by --previousManifest | — | — |
--groupBy=VALUE | Pattern to group input files for merging | — | Cannot be used with --enableBalancePlan or --enableDynamicPlan |
--targetSize=VALUE | Target size (MB) for merged output files | — | Cannot be used with --enableBalancePlan or --enableDynamicPlan |
--enableBalancePlan | Distributes tasks evenly; use when file sizes vary moderately | — | Cannot be used with --groupBy or --targetSize |
--enableDynamicPlan | Dynamically assigns tasks to faster workers; use when file sizes vary significantly and most files are small | — | Cannot be used with --groupBy or --targetSize |
--enableTransaction | Enables job-level transaction support | — | — |
--diff | Reports differences between source and destination directories after copying | — | Does not return accurate file size differences if compression or decompression was performed |
--ossKey=VALUE | OSS AccessKey ID | — | — |
--ossSecret=VALUE | OSS AccessKey Secret | — | — |
--ossEndPoint=VALUE | OSS endpoint | — | — |
--policy=VALUE | OSS storage class for written data. Values: archive, ia | — | — |
--cleanUpPending | Cleans up incomplete multipart uploads (identified by uploadId) after the job finishes | — | — |
--queue=VALUE | YARN queue name for the DistCp job | — | — |
--bandwidth=VALUE | Bandwidth cap per map or reduce task, in MB/s | — | — |
--s3Key=VALUE | Amazon S3 access key | — | — |
--s3Secret=VALUE | Amazon S3 secret key | — | — |
--s3EndPoint=VALUE | Amazon S3 endpoint | — | — |
Copy files from HDFS to OSS
Log on to the master node of your EMR cluster in SSH mode. For details, see Connect to the master node of an EMR cluster in SSH mode.
Run a copy command. The following example copies all files from
/opt/tmpin HDFS to an OSS bucket:jindo distcp --src /opt/tmp --dest oss://yang-hhht/tmpAdjust
--parallelismbased on your cluster resources. The following example runs 20 parallel reduce tasks:jindo distcp --src /opt/tmp --dest oss://yang-hhht/tmp --parallelism 20
Filter files by pattern
Use --srcPattern to copy only files matching a regular expression. The expression must match the full file path.
The following example copies only .log files from /data/incoming/hourly_table:
jindo distcp --src /data/incoming/hourly_table --dest oss://yang-hhht/hourly_table --srcPattern .*\.log --parallelism 20To verify the result, list the destination directory:
hdfs dfs -ls oss://yang-hhht/hourly_table/2017-02-01/03Expected output (only .log files are copied):
Found 2 items
-rw-rw-rw- 1 4891 2020-04-17 20:52 oss://yang-hhht/hourly_table/2017-02-01/03/1.log
-rw-rw-rw- 1 4891 2020-04-17 20:52 oss://yang-hhht/hourly_table/2017-02-01/03/2.logCopy from multiple source directories
Use --srcPrefixesFile to copy files from multiple directories in a single job. Create a text file listing one source URI prefix per line:
hdfs://emr-header-1.cluster-50466:9000/data/incoming/hourly_table/2017-02-01
hdfs://emr-header-1.cluster-50466:9000/data/incoming/hourly_table/2017-02-02Then pass the file path to --srcPrefixesFile:
jindo distcp --src /data/incoming/hourly_table --dest oss://yang-hhht/hourly_table --srcPrefixesFile file:///opt/folders.txt --parallelism 20Merge small files
Reading many small files from HDFS reduces analysis performance. Use --groupBy and --targetSize together to merge small files into larger output files.
The following example merges .txt files into output files of up to 10 MB each:
jindo distcp --src /data/incoming/hourly_table --dest oss://yang-hhht/hourly_table --targetSize=10 --groupBy='.*/([a-z]+). *.txt' --parallelism 20After the job, the two .txt files in the source are merged into a single output file:
hdfs dfs -ls oss://yang-hhht/hourly_table/2017-02-01/03/Found 1 items
-rw-rw-rw- 1 2032 2020-04-17 21:18 oss://yang-hhht/hourly_table/2017-02-01/03/emp2--groupByand--targetSizecannot be used with--enableBalancePlanor--enableDynamicPlan.
Optimize the task allocation plan
When files vary in size, choosing the right allocation plan improves copy throughput.
| Scenario | Parameter |
|---|---|
| File sizes vary moderately (mix of small and large, but not dramatically different within each group) | --enableBalancePlan |
| File sizes vary significantly and most files are small | --enableDynamicPlan |
| No specific optimization needed (default behavior) | Neither (files are allocated randomly) |
Example using --enableBalancePlan:
jindo distcp --src /data/incoming/hourly_table --dest oss://yang-hhht/hourly_table --enableBalancePlan --parallelism 20Example using --enableDynamicPlan:
jindo distcp --src /data/incoming/hourly_table --dest oss://yang-hhht/hourly_table --enableDynamicPlan --parallelism 20Both options cannot be used with--groupByor--targetSize.
Delete source files after copying
Use --deleteOnSuccess to remove source files after a successful copy:
jindo distcp --src /data/incoming/hourly_table --dest oss://yang-hhht/hourly_table --deleteOnSuccess --parallelism 20Compress output files
Use --outputCodec to compress files as they are copied. Supported values: gzip, gz, lzo, lzop, snappy, none, keep (default).
none: Copies files without compression. If the source files are compressed, they are decompressed.keep: Copies files without changing the compression.
The following example compresses output files in GZ format:
jindo distcp --src /data/incoming/hourly_table --dest oss://yang-hhht/hourly_table --outputCodec=gz --parallelism 20After the job, all files in the destination directory have the .gz extension:
Found 6 items
-rw-rw-rw- 1 938 2020-04-17 20:58 oss://yang-hhht/hourly_table/2017-02-01/03/000151.sst.gz
-rw-rw-rw- 1 1956 2020-04-17 20:58 oss://yang-hhht/hourly_table/2017-02-01/03/1.log.gz
-rw-rw-rw- 1 1956 2020-04-17 20:58 oss://yang-hhht/hourly_table/2017-02-01/03/2.log.gz
-rw-rw-rw- 1 1956 2020-04-17 20:58 oss://yang-hhht/hourly_table/2017-02-01/03/OPTIONS-000109.gz
-rw-rw-rw- 1 506 2020-04-17 20:58 oss://yang-hhht/hourly_table/2017-02-01/03/emp01.txt.gz
-rw-rw-rw- 1 506 2020-04-17 20:58 oss://yang-hhht/hourly_table/2017-02-01/03/emp06.txt.gzTo use the LZO codec in an open source Hadoop cluster, install the native gplcompression library and the hadoop-lzo package.
Track copied files with a manifest
A manifest file records all files copied by a job, including their source paths, destination paths, and sizes. Use manifests to track incremental copies and resume interrupted jobs.
Generate a manifest
Set --requirePreviousManifest=false to create a new manifest file for the current job:
jindo distcp --src /data/incoming/hourly_table --dest oss://yang-hhht/hourly_table --outputManifest=manifest-2020-04-17.gz --requirePreviousManifest=false --parallelism 20Manifest files are always compressed in GZ format. To inspect the contents:
hadoop fs -text oss://yang-hhht/hourly_table/manifest-2020-04-17.gz > before.lst
cat before.lstEach line is a JSON record:
{"path":"oss://yang-hhht/hourly_table/2017-02-01/03/000151.sst","baseName":"2017-02-01/03/000151.sst","srcDir":"oss://yang-hhht/hourly_table","size":2252}
{"path":"oss://yang-hhht/hourly_table/2017-02-01/03/1.log","baseName":"2017-02-01/03/1.log","srcDir":"oss://yang-hhht/hourly_table","size":4891}
{"path":"oss://yang-hhht/hourly_table/2017-02-01/03/2.log","baseName":"2017-02-01/03/2.log","srcDir":"oss://yang-hhht/hourly_table","size":4891}
{"path":"oss://yang-hhht/hourly_table/2017-02-01/03/OPTIONS-000109","baseName":"2017-02-01/03/OPTIONS-000109","srcDir":"oss://yang-hhht/hourly_table","size":4891}
{"path":"oss://yang-hhht/hourly_table/2017-02-01/03/emp01.txt","baseName":"2017-02-01/03/emp01.txt","srcDir":"oss://yang-hhht/hourly_table","size":1016}
{"path":"oss://yang-hhht/hourly_table/2017-02-01/03/emp06.txt","baseName":"2017-02-01/03/emp06.txt","srcDir":"oss://yang-hhht/hourly_table","size":1016}Copy only new files using a previous manifest
Pass a previous manifest with --previousManifest to skip already-copied files and copy only new additions:
jindo distcp --src /data/incoming/hourly_table --dest oss://yang-hhht/hourly_table --outputManifest=manifest-2020-04-18.gz --previousManifest=oss://yang-hhht/hourly_table/manifest-2020-04-17.gz --parallelism 20To see which files were added in the current run, compare the two manifests:
hadoop fs -text oss://yang-hhht/hourly_table/manifest-2020-04-18.gz > current.lst
diff before.lst current.lstOutput showing the two newly copied files:
3a4,5
> {"path":"oss://yang-hhht/hourly_table/2017-02-01/03/5.log","baseName":"2017-02-01/03/5.log","srcDir":"oss://yang-hhht/hourly_table","size":4891}
> {"path":"oss://yang-hhht/hourly_table/2017-02-01/03/6.log","baseName":"2017-02-01/03/6.log","srcDir":"oss://yang-hhht/hourly_table","size":4891}Replay a manifest
Use --copyFromManifest with --previousManifest to copy exactly the files listed in a specific manifest:
jindo distcp --src /data/incoming/hourly_table --dest oss://yang-hhht/hourly_table --previousManifest=oss://yang-hhht/hourly_table/manifest-2020-04-17.gz --copyFromManifest --parallelism 20Verify copy results
After copying, use --diff to check whether the source and destination directories are in sync:
jindo distcp --src /data/incoming/hourly_table --dest oss://yang-hhht/hourly_table --diffIf all files are copied successfully:
INFO distcp.JindoDistCp: distcp has been done completelyIf some files are missing, a manifest is generated in the destination directory listing the missing files. Use --copyFromManifest and --previousManifest to copy those files:
jindo distcp --src /data/incoming/hourly_table --dest oss://yang-hhht/hourly_table --previousManifest=file:///opt/manifest-2020-04-17.gz --copyFromManifest --parallelism 20If Jindo DistCp compressed or decompressed files during the copy, --diff does not return accurate file size differences.If the destination is an HDFS directory, specify--destin one of the following formats:/path,hdfs://hostname:ip/path, orhdfs://headerIp:ip/path. Formats such ashdfs:///pathandhdfs:/pathare not supported.
Enable job-level transactions
Use --enableTransaction to ensure data integrity across the entire job:
jindo distcp --src /data/incoming/hourly_table --dest oss://yang-hhht/hourly_table --enableTransaction --parallelism 20Control resource usage
Limit bandwidth
Use --bandwidth to cap the bandwidth used per map or reduce task (unit: MB/s). This prevents the job from saturating the network:
jindo distcp --src /data/incoming/hourly_table --dest oss://yang-hhht/hourly_table --bandwidth 100 --parallelism 20Assign to a YARN queue
Use --queue to route the job to a specific YARN queue:
jindo distcp --src /data/incoming/hourly_table --dest oss://<your_bucket>/hourly_table --queue yarnqueueAccess OSS with an AccessKey pair
By default, EMR clusters use AccessKey-free access to OSS. If you are running Jindo DistCp outside an EMR cluster, or if AccessKey-free access is not available, specify credentials directly in the command:
jindo distcp --src /data/incoming/hourly_table --dest oss://<your_bucket>/hourly_table --ossKey <your_access_key_id> --ossSecret <your_access_key_secret> --ossEndPoint oss-cn-hangzhou.aliyuncs.com --parallelism 20Replace the following placeholders:
| Placeholder | Description |
|---|---|
<your_access_key_id> | Your Alibaba Cloud AccessKey ID |
<your_access_key_secret> | Your Alibaba Cloud AccessKey Secret |
oss-cn-hangzhou.aliyuncs.com | OSS endpoint for your region |
Write to OSS Archive or Infrequent Access storage
Use --policy to write data directly to a lower-cost OSS storage class:
Archive mode:
jindo distcp --src /data/incoming/hourly_table --dest oss://<your_bucket>/hourly_table --policy archive --parallelism 20Infrequent Access (IA) mode:
jindo distcp --src /data/incoming/hourly_table --dest oss://<your_bucket>/hourly_table --policy ia --parallelism 20
Clean up incomplete uploads
When you run a DistCp task, files that are not correctly uploaded may be generated in your destination directory. These files are managed by OSS based on uploadId and may be invisible to users. Use --cleanUpPending to delete them automatically when the job finishes:
jindo distcp --src /data/incoming/hourly_table --dest oss://<your_bucket>/hourly_table --cleanUpPending --parallelism 20Alternatively, delete incomplete uploads from the OSS console.
Copy from Amazon S3
Specify S3 credentials and endpoint with --s3Key, --s3Secret, and --s3EndPoint:
jindo distcp jindo-distcp-2.7.3.jar --src s3a://yourbucket/ --dest oss://<your_bucket>/hourly_table --s3Key <your_s3_key> --s3Secret <your_s3_secret> --s3EndPoint s3-us-west-1.amazonaws.comTo avoid specifying credentials in every command, add them to the core-site.xml file of Hadoop:
<configuration>
<property>
<name>fs.s3a.access.key</name>
<value>xxx</value>
</property>
<property>
<name>fs.s3a.secret.key</name>
<value>xxx</value>
</property>
<property>
<name>fs.s3.endpoint</name>
<value>s3-us-west-1.amazonaws.com</value>
</property>
</configuration>After configuring core-site.xml, omit the credential flags:
jindo distcp /tmp/jindo-distcp-2.7.3.jar --src s3://smartdata1/ --dest s3://smartdata1/tmp --s3EndPoint s3-us-west-1.amazonaws.comCheck DistCp counters
When a job completes, the MapReduce output includes DistCp counters that report the total bytes transferred and files copied:
Distcp Counters
Bytes Destination Copied=11010048000
Bytes Source Read=11010048000
Files Copied=1001
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0If Jindo DistCp compressed or decompressed files during the copy, the values ofBytes Destination CopiedandBytes Source Readmay differ.