HDFS Reader allows you to read data stored in a Hadoop Distributed File System (HDFS). HDFS Reader connects to an HDFS, reads data from files in the HDFS, converts the data to a format that is readable by Data Integration, and then sends the converted data to a writer.

HDFS Reader reads data from files in an HDFS and converts the data to a format that is readable by Data Integration.

Example:

TextFile is the default storage format for creating Hive tables, without data compression. Essentially, a TextFile file is stored in HDFS as text. For Data Integration, the implementation of HDFS Reader is similar to that of OSS Reader.

Optimized Row Columnar File (ORCFile) is an optimized RCFile format. It provides an efficient method for storing Hive data. HDFS Reader uses the OrcSerde class provided by Hive to read and parse ORCFile data.

Note
  • Considering that a complex network connection is required between the default resource group and HDFS, we recommend that you use a custom resource group to run sync nodes. Make sure that your custom resource group can access the NameNode and DataNode of HDFS through a network.
  • By default, HDFS uses a network whitelist to guarantee data security. In this case, we recommend that you use a custom resource group to run HDFS sync nodes.
  • If you configure an HDFS sync node in the code editor, the HDFS connection does not need to pass the connectivity test. In this case, you can temporarily ignore connectivity test errors.
  • To synchronize data in Data Integration, you must log on as an administrator. Make sure that you have the permissions to read data from and write data to relevant HDFS files.

Supported features

Currently, HDFS Reader supports the following features:
  • Supports the TextFile, ORCFile, RCFile, SequenceFile, CSV, and Parquet file formats. What is stored in each file must be a logical two-dimensional table.
  • Reads data of various types as strings. Supports constants and column pruning.
  • Supports recursive reading. Supports regular expressions that contain asterisks (*) and question marks (?).
  • Compresses ORCFile files in SNAPPY or ZLIB format.
  • Compresses SequenceFile files in LZO format.
  • Reads multiple files concurrently.
  • Compresses CSV files in GZIP, BZ2, ZIP, LZO, LZO_DEFLATE, or SNAPPY format.
  • Supports Hive 1.1.1 and Hadoop 2.7.1 (compatible with Apache JDK 1.6). HDFS Reader can work properly with Hadoop 2.5.0, Hadoop 2.6.0, and Hive 1.2.0 during testing.
Note Currently, HDFS Reader cannot use concurrent threads to read a single file.

Data types

Hive maintains the metadata of HDFS files and stores the metadata in its own metadatabase, such as a MySQL database. Currently, HDFS Reader cannot access or query the metadata in the Hive metadatabase. Therefore, you must specify the data types for them to be converted to those readable to Data Integration.

The following table lists the default mapping between data types in RCFile, ParquetFile, ORCFile, TextFile, and SequenceFile files in Hive and the data types supported by Data Integration.

Category Data Integration data type Hive data type
Integer LONG TINYINT, SMALLINT, INT, and BIGINT
Floating point DOUBLE FLOAT and DOUBLE
String STRING STRING, CHAR, VARCHAR, STRUCT, MAP, ARRAY, UNION, and BINARY
Date and time DATE DATE and TIMESTAMP
Boolean BOOLEAN BOOLEAN
The data types are described as follows:
  • LONG: data of the integer type in HDFS files, such as 123456789.
  • DOUBLE: data of the floating-point type in HDFS files, such as 3.1415.
  • BOOLEAN: data of the Boolean type in HDFS files, such as true or false. The value is case-insensitive.
  • DATE: data of the date and time type in HDFS files, such as 2014-12-31 00:00:00.
Note The data type TIMESTAMP supported by Hive can be accurate to nanoseconds, so the TIMESTAMP data stored in TextFile and ORCFile files is similar to 2015-08-21 22:40:47.397898389. After the data of the TIMESTAMP type in Hive is converted to data of the DATE type in Data Integration, the nanoseconds in the data will be lost. Therefore, you must specify the type of converted date to STRING to make sure that the nanoseconds are retained after conversion.

Parameters

Parameter Description Required Default value
path The path of the file to read. To read multiple files, use a regular expression such as /hadoop/data_201704*.
  • If you specify a single HDFS file, HDFS Reader uses only one thread to read the file.
  • If you specify multiple HDFS files, HDFS Reader uses multiple threads. The number of threads is limited by the transmission rate, in Mbit/s.
    Note The actual number of threads is determined by both the number of HDFS files to be read and the specified transmission rate.
  • When a path contains a wildcard, HDFS Reader attempts to read all files that match the path. If the path is ended with a slash (/), HDFS Reader reads all files in the specified directory. For example, if you specify the path as /bazhen/, HDFS Reader reads all files in the bazhen directory. Currently, HDFS Reader only supports asterisks (*) and question marks (?) as file name wildcards. The syntax is similar to that of file name wildcards used on the Linux command line.
Note
  • Data Integration considers all the files on a sync node as a single table. Make sure that all the files on each sync node can adapt to the same schema and Data Integration has the permission to read all these files.
  • When creating Hive tables, you can specify partitions. For example, if you specify partition(day="20150820",hour="09"), a directory named /20150820 and a subdirectory named /09 are created in the corresponding table directory of HDFS.
    Therefore, if you want HDFS Reader to read the data of a partition, specify the file path of the partition. For example, if you want HDFS Reader to read all the data in the partition with the date of 20150820 in the table named mytable01, specify the path as follows:
    "path": "/user/hive/warehouse/mytable01/20150820/*"
Yes None
defaultFS The NameNode endpoint of HDFS. Sync nodes on the default resource group do not support the advanced parameter settings of Hadoop, such as those related to high availability. To configure these parameters, add a custom resource group. For more information, see Add a custom resource group. Yes None
fileType The file format. Valid values: text, orc, rc, seq, csv, and parquet. HDFS Reader automatically recognizes the file format and uses corresponding read policies. Before data synchronization, HDFS Reader checks whether all the source files match the specified format. If any source file does not match the format, the sync node fails.
The valid values of the fileType parameter are described as follows:
  • text: the TextFile format.
  • orc: the ORCFile format.
  • rc: the RCFile format.
  • seq: the SequenceFile format.
  • csv: the common HDFS file format, that is, the logical two-dimensional table.
  • parquet: the common Parquet file format.
Note TextFile and ORCFile are different formats. HDFS Reader parses files in the two formats in different ways. After being converted from a composite data type of Hive to the STRING type of Data Integration, the data in a file of the TextFile format can be different from that in the same file of the ORCFile format. Composite data types include MAP, ARRAY, STRUCT, and UNION. The following example uses the conversion from the MAP type to the STRING type as an example:
  • HDFS Reader converts MAP-type ORCFile data to a string: {job=80, team=60, person=70}.
  • HDFS Reader converts MAP-type TextFile data to a string: {job:80, team:60, person:70}.

The conversion results show that the data remains unchanged but the formats differ slightly. Therefore, if the data to be synchronized matches a composite data type of Hive, we recommend that you use a uniform file format.

Recommendations:
  • To use a uniform file format, we recommend that you export TextFile tables as ORCFile tables on the Hive client.
  • If the file format is Parquet, the parquetSchema parameter is required, which specifies the schema of the Parquet table.

For the column parameter, you must specify the type parameter and specify one of the index and value parameters.

Yes None
column The columns to read. The type parameter specifies the source data type. The index parameter specifies the ID of the column in the source table, starting from 0. The value parameter specifies the column value if the column is a constant column. By default, HDFS Reader reads all data as strings. Specify this parameter as "column": ["*"].
You can also specify the column parameter (either index or value) in the following way:
{
  "type": "long",
  "index": 0
  // The first INT-type column of the source file. The index starts from 0.
},
{
  "type": "string",
  "value": "alibaba"
  // The value of the current column, that is, a constant "alibaba".
}
Note We recommend that you set this parameter by specifying the type and index parameters, rather than using "column": ["*"].
Yes None
fieldDelimiter The column delimiter. You need to specify the column delimiter for text files, and the default delimiter is a comma (,). You do not need to specify the column delimiter for ORC files, and the default delimiter is \u0001.
  • If you want each row to be converted to a column in the destination table, use a string that does not exist in every row, such as \u0001.
  • Do not use \n as the delimiter.
No ,
encoding The encoding format of the file to read. No UTF-8
nullFormat The string that represents null. No standard strings can represent null in text files. Therefore, Data Integration provides the nullFormat parameter to define which string represents a null pointer.

For example, if you specify nullFormat="null", Data Integration considers null as a null pointer.

Note Pay attention to the difference between the string NULL and null pointer.
No None
compress The compression format. Available compression formats for CSV files are GZIP, BZ2, ZIP, LZO, LZO_DEFLATE, HADOOP-SNAPPY, and FRAMING-SNAPPY.
Note
  • Do not mix up LZO with LZO_DEFLATE.
  • Snappy does not have a uniform stream format. Data Integration currently only supports the most popular two compression formats: HADOOP-SNAPPY (Snappy stream format in Hadoop) and FRAMING-SNAPPY (Snappy stream format recommended by Google).
  • rc indicates the RCFile format.
  • This parameter is not required for files of the ORCFile format.
No None
parquetSchema
The description of the data schema in Parquet files. This parameter is required when you set fileType to parquet. Make sure that the value of the parquetSchema parameter complies with the JSON syntax. The parquetSchema parameter contains the following fields:
message messageTypeName {
required, dataType, columnName;
...................... ;
}
  • messageTypeName: the name of the MessageType object.
  • required: specifies whether the field is required. We recommend that you set all the fields to optional.
  • dataType: the type of the field. Valid values: BOOLEAN, INT32, INT64, INT96, FLOAT, DOUBLE, BINARY, and FIXED_LEN_BYTE_ARRAY. Set this field to BINARY if the file stores strings.
  • Note that each line, including the last one, must end with a semicolon (;).
An example is provided as follows:
"parquetSchema": "message m { optional int32 minute_id; optional int32 dsp_id; optional int32 adx_pid; optional int64 req; optional int64 res; optional int64 suc; optional int64 imp; optional double revenue; }"
No None
csvReaderConfig The configurations for reading CSV files. The parameter value must match the MAP type. A specific CSV reader is used to read data from CSV files, which supports many configurations.
An example is provided as follows:
"csvReaderConfig":{
  "safetySwitch": false,
  "skipEmptyRecords": false,
  "useTextQualifier": false
}
You can use the following parameters and their default values:
boolean caseSensitive = true;
char textQualifier = 34;
boolean trimWhitespace = true;
boolean useTextQualifier = true; // Specifies whether to use escape characters for CSV files.
char delimiter = 44; // The delimiter.
char recordDelimiter = 0;
char comment = 35;
boolean useComments = false;
int escapeMode = 1;
boolean safetySwitch = true; // Specifies whether to limit the length of each column to 100,000 characters.
boolean skipEmptyRecords = true; // Specifies whether to skip empty rows.
boolean captureRawRecord = true;
No None
hadoopConfig The advanced parameter settings of Hadoop, such as those related to high availability. Sync nodes on the default resource group do not support the advanced parameter settings of Hadoop, such as those related to high availability. To configure these parameters, add a custom resource group. For more information, see Add a custom resource group.
"hadoopConfig":{
"dfs.nameservices": "testDfs",
"dfs.ha.namenodes.testDfs": "namenode1,namenode2",
"dfs.namenode.rpc-address.youkuDfs.namenode1": "",
"dfs.namenode.rpc-address.youkuDfs.namenode2": "",
"dfs.client.failover.proxy.provider.testDfs": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
}
No None
haveKerberos Specifies whether Kerberos authentication is required. Default value: false. If you set this parameter to true, you must also set the kerberosKeytabFilePath and kerberosPrincipal parameters. No false
kerberosKeytabFilePath The absolute path of the keytab file for Kerberos authentication. This parameter is required if the haveKerberos parameter is set to true. No None
kerberosPrincipal The Kerberos principal to which Kerberos can assign tickets, such as ****/hadoopclient@**. ***. This parameter is required if the haveKerberos parameter is set to true.
Note The absolute path of the keytab file is required for Kerberos authentication. Therefore, you can configure Kerberos authentication only on a custom resource group. An example is provided as follows:
"haveKerberos":true,
"kerberosKeytabFilePath":"/opt/datax/**.keytab",
"kerberosPrincipal":"**/hadoopclient@**. **"
No None

Configure HDFS Reader by using the codeless UI

Currently, the codeless user interface (UI) is not supported for HDFS Reader.

Configure HDFS Reader by using the code editor

In the following code, a node is configured to read data from an HDFS. For more information about the parameters, see the preceding parameter description.
{
    "type": "job",
    "version": "2.0",
    "steps": [
        {
            "stepType": "hdfs",// The reader type.
            "parameter": {
                "path": "",// The path of the file to read.
                "datasource": "",// The connection name.
                "column": [
                    {
                        "index": 0,// The ID of the column in the source table.
                        "type": "string"// The data type.
                    },
                    {
                        "index": 1,
                        "type": "long"
                    },
                    {
                        "index": 2,
                        "type": "double"
                    },
                    {
                        "index": 3,
                        "type": "boolean"
                    },
                    {
                        "format": "yyyy-MM-dd HH:mm:ss",// The time format.
                        "index": 4,
                        "type": "date"
                    }
                ],
                "fieldDelimiter": ","// The column delimiter.
                "encoding": "UTF-8",// The encoding format.
                "fileType": ""// The file format.
            },
            "name": "Reader",
            "category": "reader"
        },
        {// The following template is used to configure Stream Writer. For more information, see the corresponding topic.
            "stepType": "stream",
            "parameter": {},
            "name": "Writer",
            "category": "writer"
        }
    ],
    "setting": {
        "errorLimit": {
            "record": ""// The maximum number of dirty data records allowed.
        },
        "speed": {
            "concurrent": 3,// The maximum number of concurrent threads.
            "throttle": false,// Specifies whether to enable bandwidth throttling. A value of false indicates that the bandwidth is not throttled. A value of true indicates that the bandwidth is throttled. The maximum transmission rate takes effect only if you set this parameter to true.
            "dmu": 1// The DMU value.
        }
    },
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    }
}

The following is an example of the HDFS Reader configuration with the parquetSchema parameter.

Note
  • The fileType parameter must be set to parquet.
  • If you want HDFS Reader to read specific columns from a Parquet file, you must specify the parquetSchema parameter and specify the columns to be synchronized through the index field in the column parameter.
"reader": {
    "name": "hdfsreader",
    "parameter": {
        "path": "/user/hive/warehouse/addata.db/dw_ads_rtb_monitor_minute/thedate=20170103/hour_id=22/*",
        "defaultFS": "h10s010.07100.149:8020",
        "column": [
            {
                "index": 0,
                "type": "string"
            },
            {
                "index": 1,
                "type": "long"
            },
            {
                "index": 2,
                "type": "double"
            }
        ],
        "fileType": "parquet",
        "encoding": "UTF-8",
        "parquetSchema": "message m { optional int32 minute_id; optional int32 dsp_id; optional int32 adx_pid; optional int64 req; optional int64 res; optional int64 suc; optional int64 imp; optional double revenue; }"
    }
}