本文为您介绍HDFS Writer支持的数据类型、字段映射和数据源等参数及配置示例。

HDFS Writer提供向HDFS文件系统指定路径中写入TextFile文件、 ORCFile文件以及ParquetFile格式文件,文件内容可以与Hive中的表关联。开始配置HDFS Writer插件前,请首先配置好数据源,详情请参见配置Hive数据源
说明 HDFS Writer仅支持使用新增和使用独享数据集成资源组,不支持使用公共资源组

使用限制

  • 目前HDFS Writer仅支持TextFile、ORCFile和ParquetFile三种格式的文件,且文件内容存放的必须是一张逻辑意义上的二维表。
  • 由于HDFS是文件系统,不存在schema的概念,因此不支持对部分列写入。
  • 目前不支持DECIMAL、BINARY、ARRAYS、MAPS、STRUCTS和UNION等Hive数据类型。
  • 对于Hive分区表目前仅支持一次写入单个分区。
  • 对于TextFile,需要保证写入HDFS文件的分隔符与在Hive上创建表时的分隔符一致,从而实现写入HDFS数据与Hive表字段关联。
  • 目前插件中的Hive版本为1.1.1,Hadoop版本为2.7.1(Apache为适配JDK1.7)。在Hadoop2.5.0、Hadoop2.6.0和Hive1.2.0测试环境中写入正常。

实现过程

HDFS Writer的实现过程如下所示:
  1. 根据您指定的path,创建一个HDFS文件系统上不存在的临时目录。

    创建规则:path_随机

  2. 将读取的文件写入这个临时目录。
  3. 全部写入后,将临时目录下的文件移动到您指定的目录(在创建文件时保证文件名不重复)。
  4. 删除临时目录。如果在此过程中,发生网络中断等情况造成无法与HDFS建立连接,需要您手动删除已经写入的文件和临时目录。
说明 数据同步需要使用Admin账号,并且有访问相应文件的读写权限。

数据类型转换

目前HDFS Writer支持大部分Hive类型,请注意检查您的数据类型。

HDFS Writer针对Hive数据类型的转换列表,如下所示。

说明 column的配置需要和Hive表对应的列类型保持一致。
类型分类数据库数据类型
整数类TINYINT、SMALLINT、INT和BIGINT
浮点类FLOAT和DOUBLE
字符串类CHAR、VARCHAR和STRING
布尔类BOOLEAN
日期时间类DATE和TIMESTAMP

参数说明

参数描述是否必选默认值
defaultFSHadoop HDFS文件系统namenode节点地址,例如hdfs://127.0.0.1:9000。公共资源组不支持Hadoop高级参数HA的配置,请新增和使用自定义数据集成资源组
fileType文件的类型,目前仅支持您配置为textorcparquet
  • text:表示Hive中的存储表,TextFile文件格式。
  • orc:表示Hive中的压缩表,ORCFile文件格式。
  • parquet:表示普通Parquet File文件格式。
path存储到Hadoop HDFS文件系统的路径信息,HDFS Writer会根据并发配置在path目录下写入多个文件。

为了与Hive表关联,请填写Hive表在HDFS上的存储路径。例如Hive上设置的数据仓库的存储路径为/user/hive/warehouse/,已建立数据库test表hello,则对应的存储路径为/user/hive/warehouse/test.db/hello

fileNameHDFS Writer写入时的文件名,实际执行时会在该文件名后添加随机的后缀作为每个线程写入实际文件名。
column写入数据的字段,不支持对部分列写入。

为了与Hive中的表关联,需要指定表中所有字段名和字段类型,其中name指定字段名,type指定字段类型。

您可以指定column字段信息,配置如下。
"column": 
[
    {
        "name": "userName",
        "type": "string"
    },
    {
        "name": "age",
        "type": "long"
    }
]
是(如果filetype为parquet,此项无需填写)
writeModeHDFS Writer写入前数据清理处理模式:
  • append:写入前不做任何处理,数据集成HDFS Writer直接使用filename写入,并保证文件名不冲突。
  • nonConflict:如果目录下有fileName前缀的文件,直接报错。
  • truncate:写入前清理fileName名称前缀匹配的所有文件。例如,"fileName": "abc",将清理对应目录所有abc开头的文件。
说明 Parquet格式文件不支持Append,所以只能是noConflict
fieldDelimiterHDFS Writer写入时的字段分隔符,需要您保证与创建的Hive表的字段分隔符一致,否则无法在Hive表中查到数据。
说明 仅支持单字符分隔符,如果输入多字符将导致运行时报错。
是(如果filetype为parquet,此项无需填写)
compressHDFS文件压缩类型,默认不填写,则表示没有压缩。

其中text类型文件支持gzip和bzip2压缩类型。

encoding写文件的编码配置。无压缩
parquetSchema写Parquet格式文件时的必填项,用来描述目标文件的结构,所以此项当且仅当fileTypeparquet时生效,格式如下。
message MessageType名 {
是否必填, 数据类型, 列名;
......................;
}
配置项说明如下:
  • MessageType名:填写名称。
  • 是否必填:required表示非空,optional表示可为空。推荐全填optional。
  • 数据类型:Parquet文件支持BOOLEAN、INT32、INT64、INT96、FLOAT、DOUBLE、BINARY(如果是字符串类型,请填BINARY)和FIXED_LEN_BYTE_ARRAY等类型。
说明 每行列设置必须以分号结尾,最后一行也要写上分号。
示例如下。
message m {
optional int64 id;
optional int64 date_id;
optional binary datetimestring;
optional int32 dspId;
optional int32 advertiserId;
optional int32 status;
optional int64 bidding_req_num;
optional int64 imp;
optional int64 click_num;
}
hadoopConfighadoopConfig中可以配置与Hadoop相关的一些高级参数,例如HA的配置。公共资源组不支持Hadoop高级参数HA的配置,请新增和使用自定义数据集成资源组
"hadoopConfig":{
"dfs.nameservices": "testDfs",
"dfs.ha.namenodes.testDfs": "namenode1,namenode2",
"dfs.namenode.rpc-address.youkuDfs.namenode1": "",
"dfs.namenode.rpc-address.youkuDfs.namenode2": "",
"dfs.client.failover.proxy.provider.testDfs": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
}
dataxParquetMode针对Parquet文件进行同步的模式。使用fields支持array、map和struct等复杂类型。可选值包括fields和columns。
配置dataxParquetMode为fields时,支持hdfs over oss,即HDFS的存储为OSS,OSS的数据存储格式为parquet。此时您可以在hadoopConfig中增加OSS相关的参数,详情如下:
  • fs.oss.accessKeyId:访问OSS的AccessKeyID。
  • fs.oss.accessKeySecret:访问OSS的AccessKeySecret。
  • fs.oss.endpoint:访问OSS的endpoint。
访问OSS的示例如下所示。
```json
    "writer": {
    "name": "hdfswriter",
    "parameter": {
        "defaultFS": "oss://test-bucket",
        "fileType": "parquet",
        "path": "/datasets/oss_demo/kpt",
        "fileName": "test",
        "writeMode": "truncate",
        "encoding": "UTF-8",
        "hadoopConfig": {
            "fs.oss.accessKeyId": "the-access-id",
            "fs.oss.accessKeySecret": "the-access-key",
            "fs.oss.endpoint": "oss-cn-hangzhou.aliyuncs.com"
            },
            "parquetSchema": "message test {\n    required int64 id;\n    optional binary name (UTF8);\n    optional int64 gmt_create;\n    required group map_col (MAP) {\n        repeated group key_value {\n            required binary key (UTF8);\n            required binary value (UTF8);\n        }\n    }\n    required group array_col (LIST) {\n        repeated group list {\n            required binary element (UTF8);\n        }\n    }\n    required group struct_col {\n        required int64 id;\n        required binary name (UTF8);\n    }    \n}",
            "dataxParquetMode": "fields"
            }
        }
    ```
columns
haveKerberos是否有Kerberos认证,默认为false。如果您配置为true,则配置项kerberosKeytabFilePathkerberosPrincipal为必填。false
kerberosKeytabFilePathKerberos认证keytab文件的绝对路径。如果haveKerberostrue,则必选。
kerberosPrincipalKerberos认证Principal名,如****/hadoopclient@**.*** 。如果haveKerberostrue,则必选。
由于Kerberos需要配置keytab认证文件的绝对路径,您需要在自定义资源组上使用此功能。配置示例如下。
"haveKerberos":true,
"kerberosKeytabFilePath":"/opt/datax/**.keytab",
"kerberosPrincipal":"**/hadoopclient@**.**"

向导开发介绍

  1. 选择数据源。
    配置同步任务的数据来源数据去向选择数据源
    参数描述
    数据源即上述参数说明中的datasource,通常输入您配置的数据源名称。
    文件路径即上述参数说明中的path
    文本类型即上述参数说明中的fileType。写入的文件类型,目前支持您配置为TEXTORCparquet
    写入文件名即上述参数说明中的fileName。HDFS Writer写入时的文件名。
    写入模式即上述参数说明中的writeMode。HDFS Writer写入前数据清理处理模式:
    • append:写入前不做任何处理,数据集成HDFS Writer直接使用filename写入,并保证文件名不冲突。
    • nonConflict:如果目录下有fileName前缀的文件,直接报错。
    说明 Parquet格式文件不支持Append,所以只能是noConflict。
    字段分隔符即上述参数说明中的fieldDelimiter。HDFS Writer写入时的字段分隔符,需要您保证与创建的Hive表的字段分隔符一致,否则无法在Hive表中查到数据。
    说明 仅支持单字符分隔符,如果输入多字符将导致运行时报错。
    文件编码即上述脚本模式参数说明中的encoding,默认值为utf-8
    Kerberos认证是否有Kerberos认证,默认为,当配置为的时候,配置项Keytab文件路径Principal名为必填。详情请参见附录:配置Kerberos认证
    HadoopConfighadoopConfig中可以配置与Hadoop相关的一些高级参数,例如HA的配置。公共资源组不支持Hadoop高级参数HA的配置。
  2. 字段映射,即上述参数说明中的column。默认使用同行映射。您可以单击图标图标手动编辑目标表字段 ,一行表示一个字段,首尾空行会被采用,其它空行会被忽略。请确保读、写两端字段数量匹配。字段映射
  3. 通道控制。通道配置
    参数描述
    任务期望最大并发数数据同步任务内,可以从源并行读取或并行写入数据存储端的最大线程数。向导模式通过界面化配置并发数,指定任务所使用的并行度。
    同步速率设置同步速率可以保护读取端数据库,以避免抽取速度过大,给源库造成太大的压力。同步速率建议限流,结合源库的配置,请合理配置抽取速率。
    错误记录数错误记录数,表示脏数据的最大容忍条数。
    分布式处理能力

    数据同步时,可以将任务切片分散到多台执行节点上并发执行,提高同步速率。该模式下,配置较大任务并发数会增加数据存储访问压力,如需使用该功能,请提前评估数据存储的访问负载。该功能仅支持在独享数据集成资源组配置,详情请参见独享数据集成资源组新增和使用独享数据集成资源组

脚本开发介绍

使用脚本模式开发的详情请参见通过脚本模式配置离线同步任务

脚本配置示例如下,详情请参见上述参数说明。
{
    "type": "job",
    "version": "2.0",//版本号。
    "steps": [
        { 
            "stepType": "stream",
            "parameter": {},
            "name": "Reader",
            "category": "reader"
        },
        {
            "stepType": "hdfs",//插件名。
            "parameter": {
                "path": "",//存储到Hadoop HDFS文件系统的路径信息。
                "fileName": "",//HDFS Writer写入时的文件名。
                "compress": "",//HDFS文件压缩类型。
                "datasource": "",//数据源。
                "column": [
                    {
                        "name": "col1",//字段名。
                        "type": "string"//字段类型。
                    },
                    {
                        "name": "col2",
                        "type": "int"
                    },
                    {
                        "name": "col3",
                        "type": "double"
                    },
                    {
                        "name": "col4",
                        "type": "boolean"
                    },
                    {
                        "name": "col5",
                        "type": "date"
                    }
                ],
                "writeMode": "",//写入模式。
                "fieldDelimiter": ",",//列分隔符。
                "encoding": "",//编码格式。
                "fileType": "text"//文本类型。
            },
            "name": "Writer",
            "category": "writer"
        }
    ],
    "setting": {
        "errorLimit": {
            "record": ""//错误记录数。
        },
        "speed": {
            "concurrent": 3,//作业并发数。
            "throttle": false //false代表不限流,下面的限流的速度不生效;true代表限流。
        }
    },
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    }
}