This topic describes the data types and parameters supported by Object Storage Service (OSS) Reader and how to configure it by using the codeless user interface (UI) and code editor.

OSS Reader can read data stored in OSS. OSS Reader connects to OSS through the official OSS Java SDK, reads data from OSS, converts the data to a format that is readable by Data Integration, and then sends the converted data to a writer.

OSS stores unstructured data only. Currently, OSS Reader supports the following features:
  • Reads TXT objects that store logical two-dimensional tables. OSS Reader can read only TXT objects.
  • Reads data stored in formats similar to CSV with custom delimiters.
  • Reads data of various types as strings. Supports constants and column pruning.
  • Supports recursive reading and object name-based filtering.
  • Supports the following object compression formats: GZIP, BZIP2, and ZIP.
    Note You cannot compress multiple objects into one package.
  • Reads multiple objects concurrently.
Currently, OSS Reader does not support the following features:
  • Uses concurrent threads to read an uncompressed object.
  • Uses concurrent threads to read a compressed object.

OSS Reader supports the following OSS data types: BIGINT, DOUBLE, STRING, DATATIME, and BOOLEAN.

Data types

Category Data Integration data type OSS data type
Integer LONG LONG
String STRING STRING
Floating point DOUBLE DOUBLE
Boolean BOOLEAN BOOLEAN
Date and time DATE DATE

Parameters

Parameter Description Required Default value
datasource The connection name. It must be identical to the name of the added connection. You can add connections in the code editor. Yes None
Object The name of the OSS object to read. You can specify multiple object names. For example, if a bucket has a directory named yunshi and this directory contains an object named ll.txt, you can set this parameter to yunshi/ll.txt.
  • If you specify a single OSS object, OSS Reader uses only one thread to read the object. Concurrent multi-thread reading of a single uncompressed object is coming soon.
  • If you specify multiple OSS objects, OSS Reader uses multiple threads to read these objects. The actual number of threads is determined by the number of channels.
  • When a name contains a wildcard, OSS Reader attempts to read all objects that match the name. For example, if you set the value to abc[0-9], OSS Reader reads objects abc0 to abc9. We recommend that you do not use wildcards because wildcards may cause out of memory (OOM). For more information, see OSS documentation.
Note
  • Data Integration considers all the objects on a sync node as a single table. Make sure that all the objects on each sync node can adapt to the same schema.
  • Control the number of objects stored in a single directory. If a directory contains too many objects, an OOM error may be returned. In this case, store the objects in different directories and then synchronize data.
Yes None
column The columns to read. The type parameter specifies the source data type. The index parameter specifies the ID of the column in the source table, starting from 0. The value parameter specifies the column value if the column is a constant column.
By default, OSS Reader reads all data as strings. You can specify the column parameter in the following way:
json
"column": ["*"]
You can also specify the column parameter in the following way:
json
"column":
    {
       "type": "long",
       "index": 0 // The first INT-type column of the source object.
    },
    {
       "type": "string",
       "value": "alibaba" // The value of the current column, that is, a constant "alibaba".
    }
Note For the column parameter, you must specify the type parameter and specify one of the index and value parameters.
Yes By default, OSS Reader reads all data as strings.
fieldDelimiter The column delimiter.
Note You must specify the column delimiter for OSS Reader. The default delimiter is comma (,). The default setting for the column delimiter on the codeless UI is comma (,), too.
Yes ,
compress The compression format of the object. By default, this parameter is left empty, that is, objects are not compressed. OSS Reader supports the following object compression formats: GZIP, BZIP2, and ZIP. No By default, objects are not compressed.
encoding The encoding format of the object to read. No UTF-8
nullFormat The string that represents null. No standard strings can represent null in TXT objects. Therefore, Data Integration provides the nullFormat parameter to define which string represents a null pointer. For example, if you specify nullFormat="null", Data Integration considers null as a null pointer. You can use the following formula to escape empty strings: \N=\\N. No None
skipHeader Specifies whether to skip the header (if exists) of a CSV-like object. The skipHeader parameter is not supported for compressed objects. No false
csvReaderConfig The configurations for reading CSV objects. The parameter value must match the MAP type. A specific CSV reader is used to read data from CSV objects, which supports many configurations. No None

Configure OSS Reader by using the codeless UI

  1. Configure the connections.
    Configure the source and destination connections for the sync node.
    Parameter Description
    Connection The datasource parameter in the preceding parameter description. Select a connection type, and enter the name of a connection that has been configured in DataWorks.
    Object Name Prefix The object parameter in the preceding parameter description.
    Note If an OSS object is named based on the date, for example, named as aaa/20171024abc.txt, you can set the object parameter to aaa/${bdp.system.bizdate}abc.txt.
    Field Delimiter The fieldDelimiter parameter in the preceding parameter description. The default delimiter is comma (,).
    Encoding The encoding parameter in the preceding parameter description. The default encoding format is UTF-8.
    Null String The nullFormat parameter in the preceding parameter description. Enter a string that represents null. If the source connection contains the string, the string is replaced with null.
    Compression Format The compress parameter in the preceding parameter description. By default, objects are not compressed.
    Include Header The skipHeader parameter in the preceding parameter description. The default value is No.
  2. Configure field mapping, that is, the column parameter in the preceding parameter description.
    Fields in the source table on the left have a one-to-one mapping with fields in the destination table on the right. You can click Add to add a field, or move the pointer over a field and click the Delete icon to delete the field.
    Parameter Description
    Map Fields with the Same Name Click Map Fields with the Same Name to establish a mapping between fields with the same name. Note that the data types of the fields must match.
    Fields in the Same Line Click Map Fields in the Same Line to establish a mapping for fields in the same row. Note that the data types of the fields must match.
    Delete All Mappings Click Delete All Mappings to remove mappings that have been established.
  3. Configure channel control policies.
    Parameter Description
    Expected Maximum Concurrency The maximum number of concurrent threads to read and write data to data storage within the sync node. You can configure the concurrency for a node on the codeless UI.
    Bandwidth Throttling Specifies whether to enable bandwidth throttling. You can enable bandwidth throttling and set a maximum transmission rate to avoid heavy read workload of the source. We recommend that you enable bandwidth throttling and set the maximum transmission rate to a proper value.
    Dirty Data Records Allowed The maximum number of dirty data records allowed.
    Resource Group The resource group used for running the sync node. If a large number of nodes including this sync node are deployed on the default resource group, the sync node may need to wait for resources. We recommend that you purchase an exclusive resource group for data integration or add a custom resource group. For more information, see DataWorks exclusive resources and Add a custom resource group.

Configure OSS Reader by using the code editor

In the following code, a node is configured to read data from OSS. For more information about the parameters, see the preceding parameter description.
{
    "type":"job",
    "version":"2.0",// The version number.
    "steps":[
        {
            "stepType":"oss",// The reader type.
            "parameter":{
                "nullFormat":"", // The string that represents null.
                "compress":"", // The compression format.
                "datasource":"", // The connection name.
                "column":[ // The columns to be synchronized.
                    {
                        "index":0, // The ID of the column in the source table.
                        "type":"string" // The data type.
                    },
                    {
                        "index":1,
                        "type":"long"
                    },
                    {
                        "index":2,
                        "type":"double"
                    },
                    {
                        "index":3,
                        "type":"boolean"
                    },
                    {
                        "format":"yyyy-MM-dd HH:mm:ss", // The format of the time.
                        "index":4,
                        "type":"date"
                    }
                ],
                "skipHeader":"", // Specifies whether to skip the header (if exists) of a CSV-like object.
                "encoding":"", // The encoding format.
                "fieldDelimiter":",", // The column delimiter.
                "fileFormat": "", // The format of the object saved by OSS Reader.
                "Object":[] // The name of the OSS object to read.
            },
            "name":"Reader",
            "category":"reader"
        },
        {// The following template is used to configure Stream Writer. For more information, see the corresponding topic.
            "stepType":"stream",
            "parameter":{},
            "name":"Writer",
            "category":"writer"
        }
    ],
    "setting":{
        "errorLimit":{
            "record":"" // The maximum number of dirty data records allowed.
        },
        "speed":{
            "throttle":false,// Specifies whether to enable bandwidth throttling. A value of false indicates that the bandwidth is not throttled. A value of true indicates that the bandwidth is throttled. The maximum transmission rate takes effect only if you set this parameter to true.
            "concurrent":1,// The maximum number of concurrent threads.
        }
    },
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    }
}

Read ORC or Parquet files from OSS

Currently, OSS Reader uses HDFS Reader to read ORC or Parquet files from OSS. In addition to the original parameters, OSS Reader provides extended parameters Path and FileFormat. For more information about the extended parameters, see Configure HDFS Reader.

  • In the following code, OSS Reader is configured to read ORC files from OSS.
    
    {
          "stepType": "oss",
          "parameter": {
            "datasource": "",
            "fileFormat": "orc",
            "path": "/tests/case61/orc__691b6815_9260_4037_9899_aa8e61dc7e4b",
            "column": [
              {
                "index": 0,
                "type": "long"
              },
              {
                "index": "1",
                "type": "string"
              },
              {
                "index": "2",
                "type": "string"
              }
            ]
          }
        }
  • In the following code, OSS Reader is configured to read Parquet files from OSS.
    
    {
          "stepType": "oss",
          "parameter": {
            "datasource": "",
            "fileFormat": "parquet",
            "path": "/tests/case61/parquet",
            "parquetSchema": "message test { required int64 int64_col;\n required binary str_col (UTF8);\nrequired group params (MAP) {\nrepeated group key_value {\nrequired binary key (UTF8);\nrequired binary value (UTF8);\n}\n}\nrequired group params_arr (LIST) {\n  repeated group list {\n    required binary element (UTF8);\n  }\n}\nrequired group params_struct {\n  required int64 id;\n required binary name (UTF8);\n }\nrequired group params_arr_complex (LIST) {\n  repeated group list {\n    required group element {\n required int64 id;\n required binary name (UTF8);\n}\n  }\n}\nrequired group params_complex (MAP) {\nrepeated group key_value {\nrequired binary key (UTF8);\nrequired group value {\n  required int64 id;\n required binary name (UTF8);\n  }\n}\n}\nrequired group params_struct_complex {\n  required int64 id;\n required group detail {\n  required int64 id;\n required binary name (UTF8);\n  }\n  }\n}",
            "column": [
              {
                "index": 0,
                "type": "long"
              },
              {
                "index": "1",
                "type": "string"
              },
              {
                "index": "2",
                "type": "string"
              },
              {
                "index": "3",
                "type": "string"
              },
              {
                "index": "4",
                "type": "string"
              },
              {
                "index": "5",
                "type": "string"
              },
              {
                "index": "6",
                "type": "string"
              },
              {
                "index": "7",
                "type": "string"
              }
            ]
          }
        }