This topic describes how to use the data copy tool Jindo DistCp.

Prerequisites

  • Java Development Kit (JDK) 8 is installed on your computer.
  • An E-MapReduce (EMR) cluster is created. For more information, see Create a cluster.

Use Jindo DistCp

  1. Connect to the master node of the EMR cluster in SSH mode.

    For more information, see Connect to the master node of an EMR cluster in SSH mode.

  2. Run the following command to obtain help information:
    jindo distcp --help
    The following information is returned:
         --help           - Print help text
         --src=VALUE          - Directory to copy files from
         --dest=VALUE              - Directory to copy files to
         --parallelism=VALUE         - Copy task parallelism
         --outputManifest=VALUE       - The name of the manifest file
         --previousManifest=VALUE   -   The path to an existing manifest file
         --requirePreviousManifest=VALUE   -   Require that a previous manifest is present if specified
         --copyFromManifest   -   Copy from a manifest instead of listing a directory
         --srcPrefixesFile=VALUE   -   File containing a list of source URI prefixes
         --srcPattern=VALUE   -   Include only source files matching this pattern
         --deleteOnSuccess   -   Delete input files after a successful copy
         --outputCodec=VALUE   -   Compression codec for output files
         --groupBy=VALUE   -   Pattern to group input files by
         --targetSize=VALUE   -   Target size for output files
         --enableBalancePlan   -   Enable plan copy task to make balance
         --enableDynamicPlan   -   Enable plan copy task dynamically
         --enableTransaction   -   Enable transation on Job explicitly
         --diff   -   show the difference between src and dest filelist
         --ossKey=VALUE   -   Specify your oss key if needed
         --ossSecret=VALUE   -   Specify your oss secret if needed
         --ossEndPoint=VALUE   -   Specify your oss endPoint if needed
         --policy=VALUE   -   Specify your oss storage policy
         --cleanUpPending   -   clean up the incomplete upload when distcp job finish
         --queue=VALUE   -   Specify yarn queuename if needed
         --bandwidth=VALUE   -   Specify bandwidth per map/reduce in MB if needed
         --s3Key=VALUE   -   Specify your s3 key
         --s3Secret=VALUE   -   Specify your s3 Sercet
         --s3EndPoint=VALUE   -   Specify your s3 EndPoint
         --enableCMS  -   Enable CMS
         --update   -   Update target, copying only missing files or directories
         --filters=VALUE   -   Specify a path of file containing patterns to exlude source files

--src and --dest

--src specifies the source directory. --dest specifies the destination directory.

By default, Jindo DistCp copies all files in the directory specified by --src to the directory specified by --dest. If you do not specify a root directory, Jindo DistCp automatically creates a root directory.

For example, you can run the following command to copy files in the /opt/tmp directory of HDFS to an OSS bucket:
jindo distcp --src /opt/tmp --dest oss://<yourBucketName>/tmp
Note yourBucketName is the name of your OSS bucket.

--parallelism

--parallelism specifies the mapreduce.job.reduces parameter for the MapReduce job that is run to copy files. The default value is 7. You can customize --parallelism based on your available cluster resources. This allows you to determine how many reduce tasks can be run in parallel.

For example, you can run the following command to copy files in the /opt/tmp directory of HDFS to an OSS bucket:
jindo distcp --src /opt/tmp --dest oss://<yourBucketName>/tmp --parallelism 20

--srcPattern

--srcPattern specifies a regular expression that filters files for the copy operation. The regular expression must match a full path.

For example, if you need to copy all log files in the /data/incoming/hourly_table/2017-02-01/03 directory, set --srcPattern to .*\.log.

Run the following command to view the files in the /data/incoming/hourly_table/2017-02-01/03 directory:
hdfs dfs -ls /data/incoming/hourly_table/2017-02-01/03
The following information is returned:
Found 6 items
-rw-r-----   2 root hadoop       2252 2020-04-17 20:42 /data/incoming/hourly_table/2017-02-01/03/000151.sst
-rw-r-----   2 root hadoop       4891 2020-04-17 20:47 /data/incoming/hourly_table/2017-02-01/03/1.log
-rw-r-----   2 root hadoop       4891 2020-04-17 20:47 /data/incoming/hourly_table/2017-02-01/03/2.log
-rw-r-----   2 root hadoop       4891 2020-04-17 20:42 /data/incoming/hourly_table/2017-02-01/03/OPTIONS-000109
-rw-r-----   2 root hadoop       1016 2020-04-17 20:47 /data/incoming/hourly_table/2017-02-01/03/emp01.txt
-rw-r-----   2 root hadoop       1016 2020-04-17 20:47 /data/incoming/hourly_table/2017-02-01/03/emp06.txt
Run the following command to copy the log files:
jindo distcp --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --srcPattern .*\.log --parallelism 20
Run the following command to view files in the destination OSS bucket:
hdfs dfs -ls oss://<yourBucketName>/hourly_table/2017-02-01/03
The following information is returned. Only log files in the source directory are copied.
Found 2 items
-rw-rw-rw-   1       4891 2020-04-17 20:52 oss://<yourBucketName>/hourly_table/2017-02-01/03/1.log
-rw-rw-rw-   1       4891 2020-04-17 20:52 oss://<yourBucketName>/hourly_table/2017-02-01/03/2.log

--deleteOnSuccess

--deleteOnSuccess enables Jindo DistCp to delete the copied files from the source directory after a copy operation succeeds.

For example, you can run the following command to copy files in /data/incoming/hourly_table to an OSS bucket and delete the copied files from the source directory:
jindo distcp --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --deleteOnSuccess --parallelism 20

--outputCodec

--outputCodec specifies the compression codec that is used to compress copied files online. Example:

jindo distcp --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --outputCodec=gz --parallelism 20
Run the following command to view files in the destination directory:
hdfs dfs -ls oss://<yourBucketName>/hourly_table/2017-02-01/03
The following information is returned. Files in the destination directory are compressed in the GZ format.
Found 6 items
-rw-rw-rw-   1        938 2020-04-17 20:58 oss://<yourBucketName>/hourly_table/2017-02-01/03/000151.sst.gz
-rw-rw-rw-   1       1956 2020-04-17 20:58 oss://<yourBucketName>/hourly_table/2017-02-01/03/1.log.gz
-rw-rw-rw-   1       1956 2020-04-17 20:58 oss://<yourBucketName>/hourly_table/2017-02-01/03/2.log.gz
-rw-rw-rw-   1       1956 2020-04-17 20:58 oss://<yourBucketName>/hourly_table/2017-02-01/03/OPTIONS-000109.gz
-rw-rw-rw-   1        506 2020-04-17 20:58 oss://<yourBucketName>/hourly_table/2017-02-01/03/emp01.txt.gz
-rw-rw-rw-   1        506 2020-04-17 20:58 oss://<yourBucketName>/hourly_table/2017-02-01/03/emp06.txt.gz
You can set this parameter to gzip, gz, lzo, lzop, snappy, none, or keep. Default value: keep. Descriptions of none and keep:
  • none: Jindo DistCp does not compress copied files. If the files have been compressed, Jindo DistCp decompresses them.
  • keep: Jindo DistCp copies files with no change in the compression.
Note If you want to use the LZO codec in an open source Hadoop cluster, you must install the native library of gplcompression and the hadoop-lzo package.

--outputManifest and --requirePreviousManifest

--outputManifest generates a manifest file that contains the information about all the files copied by Jindo DistCp. The information includes the destination files, source files, and file sizes.

If you want to generate a manifest file, set --requirePreviousManifest to false. By default, the file is compressed in the GZ format. This is the only supported format.
jindo distcp --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --outputManifest=manifest-2020-04-17.gz --requirePreviousManifest=false --parallelism 20
Run the following command to view the content of the file:
hadoop fs -text oss://<yourBucketName>/hourly_table/manifest-2020-04-17.gz > before.lst
cat before.lst 
The following information is returned:
{"path":"oss://<yourBucketName>/hourly_table/2017-02-01/03/000151.sst","baseName":"2017-02-01/03/000151.sst","srcDir":"oss://<yourBucketName>/hourly_table","size":2252}
{"path":"oss://<yourBucketName>/hourly_table/2017-02-01/03/1.log","baseName":"2017-02-01/03/1.log","srcDir":"oss://<yourBucketName>/hourly_table","size":4891}
{"path":"oss://<yourBucketName>/hourly_table/2017-02-01/03/2.log","baseName":"2017-02-01/03/2.log","srcDir":"oss://<yourBucketName>/hourly_table","size":4891}
{"path":"oss://<yourBucketName>/hourly_table/2017-02-01/03/OPTIONS-000109","baseName":"2017-02-01/03/OPTIONS-000109","srcDir":"oss://<yourBucketName>/hourly_table","size":4891}
{"path":"oss://<yourBucketName>/hourly_table/2017-02-01/03/emp01.txt","baseName":"2017-02-01/03/emp01.txt","srcDir":"oss://<yourBucketName>/hourly_table","size":1016}
{"path":"oss://<yourBucketName>/hourly_table/2017-02-01/03/emp06.txt","baseName":"2017-02-01/03/emp06.txt","srcDir":"oss://<yourBucketName>/hourly_table","size":1016}

--outputManifest and --previousManifest

--outputManifest generates a manifest file that contains a list of both previously and newly copied files. --previousManifest generates a manifest file that contains a list of previously copied files. This way, you can recreate the full history of operations and see what files are copied by the current job.

For example, two files are added to the source directory. Run the following command to copy the newly added files:
jindo distcp --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --outputManifest=manifest-2020-04-18.gz --previousManifest=oss://<yourBucketName>/hourly_table/manifest-2020-04-17.gz --parallelism 20
Run the following command to view the copied files:
hadoop fs -text oss://<yourBucketName>/hourly_table/manifest-2020-04-18.gz > current.lst
diff before.lst current.lst 
The following information is returned:
3a4,5
> {"path":"oss://<yourBucketName>/hourly_table/2017-02-01/03/5.log","baseName":"2017-02-01/03/5.log","srcDir":"oss://<yourBucketName>/hourly_table","size":4891}
> {"path":"oss://<yourBucketName>/hourly_table/2017-02-01/03/6.log","baseName":"2017-02-01/03/6.log","srcDir":"oss://<yourBucketName>/hourly_table","size":4891}

--copyFromManifest

You can use --copyFromManifest to specify a manifest file that was previously generated by --outputManifest and copy the files listed in the manifest file to the destination directory. Example:
jindo distcp --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --previousManifest=oss://<yourBucketName>/hourly_table/manifest-2020-04-17.gz --copyFromManifest --parallelism 20

--srcPrefixesFile

--srcPrefixesFile enables Jindo DistCp to copy files in multiple folders at a time.

Run the following command to view the sub-folders under hourly_table:
hdfs dfs -ls oss://<yourBucketName>/hourly_table
The following information is returned:
Found 4 items
drwxrwxrwx   -          0 1970-01-01 08:00 oss://<yourBucketName>/hourly_table/2017-02-01
drwxrwxrwx   -          0 1970-01-01 08:00 oss://<yourBucketName>/hourly_table/2017-02-02
Run the following command to copy all the files under hourly_table to the destination folder. A file named folders.txt is generated. It contains prefixes that correspond to each of the sub-folders under hourly_table.
jindo distcp --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --srcPrefixesFile file:///opt/folders.txt --parallelism 20
Run the following command to view the content of the folders.txt file:
cat folders.txt 
The following information is returned:
hdfs://emr-header-1.cluster-50466:9000/data/incoming/hourly_table/2017-02-01
hdfs://emr-header-1.cluster-50466:9000/data/incoming/hourly_table/2017-02-02

--groupBy and -targetSize

Reading a large number of small files from HDFS affects the data processing performance. Therefore, we recommend that you use Jindo DistCp to merge small files into large files of a specified size. This optimizes analysis performance and reduces costs.

Run the following command to view the files in the specified folder:
hdfs dfs -ls /data/incoming/hourly_table/2017-02-01/03
The following information is returned:
Found 8 items
-rw-r-----   2 root hadoop       2252 2020-04-17 20:42 /data/incoming/hourly_table/2017-02-01/03/000151.sst
-rw-r-----   2 root hadoop       4891 2020-04-17 20:47 /data/incoming/hourly_table/2017-02-01/03/1.log
-rw-r-----   2 root hadoop       4891 2020-04-17 20:47 /data/incoming/hourly_table/2017-02-01/03/2.log
-rw-r-----   2 root hadoop       4891 2020-04-17 21:08 /data/incoming/hourly_table/2017-02-01/03/5.log
-rw-r-----   2 root hadoop       4891 2020-04-17 21:08 /data/incoming/hourly_table/2017-02-01/03/6.log
-rw-r-----   2 root hadoop       4891 2020-04-17 20:42 /data/incoming/hourly_table/2017-02-01/03/OPTIONS-000109
-rw-r-----   2 root hadoop       1016 2020-04-17 20:47 /data/incoming/hourly_table/2017-02-01/03/emp01.txt
-rw-r-----   2 root hadoop       1016 2020-04-17 20:47 /data/incoming/hourly_table/2017-02-01/03/emp06.txt
Run the following command to merge the TXT files in the folder into files each with a size of no more than 10 MB:
jindo distcp --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --targetSize=10 --groupBy='.*/([a-z]+).*.txt' --parallelism 20
Run the following command to view the files in the destination directory. The two TXT files are merged into one.
hdfs dfs -ls oss://<yourBucketName>/hourly_table/2017-02-01/03/
Found 1 items
-rw-rw-rw-   1       2032 2020-04-17 21:18 oss://<yourBucketName>/hourly_table/2017-02-01/03/emp2

--enableBalancePlan

If both small and large files are to be copied but the file sizes are not significantly different among small files and among large files, you can use --enableBalancePlan to optimize the job allocation plan. This improves the copy performance of Jindo DistCp. If you do not specify an application plan, files are randomly allocated.
jindo distcp --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --enableBalancePlan --parallelism 20
Note You cannot use this option in the same command as --groupBy or --targetSize.

--enableDynamicPlan

If the files to be copied are significantly different in size and most of the files are small files, you can use --enableDynamicPlan to optimize the job allocation plan. This improves the copy performance of Jindo DistCp.
jindo distcp --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --enableDynamicPlan --parallelism 20
Note You cannot use this option in the same command as --groupBy or --targetSize.

--enableTransaction

--enableTransaction ensures the integrity of job levels and transaction support among jobs. Example:

jindo distcp --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --enableTransaction --parallelism 20

--diff

After files are copied, you can use --diff to check the differences between files in the source and destination directories.

Example:
jindo distcp --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --diff
If all files are copied, the following information is returned:
INFO distcp.JindoDistCp: Jindo DistCp job exit with 0
If some files are not copied, a manifest file that contains a list of these files is generated in the destination directory. Then, you can use --copyFromManifest and --previousManifest to copy the files in the list to the destination directory. This way, the data volume and file quantity are verified. If Jindo DistCp has performed compression or decompression operations during the copy process, --diff does not return accurate file size differences.
jindo distcp --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --dest oss://<yourBucketName>/hourly_table --previousManifest=file:///opt/manifest-2020-04-17.gz --copyFromManifest --parallelism 20
Note If your destination directory is an HDFS directory, you must specify --dest in the /path, hdfs://hostname:ip/path, or hdfs://headerIp:ip/path format. Other formats, such as hdfs:///path and hdfs:/path, are not supported.
You can also use --update to incrementally update files that have differences.
hadoop jar jindo-distcp-3.5.0.jar --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --update --parallelism 20

--queue

--queue specifies the name of the YARN queue in which the current DistCp task resides.

Example:
jindo distcp --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --queue yarnqueue

--bandwidth

--bandwidth specifies the bandwidth allocated to a single node of the current DistCp task. This prevents the task from occupying excessive bandwidth. Unit: MB/s.

--update

--update enables Jindo DistCp to incrementally update files with one click. Jindo DistCp skips the files and directories that are the same as those in the destination and directly synchronizes new and updated files and directories from the source to the destination.

Example:
hadoop jar jindo-distcp-3.5.0.jar --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --update --parallelism 20

--filters

--filters specifies the path of a file. In this file, one regular expression is specified in each row. The regular expressions are used to filter out the files that you do not want to copy or compare for differences in the current DistCp task.

Example:
hadoop jar jindo-distcp-3.5.0.jar --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table -filters /path/to/filterfile.txt --parallelism 20
Content of a sample file:
.*\.tmp.
.*\.staging.*

If the preceding sample file is used, the DistCp task filters out the files whose names contain .tmp or .staging in the hdfs://data/incoming/hourly_tabl directory and skips these files during copy and --diff operations.

Use an AccessKey pair to access OSS

If you want to access OSS from an instance outside EMR or AccessKey-free access is not supported, you can use an AccessKey pair to access OSS. Set the --key, --secret, and --endPoint parameters in the command to specify an AccessKey pair.

Example:
jindo distcp --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --key <yourAccessKeyId> --secret <yourAccessKeySecret> --endPoint oss-cn-hangzhou.aliyuncs.com --parallelism 20

yourAccessKeyId is the AccessKey ID of your Alibaba Cloud account. yourAccessKeySecret is the AccessKey secret of your Alibaba Cloud account.

Write data to OSS Cold Archive, Archive, or IA storage

When you use a DistCp task to write data to Object Storage Service (OSS), you can use --policy to specify a storage class.
  • Example of the Cold Archive storage class (--policy coldArchive)
    hadoop jar jindo-distcp-3.5.0.jar --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --policy coldArchive --parallelism 20
    Note The Cold Archive storage class is available only in some regions. For more information about the supported regions, see Overview.
  • Example of the Archive storage class (--policy archive)
    jindo distcp --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --policy archive --parallelism 20
  • Example of the Infrequent Access (IA) storage class (--policy ia)
    jindo distcp --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --policy ia --parallelism 20

Use CloudMonitor

You can use CloudMonitor to collect the metrics of Alibaba Cloud resources and your custom metrics, detect service availability, and configure alerts for metrics. This helps you obtain the Alibaba Cloud resource usage and the status of applications and handle exceptions at the earliest opportunity to ensure the normal running of your applications.

You can specify whether CloudMonitor reports a failure if the current DistCp task fails. You can perform the following steps to configure the alerting feature in the CloudMonitor console:

  1. Create an alert contact or an alert contact group. For more information, see Create an alert contact or alert group.
  2. Obtain an alert token.
    1. In the left-side navigation pane, choose Alerts > Alert Contacts.
    2. On the Alert Contacts page, click the Alert Contact Group tab.
    3. Find your alert contact group and click Access External alert.

      Record the alert token that is displayed in the panel that appears.

  3. In the panel, click Test Command to configure the environment variables described in the following table.
    Parameter Description
    cmsAccessKeyId The AccessKey ID of your Alibaba Cloud account.
    cmsAccessSecret The AccessKey secret of your Alibaba Cloud account.
    cmsRegion The ID of the region where the cluster resides, such as cn-hangzhou.
    cmsToken The alert token that is obtained in Step 2.
    cmsLevel The alert level. The following levels are supported:
    • INFO: email and DingTalk chatbot
    • WARN: text message, email, and DingTalk chatbot
    • CRITICAL: phone call, text message, email, and DingTalk chatbot
    Example:
    export cmsAccessKeyId=<your_key_id>
    export cmsAccessSecret=<your_key_secret>
    export cmsRegion=cn-hangzhou
    export cmsToken=<your_cms_token>
    export cmsLevel=WARN
    
    hadoop jar jindo-distcp-3.5.0.jar \
    --src /data/incoming/hourly_table \
    --dest oss://yang-hhht/hourly_table \
    --enableCMS

Clean up residual files

When you run a DistCp task, files that are not correctly uploaded may be generated in your destination directory. The files are managed by OSS based on uploadId and may be invisible to users. In this case, you can specify the --cleanUpPending parameter in the command. This way, the system automatically cleans up the residual files after the task is complete. Alternatively, you can also clean up the files in the OSS console.

Example:
jindo distcp --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --cleanUpPending --parallelism 20

Use Amazon S3 as a data source

You can use the --s3Key, --s3Secret, and --s3EndPoint parameters in a command to specify information related to Amazon S3.

Example:
jindo distcp jindo-distcp-2.7.3.jar --src s3a://yourbucket/ --dest oss://<your_bucket>/hourly_table --s3Key yourkey --s3Secret yoursecret --s3EndPoint s3-us-west-1.amazonaws.com 
You can configure the s3Key, s3Secret, and s3EndPoint parameters in the core-site.xml file of Hadoop. This way, you do not need to specify an AccessKey pair each time you run a command.
<configuration>
    <property>
        <name>fs.s3a.access.key</name>
        <value>xxx</value>
    </property>

    <property>
        <name>fs.s3a.secret.key</name>
        <value>xxx</value>
    </property>

    <property>
        <name>fs.s3.endpoint</name>
        <value>s3-us-west-1.amazonaws.com</value>
    </property>
</configuration>
Example:
jindo distcp /tmp/jindo-distcp-2.7.3.jar --src s3://smartdata1/ --dest s3://smartdata1/tmp --s3EndPoint  s3-us-west-1.amazonaws.com

Check DistCp counters

Run the following commands to check DistCp counters in the counter information of MapReduce:
JindoDistcpCounter
  BYTES_EXPECTED=10000
  BYTES_SKIPPED=10000
  FILES_EXPECTED=11
  FILES_SKIPPED=11
Shuffle Errors
  BAD_ID=0
  CONNECTION=0
  IO_ERROR=0
  WRONG_LENGTH=0
  WRONG_MAP=0
  WRONG_REDUCE=0    
Task counter Description
COPY_FAILED The number of files that fail to be copied. An alert is reported when this counter is not 0.
CHECKSUM_DIFF The number of files that fail to pass the checksum verification. The number is added to the value of COPY_FAILED.
FILES_EXPECTED The number of files that need to be copied.
FILES_COPIED The number of files that are copied.
FILES_SKIPPED The number of files that are skipped during incremental updates.
BYTES_SKIPPED The number of bytes that are skipped during incremental updates.
DIFF_FILES The number of files that have differences. The files are obtained by using --diff. An alert is reported when this counter is not 0.
SAME_FILES The number of files that have no differences. The files are obtained by using --diff.
DST_MISS The number of files that do not exist in the destination directory. The number is added to the value of DIFF_FILES.
LENGTH_DIFF The number of files that are of different sizes in the source and destination directories. The number is added to the value of DIFF_FILES.
CHECKSUM_DIFF The number of files that fail to pass the checksum verification. The number is added to the value of DIFF_FILES.
DIFF_FAILED The number of files for which --diff-related errors are reported. You can view details about the errors in log files.