DataWorks Data Integration supports reading from and writing to Hadoop Distributed File System (HDFS) through the HDFS Reader and HDFS Writer plugins.
Apsara File Storage for HDFS is not supported.
Supported file formats
| Plugin | Supported formats |
|---|---|
| HDFS Reader | TextFile, ORCFile, RCFile, SequenceFile, CSV, Parquet |
| HDFS Writer | TextFile, ORCFile, Parquet |
Resource group requirements
HDFS uses a network whitelist for data security, and the default resource group does not provide reliable network access to HDFS NameNode and DataNode endpoints. Use a serverless resource group or an exclusive resource group for Data Integration for HDFS synchronization tasks.
| Plugin | Supported resource groups |
|---|---|
| HDFS Reader | Serverless resource group (recommended), exclusive resource group for Data Integration |
| HDFS Writer | Exclusive resource group for Data Integration only |
Limitations
HDFS Reader
-
Multi-threaded concurrent reading of a single file is not supported due to the internal chunking algorithm. When multiple files are specified, HDFS Reader reads them concurrently — the actual thread count is the smaller of the number of files and the
concurrentsetting. -
HDFS Reader cannot access Hive's metastore. Specify data types explicitly during type conversion.
-
TIMESTAMP data stored in TextFile and ORCFile is precise to the nanosecond (for example,
2015-08-21 22:40:47.397898389). Converting to thedatetype drops the nanosecond part. To preserve it, map the column to thestringtype instead. -
When configuring an HDFS synchronization task in the code editor, the task does not require a successful network connectivity test for the HDFS data source. Ignore any resulting errors.
-
Data Integration runs under the
adminaccount. Theadminaccount of the operating system must have read and write permissions on the relevant HDFS files. If it does not, switch to the code editor and add"hdfsUsername": "user_with_permissions"to the script.
HDFS Writer
-
Only TextFile, ORCFile, and Parquet formats are supported. Writing to RCFile, SequenceFile, or CSV is not supported.
-
Writing to a subset of columns is not supported. Because HDFS is a file system without schemas, all columns must be specified.
-
The following Hive data types are not supported: DECIMAL, BINARY, ARRAY, MAP, STRUCT, and UNION.
-
For Hive partitioned tables, only single-partition writes are supported.
-
For TextFile, the field delimiter used for writing must match the delimiter used when the Hive table was created, so that the data can be associated with Hive table fields.
Plugin version compatibility
Both HDFS Reader and HDFS Writer are built on Hive 1.1.1 and Hadoop 2.7.1 (Apache, adapted for JDK 1.6 for Reader and JDK 1.7 for Writer). The plugins have been tested with Hadoop 2.5.0, Hadoop 2.6.0, and Hive 1.2.0.
How HDFS Writer works
HDFS Writer uses a write-and-rename strategy to avoid file conflicts and prevent other processes from reading partially written files:
-
Creates a temporary folder in HDFS based on the specified path, using the
path_randomnaming rule. -
Writes all files to the temporary folder.
-
After all files are written, moves them from the temporary folder to the destination path.
-
Deletes the temporary folder.
If a network interruption or connection error occurs during step 2 or 3, manually delete the temporary folder and any files that were written.
The admin account must have read and write permissions on the relevant HDFS files.
Supported field types
HDFS Reader type mapping
By default, HDFS Reader converts Hive data types to Data Integration internal types as follows:
| Type category | Data Integration type | Hive data types |
|---|---|---|
| Integer | long |
TINYINT, SMALLINT, INT, BIGINT |
| Floating-point | double |
FLOAT, DOUBLE |
| String | string |
STRING, CHAR, VARCHAR, STRUCT, MAP, ARRAY, UNION, BINARY |
| Date/Time | date |
DATE, TIMESTAMP |
| Boolean | boolean |
BOOLEAN |
Notes on specific types:
-
`long`: Integer values in an HDFS file, such as
123456789. -
`double`: Floating-point values in an HDFS file, such as
3.1415. -
`boolean`: Boolean values (
trueorfalse). Case-insensitive. -
`date`: Time values in an HDFS file, such as
2014-12-31 00:00:00.
HDFS Writer type mapping
HDFS Writer supports the following Hive data types. The column configuration must match the corresponding column types in the Hive table.
| Type category | Supported Hive data types |
|---|---|
| Integer | TINYINT, SMALLINT, INT, BIGINT |
| Floating-point | FLOAT, DOUBLE |
| String | CHAR, VARCHAR, STRING |
| Boolean | BOOLEAN |
| Date/Time | DATE, TIMESTAMP |
Configure a synchronization task
To configure a single-table offline synchronization task, see:
For all parameters and a script example for the code editor, see Appendix: Script demo and parameter descriptions.
Appendix: Script demo and parameter descriptions
Reader script demo
The following script shows a basic HDFS Reader configuration. All examples use the datasource parameter to reference the HDFS data source configured in DataWorks.
{
"type": "job",
"version": "2.0",
"steps": [
{
"stepType": "hdfs",
"parameter": {
"datasource": "",
"path": "",
"fileType": "",
"column": [
{
"index": 0,
"type": "string"
},
{
"index": 1,
"type": "long"
},
{
"index": 2,
"type": "double"
},
{
"index": 3,
"type": "boolean"
},
{
"index": 4,
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss"
}
],
"fieldDelimiter": ",",
"encoding": "UTF-8",
"hadoopConfig": {
"dfs.data.transfer.protection": "integrity",
"dfs.datanode.use.datanode.hostname": "true",
"dfs.client.use.datanode.hostname": "true"
}
},
"name": "Reader",
"category": "reader"
},
{
"stepType": "stream",
"parameter": {},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"errorLimit": {
"record": ""
},
"speed": {
"concurrent": 3,
"throttle": true,
"mbps": "12"
}
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}
The following example shows how to configure HDFS Reader to read a Parquet file using parquetSchema. Set fileType to parquet and specify the complete schema. Use the index in the column parameter to select and map the required columns.
"reader": {
"name": "hdfsreader",
"parameter": {
"path": "/user/hive/warehouse/addata.db/dw_ads_rtb_monitor_minute/thedate=20170103/hour_id=22/*",
"defaultFS": "h10s010.07100.149:8020",
"fileType": "parquet",
"encoding": "UTF-8",
"column": [
{
"index": 0,
"type": "string"
},
{
"index": 1,
"type": "long"
},
{
"index": 2,
"type": "double"
}
],
"parquetSchema": "message m { optional int32 minute_id; optional int32 dsp_id; optional int32 adx_pid; optional int64 req; optional int64 res; optional int64 suc; optional int64 imp; optional double revenue; }"
}
}
Reader parameters
| Parameter | Description | Required | Default |
|---|---|---|---|
path |
The path of the files to read. See Specifying the read path for details on static paths, wildcards, and partition paths. | Yes | None |
defaultFS |
The address of the HDFS NameNode, for example, hdfs://127.0.0.1:9000. The public resource group does not support Hadoop high availability (HA) configuration. |
Yes | None |
fileType |
The file format: TEXT, ORC, RC, SEQ, CSV, or parquet. Before reading, HDFS Reader verifies that all files in the path match this format. If they do not, the task fails. See File format parsing notes. |
Yes | None |
column |
The list of columns to read. type specifies the data type. index specifies the column position (0-based). value specifies a constant — the field is generated from this value rather than read from the source file. To read all columns as strings, use "column": ["*"]. |
Yes | None |
fieldDelimiter |
The field delimiter for TextFile data. Not required for ORCFile (Hive's default delimiter is \u0001). The delimiter cannot be \n. |
No | , |
encoding |
The file encoding. | No | UTF-8 |
nullFormat |
The string to interpret as a null value. For example, setting "nullFormat": "null" treats the string null as a null field (not the same as an actual null value). |
No | None |
compress |
The compression format for CSV files. Supported values: gzip, bz2, zip, lzo, lzo_deflate, hadoop-snappy, framing-snappy. Not required for ORC files. LZO has two formats (lzo and lzo_deflate) — specify the correct one. |
No | None |
parquetSchema |
Required when fileType is parquet. Describes the structure of the Parquet file. See Parquet schema format. |
No | None |
csvReaderConfig |
Advanced configuration for reading CSV files (Map type). If not set, default values are used. See CSV reader configuration. | No | None |
hadoopConfig |
Advanced Hadoop parameters, such as HA configuration. See Hadoop HA configuration. | No | None |
haveKerberos |
Whether Kerberos authentication is enabled. If true, kerberosKeytabFilePath and kerberosPrincipal are required. |
No | false |
kerberosKeytabFilePath |
The absolute path of the Kerberos keytab file. Required if haveKerberos is true. |
No | None |
kerberosPrincipal |
The Kerberos principal name, such as **/hadoopclient@.***. Required if haveKerberos is true. |
No | None |
Specifying the read path
The path parameter supports three approaches:
-
Option 1: Static path — Reads a single file or all files in a directory. A single file uses one thread. Example:
/user/hive/warehouse/mytable01/data.csv. -
Option 2: Wildcard path — Reads multiple files matching a pattern. HDFS Reader supports
*(matches any characters) and?(matches a single character). Example:/hadoop/data_201704*. The actual thread count is the smaller of the number of matching files and theconcurrentsetting. -
Option 3: Partition path — Reads data from a Hive partition directory. When a Hive table is created with partitions (for example,
partition(day="20150820", hour="09")), the partition appears as a directory structure in HDFS. To read all data for a given day, set the path as follows:"path": "/user/hive/warehouse/mytable01/20150820/*"
Data Integration treats all files in a synchronization task as a single table. All files must conform to the same schema, and the admin account must have read permissions on them. If filenames follow a time-based pattern, use scheduling parameters to replace the path dynamically based on business time.
File format parsing notes
TextFile and ORCFile parse complex Hive types differently. For the map type, an ORCFile produces {job=80, team=60} while a TextFile produces {job:80, team:60}. The data is the same, but the format differs. If your data includes complex Hive types, use a consistent file format across the path. To unify the format, export TextFile tables to ORCFile in the Hive client.
Parquet schema format
message MessageTypeName {
RequiredStatus DataType ColumnName;
...;
}
-
MessageTypeName: A name for the message type.
-
RequiredStatus: Use
requiredfor non-null columns andoptionalfor nullable columns. Set all columns tooptional. -
DataType: Supported types are
BOOLEAN,INT32,INT64,INT96,FLOAT,DOUBLE,BINARY(use for string types), andFIXED_LEN_BYTE_ARRAY. -
End each column definition with a semicolon, including the last one.
Example:
"parquetSchema": "message m { optional int32 minute_id; optional int32 dsp_id; optional int64 req; optional double revenue; }"
CSV reader configuration
"csvReaderConfig": {
"safetySwitch": false,
"skipEmptyRecords": false,
"useTextQualifier": false
}
All available fields and their defaults:
boolean caseSensitive = true;
char textQualifier = 34;
boolean trimWhitespace = true;
boolean useTextQualifier = true; // Whether to use a CSV escape character
char delimiter = 44; // Separator
char recordDelimiter = 0;
char comment = 35;
boolean useComments = false;
int escapeMode = 1;
boolean safetySwitch = true; // Whether to limit a single column to 100,000 characters
boolean skipEmptyRecords = true; // Whether to skip empty rows
boolean captureRawRecord = true;
Hadoop HA configuration
"hadoopConfig": {
"dfs.nameservices": "testDfs",
"dfs.ha.namenodes.testDfs": "namenode1,namenode2",
"dfs.namenode.rpc-address.testDfs.namenode1": "",
"dfs.namenode.rpc-address.testDfs.namenode2": "",
"dfs.client.failover.proxy.provider.testDfs": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
"dfs.data.transfer.protection": "integrity",
"dfs.datanode.use.datanode.hostname": "true",
"dfs.client.use.datanode.hostname": "true"
}
Thedfs.data.transfer.protection,dfs.datanode.use.datanode.hostname, anddfs.client.use.datanode.hostnameparameters enable Kerberos authentication in the HDFS Reader plugin. If Kerberos authentication is already configured on the HDFS data source, these parameters are not required in the plugin configuration. See Configure an HDFS data source.
Kerberos configuration example
"haveKerberos": true,
"kerberosKeytabFilePath": "/opt/datax/**.keytab",
"kerberosPrincipal": "**/hadoopclient@**.**"
Because Kerberos requires the absolute path to the keytab file, deploy this configuration on a resource group.
Writer script demo
The following script shows a basic HDFS Writer configuration.
{
"type": "job",
"version": "2.0",
"steps": [
{
"stepType": "stream",
"parameter": {},
"name": "Reader",
"category": "reader"
},
{
"stepType": "hdfs",
"parameter": {
"datasource": "",
"path": "",
"fileName": "",
"fileType": "text",
"column": [
{
"name": "col1",
"type": "string"
},
{
"name": "col2",
"type": "int"
},
{
"name": "col3",
"type": "double"
},
{
"name": "col4",
"type": "boolean"
},
{
"name": "col5",
"type": "date"
}
],
"writeMode": "",
"fieldDelimiter": ",",
"encoding": "UTF-8",
"compress": ""
},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"errorLimit": {
"record": ""
},
"speed": {
"concurrent": 3,
"throttle": false
}
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}
Writer parameters
| Parameter | Description | Required | Default |
|---|---|---|---|
defaultFS |
The NameNode address for HDFS, for example, hdfs://127.0.0.1:9000. |
Yes | None |
fileType |
The output file format: text (TextFile), orc (ORCFile), or parquet (Parquet). |
Yes | None |
path |
The destination path in HDFS. HDFS Writer writes multiple files to this directory based on the concurrent setting. To write to a Hive table, set this to the Hive table's storage path on HDFS. For example, for a table named hello in the test database with a Hive data warehouse path of /user/hive/warehouse/, the path is /user/hive/warehouse/test.db/hello. |
Yes | None |
fileName |
The base filename for output files. A random suffix is appended to create the actual filename for each thread. | Yes | None |
column |
The list of fields to write. Specify all field names (name) and types (type) — writing to a subset of columns is not supported. Not required when fileType is parquet. |
Yes (not required for parquet) |
None |
writeMode |
The behavior when files with the same fileName prefix already exist in the destination path. See Write modes. |
Yes | None |
fieldDelimiter |
The field delimiter for output files. Must match the delimiter used when creating the Hive table, otherwise data cannot be queried in Hive. Only single-character delimiters are supported. Not required when fileType is parquet. |
Yes (not required for parquet) |
None |
compress |
The compression type for output files. For text files, gzip and bzip2 are supported. Leave blank for no compression. |
No | None |
encoding |
The encoding format for output files. | No | UTF-8 |
parquetSchema |
Required when fileType is parquet. Describes the structure of the Parquet file. Uses the same format as the Reader's parquetSchema. |
No | None |
hadoopConfig |
Advanced Hadoop parameters, such as HA configuration. Uses the same format as the Reader's hadoopConfig. Also supports OSS-backed HDFS when dataxParquetMode is fields. |
No | None |
dataxParquetMode |
The mode for synchronizing Parquet files. fields supports complex types (ARRAY, MAP, STRUCT) and HDFS over OSS. columns is the default. |
No | columns |
haveKerberos |
Whether Kerberos authentication is enabled. If true, kerberosKeytabFilePath and kerberosPrincipal are required. |
No | false |
kerberosKeytabFilePath |
The absolute path of the Kerberos keytab file. Required if haveKerberos is true. |
No | None |
kerberosPrincipal |
The Kerberos principal name. Required if haveKerberos is true. |
No | None |
Write modes
HDFS Writer uses a write-and-rename strategy: it writes to a temporary folder first, then moves the files to the destination path. The writeMode parameter controls how existing files with the same fileName prefix are handled before writing begins.
| Mode | Behavior |
|---|---|
append |
No cleanup before writing. HDFS Writer appends files directly without checking for conflicts. |
nonConflict |
Fails the task if any file with the fileName prefix already exists in the destination directory. |
truncate |
Deletes all files with the fileName prefix in the destination directory before writing. For example, if "fileName": "abc", all files starting with abc are deleted first. |
Parquet format does not supportappendmode. UsenonConflictfor Parquet files.
Writing to OSS-backed HDFS
When dataxParquetMode is fields, HDFS Writer supports OSS as the underlying storage. Add the following OSS parameters to hadoopConfig:
"writer": {
"name": "hdfswriter",
"parameter": {
"defaultFS": "oss://test-bucket",
"fileType": "parquet",
"path": "/datasets/oss_demo/kpt",
"fileName": "test",
"writeMode": "truncate",
"encoding": "UTF-8",
"hadoopConfig": {
"fs.oss.accessKeyId": "<your-access-key-id>",
"fs.oss.accessKeySecret": "<your-access-key-secret>",
"fs.oss.endpoint": "oss-cn-hangzhou.aliyuncs.com"
},
"parquetSchema": "message test {\n required int64 id;\n optional binary name (UTF8);\n optional int64 gmt_create;\n required group map_col (MAP) {\n repeated group key_value {\n required binary key (UTF8);\n required binary value (UTF8);\n }\n }\n required group array_col (LIST) {\n repeated group list {\n required binary element (UTF8);\n }\n }\n required group struct_col {\n required int64 id;\n required binary name (UTF8);\n }\n}",
"dataxParquetMode": "fields"
}
}
Replace the following placeholders with actual values:
| Placeholder | Description |
|---|---|
<your-access-key-id> |
The AccessKey ID for accessing OSS |
<your-access-key-secret> |
The AccessKey secret for accessing OSS |