HDFS Reader allows you to read data stored in a Hadoop Distributed File System (HDFS). Specifically, HDFS Reader reads data from files in the HDFS, converts the data to a transmission protocol format that is supported by Data Integration, and then sends the converted data to a writer.

Notice HDFS Reader supports only exclusive resource groups for Data Integration, but not the shared resource group or custom resource groups. For more information, see Exclusive resource groups for Data Integration, Use the default resource group, and Add a custom resource group.

Background information

HDFS Reader reads data from files in an HDFS and converts the data to a transmission protocol format that is supported by Data Integration.

For example, TextFile is the default storage format for creating Hive tables, without data compression. Essentially, a TextFile file is stored in HDFS as text. For Data Integration, the implementation of HDFS Reader is similar to that of OSS Reader.

Optimized Row Columnar File (ORCFile) is an optimized RCFile format. It provides an efficient method for storing Hive data. HDFS Reader uses the OrcSerde class provided by Hive to read and parse ORCFile data.

Take note of the following items when you use HDFS Reader:
  • Considering that a complex network connection is required between the default resource group and HDFS, we recommend that you use an exclusive resource group for Data Integration to run sync nodes. Make sure that your exclusive resource group for Data Integration can access the NameNode and DataNode of HDFS through the network.
  • By default, HDFS uses a network whitelist to guarantee data security. In this case, we recommend that you use an exclusive resource group for Data Integration to run HDFS sync nodes.
  • If you configure an HDFS sync node by using the code editor, the HDFS connection does not need to pass the connectivity test. In this case, you can temporarily ignore connectivity test errors.
  • To synchronize data in Data Integration, you must log on as an administrator. Make sure that you have the permissions to read data from and write data to relevant HDFS files.

Features

HDFS Reader supports the following features:
  • Supports the TextFile, ORCFile, RCFile, SequenceFile, CSV, and Parquet file formats. What is stored in each file must be a logical two-dimensional table.
  • Reads data of various types as strings. Supports constants and column pruning.
  • Supports recursive reading. Supports regular expressions that contain asterisks (*) and question marks (?).
  • Compresses ORCFile files in SNAPPY or ZLIB format.
  • Compresses SequenceFile files in LZO format.
  • Reads multiple files concurrently.
  • Compresses CSV files in GZIP, BZ2, ZIP, LZO, LZO_DEFLATE, or SNAPPY format.
  • Supports Hive 1.1.1 and Hadoop 2.7.1 (compatible with Apache JDK 1.6). HDFS Reader can work properly with Hadoop 2.5.0, Hadoop 2.6.0, and Hive 1.2.0 during testing.
Notice HDFS Reader cannot use concurrent threads to read a single file due to the internal sharding method.

Data types

Hive maintains the metadata of files and stores the metadata in its own metadatabase, such as a MySQL database. HDFS Reader cannot access or query the metadata in the Hive metadatabase. Therefore, you must specify the data types for them to be converted to.

The following table describes the default mapping between data types in RCFile, ParquetFile, ORCFile, TextFile, and SequenceFile files in Hive and the data types supported by Data Integration.
Category Data Integration data type Hive data type
Integer long TINYINT, SMALLINT, INT, and BIGINT
Floating point double FLOAT and DOUBLE
String string STRING, CHAR, VARCHAR, STRUCT, MAP, ARRAY, UNION, and BINARY
Date and time date DATE and TIMESTAMP
Boolean boolean boolean
Data type description:
  • LONG: data of the integer type in HDFS files, such as 123456789.
  • DOUBLE: data of the floating-point type in HDFS files, such as 3.1415.
  • BOOLEAN: data of the Boolean type in HDFS files, such as true or false. The value is not case-sensitive.
  • DATE: data of the date and time type in HDFS files, such as 2014-12-31 00:00:00.

The data type TIMESTAMP supported by Hive can be accurate to nanoseconds, so the TIMESTAMP data stored in TextFile and ORCFile files is similar to 2015-08-21 22:40:47.397898389. After the data is converted to the DATE type in Data Integration, the nanoseconds in the data is lost. Therefore, you must specify the type of converted date to STRING to make sure that the nanoseconds are retained after conversion.

Parameters

Parameter Description Required Default value
path The path of the file to read. To read multiple files, use a regular expression such as /hadoop/data_201704*.
  • If you specify a single HDFS file, HDFS Reader uses only one thread to read the file.
  • If you specify multiple HDFS files, HDFS Reader uses multiple threads. The number of threads is limited by the transmission rate, in Mbit/s.
    Note The actual number of threads is determined by the smaller of the number of HDFS files to be read and the specified transmission rate.
  • When a path contains a wildcard, HDFS Reader attempts to read all files that match the path. If the path is ended with a slash (/), HDFS Reader reads all files in the specified directory. For example, if you specify the path as /bazhen/, HDFS Reader reads all files in the bazhen directory. HDFS Reader only supports asterisks (*) and question marks (?) as file name wildcards. The syntax is similar to that of file name wildcards used on the Linux command line.
Take note of the following items:
  • Data Integration considers all the files to be read by a sync node as a single table. Make sure that all the files can adapt to the same schema and Data Integration has the permission to read all these files.
  • Table partitioning: When you create Hive tables, you can specify partitions. For example, if you specify partition(day="20150820",hour="09") when you create a Hive table, a directory named /20150820 and a subdirectory named /09 are created in the table directory of HDFS.
    Partitions form a directory structure. If you want to read all the data in a partition of a table, specify the path in the JSON file. For example, if you want to read all the data in the partition with the date of 20150820 in the table named mytable01, specify the following path:
    "path": "/user/hive/warehouse/mytable01/20150820/*"
Yes N/A
defaultFS The NameNode endpoint of Hadoop HDFS. The shared resource group does not support advanced Hadoop parameters related to the high availability feature. Yes N/A
fileType The type of the file. You can set this parameter only to TEXT, ORC, RC, SEQ, CSV, and PARQUET. HDFS Reader automatically identifies the file format and uses appropriate read policies. Before data synchronization, HDFS Reader checks whether all the files in the path to be synchronized match the format specified by the fileType parameter. If any file does not match the type, the sync node fails.
The following values are valid for the fileType parameter:
  • text: the TextFile format.
  • orc: the ORCFile format.
  • rc: the RCFile format.
  • seq: the SequenceFile format.
  • csv: the common HDFS file format, which is the logical two-dimensional table.
  • parquet: the common parquet file format.
TextFile and ORCFile are different formats. HDFS Reader parses files in the two formats in different ways. When data is converted from a composite data type of Hive to the STRING type of Data Integration, the conversion results are different for the TextFile and ORCFile formats. The composite data types include MAP, ARRAY, STRUCT, and UNION. The following examples demonstrate the results of the conversion from the MAP type to the STRING type:
  • After HDFS Reader parses and converts MAP-type ORCFile data to the STRING type of Data Integration, the result is {job=80, team=60, person=70}.
  • After HDFS Reader parses and converts MAP-type TextFile data to the STRING type of Data Integration, the result is {job:80, team:60, person:70}.

The conversion results show that the data remains unchanged but the formats differ slightly. Therefore, if the column to be synchronized uses a composite data type of Hive, we recommend that you use a uniform file format.

Recommended best practices:
  • To use a uniform file format, we recommend that you export TextFile tables as ORCFile tables on a Hive client.
  • If the file format is Parquet, the parquetSchema parameter is required, which specifies the schema of the Parquet table.

For the column parameter, you must specify the type parameter and specify one of the index and value parameters.

Yes N/A
column The columns to read. The type field specifies the source data type. The index field specifies the ID of the column in the source table, starting from 0. The value field specifies a constant. The data is not read from the source file. The column is automatically generated based on the value field. By default, HDFS Reader reads all data as strings. Specify this parameter as "column": ["*"].
You can also specify the index or value field in the column parameter by using the following method:
{
  "type": "long",
  "index": 0
  // The first INT-type column of the source file. The index starts from 0. The index field indicates columns to be read from the file.
},
{
  "type": "string",
  "value": "alibaba"
  // The value of the current column, which is a constant column, "alibaba". It is internally generated by HDFS Reader.
}
Note We recommend that you specify the index and type fields for each column to read, instead of using "column": ["*"].
Yes N/A
fieldDelimiter The delimiter of the columns to read. If the source files are TextFile files, you must specify a column delimiter. If you do not specify a column delimiter, HDFS Reader uses a comma (,) as the delimiter by default. If the source files are ORCFile files, you do not need to specify a column delimiter. HDFS Reader uses the default delimiter of Hive, which is \u0001.
Note
  • If you want each row to be converted to a column in the destination table, use a string that does not exist in these rows as the delimiter, for example, the non-printable character \u0001.
  • Do not use \n as the delimiter.
No ,
encoding The encoding format of the file that is read. No utf-8
nullFormat No standard strings can represent null in text files. Therefore, Data Integration provides the nullFormat parameter to define which string represents a null pointer.

For example, if you specify nullFormat="null", Data Integration considers null as a null pointer.

Note Pay attention to the difference between the string NULL and null pointer.
No N/A
compress The compression format when the fileType parameter is set to csv. The following compression formats are supported: GZIP, BZ2, ZIP, LZO, LZO_DEFLATE, HADOOP-SNAPPY, and FRAMING-SNAPPY.
Note
  • LZO and LZO_DEFLATE are two different compression formats. Do not mix them up when you configure this parameter.
  • Snappy does not have a uniform stream format. Data Integration supports only the most popular two compression formats: HADOOP-SNAPPY and FRAMING-SNAPPY. HADOOP-SNAPPY is the Snappy stream format in Hadoop, and FRAMING-SNAPPY is the Snappy stream format recommended by Google.
  • This parameter is not required for files of the ORCFile format.
No N/A
parquetSchema
The description of the data schema in Parquet files. If the file format is Parquet, you must set the parquetSchema parameter in addition to the column parameter. Make sure that the value of the parquetSchema parameter complies with the JSON syntax.
message messageTypeName {
required, dataType, columnName;
...................... ;
}
The parquetSchema parameter contains the following fields:
  • messageTypeName: the name of the MessageType object.
  • required: indicates that the field cannot be empty. optional: indicates that the field can be empty. We recommend that you set this parameter to optional for all fields.
  • dataType: Parquet files support various field types such as BOOLEAN, INT32, INT64, INT96, FLOAT, DOUBLE, BINARY, and FIXED_LEN_BYTE_ARRAY. Set this parameter to BINARY if the field stores strings.
  • Each line, including the last one, must end with a semicolon (;).
Configuration example:
"parquetSchema": "message m { optional int32 minute_id; optional int32 dsp_id; optional int32 adx_pid; optional int64 req; optional int64 res; optional int64 suc; optional int64 imp; optional double revenue; }"
No N/A
csvReaderConfig The configurations for reading CSV files. The parameter value must match the MAP type. A specific CSV reader is used to read data from CSV files, which supports many configurations. If you do not specify this parameter, the default configurations are used.
The following example shows common configurations:
"csvReaderConfig":{
  "safetySwitch": false,
  "skipEmptyRecords": false,
  "useTextQualifier": false
}
The following configurations show all the fields and their default values. When you set the csvReaderConfig parameter of the map type, you must use the field names provided in the following configurations.
boolean caseSensitive = true;
char textQualifier = 34;
boolean trimWhitespace = true;
boolean useTextQualifier = true; // Specifies whether to use escape characters for CSV files.
char delimiter = 44; // The delimiter.
char recordDelimiter = 0;
char comment = 35;
boolean useComments = false;
int escapeMode = 1;
boolean safetySwitch = true; // Specifies whether to limit the length of each column to 100,000 characters.
boolean skipEmptyRecords = true; // Specifies whether to skip empty rows.
boolean captureRawRecord = true;
No N/A
hadoopConfig The advanced parameter settings of Hadoop, such as those related to the high availability feature. The shared resource group does not support advanced Hadoop parameters related to the high availability feature.
"hadoopConfig":{
"dfs.nameservices": "testDfs",
"dfs.ha.namenodes.testDfs": "namenode1,namenode2",
"dfs.namenode.rpc-address.youkuDfs.namenode1": "",
"dfs.namenode.rpc-address.youkuDfs.namenode2": "",
"dfs.client.failover.proxy.provider.testDfs": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
}
No N/A
haveKerberos Specifies whether Kerberos authentication is required. Default value: false. If you set this parameter to true, you must also set the kerberosKeytabFilePath and kerberosPrincipal parameters. No false
kerberosKeytabFilePath The absolute path of the keytab file for Kerberos authentication. This parameter is required if the haveKerberos parameter is set to true. No N/A
kerberosPrincipal The Kerberos principal to which Kerberos can assign tickets. Example: ****/hadoopclient@**. ***. This parameter is required if the haveKerberos parameter is set to true.
Note The absolute path of the keytab file is required for Kerberos authentication. Therefore, you must configure Kerberos authentication on an exclusive resource group for Data Integration. Configuration example:
"haveKerberos":true,
"kerberosKeytabFilePath":"/opt/datax/**.keytab",
"kerberosPrincipal":"**/hadoopclient@**. **"
No N/A

Codeless UI mode

The codeless user interface (UI) mode is not supported.

Code Editor mode

The following example shows how to configure a sync node to read data from HDFS by using the code editor. For more information, see Create a sync node by using the code editor.
Note Delete the comments from the following code before you run the code.
{
    "type": "job",
    "version": "2.0",
    "steps": [
        {
            "stepType": "hdfs",// The plug-in name.
            "parameter": {
                "path": "",// The path of the file to read.
                "datasource": "",// The connection.
                "column": [
                    {
                        "index": 0,// The serial number.
                        "type": "string"// The field type.
                    },
                    {
                        "index": 1,
                        "type": "long"
                    },
                    {
                        "index": 2,
                        "type": "double"
                    },
                    {
                        "index": 3,
                        "type": "boolean"
                    },
                    {
                        "format": "yyyy-MM-dd HH:mm:ss",// The time format.
                        "index": 4,
                        "type": "date"
                    }
                ],
                "fieldDelimiter": ","// The column delimiter.
                "encoding": "UTF-8",// The encoding format.
                "fileType": ""// The file format.
            },
            "name": "Reader",
            "category": "reader"
        },
        { 
            "stepType": "stream",
            "parameter": {},
            "name": "Writer",
            "category": "writer"
        }
    ],
    "setting": {
        "errorLimit": {
            "record": ""// The number of error records.
        },
        "speed": {
            "concurrent": 3,// The number of concurrent threads in the node.
            "throttle": false, // A value of false indicates that the bandwidth is not throttled. A value of true indicates that the bandwidth is throttled. The maximum transmission rate takes effect only if you set this parameter to true.
        }
    },
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    }
}
The following example shows the HDFS Reader configuration with the parquetSchema parameter.
Note
  • The fileType parameter must be set to parquet.
  • If you want HDFS Reader to read specific columns from a Parquet file, you must specify the complete schema in the parquetSchema parameter and specify the columns to be synchronized by using the index field in the column parameter.
"reader":  {
    "name": "hdfsreader",
    "parameter": {
        "path": "/user/hive/warehouse/addata.db/dw_ads_rtb_monitor_minute/thedate=20170103/hour_id=22/*",
        "defaultFS": "h10s010.07100.149:8020",
        "column": [
            {
                "index": 0,
                "type": "string"
            },
            {
                "index": 1,
                "type": "long"
            },
            {
                "index": 2,
                "type": "double"
            }
        ],
        "fileType": "parquet",
        "encoding": "UTF-8",
        "parquetSchema": "message m { optional int32 minute_id; optional int32 dsp_id; optional int32 adx_pid; optional int64 req; optional int64 res; optional int64 suc; optional int64 imp; optional double revenue; }"
    }
}