The OSS data source provides a bidirectional channel for reading data from and writing data to OSS. This topic describes how DataWorks synchronizes data with an OSS data source.
Supported field types and limits
Offline read
OSS Reader reads data from OSS, a service for storing unstructured data, and converts the data to the Data Integration protocol. OSS Reader supports the following features.
Supported | Unsupported |
|
|
When you prepare data in OSS, if the data is in a CSV file, the file must be in the standard CSV format. For example, if a column contains a double quotation mark ("), you must replace it with two double quotation marks (""). Otherwise, the file may be split incorrectly. If a file contains multiple delimiters, use the text file type.
OSS is an unstructured data source. Before you synchronize data, confirm that the field structure is as expected. Similarly, if the data structure in the source changes, you must update the field structure in the task configuration. Otherwise, data might be garbled during synchronization.
Offline write
OSS Writer converts data from the data synchronization protocol to text files and writes them to OSS, a service for storing unstructured data. OSS Writer supports the following features.
Supported | Unsupported |
|
|
Type category | Data Integration column configuration type |
Integer | LONG |
String | STRING |
Floating-point | DOUBLE |
Boolean | BOOLEAN |
Date and time | DATE |
Real-time write
Real-time writes are supported.
Real-time writes from a single table to data lakes, such as Hudi (0.12.x), Paimon, and Iceberg, are supported.
Create a data source
Before you develop a synchronization task in DataWorks, you must add the required data source to DataWorks by following the instructions in Data Source Management. You can view the infotips of parameters in the DataWorks console to understand the meanings of the parameters when you add a data source.
When you create an OSS data source for cross-account access, you must grant the required permissions to the relevant account. For more information, see Use a bucket policy to implement cross-account access to OSS.
To use a RAM role to configure the OSS data source, see Configure a data source using a RAM role for authorization.
When you create an OSS data source for a cross-region connection, you must use a public endpoint. For more information, see Overview of endpoints and network connectivity.
Develop a data synchronization task
For information about the entry point for and the procedure of configuring a synchronization task, see the following configuration guides.
Configure an offline synchronization task for a single table
For more information, see Configure a task in the codeless UI and Configure a task in the code editor.
For a complete list of parameters and a script demo for the code editor, see Appendix: Script demo and parameter description.
Configure a real-time synchronization task for a single table
For more information, see Configure a real-time synchronization task in Data Integration and Configure a real-time synchronization task in DataStudio.
Configure a full database synchronization task
For more information, see Full database offline synchronization tasks and Full database real-time synchronization tasks.
FAQ
Is there a limit on the number of OSS files that can be read?
How do I handle dirty data when I read a CSV file that has multiple delimiters?
Appendix: Script demo and parameter description
Configure a batch synchronization task by using the code editor
If you want to configure a batch synchronization task by using the code editor, you must configure the related parameters in the script based on the unified script format requirements. For more information, see Configuration in the code editor. The following information describes the parameters that you must configure for data sources when you configure a batch synchronization task by using the code editor.
Reader script demo: General example
{
"type":"job",
"version":"2.0",// The version number.
"steps":[
{
"stepType":"oss",// The plug-in name.
"parameter":{
"nullFormat":"",// Defines the string that can be interpreted as null.
"compress":"",// The text compression type.
"datasource":"",// The data source.
"column":[// The fields.
{
"index":0,// The column index.
"type":"string"// The data type.
},
{
"index":1,
"type":"long"
},
{
"index":2,
"type":"double"
},
{
"index":3,
"type":"boolean"
},
{
"format":"yyyy-MM-dd HH:mm:ss", // The time format.
"index":4,
"type":"date"
}
],
"skipHeader":"",// Specifies whether to skip the header in a CSV-like file.
"encoding":"",// The encoding format.
"fieldDelimiter":",",// The column delimiter.
"fileFormat": "",// The text type.
"object":[]// The object prefix.
},
"name":"Reader",
"category":"reader"
},
{
"stepType":"stream",
"parameter":{},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":""// The number of dirty data records allowed.
},
"speed":{
"throttle":true,// If you set throttle to false, the mbps parameter does not take effect and no rate limit is imposed. If you set throttle to true, a rate limit is imposed.
"concurrent":1, // The number of concurrent jobs.
"mbps":"12"// The rate limit. 1 Mbps is equal to 1 MB/s.
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}Reader script demo: Read ORC or Parquet files from OSS
To read files in ORC or Parquet format from OSS, DataWorks reuses HDFS Reader. In addition to the existing OSS Reader parameters, other configuration parameters, such as Path (ORC) and FileFormat (ORC, Parquet), are supported.
The following example shows how to read an ORC file from OSS.
{ "stepType": "oss", "parameter": { "datasource": "", "fileFormat": "orc", "path": "/tests/case61/orc__691b6815_9260_4037_9899_****", "column": [ { "index": 0, "type": "long" }, { "index": "1", "type": "string" }, { "index": "2", "type": "string" } ] } }The following example shows how to read a Parquet file from OSS.
{ "type":"job", "version":"2.0", "steps":[ { "stepType":"oss", "parameter":{ "nullFormat":"", "compress":"", "fileFormat":"parquet", "path":"/*", "parquetSchema":"message m { optional BINARY registration_dttm (UTF8); optional Int64 id; optional BINARY first_name (UTF8); optional BINARY last_name (UTF8); optional BINARY email (UTF8); optional BINARY gender (UTF8); optional BINARY ip_address (UTF8); optional BINARY cc (UTF8); optional BINARY country (UTF8); optional BINARY birthdate (UTF8); optional DOUBLE salary; optional BINARY title (UTF8); optional BINARY comments (UTF8); }", "column":[ { "index":"0", "type":"string" }, { "index":"1", "type":"long" }, { "index":"2", "type":"string" }, { "index":"3", "type":"string" }, { "index":"4", "type":"string" }, { "index":"5", "type":"string" }, { "index":"6", "type":"string" }, { "index":"7", "type":"string" }, { "index":"8", "type":"string" }, { "index":"9", "type":"string" }, { "index":"10", "type":"double" }, { "index":"11", "type":"string" }, { "index":"12", "type":"string" } ], "skipHeader":"false", "encoding":"UTF-8", "fieldDelimiter":",", "fieldDelimiterOrigin":",", "datasource":"wpw_demotest_oss", "envType":0, "object":[ "wpw_demo/userdata1.parquet" ] }, "name":"Reader", "category":"reader" }, { "stepType":"odps", "parameter":{ "partition":"dt=${bizdate}", "truncate":true, "datasource":"0_odps_wpw_demotest", "envType":0, "column":[ "id" ], "emptyAsNull":false, "table":"wpw_0827" }, "name":"Writer", "category":"writer" } ], "setting":{ "errorLimit":{ "record":"" }, "locale":"zh_CN", "speed":{ "throttle":false, "concurrent":2 } }, "order":{ "hops":[ { "from":"Reader", "to":"Writer" } ] } }
Reader script parameters
Parameter | Description | Required | Default value |
datasource | The name of the data source. You can add a data source in the code editor. The value of this parameter must be the same as the name of the added data source. | Yes | None |
Object | Specifies one or more objects to synchronize from OSS. You can specify an object using an explicit path, a wildcard character, or a dynamic parameter. 1. Configuration methods
Important
2. Concurrent read mechanism and performance The configuration method directly affects the concurrent performance of data extraction:
| Yes | None |
parquetSchema | This parameter is configured when you read a Parquet file from OSS. It takes effect only when fileFormat is set to parquet. This parameter specifies the data types stored in the Parquet file. You must ensure that the overall configuration conforms to the JSON syntax after you specify this parameter. The following list describes the format of the parquetSchema parameter:
The following example shows how to configure this parameter. | No | None |
column | The list of fields to read. `type` specifies the data type of the source data. `index` specifies the column from which to read data. The value of `index` starts from 0. `value` specifies that the current column is a constant. Data is not read from the source file but is automatically generated based on the value of this parameter. By default, you can read all data as the STRING type. The configuration is as follows. You can specify the column field information. The configuration is as follows. Note When you specify column information, you must specify the `type` parameter. You must specify either the `index` or `value` parameter. | Yes | All data is read as the STRING type. |
fileFormat | The format of the source files in OSS. Valid values: csv and text. Both formats support custom delimiters. | Yes | csv |
fieldDelimiter | The delimiter that separates columns in the source files. Note When OSS Reader reads data, you must specify a column delimiter. If you do not specify one, the comma (,) is used by default. The comma (,) is also the default value in the UI. If the delimiter is not a visible character, enter its Unicode representation. For example, \u001b or \u007c. | Yes | , |
lineDelimiter | The delimiter that separates rows in the source files. Note This parameter is valid only when fileFormat is set to text. | No | None |
compress | The compression format of the text files. The default value is empty, which indicates that the files are not compressed. Valid values: gzip, bzip2, and zip. | No | Not compressed |
encoding | The encoding format of the source files. | No | utf-8 |
nullFormat | In a text file, you cannot use a standard string to define a null pointer. Data Integration provides the nullFormat parameter to define which strings can be interpreted as null. Examples:
| No | None |
skipHeader | Specifies whether to skip the header in a CSV-like file. The default value is false. The skipHeader parameter is not supported for compressed files. | No | false |
csvReaderConfig | The parameters for reading a CSV file. This parameter is of the Map type. CsvReader is used to read CSV files. You can configure multiple parameters. If you do not configure this parameter, the default values are used. | No | None |
Writer script demo: General example
{
"type":"job",
"version":"2.0",
"steps":[
{
"stepType":"stream",
"parameter":{},
"name":"Reader",
"category":"reader"
},
{
"stepType":"oss",// The plug-in name.
"parameter":{
"nullFormat":"",// Data Integration provides the nullFormat parameter to define which strings can be interpreted as null.
"dateFormat":"",// The date format.
"datasource":"",// The data source.
"writeMode":"",// The write mode.
"writeSingleObject":"false", // Specifies whether to write the synchronized data to a single OSS file.
"encoding":"",// The encoding format.
"fieldDelimiter":",",// The column delimiter.
"fileFormat":"",// The text type.
"object":""// The object prefix.
},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"// The number of dirty data records allowed.
},
"speed":{
"throttle":true,// If you set throttle to false, the mbps parameter does not take effect and no rate limit is imposed. If you set throttle to true, a rate limit is imposed.
"concurrent":1, // The number of concurrent jobs.
"mbps":"12"// The rate limit. 1 Mbps is equal to 1 MB/s.
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}Writer script demo: Configure a script to write ORC or Parquet files to OSS
DataWorks reuses HDFS Writer to write ORC or Parquet files to OSS. In addition to the existing OSS Writer parameters, additional parameters, such as Path and FileFormat, are supported. For more information about the parameters, see HDFS Writer.
The following examples show how to write ORC or Parquet files to OSS:
The following code is only an example. You must modify the parameters based on your column names and types. Do not copy the code directly.
Write data to OSS in ORC format
You can write ORC files only in the code editor. Set fileFormat to
orc, set path to the destination file path, and configure column in the{"name":"your column name","type": "your column type"}format.The following ORC data types are supported for write operations:
Field type
Offline write to OSS (ORC format)
TINYINT
Supported
SMALLINT
Supported
INT
Supported
BIGINT
Supported
FLOAT
Supported
DOUBLE
Support
TIMESTAMP
Supported
DATE
Supported
VARCHAR
Supported
STRING
Supported
CHAR
Supported
BOOLEAN
Supported
DECIMAL
Supported
BINARY
Supported
{ "stepType": "oss", "parameter": { "datasource": "", "fileFormat": "orc", "path": "/tests/case61", "fileName": "orc", "writeMode": "append", "column": [ { "name": "col1", "type": "BIGINT" }, { "name": "col2", "type": "DOUBLE" }, { "name": "col3", "type": "STRING" } ], "writeMode": "append", "fieldDelimiter": "\t", "compress": "NONE", "encoding": "UTF-8" } }Write data to OSS in Parquet format
{ "stepType": "oss", "parameter": { "datasource": "", "fileFormat": "parquet", "path": "/tests/case61", "fileName": "test", "writeMode": "append", "fieldDelimiter": "\t", "compress": "SNAPPY", "encoding": "UTF-8", "parquetSchema": "message test { required int64 int64_col;\n required binary str_col (UTF8);\nrequired group params (MAP) {\nrepeated group key_value {\nrequired binary key (UTF8);\nrequired binary value (UTF8);\n}\n}\nrequired group params_arr (LIST) {\nrepeated group list {\nrequired binary element (UTF8);\n}\n}\nrequired group params_struct {\nrequired int64 id;\n required binary name (UTF8);\n }\nrequired group params_arr_complex (LIST) {\nrepeated group list {\nrequired group element {\n required int64 id;\n required binary name (UTF8);\n}\n}\n}\nrequired group params_complex (MAP) {\nrepeated group key_value {\nrequired binary key (UTF8);\nrequired group value {\nrequired int64 id;\n required binary name (UTF8);\n}\n}\n}\nrequired group params_struct_complex {\nrequired int64 id;\n required group detail {\nrequired int64 id;\n required binary name (UTF8);\n}\n}\n}", "dataxParquetMode": "fields" } }
Writer script parameters
Parameter | Description | Required | Default value |
datasource | The name of the data source. You can add a data source in the code editor. The value of this parameter must be the same as the name of the added data source. | Yes | None |
object | The name of the file to be written by OSS Writer. OSS uses file names to simulate folders. OSS has the following limits on object names:
If you do not want a random UUID as a suffix, set | Yes | None |
ossBlockSize | The size of an OSS part. The default size is 16 MB. This parameter is supported only when the file is written in parquet or ORC format. You can add this parameter at the same level as the object parameter. Because OSS multipart upload supports a maximum of 10,000 parts, the default size of a single file is limited to 160 GB. If the number of parts exceeds the limit, you can increase the part size to support larger file uploads. | No | 16 |
writeMode | The data processing method that OSS Writer uses before it writes data:
| Yes | None |
writeSingleObject | Specifies whether to write data to a single file in OSS:
Note
| No | false |
fileFormat | The format of the file to be written. The following formats are supported:
| No | text |
compress | The compression format of the data file to be written to OSS. This parameter must be configured in the code editor. Note Compression is not supported for csv and text file types. Parquet and ORC files support compression formats such as gzip and snappy. | No | None |
fieldDelimiter | The delimiter that separates columns in the destination file. | No | , |
encoding | The encoding format of the destination file. | No | utf-8 |
parquetSchema | This parameter is required when you write data to OSS in Parquet format. It is used to describe the structure of the destination file. This parameter takes effect only when fileFormat is set to parquet. The format is as follows. The following list describes the configuration items:
Note Each row setting must end with a semicolon, including the last row. Consider the following example. | No | None |
nullFormat | You cannot use a standard string to define null (null pointer) in a text file. The data synchronization system provides the nullFormat parameter to define a string that represents a null value. For example, if you configure | No | None |
header | The header of the file written to OSS. Example: | No | None |
maxFileSize (Advanced configuration, not supported in the codeless UI) | The maximum size of a single object file written to OSS. The default value is 10,000 × 10 MB. This is similar to controlling the size of a log file when you print log4j logs. When OSS performs multipart uploads, the size of each part is 10 MB. This is also the minimum granularity for log file rotation. A maxFileSize value less than 10 MB is treated as 10 MB. Each OSS InitiateMultipartUploadRequest supports a maximum of 10,000 parts. When rotation occurs, the object name is formed by adding a UUID and a suffix such as _1, _2, or _3 to the original object prefix. Note
| No | 100,000 |
suffix (Advanced configuration, not supported in the codeless UI) | The suffix of the file name generated when data is written. For example, if you set suffix to .csv, the final file name is fileName****.csv. | No | None |
Appendix: Conversion policy for Parquet data types
If you do not configure the parquetSchema parameter, DataWorks converts the data types of source fields based on a predefined policy. The following table describes this conversion policy.
Converted data type | Parquet type | Parquet logical type |
CHAR / VARCHAR / STRING | BINARY | UTF8 |
BOOLEAN | BOOLEAN | Not applicable |
BINARY / VARBINARY | BINARY | Not applicable |
DECIMAL | FIXED_LEN_BYTE_ARRAY | DECIMAL |
TINYINT | INT32 | INT_8 |
SMALLINT | INT32 | INT_16 |
INT/INTEGER | INT32 | Not applicable |
BIGINT | INT64 | Not applicable |
FLOAT | FLOAT | Not applicable |
DOUBLE | DOUBLE | Not applicable |
DATE | INT32 | DATE |
TIME | INT32 | TIME_MILLIS |
TIMESTAMP/DATETIME | INT96 | Not applicable |