HDFS Reader can read data from files stored in Hadoop Distributed File System (HDFS). After HDFS Reader obtains data, it converts the data from the original data types to the data types supported by Data Integration and sends the converted data to a writer.
Background information
HDFS Reader reads data from files in HDFS and converts the data from the original data types to the data types supported by Data Integration.
By default, if HDFS is used as the storage of Hive tables, the Hive tables are stored in HDFS as text files that are not compressed. HDFS Reader reads data in a similar way as OSS Reader.
The Optimized Row Columnar (ORC) file format is an optimized RC file format and allows you to store Hive data in an efficient manner. HDFS Reader uses the OrcSerde class provided by Hive to parse and read data in ORC files.
- Complex network connections are required between the shared resource group and HDFS. Therefore, we recommend that you use an exclusive resource group for Data Integration to run your synchronization node. Make sure that your exclusive resource group for Data Integration can access the NameNode and DataNode nodes of HDFS.
- By default, HDFS uses a network whitelist to ensure data security. In this case, we recommend that you use exclusive resource groups for Data Integration to run synchronization nodes that use HDFS Reader.
- If you use the code editor to configure a synchronization node that uses HDFS Reader, the network connectivity test for the HDFS data source that you use is not required. If the system reports an error for the connectivity test, you can ignore the error.
- You must use an administrator account to start your synchronization node. Make sure that your administrator account has the permissions to read data from and write data to related HDFS files.
Features
- Supports the text, ORC, RC, Sequence, CSV, and Parquet file formats. Data stored in the files in these formats must be organized as logical two-dimensional tables.
- Reads data of various types as strings. Supports constants and column pruning.
- Supports recursive reading and regular expressions that contain asterisks (
*
) and question marks (?
). - Compresses ORC files in the Snappy or ZLIB format.
- Compresses SequenceFile files in LZO format.
- Uses multiple threads to read files.
- Compresses CSV files in the GZIP, BZ2, ZIP, LZO, LZO_DEFLATE, or Snappy format.
- Supports Hive 1.1.1 and Hadoop 2.7.1 that works with JDK 1.6. HDFS Reader can normally run with Hive 1.2.0 and Hadoop 2.5.0 or Hadoop 2.6.0 during testing.
Data types
Hive maintains the metadata of files and stores the metadata in its own metadatabase, such as a MySQL database. HDFS Reader cannot access or query the metadata in the metadatabase of Hive. Therefore, you must specify the data types that you want to convert.
Category | Data Integration data type | Hive data type |
---|---|---|
Integer | long | TINYINT, SMALLINT, INT, and BIGINT |
Floating point | double | FLOAT and DOUBLE |
String | string | STRING, CHAR, VARCHAR, STRUCT, MAP, ARRAY, UNION, and BINARY |
Date and time | date | DATE and TIMESTAMP |
Boolean | boolean | BOOLEAN |
- LONG: data of the integer type in HDFS files, such as 123456789.
- DOUBLE: data of the floating point type in HDFS files, such as 3.1415.
- BOOLEAN: data of the Boolean type in HDFS files, such as true or false. Data is not case-sensitive.
- DATE: data of the date and time type in HDFS files, such as 2014-12-31 00:00:00.
The TIMESTAMP data type supported by Hive can be accurate to the nanosecond. Therefore,
data of the TIMESTAMP type stored in text and ORC files is similar to 2015-08-21 22:40:47.397898389
. After the data is converted to the DATE type in Data Integration, the nanosecond
part in the data is lost. Therefore, you must specify the type of the converted data
to STRING to make sure that the nanosecond part of the data is retained after conversion.
Parameters
Parameter | Description | Required | Default value |
---|---|---|---|
path | The path of the file from which you want to read data. If you want to read data from
multiple files, you can specify a regular expression, such as /hadoop/data_201704* . If the file names contain time information and the time information is presented
in a regular manner, you can use scheduling parameters together with a regular expression.
The values of the scheduling parameters are replaced based on the data timestamp of
the node. For more information about scheduling parameters, see Configure scheduling parameters.
Take note of the following items when you specify the path parameter:
|
Yes | No default value |
defaultFS | The endpoint of the NameNode node in HDFS. The shared resource group does not support advanced Hadoop parameters related to high availability. | Yes | No default value |
fileType | The format of the file from which you want to read data. HDFS Reader automatically
identifies the file format and uses the related read policies. Before HDFS Reader
reads data in a synchronization node, it checks whether all the files in the specified
path match the format specified by the fileType parameter. If the format of a file
does not match the format specified by the fileType parameter, the synchronization
node fails.
Valid values of the fileType parameter:
HDFS Reader parses files in text and ORC formats in different ways. If data is converted
from a Hive complex data type to the STRING type supported by Data Integration, the
conversion results are different for the text and ORC formats. Complex data types
include MAP, ARRAY, STRUCT, and UNION. The following examples demonstrate the results
of the conversion from the MAP type to the STRING type:
The conversion results show that the data remains unchanged but the formats differ slightly. Therefore, if a column that you want to synchronize uses a Hive complex data type, we recommend that you use a uniform file format. Recommended best practices:
|
Yes | No default value |
column | The names of the columns from which you want to read data. The type field specifies
a data type. The index field specifies the ID of a column, starting from 0. The value
field specifies a constant. If you specify the value field, HDFS Reader reads the
value of this field. By default, HDFS Reader reads all data as strings. In this case,
set this parameter to "column": ["*"] .
For the column parameter, you must configure the type parameter and one of the index
and value parameters. Example:
Note
|
Yes | No default value |
fieldDelimiter | The delimiter of the columns from which you want to read data. If the source files
are text files, you must specify a column delimiter. If you do not specify a column
delimiter, HDFS Reader uses commas (,) as column delimiters by default. If the source
files are ORC files, you do not need to specify a column delimiter. HDFS Reader uses
the default delimiter of Hive, which is \u0001.
Note
|
No | , |
encoding | The encoding format of the file from which you want to read data. | No | utf-8 |
nullFormat | The string that represents a null pointer. No standard strings can represent a null
pointer in TXT files. You can use this parameter to define which string represents
a null pointer.
For example, if you set this parameter to Note The string NULL is different from a null pointer. Pay attention to the difference
between them.
|
No | No default value |
compress | The compression format when the fileType parameter is set to CSV. The following compression
formats are supported: GZIP, BZ2, ZIP, LZO, LZO_DEFLATE, Hadoop-Snappy, and Framing-Snappy.
Note
|
No | No default value |
parquetSchema |
The description of the schema of data in Parquet files. If you set the fileType parameter
to Parquet, you must set the parquetSchema parameter. Make sure that the value of
the parquetSchema parameter complies with the JSON syntax.
The parquetSchema parameter contains the following fields:
Configuration example:
|
No | No default value |
csvReaderConfig | The configurations required to read CSV files. The parameter value must match the
MAP type. You can use a CSV file reader to read data from CSV files. The CSV file
reader supports multiple configurations. If you do not configure this parameter, the
default configurations are used.
The following example shows common configurations:
The following configurations show all the fields and their default values. When you
configure the csvReaderConfig parameter of the MAP type, you must use the field names
provided in the following configurations:
|
No | No default value |
hadoopConfig | The settings of advanced Hadoop parameters, such as the parameters related to high
availability. The shared resource group does not support advanced Hadoop parameters
related to high availability.
Note
The preceding settings are used to configure Kerberos authentication in HDFS Reader.
If you configure Kerberos authentication in the HDFS data source, you do not need
to configure it in HDFS Reader. For more information about how to add an HDFS data
source, see Add an HDFS data source.
|
No default value | |
haveKerberos | Specifies whether Kerberos authentication is required. Default value: false. If you set this parameter to true, you must also configure the kerberosKeytabFilePath and kerberosPrincipal parameters. | No | false |
kerberosKeytabFilePath | The absolute path of the keytab file for Kerberos authentication. This parameter is required if the haveKerberos parameter is set to true. | No | No default value |
kerberosPrincipal | The Kerberos principal to which Kerberos can assign tickets, such as ****/hadoopclient@**.***.
This parameter is required if the haveKerberos parameter is set to true.
Note The absolute path of the keytab file is required for Kerberos authentication. Therefore,
you must configure Kerberos authentication for exclusive resource groups for Data
Integration. The following code provides a configuration example:
|
No | No default value |
Configure HDFS Reader by using the codeless UI
Create a synchronization node and configure the node. For more information, see Configure a synchronization node by using the codeless UI.
- Configure data sources.
Configure Source and Target for the synchronization node.
Parameter Description Connection The name of the data source from which you want to read data. This parameter is equivalent to the datasource parameter that is described in the preceding section. File path The path of the file from which you want to read data. This parameter is equivalent to the path parameter that is described in the preceding section. File type This parameter is equivalent to the fileType parameter that is described in the preceding section. This parameter specifies the format of the file from which you want to read data. Valid values: text, orc, rc, seq, csv, and parquet. FieldDelimiter The column delimiter. This parameter is equivalent to the fieldDelimiter parameter that is described in the preceding section. By default, a comma (,) is used as a column delimiter. Encoding The encoding format. This parameter is equivalent to the encoding parameter that is described in the preceding section. Default value: UTF-8. Kerberos authentication Specifies whether to enable Kerberos authentication. Default value: No. If you set this parameter to Yes, the KeyTab file path and Principal Name parameters are required. For more information, see Configure Kerberos authentication. Ignore(when file does not exist) Specifies whether to ignore the file or folder that you configure if the file or folder does not exist. If you set this parameter to Yes, HDFS Reader does not read data from the file, and no error is reported in the log. If you set this parameter to No, the data synchronization node fails to run. Default value: No. NullFormat This parameter is equivalent to the NullFormat parameter that is described in the preceding section. This parameter specifies the string that represents a null pointer. HadoopConfig The settings of advanced Hadoop parameters, such as the parameters related to high availability. The shared resource group does not support advanced Hadoop parameters related to high availability. - Configure field mappings. This operation is equivalent to setting the column parameter that is described in the preceding section. The system maps the field in
a row of the source to the field in the same row of the destination. You can click
the
icon to manually edit the fields in the source. Each field occupies a row. The first and the last blank rows are included, whereas other blank rows are ignored.
Note The index starts from 0, which indicates that HDFS Reader reads data from the first column of the source file. - Configure channel control policies.
Parameter Description Expected Maximum Concurrency The maximum number of parallel threads that the synchronization node uses to read data from the source or write data to the destination. You can configure the parallelism for the synchronization node on the codeless UI. Bandwidth Throttling Specifies whether to enable bandwidth throttling. You can enable bandwidth throttling and specify a maximum transmission rate to prevent heavy read workloads on the source. We recommend that you enable bandwidth throttling and set the maximum transmission rate to an appropriate value based on the configurations of the source. Dirty Data Records Allowed The maximum number of dirty data records allowed. Distributed Execution The distributed execution mode that allows you to split your node into pieces and distribute them to multiple Elastic Compute Service (ECS) instances for parallel execution. This speeds up synchronization. If you use a large number of parallel threads to run your synchronization node in distributed execution mode, excessive access requests are sent to the data sources. Therefore, before you use the distributed execution mode, you must evaluate the access load on the data sources. You can enable this mode only if you use an exclusive resource group for Data Integration. For more information about exclusive resource groups for Data Integration, see Exclusive resource groups for Data Integration and Create and use an exclusive resource group for Data Integration.
Configure HDFS Reader by using the code editor
{
"type": "job",
"version": "2.0",
"steps": [
{
"stepType": "hdfs",// The reader type.
"parameter": {
"path": "",// The path of the file from which you want to read data.
"datasource": "",// The name of the data source.
"hadoopConfig":{
"dfs.data.transfer.protection": "integrity",
"dfs.datanode.use.datanode.hostname" :"true",
"dfs.client.use.datanode.hostname":"true"
},
"column": [
{
"index": 0,// The index of the column in the source file. The index starts from 0, which indicates that HDFS Reader reads data from the first column of the source file.
"type": "string"// The field type.
},
{
"index": 1,
"type": "long"
},
{
"index": 2,
"type": "double"
},
{
"index": 3,
"type": "boolean"
},
{
"format": "yyyy-MM-dd HH:mm:ss",// The time format.
"index": 4,
"type": "date"
}
],
"fieldDelimiter": ","// The column delimiter.
"encoding": "UTF-8",// The encoding format.
"fileType": ""// The file format.
},
"name": "Reader",
"category": "reader"
},
{
"stepType": "stream",
"parameter": {},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"errorLimit": {
"record": ""// The maximum number of dirty data records allowed.
},
"speed": {
"concurrent": 3,// The maximum number of parallel threads.
"throttle": true // Specifies whether to enable bandwidth throttling. The value false indicates that bandwidth throttling is disabled, and the value true indicates that bandwidth throttling is enabled. The mbps parameter takes effect only when the throttle parameter is set to true.
"mbps":"12"// The maximum transmission rate.
}
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}
- The fileType parameter must be set to PARQUET.
- If you want HDFS Reader to read specific columns from a Parquet file, you must specify the complete schema in the parquetSchema parameter and specify the columns that you want to read by using the index field in the column parameter.
"reader": {
"name": "hdfsreader",
"parameter": {
"path": "/user/hive/warehouse/addata.db/dw_ads_rtb_monitor_minute/thedate=20170103/hour_id=22/*",
"defaultFS": "h10s010.07100.149:8020",
"column": [
{
"index": 0,
"type": "string"
},
{
"index": 1,
"type": "long"
},
{
"index": 2,
"type": "double"
}
],
"fileType": "parquet",
"encoding": "UTF-8",
"parquetSchema": "message m { optional int32 minute_id; optional int32 dsp_id; optional int32 adx_pid; optional int64 req; optional int64 res; optional int64 suc; optional int64 imp; optional double revenue; }"
}
}