This topic describes the data types and parameters that are supported by Object Storage Service (OSS) Reader and how to configure OSS Reader by using the codeless user interface (UI) and code editor.

OSS Reader reads data stored in OSS. OSS Reader uses Alibaba Cloud OSS SDK for Java to read data from OSS. Then, OSS Reader converts the data to a format that is readable to Data Integration and sends the converted data to a writer. OSS Reader supports the following OSS data types: BIGINT, DOUBLE, STRING, DATETIME, and BOOLEAN.

OSS stores only unstructured data. OSS Reader provides the following features:
  • Reads data from TXT objects. The data in the objects must be logical two-dimensional tables.
  • Reads data from CSV-like objects with custom delimiters.
  • Reads data of various types as strings and supports constants and column pruning.
  • Supports recursive data read and object name-based filtering.
  • Supports object compression. The following compression formats are supported: GZIP, BZIP2, and ZIP.
    Note You cannot compress multiple objects into one package.
  • Uses parallel threads to read data from multiple objects.
OSS Reader does not support the following features:
  • Uses parallel threads to read data from a single object.
  • Reads data from an object that exceeds 100 GB in size.
References:

Data types

Category Data Integration data type OSS data type
Integer LONG LONG
String STRING STRING
Floating point DOUBLE DOUBLE
Boolean BOOLEAN BOOLEAN
Date and time DATE DATE

Parameters

Parameter Description Required Default value
datasource The name of the data source. It must be the same as the name of the added data source. You can add data sources by using the code editor. Yes No default value
Object The name of the OSS object from which you want to read data. You can specify multiple object names. For example, a bucket has a directory named yunshi, and this directory contains an object named ll.txt. In this case, you can set this parameter to yunshi/ll.txt.
  • If you specify a single OSS object name, OSS Reader uses only a single thread to read data. The feature of using parallel threads to read data from a single uncompressed object will be available in the future.
  • If you specify multiple OSS object names, OSS Reader uses parallel threads to read data. You can configure the number of parallel threads based on your business requirements.
  • If you specify a name that contains a wildcard, OSS Reader reads data from all objects that match the name. For example, if you set this parameter to abc[0-9], OSS Reader reads data from objects abc0 to abc9. We recommend that you do not use wildcards because an out of memory (OOM) error may occur. For more information, see What is OSS?
Note
  • Data Integration considers all objects in a synchronization node as a single table. Make sure that all objects in each synchronization node use the same schema.
  • Control the number of objects stored in a directory. If a directory contains excessive objects, an OOM error may occur. In this case, store the objects in different directories before you synchronize data.
Yes No default value
column The names of the columns from which you want to read data. The type parameter specifies the source data type. The index parameter specifies the ID of the column in the source table, starting from 0. The value parameter specifies the column value if the column is a constant column.
By default, OSS Reader reads all data as strings. You can specify the column parameter in the following format:
json
"column": ["*"]
You can also specify the column parameter in the following format:
json
"column":
    {
       "type": "long",
       "index": 0    // The first INT-type column in the object from which you want to read data. 
    },
    {
       "type": "string",
       "value": "alibaba"  // The value of the current column. In this code, the value is the constant alibaba. 
    }
Note For the column parameter, you must specify the type parameter and specify either the index or value parameter.
Yes No default value
fieldDelimiter The column delimiter that is used in the OSS object from which you want to read data.
Note You must specify a column delimiter for OSS Reader. The default column delimiter is commas (,). If you do not specify the column delimiter, the default column delimiter is used.

If the delimiter is non-printable, enter a value encoded in Unicode, such as \u001b and \u007c.

Yes ,
compress The format in which objects are compressed. By default, this parameter is left empty, which means that objects are not compressed. OSS Reader supports the following compression formats: GZIP, BZIP2, and ZIP. No No default value
encoding The encoding format of the object from which you want to read data. No utf-8
nullFormat The string that represents a null pointer. No standard strings can represent a null pointer in TXT files. You can use this parameter to define a string that represents a null pointer. For example, if you specify nullFormat="null", OSS Reader considers null as a null pointer. You can use the following formula to escape empty strings: \N=\\N. No No default value
skipHeader Specifies whether to skip the headers in a CSV-like object if the object has headers. The skipHeader parameter is unavailable for compressed objects. No false
csvReaderConfig The configurations required to read CSV objects. The parameter value must match the MAP type. You can use a CSV object reader to read data from CSV objects. The CSV object reader supports multiple configurations. If no configuration is performed, the default settings are used. No No default value

Configure OSS Reader by using the codeless UI

  1. Configure data sources.
    Configure Source and Target for the synchronization node. Configure data sources
    Parameter Description
    Connection The name of the data source from which you want to read data. This parameter is equivalent to the datasource parameter that is described in the preceding section.
    Object Name (Path Included) The name of the object from which you want to read data. This parameter is equivalent to the Object parameter that is described in the preceding section.
    Note If an OSS object is named based on the date, such as aaa/20171024abc.txt, you can set this parameter to aaa/${bdp.system.bizdate}abc.txt.
    Field Delimiter The column delimiter. This parameter is equivalent to the fieldDelimiter parameter that is described in the preceding section. By default, a comma (,) is used as a column delimiter.
    Encoding The encoding format. This parameter is equivalent to the encoding parameter that is described in the preceding section. Default value: UTF-8.
    Null String The string that represents a null pointer. This parameter is equivalent to the nullFormat parameter that is described in the preceding section. If the source contains the string, the string is replaced with null.
    Compression Format The format in which objects are compressed. This parameter is equivalent to the compress parameter that is described in the preceding section. By default, objects are not compressed.
    Include Header Specifies whether to skip the headers in the object. This parameter is equivalent to the skipHeader parameter that is described in the preceding section. Default value: No.
  2. Configure field mappings. This operation is equivalent to setting the column parameter that is described in the preceding section.
    Fields in the source on the left have a one-to-one mapping with fields in the destination on the right. You can click Add to add a field. To remove an added field, move the pointer over the field and click the Remove icon. Field mappings
    Operation Description
    Map Fields with the Same Name Click Map Fields with the Same Name to establish mappings between fields with the same name. The data types of the fields must match.
    Map Fields in the Same Line Click Map Fields in the Same Line to establish mappings between fields in the same row. The data types of the fields must match.
    Delete All Mappings Click Delete All Mappings to remove the mappings that are established.
  3. Configure channel control policies. Channel control
    Parameter Description
    Expected Maximum Concurrency The maximum number of parallel threads that the synchronization node uses to read data from the source or write data to the destination. You can configure the parallelism for the synchronization node on the codeless UI.
    Bandwidth Throttling Specifies whether to enable bandwidth throttling. You can enable bandwidth throttling and specify a maximum transmission rate to prevent heavy read workloads on the source. We recommend that you enable bandwidth throttling and set the maximum transmission rate to an appropriate value based on the configurations of the source.
    Dirty Data Records Allowed The maximum number of dirty data records allowed.
    Distributed Execution

    The distributed execution mode that allows you to split your node into pieces and distribute them to multiple Elastic Compute Service (ECS) instances for parallel execution. This speeds up synchronization. If you use a large number of parallel threads to run your synchronization node in distributed execution mode, excessive access requests are sent to the data sources. Therefore, before you use the distributed execution mode, you must evaluate the access load on the data sources. You can enable this mode only if you use an exclusive resource group for Data Integration. For more information about exclusive resource groups for Data Integration, see Exclusive resource groups for Data Integration and Create and use an exclusive resource group for Data Integration.

Configure OSS Reader by using the code editor

For more information about how to configure a synchronization node by using the code editor, see Create a sync node by using the code editor.

In the following code, a synchronization node is configured to read data from OSS. For more information about parameters, see the preceding parameter description.
{
    "type":"job",
    "version":"2.0",// The version number. 
    "steps":[
        {
            "stepType":"oss",// The reader type. 
            "parameter":{
                "nullFormat":"",// The string that represents a null pointer. 
                "compress":"",// The format in which objects are compressed. 
                "datasource":"",// The name of the data source. 
                "column":[// The names of the columns from which you want to read data. 
                    {
                        "index":0,// The ID of a column in the source object. 
                        "type":"string"// The source data type. 
                    },
                    {
                        "index":1,
                        "type":"long"
                    },
                    {
                        "index":2,
                        "type":"double"
                    },
                    {
                        "index":3,
                        "type":"boolean"
                    },
                    {
                        "format":"yyyy-MM-dd HH:mm:ss", // The time format. 
                        "index":4,
                        "type":"date"
                    }
                ],
                "skipHeader":"",// Specifies whether to skip the headers in a CSV-like object if the object has headers. 
                "encoding":"",// The encoding format. 
                "fieldDelimiter":",",// The column delimiter. 
                "fileFormat": "",// The format of the object. 
                "object":[]// The name of the object from which you want to read data. 
            },
            "name":"Reader",
            "category":"reader"
        },
        {
            "stepType":"stream",
            "parameter":{},
            "name":"Writer",
            "category":"writer"
        }
    ],
    "setting":{
        "errorLimit":{
            "record":""// The maximum number of dirty data records allowed. 
        },
        "speed":{
            "throttle":true,// Specifies whether to enable bandwidth throttling. The value false indicates that bandwidth throttling is disabled, and the value true indicates that bandwidth throttling is enabled. The mbps parameter takes effect only when the throttle parameter is set to true. 
            "concurrent":1 // The maximum number of parallel threads. 
            "mbps":"12",// The maximum transmission rate.
        }
    },
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    }
}

Read data from ORC or Parquet objects in OSS

OSS Reader reads data from ORC or Parquet objects in the way in which HDFS Reader reads data. In addition to the original parameters, OSS Reader provides extended parameters such as Path and FileFormat.

  • The following sample code provides an example on how to configure OSS Reader to read data from ORC objects in OSS:
    {
          "stepType": "oss",
          "parameter": {
            "datasource": "",
            "fileFormat": "orc",
            "path": "/tests/case61/orc__691b6815_9260_4037_9899_****",
            "column": [
              {
                "index": 0,
                "type": "long"
              },
              {
                "index": "1",
                "type": "string"
              },
              {
                "index": "2",
                "type": "string"
              }
            ]
          }
        }
  • The following sample code provides an example on how to configure OSS Reader to read data from Parquet objects in OSS:
    
    {
          "stepType": "oss",
          "parameter": {
            "datasource": "",
            "fileFormat": "parquet",
            "path": "/tests/case61/parquet",
            "parquetSchema": "message test { required int64 int64_col;\n required binary str_col (UTF8);\nrequired group params (MAP) {\nrepeated group key_value {\nrequired binary key (UTF8);\nrequired binary value (UTF8);\n}\n}\nrequired group params_arr (LIST) {\n  repeated group list {\n    required binary element (UTF8);\n  }\n}\nrequired group params_struct {\n  required int64 id;\n required binary name (UTF8);\n }\nrequired group params_arr_complex (LIST) {\n  repeated group list {\n    required group element {\n required int64 id;\n required binary name (UTF8);\n}\n  }\n}\nrequired group params_complex (MAP) {\nrepeated group key_value {\nrequired binary key (UTF8);\nrequired group value {\n  required int64 id;\n required binary name (UTF8);\n  }\n}\n}\nrequired group params_struct_complex {\n  required int64 id;\n required group detail {\n  required int64 id;\n required binary name (UTF8);\n  }\n  }\n}",
            "column": [
              {
                "index": 0,
                "type": "long"
              },
              {
                "index": "1",
                "type": "string"
              },
              {
                "index": "2",
                "type": "string"
              },
              {
                "index": "3",
                "type": "string"
              },
              {
                "index": "4",
                "type": "string"
              },
              {
                "index": "5",
                "type": "string"
              },
              {
                "index": "6",
                "type": "string"
              },
              {
                "index": "7",
                "type": "string"
              }
            ]
          }
        }