本文为您介绍OSS Writer支持的数据类型、写入方式、字段映射和数据源等参数及配置示例。

背景信息

OSS Writer插件为您提供向OSS写入类CSV格式的一个或多个表文件的功能,写入的文件个数和您的任务并发及同步的文件数有关。
说明 开始配置OSS Writer插件前,请首先配置好数据源,详情请参见配置OSS数据源

写入OSS中的是一张逻辑意义上的二维表,例如CSV格式的文本信息。如果您想对OSS产品有更深入的了解,请参见OSS产品概述

OSS Writer实现了从数据同步协议转为OSS中的文本文件功能,OSS本身是无结构化数据存储,目前OSS Writer支持的功能如下:
  • 支持且仅支持写入文本文件,并要求文本文件中的Schema为一张二维表。
  • 支持类CSV格式文件,自定义分隔符。
  • 支持多线程写入,每个线程写入不同的子文件。
  • 文件支持滚动,当文件大于某个size值时,支持文件切换。当文件大于某个行数值时,支持文件切换。
OSS Writer暂时不能实现以下功能:
  • 单个文件不能支持并发写入。
  • OSS本身不提供数据类型,OSS Writer均以STRING类型写入OSS对象。

参数说明

参数 描述 是否必选 默认值
datasource 数据源名称,脚本模式支持添加数据源,该配置项填写的内容必须与添加的数据源名称保持一致。
object OSS Writer写入的文件名,OSS使用文件名模拟目录的实现。OSS对于Object的名称有以下限制:
  • 使用"object": "datax",写入的Object以datax开头,后缀添加随机字符串。
  • 使用"object": "cdo/datax",写入的Object以/cdo/datax开头,后缀随机添加字符串,OSS模拟目录的分隔符为(/)。

如果您不需要后缀随机UUID,建议您配置"writeSingleObject" : "true",详情请参见writeSingleObject说明。

writeMode OSS Writer写入前,数据的处理:
  • truncate:写入前清理Object名称前缀匹配的所有Object。例如"object":"abc",将清理所有abc开头的Object。
  • append:写入前不进行任何处理,数据集成OSS Writer直接使用Object名称写入,并使用随机UUID的后缀名来保证文件名不冲突。例如您指定的Object名为数据集成,实际写入为DI_****_****_****
  • nonConflict:如果指定路径出现前缀匹配的Object,直接报错。例如"object":"abc",如果存在abc123的Object,将直接报错。
writeSingleObject OSS写数据时,是否写单个文件:
  • true:表示写单个文件。
  • false:表示写多个文件。
false
fileFormat 文件写出的格式,包括csvtext
  • csv仅支持严格的csv格式。如果待写数据包括列分隔符,则会根据csv的转义语法转义,转义符号为双引号(")。
  • text格式指使用列分隔符简单分割待写数据,对于待写数据包括列分隔符情况下不进行转义。
text
fieldDelimiter 读取的字段分隔符。 ,
encoding 写出文件的编码配置。 utf-8
nullFormat 文本文件中无法使用标准字符串定义null(空指针),数据同步系统提供nullFormat定义可以表示为null的字符串。例如,您配置nullFormat="null",如果源头数据是null,数据同步系统会视作null字段。
header(高级配置,向导模式不支持) OSS写出时的表头,例如,['id', 'name', 'age']
maxFileSize(高级配置,向导模式不支持) OSS写出时单个Object文件的最大值,默认为10,000*10MB,类似于在打印log4j日志时,控制日志文件的大小。OSS分块上传时,每个分块大小为10MB(也是日志轮转文件最小粒度,即小于10MB的maxFileSize会被作为10MB),每个OSS InitiateMultipartUploadRequest支持的分块最大数量为10,000。

轮转发生时,Object名字规则是在原有Object前缀加UUID随机数的基础上,拼接_1,_2,_3等后缀。

100,000MB
suffix(高级配置,向导模式不支持) 数据同步写出时,生成的文件名后缀。例如,配置suffix.csv,则最终写出的文件名为fileName****.csv

向导开发介绍

  1. 选择数据源。
    配置同步任务的数据来源数据去向选择数据源
    参数 描述
    数据源 即上述参数说明中的datasource,通常填写您配置的数据源名称。
    Object前缀 即上述参数说明中的Object,填写OSS文件夹的路径,其中不要填写bucket的名称。
    文本类型 包括csvtext
    列分隔符 即上述参数说明中的fieldDelimiter,默认值为(,)。
    编码格式 即上述参数说明中的encoding,默认值为utf-8
    null值 即上述参数说明中的nullFormat,将要表示为空的字段填入文本框,如果源端存在则将对应的部分转换为空。
    时间格式 日期类型的数据序列化到Object时的格式,例如"dateFormat": "yyyy-MM-dd"
    前缀冲突 有同样的文件时,可以选择替换、保留或报错。
  2. 字段映射,即上述参数说明中的column。左侧的源头表字段和右侧的目标表字段为一一对应的关系。字段映射
    参数 描述
    同名映射 单击同名映射,可以根据名称建立相应的映射关系,请注意匹配数据类型。
    同行映射 单击同行映射,可以在同行建立相应的映射关系,请注意匹配数据类型。
    取消映射 单击取消映射,可以取消建立的映射关系。
  3. 通道控制。通道控制
    参数 描述
    任务期望最大并发数 数据同步任务内,可以从源并行读取或并行写入数据存储端的最大线程数。向导模式通过界面化配置并发数,指定任务所使用的并行度。
    同步速率 设置同步速率可以保护读取端数据库,以避免抽取速度过大,给源库造成太大的压力。同步速率建议限流,结合源库的配置,请合理配置抽取速率。
    错误记录数 错误记录数,表示脏数据的最大容忍条数。

脚本开发介绍

脚本配置示例如下所示,使用脚本模式开发的详情请参见通过脚本模式配置任务
{
    "type":"job",
    "version":"2.0",
    "steps":[
        {
            "stepType":"stream",
            "parameter":{},
            "name":"Reader",
            "category":"reader"
        },
        {
            "stepType":"oss",//插件名。
            "parameter":{
                "nullFormat":"",//数据同步系统提供nullFormat,定义哪些字符串可以表示为null。
                "dateFormat":"",//日期格式。
                "datasource":"",//数据源。
                "writeMode":"",//写入模式。
                "writeSingleObject":"false", //表示是否将同步数据写入单个oss文件。
                "encoding":"",//编码格式。
                "fieldDelimiter":","//字段分隔符。
                "fileFormat":"",//文本类型。
                "object":""//Object前缀。
            },
            "name":"Writer",
            "category":"writer"
        }
    ],
    "setting":{
        "errorLimit":{
            "record":"0"//错误记录数。
        },
        "speed":{
            "throttle":false,//false代表不限流,下面的限流的速度不生效;true代表限流。
            "concurrent":1,//作业并发数。
        }
    },
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    }
}

ORC或Parquet文件写入OSS

目前通过复用HDFS Writer的方式完成OSS写ORC或Parquet格式的文件。在OSS Writer已有参数的基础上,增加了PathFileFormat等扩展配置参数,参数含义请参见HDFS Writer

ORC或Parquet文件写入OSS的示例如下:
  • 以ORC文件格式写入OSS。
    {
          "stepType": "oss",
          "parameter": {
            "datasource": "",
            "fileFormat": "orc",
            "path": "/tests/case61",
            "fileName": "orc",
            "writeMode": "append",
            "column": [
              {
                "name": "col1",
                "type": "BIGINT"
              },
              {
                "name": "col2",
                "type": "DOUBLE"
              },
              {
                "name": "col3",
                "type": "STRING"
              }
            ],
            "writeMode": "append",
            "fieldDelimiter": "\t",
            "compress": "NONE",
            "encoding": "UTF-8"
          }
        }
  • 以Parquet文件格式写入OSS,示例如下。
    {
          "stepType": "oss",
          "parameter": {
            "datasource": "",
            "fileFormat": "parquet",
            "path": "/tests/case61",
            "fileName": "test",
            "writeMode": "append",
            "fieldDelimiter": "\t",
            "compress": "SNAPPY",
            "encoding": "UTF-8",
            "parquetSchema": "message test { required int64 int64_col;\n required binary str_col (UTF8);\nrequired group params (MAP) {\nrepeated group key_value {\nrequired binary key (UTF8);\nrequired binary value (UTF8);\n}\n}\nrequired group params_arr (LIST) {\n  repeated group list {\n    required binary element (UTF8);\n  }\n}\nrequired group params_struct {\n  required int64 id;\n required binary name (UTF8);\n }\nrequired group params_arr_complex (LIST) {\n  repeated group list {\n    required group element {\n required int64 id;\n required binary name (UTF8);\n}\n  }\n}\nrequired group params_complex (MAP) {\nrepeated group key_value {\nrequired binary key (UTF8);\nrequired group value {\n  required int64 id;\n required binary name (UTF8);\n  }\n}\n}\nrequired group params_struct_complex {\n  required int64 id;\n required group detail {\n  required int64 id;\n required binary name (UTF8);\n  }\n  }\n}",
            "dataxParquetMode": "fields"
          }
        }