This topic describes the data types and parameters that OSS Reader supports and how to configure it by using the codeless user interface (UI) and code editor.

OSS Reader can read data that is stored in Object Storage Service (OSS). OSS Reader connects to OSS by using the official OSS SDK for Java, reads data from OSS, converts the data to a format that is readable by Data Integration, and then sends the converted data to a writer. OSS Reader supports the following OSS data types: BIGINT, DOUBLE, STRING, DATATIME, and BOOLEAN.

OSS stores unstructured data only. OSS Reader supports the following features:
  • Reads TXT objects that store logical two-dimensional tables. OSS Reader can read only TXT objects.
  • Reads data that is stored in formats similar to CSV with custom delimiters.
  • Reads data of various types as strings and supports constants and column pruning.
  • Supports recursive reading and object name-based filtering.
  • Supports the following object compression formats: GZIP, BZIP2, and ZIP.
    Note You cannot compress multiple objects into one package.
  • Reads multiple objects concurrently.
OSS Reader does not support the following features:
  • Uses concurrent threads to read an uncompressed object.
  • Uses concurrent threads to read a compressed object.
  • Reads an object that exceeds 100 GB in size.
Reference:

Data types

Category Data Integration data type OSS data type
Integer LONG LONG
String STRING STRING
Floating point DOUBLE DOUBLE
Boolean BOOLEAN BOOL
Date and time DATE DATE

Parameters

Parameter Description Required Default value
datasource The connection name. It must be the same as the name of the created connection. You can create connections in the code editor. Yes N/A
Object The name of the OSS object to read. You can specify multiple object names. For example, if a bucket has a directory that is named yunshi and this directory contains an object that is named ll.txt, you can set this parameter to yunshi/ll.txt.
  • If you specify a single OSS object, OSS Reader uses only one thread to read the object. Concurrent multi-thread reading of a single uncompressed object is coming soon.
  • If you specify multiple OSS objects, OSS Reader uses multiple threads to read these objects. The actual number of threads is determined by the number of channels.
  • When a name contains a wildcard, OSS Reader attempts to read all objects that match the name. For example, if you set the value to abc[0-9], OSS Reader reads objects abc0 to abc9. We recommend that you do not use wildcards because wildcards may cause out of memory (OOM). For more information, see What is OSS?
Note
  • Data Integration considers all the objects on a sync node as a single table. Make sure that all the objects on each sync node can adapt to the same schema.
  • Control the number of objects that are stored in a single directory. If a directory contains a large number of objects, an OOM error may be returned. In this case, store the objects in different directories and then synchronize data.
Yes N/A
column The columns to read. The type parameter specifies the source data type. The index parameter specifies the ID of the column in the source table, starting from 0. The value parameter specifies the column value if the column is a constant column.
By default, OSS Reader reads all data as strings. You can specify the column parameter in the following way:
json
"column": ["*"]
You can also specify the column parameter in the following way:
json
"column":
    {
       "type": "long",
       "index": 0 // The first INT-type column of the source object.
    },
    {
       "type": "string",
       "value": "alibaba" // The value of the current column. In this code, the value is a constant "alibaba."
    }
Note For the column parameter, you must specify the type parameter and specify one of the index and value parameters.
Yes By default, OSS Reader reads all data as strings.
fieldDelimiter The column delimiter.
Note You must specify the column delimiter for OSS Reader. The default delimiter is comma (,). The default setting for the column delimiter on the codeless UI is comma (,), too.

If the delimiter is non-printable, enter a value in Unicode format, for example, \u001b, \u007c.

Yes ,
compress The compression format of the object. By default, this parameter is left empty, that is, objects are not compressed. OSS Reader supports the following object compression formats: GZIP, BZIP2, and ZIP. No By default, objects are not compressed.
encoding The encoding format of the object to read. No utf-8
nullFormat The string that represents null. No standard strings can represent null in TXT objects. Therefore, Data Integration provides the nullFormat parameter to define which string represents a null pointer. For example, if you specify nullFormat="null", Data Integration considers null as a null pointer. You can use the following formula to escape empty strings: \N=\\N. No N/A
skipHeader Specifies whether to skip the header (if exists) of a CSV-like object. The skipHeader parameter is not supported for compressed objects. No false
csvReaderConfig The configurations for reading CSV objects. The parameter value must match the MAP type. A specific CSV reader is used to read data from CSV objects, which supports many configurations. No N/A

Configure OSS Reader by using the codeless UI

  1. Configure the connections.
    Configure the connections to the source and destination data stores for the sync node.Connections section
    Parameter Description
    Connection The datasource parameter in the preceding parameter description. Select a connection type and select the name of a connection that you have configured in DataWorks.
    Object Name Prefix The object parameter in the preceding parameter description.
    Note If an OSS object is named based on the date, for example, named as aaa/20171024abc.txt, you can set the object parameter to aaa/${bdp.system.bizdate}abc.txt.
    Field Delimiter The fieldDelimiter parameter in the preceding parameter description. The default delimiter is comma (,).
    Encoding The encoding parameter in the preceding parameter description. The default encoding format is UTF-8.
    Null String The nullFormat parameter in the preceding parameter description. Enter a string that represents null. If the source data store contains the string, the string is replaced with null.
    Compression Format The compress parameter in the preceding parameter description. By default, objects are not compressed.
    Include Header The skipHeader parameter in the preceding parameter description. The default value is No.
  2. Configure field mapping, that is, the column parameter in the preceding parameter description.
    Fields in the source table on the left have a one-to-one mapping with fields in the destination table on the right. You can click Add to add a field. To delete a field, move the pointer over the field and click theDelete icon.Mappings section
    GUI element Description
    Map Fields with the Same Name Click Map Fields with the Same Name to establish a mapping between fields with the same name. The data types of the fields must match.
    Map Fields in the Same Line Click Map Fields in the Same Line to establish a mapping between fields in the same row. The data types of the fields must match.
    Delete All Mappings Click Delete All Mappings to remove mappings that have been established.
  3. Configure channel control policies.Channel section
    Parameter Description
    Expected Maximum Concurrency The maximum number of concurrent threads that the sync node uses to read data from or write data to data stores. You can configure the concurrency for the node on the codeless UI.
    Bandwidth Throttling Specifies whether to enable bandwidth throttling. You can enable bandwidth throttling and set a maximum transmission rate to avoid heavy read workload of the source. We recommend that you enable bandwidth throttling and set the maximum transmission rate to a proper value.
    Dirty Data Records Allowed The maximum number of dirty data records allowed.

Configure OSS Reader by using the code editor

You can configure OSS Reader by using the code editor. For more information, see Create a sync node by using the code editor.

The following example shows how to configure a sync node to read data from OSS in the code editor. For more information about the parameters, see the preceding parameter description.
{
    "type":"job",
    "version":"2.0",// The version number.
    "steps":[
        {
            "stepType":"oss", // The reader type.
            "parameter":{
                "nullFormat":"", // The string that represents null.
                "compress":"",// The compression format.
                "datasource":"",// The connection name.
                "column":[// The columns to be synchronized from the source table.
                    {
                        "index":0, // The ID of the column in the source table.
                        "type":"string" // The data type.
                    },
                    {
                        "index":1,
                        "type":"long"
                    },
                    {
                        "index":2,
                        "type":"double"
                    },
                    {
                        "index":3,
                        "type":"boolean"
                    },
                    {
                        "format":"yyyy-MM-dd HH:mm:ss", // The format of the time.
                        "index":4,
                        "type":"date"
                    }
                ],
                "skipHeader":"", // Specifies whether to skip the header (if exists) of a CSV-like object.
                "encoding":"", // The encoding format.
                "fieldDelimiter":",",// The column delimiter.
                "fileFormat": "",// The format of the object that is saved by OSS Reader.
                "Object":[] // The name of the OSS object to read.
            },
            "name":"Reader",
            "category":"reader"
        },
        {
            "stepType":"stream",
            "parameter":{},
            "name":"Writer",
            "category":"writer"
        }
    ],
    "setting":{
        "errorLimit":{
            "record":"" // The maximum number of dirty data records allowed.
        },
        "speed":{
            "throttle":false,// Specifies whether to enable bandwidth throttling. A value of false indicates that the bandwidth is not throttled. A value of true indicates that the bandwidth is throttled. The maximum transmission rate takes effect only if you set this parameter to true.
            "concurrent":1,// The maximum number of concurrent threads.
        }
    },
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    }
}

Read ORC or Parquet files from OSS

OSS Reader uses HDFS Reader to read ORC or Parquet files from OSS. In addition to the original parameters, OSS Reader provides extended parameters Path and FileFormat.

  • The following example shows how to configure OSS Reader to read ORC files from OSS.
    {
          "stepType": "oss",
          "parameter": {
            "datasource": "",
            "fileFormat": "orc",
            "path": "/tests/case61/orc__691b6815_9260_4037_9899_****",
            "column": [
              {
                "index": 0,
                "type": "long"
              },
              {
                "index": "1",
                "type": "string"
              },
              {
                "index": "2",
                "type": "string"
              }
            ]
          }
        }
  • The following example shows how to configure OSS Reader to read Parquet files from OSS.
    
    {
          "stepType": "oss",
          "parameter": {
            "datasource": "",
            "fileFormat": "parquet",
            "path": "/tests/case61/parquet",
            "parquetSchema": "message test { required int64 int64_col;\n required binary str_col (UTF8);\nrequired group params (MAP) {\nrepeated group key_value {\nrequired binary key (UTF8);\nrequired binary value (UTF8);\n}\n}\nrequired group params_arr (LIST) {\n  repeated group list {\n    required binary element (UTF8);\n  }\n}\nrequired group params_struct {\n  required int64 id;\n required binary name (UTF8);\n }\nrequired group params_arr_complex (LIST) {\n  repeated group list {\n    required group element {\n required int64 id;\n required binary name (UTF8);\n}\n  }\n}\nrequired group params_complex (MAP) {\nrepeated group key_value {\nrequired binary key (UTF8);\nrequired group value {\n  required int64 id;\n required binary name (UTF8);\n  }\n}\n}\nrequired group params_struct_complex {\n  required int64 id;\n required group detail {\n  required int64 id;\n required binary name (UTF8);\n  }\n  }\n}",
            "column": [
              {
                "index": 0,
                "type": "long"
              },
              {
                "index": "1",
                "type": "string"
              },
              {
                "index": "2",
                "type": "string"
              },
              {
                "index": "3",
                "type": "string"
              },
              {
                "index": "4",
                "type": "string"
              },
              {
                "index": "5",
                "type": "string"
              },
              {
                "index": "6",
                "type": "string"
              },
              {
                "index": "7",
                "type": "string"
              }
            ]
          }
        }