HDFS Reader can read data from files stored in Hadoop Distributed File System (HDFS). After HDFS Reader obtains data, it converts the data from the original data types to the data types supported by Data Integration and sends the converted data to a writer.

Notice You can use only an exclusive resource group for Data Integration to run a data synchronization node that uses HDFS Reader. For more information about exclusive resource groups for Data Integration, see Create and use an exclusive resource group for Data Integration.

Background information

HDFS Reader reads data from files in HDFS and converts the data from the original data types to the data types supported by Data Integration.

By default, if HDFS is used as the storage of Hive tables, the Hive tables are stored in HDFS as text files that are not compressed. HDFS Reader reads data in a similar way as OSS Reader.

The Optimized Row Columnar (ORC) file format is an optimized RC file format and allows you to store Hive data in an efficient manner. HDFS Reader uses the OrcSerde class provided by Hive to parse and read data in ORC files.

Take note of the following items when you use HDFS Reader:
  • Complex network connections are required between the shared resource group and HDFS. Therefore, we recommend that you use an exclusive resource group for Data Integration to run your synchronization node. Make sure that your exclusive resource group for Data Integration can access the NameNode and DataNode nodes of HDFS.
  • By default, HDFS uses a network whitelist to ensure data security. In this case, we recommend that you use exclusive resource groups for Data Integration to run synchronization nodes that use HDFS Reader.
  • If you use the code editor to configure a synchronization node that uses HDFS Reader, the network connectivity test for the HDFS data source that you use is not required. If the system reports an error for the connectivity test, you can ignore the error.
  • You must use an administrator account to start your synchronization node. Make sure that your administrator account has the permissions to read data from and write data to related HDFS files.

Features

HDFS Reader supports the following features:
  • Supports the text, ORC, RC, Sequence, CSV, and Parquet file formats. Data stored in the files in these formats must be organized as logical two-dimensional tables.
  • Reads data of various types as strings. Supports constants and column pruning.
  • Supports recursive reading and regular expressions that contain asterisks (*) and question marks (?).
  • Compresses ORC files in the Snappy or ZLIB format.
  • Compresses SequenceFile files in LZO format.
  • Uses multiple threads to read files.
  • Compresses CSV files in the GZIP, BZ2, ZIP, LZO, LZO_DEFLATE, or Snappy format.
  • Supports Hive 1.1.1 and Hadoop 2.7.1 that works with JDK 1.6. HDFS Reader can normally run with Hive 1.2.0 and Hadoop 2.5.0 or Hadoop 2.6.0 during testing.
Notice HDFS Reader cannot use parallel threads to read a single file due to the internal sharding method.

Data types

Hive maintains the metadata of files and stores the metadata in its own metadatabase, such as a MySQL database. HDFS Reader cannot access or query the metadata in the metadatabase of Hive. Therefore, you must specify the data types that you want to convert.

The following table describes the mappings between data types in RC, Parquet, ORC, text, and Sequence files in Hive and the data types supported by Data Integration.
Category Data Integration data type Hive data type
Integer long TINYINT, SMALLINT, INT, and BIGINT
Floating point double FLOAT and DOUBLE
String string STRING, CHAR, VARCHAR, STRUCT, MAP, ARRAY, UNION, and BINARY
Date and time date DATE and TIMESTAMP
Boolean boolean BOOLEAN
Note
  • LONG: data of the integer type in HDFS files, such as 123456789.
  • DOUBLE: data of the floating point type in HDFS files, such as 3.1415.
  • BOOLEAN: data of the Boolean type in HDFS files, such as true or false. Data is not case-sensitive.
  • DATE: data of the date and time type in HDFS files, such as 2014-12-31 00:00:00.

The TIMESTAMP data type supported by Hive can be accurate to the nanosecond. Therefore, data of the TIMESTAMP type stored in text and ORC files is similar to 2015-08-21 22:40:47.397898389. After the data is converted to the DATE type in Data Integration, the nanosecond part in the data is lost. Therefore, you must specify the type of the converted data to STRING to make sure that the nanosecond part of the data is retained after conversion.

Parameters

Parameter Description Required Default value
path The path of the file from which you want to read data. If you want to read data from multiple files, you can specify a regular expression, such as /hadoop/data_201704*. If the file names contain time information and the time information is presented in a regular manner, you can use scheduling parameters together with a regular expression. The values of the scheduling parameters are replaced based on the data timestamp of the node. For more information about scheduling parameters, see Configure scheduling parameters.
  • If you specify a single file, HDFS Reader uses only one thread to read data from the file.
  • If you specify multiple files, HDFS Reader uses multiple threads to read data from the files. The number of threads is determined by the concurrent parameter.
    Note The number of threads that are actually started is always the same as the smaller value between the number of HDFS files that you want to read and the number of parallel threads that you configure.
  • If a path contains a wildcard, HDFS Reader attempts to read data from all files that match the path. For example, if you specify the path as /bazhen/, HDFS Reader reads all files in the bazhen directory. HDFS Reader supports only asterisks (*) and question marks (?) as wildcards. The syntax is similar to the syntax of file name wildcards used in the Linux command line.
Take note of the following items when you specify the path parameter:
  • Data Integration considers all the files to read in a synchronization node as a single table. Make sure that all the files can adapt to the same schema and Data Integration has the permissions to read all these files.
  • Table partitioning: When you create Hive tables, you can specify partitions. For example, if you specify partition(day="20150820", hour="09") when you create a Hive table, a directory named /20150820 and a subdirectory named /09 are created in the table directory in HDFS.
    Partitions form a directory structure. If you want to read all the data in a partition of a table, specify the path in the path parameter. For example, if you want to read all the data in the 20150820 partition in the table named mytable01, specify the path in the following way:
    "path": "/user/hive/warehouse/mytable01/20150820/*"
Yes No default value
defaultFS The endpoint of the NameNode node in HDFS. The shared resource group does not support advanced Hadoop parameters related to high availability. Yes No default value
fileType The format of the file from which you want to read data. HDFS Reader automatically identifies the file format and uses the related read policies. Before HDFS Reader reads data in a synchronization node, it checks whether all the files in the specified path match the format specified by the fileType parameter. If the format of a file does not match the format specified by the fileType parameter, the synchronization node fails.
Valid values of the fileType parameter:
  • TEXT: the text format.
  • ORC: the ORC format.
  • RC: the RC format.
  • SEQ: the Sequence format.
  • CSV: the CSV format, which is a common HDFS file format. The data in a CSV file is organized as a logical two-dimensional table.
  • PARQUET: the Parquet format.
HDFS Reader parses files in text and ORC formats in different ways. If data is converted from a Hive complex data type to the STRING type supported by Data Integration, the conversion results are different for the text and ORC formats. Complex data types include MAP, ARRAY, STRUCT, and UNION. The following examples demonstrate the results of the conversion from the MAP type to the STRING type:
  • After HDFS Reader parses and converts MAP-type data in an ORC file to the STRING type, the result is {job=80, team=60, person=70}.
  • After HDFS Reader parses and converts MAP-type data in a text file to the STRING type, the result is {job:80, team:60, person:70}.

The conversion results show that the data remains unchanged but the formats differ slightly. Therefore, if a column that you want to synchronize uses a Hive complex data type, we recommend that you use a uniform file format.

Recommended best practices:
  • To use a uniform file format, we recommend that you convert text files to ORC files on your Hive client.
  • If the file format is Parquet, you must specify the parquetSchema parameter, which specifies the schema of data in Parquet files.
Yes No default value
column The names of the columns from which you want to read data. The type field specifies a data type. The index field specifies the ID of a column, starting from 0. The value field specifies a constant. If you specify the value field, HDFS Reader reads the value of this field. By default, HDFS Reader reads all data as strings. In this case, set this parameter to "column": ["*"].
For the column parameter, you must configure the type parameter and one of the index and value parameters. Example:
{
  "type": "long",
  "index": 0
  // The first INT-type column of the source file. The index starts from 0. The index field indicates the IDs of the columns from which you want to read data in the file. 
},
{
  "type": "string",
  "value": "alibaba"
  // The value of the current column, which is a constant column alibaba. It is internally generated by HDFS Reader. 
}
Note
  • The index starts from 0, which indicates that HDFS Reader reads data from the first column of the source file.
  • We recommend that you specify the index and type fields for each column from which you want to read data, instead of using column *.
Yes No default value
fieldDelimiter The delimiter of the columns from which you want to read data. If the source files are text files, you must specify a column delimiter. If you do not specify a column delimiter, HDFS Reader uses commas (,) as column delimiters by default. If the source files are ORC files, you do not need to specify a column delimiter. HDFS Reader uses the default delimiter of Hive, which is \u0001.
Note
  • If you want each row to be converted to a column in the destination file, use a character that does not exist in these rows as the delimiter, such as \u0001.
  • Do not use \n as the delimiter.
No ,
encoding The encoding format of the file from which you want to read data. No utf-8
nullFormat The string that represents a null pointer. No standard strings can represent a null pointer in TXT files. You can use this parameter to define which string represents a null pointer.

For example, if you set this parameter to null, Data Integration considers null as a null pointer.

Note The string NULL is different from a null pointer. Pay attention to the difference between them.
No No default value
compress The compression format when the fileType parameter is set to CSV. The following compression formats are supported: GZIP, BZ2, ZIP, LZO, LZO_DEFLATE, Hadoop-Snappy, and Framing-Snappy.
Note
  • LZO and LZO_DEFLATE are two different compression formats. Do not mix them up when you configure this parameter.
  • Snappy does not have a uniform stream format. Data Integration supports only the two most popular compression formats: Hadoop-Snappy and Framing-Snappy. Hadoop-Snappy is the Snappy stream format in Hadoop, and Framing-Snappy is the Snappy stream format recommended by Google.
  • This parameter is not required if you set the fileType parameter to ORC.
No No default value
parquetSchema
The description of the schema of data in Parquet files. If you set the fileType parameter to Parquet, you must set the parquetSchema parameter. Make sure that the value of the parquetSchema parameter complies with the JSON syntax.
message MessageTypeName {
required, dataType, columnName;
......................;
}
The parquetSchema parameter contains the following fields:
  • MessageTypeName: the name of the MessageType object.
  • required: indicates that the field cannot be empty. The value optional indicates that the field can be empty. We recommend that you set this parameter to optional for all fields.
  • dataType: Parquet files support various field types such as BOOLEAN, INT32, INT64, INT96, FLOAT, DOUBLE, BINARY, and FIXED_LEN_BYTE_ARRAY. Set this parameter to BINARY if the field stores strings.
  • Each line, including the last one, must end with a semicolon (;).
Configuration example:
"parquetSchema": "message m { optional int32 minute_id; optional int32 dsp_id; optional int32 adx_pid; optional int64 req; optional int64 res; optional int64 suc; optional int64 imp; optional double revenue; }"
No No default value
csvReaderConfig The configurations required to read CSV files. The parameter value must match the MAP type. You can use a CSV file reader to read data from CSV files. The CSV file reader supports multiple configurations. If you do not configure this parameter, the default configurations are used.
The following example shows common configurations:
"csvReaderConfig":{
  "safetySwitch": false,
  "skipEmptyRecords": false,
  "useTextQualifier": false
}
The following configurations show all the fields and their default values. When you configure the csvReaderConfig parameter of the MAP type, you must use the field names provided in the following configurations:
boolean caseSensitive = true;
char textQualifier = 34;
boolean trimWhitespace = true;
boolean useTextQualifier = true;// Specifies whether to use escape characters for CSV files. 
char delimiter = 44;// The delimiter.
char recordDelimiter = 0;
char comment = 35;
boolean useComments = false;
int escapeMode = 1;
boolean safetySwitch = true;// Specifies whether to limit the length of each column to 100,000 characters. 
boolean skipEmptyRecords = true;// Specifies whether to skip empty rows. 
boolean captureRawRecord = true;
No No default value
hadoopConfig The settings of advanced Hadoop parameters, such as the parameters related to high availability. The shared resource group does not support advanced Hadoop parameters related to high availability.
"hadoopConfig":{
"dfs.nameservices": "testDfs",
"dfs.ha.namenodes.testDfs": "namenode1,namenode2",
"dfs.namenode.rpc-address.youkuDfs.namenode1": "",
"dfs.namenode.rpc-address.youkuDfs.namenode2": "",
"dfs.client.failover.proxy.provider.testDfs": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
"dfs.data.transfer.protection": "integrity",
"dfs.datanode.use.datanode.hostname" :"true",
"dfs.client.use.datanode.hostname":"true"
}
Note
"hadoopConfig":{ "dfs.data.transfer.protection": "integrity", "dfs.datanode.use.datanode.hostname" :"true", "dfs.client.use.datanode.hostname":"true" }
The preceding settings are used to configure Kerberos authentication in HDFS Reader. If you configure Kerberos authentication in the HDFS data source, you do not need to configure it in HDFS Reader. For more information about how to add an HDFS data source, see Add an HDFS data source.
No default value
haveKerberos Specifies whether Kerberos authentication is required. Default value: false. If you set this parameter to true, you must also configure the kerberosKeytabFilePath and kerberosPrincipal parameters. No false
kerberosKeytabFilePath The absolute path of the keytab file for Kerberos authentication. This parameter is required if the haveKerberos parameter is set to true. No No default value
kerberosPrincipal The Kerberos principal to which Kerberos can assign tickets, such as ****/hadoopclient@**.***. This parameter is required if the haveKerberos parameter is set to true.
Note The absolute path of the keytab file is required for Kerberos authentication. Therefore, you must configure Kerberos authentication for exclusive resource groups for Data Integration. The following code provides a configuration example:
"haveKerberos":true,
"kerberosKeytabFilePath":"/opt/datax/**.keytab",
"kerberosPrincipal":"**/hadoopclient@**.**"
No No default value

Configure HDFS Reader by using the codeless UI

Create a synchronization node and configure the node. For more information, see Configure a synchronization node by using the codeless UI.

Perform the following steps on the configuration tab of the synchronization node:
  1. Configure data sources.
    Configure Source and Target for the synchronization node. Configure data sources
    Parameter Description
    Connection The name of the data source from which you want to read data. This parameter is equivalent to the datasource parameter that is described in the preceding section.
    File path The path of the file from which you want to read data. This parameter is equivalent to the path parameter that is described in the preceding section.
    File type This parameter is equivalent to the fileType parameter that is described in the preceding section. This parameter specifies the format of the file from which you want to read data. Valid values: text, orc, rc, seq, csv, and parquet.
    FieldDelimiter The column delimiter. This parameter is equivalent to the fieldDelimiter parameter that is described in the preceding section. By default, a comma (,) is used as a column delimiter.
    Encoding The encoding format. This parameter is equivalent to the encoding parameter that is described in the preceding section. Default value: UTF-8.
    Kerberos authentication Specifies whether to enable Kerberos authentication. Default value: No. If you set this parameter to Yes, the KeyTab file path and Principal Name parameters are required. For more information, see Configure Kerberos authentication.
    Ignore(when file does not exist) Specifies whether to ignore the file or folder that you configure if the file or folder does not exist. If you set this parameter to Yes, HDFS Reader does not read data from the file, and no error is reported in the log. If you set this parameter to No, the data synchronization node fails to run. Default value: No.
    NullFormat This parameter is equivalent to the NullFormat parameter that is described in the preceding section. This parameter specifies the string that represents a null pointer.
    HadoopConfig The settings of advanced Hadoop parameters, such as the parameters related to high availability. The shared resource group does not support advanced Hadoop parameters related to high availability.
  2. Configure field mappings. This operation is equivalent to setting the column parameter that is described in the preceding section. The system maps the field in a row of the source to the field in the same row of the destination. You can click the Icon icon to manually edit the fields in the source. Each field occupies a row. The first and the last blank rows are included, whereas other blank rows are ignored.
    Field mappings
    Note The index starts from 0, which indicates that HDFS Reader reads data from the first column of the source file.
  3. Configure channel control policies. Channel control
    Parameter Description
    Expected Maximum Concurrency The maximum number of parallel threads that the synchronization node uses to read data from the source or write data to the destination. You can configure the parallelism for the synchronization node on the codeless UI.
    Bandwidth Throttling Specifies whether to enable bandwidth throttling. You can enable bandwidth throttling and specify a maximum transmission rate to prevent heavy read workloads on the source. We recommend that you enable bandwidth throttling and set the maximum transmission rate to an appropriate value based on the configurations of the source.
    Dirty Data Records Allowed The maximum number of dirty data records allowed.
    Distributed Execution

    The distributed execution mode that allows you to split your node into pieces and distribute them to multiple Elastic Compute Service (ECS) instances for parallel execution. This speeds up synchronization. If you use a large number of parallel threads to run your synchronization node in distributed execution mode, excessive access requests are sent to the data sources. Therefore, before you use the distributed execution mode, you must evaluate the access load on the data sources. You can enable this mode only if you use an exclusive resource group for Data Integration. For more information about exclusive resource groups for Data Integration, see Exclusive resource groups for Data Integration and Create and use an exclusive resource group for Data Integration.

Configure HDFS Reader by using the code editor

In the following code, a synchronization node is configured to read data from HDFS. For more information about how to configure a synchronization node by using the code editor, see Create a synchronization node by using the code editor.
Note You must delete the comments from the following code before you run the code.
{
    "type": "job",
    "version": "2.0",
    "steps": [
        {
            "stepType": "hdfs",// The reader type.
            "parameter": {
                "path": "",// The path of the file from which you want to read data.
                "datasource": "",// The name of the data source.
                "hadoopConfig":{
                "dfs.data.transfer.protection": "integrity",
               "dfs.datanode.use.datanode.hostname" :"true",
                "dfs.client.use.datanode.hostname":"true"
                 },
                "column": [
                    {
                        "index": 0,// The index of the column in the source file. The index starts from 0, which indicates that HDFS Reader reads data from the first column of the source file. 
                        "type": "string"// The field type.
                    },
                    {
                        "index": 1,
                        "type": "long"
                    },
                    {
                        "index": 2,
                        "type": "double"
                    },
                    {
                        "index": 3,
                        "type": "boolean"
                    },
                    {
                        "format": "yyyy-MM-dd HH:mm:ss",// The time format.
                        "index": 4,
                        "type": "date"
                    }
                ],
                "fieldDelimiter": ","// The column delimiter.
                "encoding": "UTF-8",// The encoding format.
                "fileType": ""// The file format.
            },
            "name": "Reader",
            "category": "reader"
        },
        { 
            "stepType": "stream",
            "parameter": {},
            "name": "Writer",
            "category": "writer"
        }
    ],
    "setting": {
        "errorLimit": {
            "record": ""// The maximum number of dirty data records allowed.
        },
        "speed": {
            "concurrent": 3,// The maximum number of parallel threads.
            "throttle": true // Specifies whether to enable bandwidth throttling. The value false indicates that bandwidth throttling is disabled, and the value true indicates that bandwidth throttling is enabled. The mbps parameter takes effect only when the throttle parameter is set to true. 
            "mbps":"12"// The maximum transmission rate.
        }
    },
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    }
}
The following example shows the HDFS Reader configuration with the parquetSchema parameter.
Note
  • The fileType parameter must be set to PARQUET.
  • If you want HDFS Reader to read specific columns from a Parquet file, you must specify the complete schema in the parquetSchema parameter and specify the columns that you want to read by using the index field in the column parameter.
"reader":  {
    "name": "hdfsreader",
    "parameter": {
        "path": "/user/hive/warehouse/addata.db/dw_ads_rtb_monitor_minute/thedate=20170103/hour_id=22/*",
        "defaultFS": "h10s010.07100.149:8020",
        "column": [
            {
                "index": 0,
                "type": "string"
            },
            {
                "index": 1,
                "type": "long"
            },
            {
                "index": 2,
                "type": "double"
            }
        ],
        "fileType": "parquet",
        "encoding": "UTF-8",
        "parquetSchema": "message m { optional int32 minute_id; optional int32 dsp_id; optional int32 adx_pid; optional int64 req; optional int64 res; optional int64 suc; optional int64 imp; optional double revenue; }"
    }
}