The OSS data source provides a bidirectional channel to read data from and write data to OSS. This topic describes the data synchronization capabilities of the OSS data source in DataWorks.
Supported field types and limits
Offline read
OSS Reader reads data from OSS and converts the data to conform to the Data Integration protocol. OSS is a service for storing unstructured data. OSS Reader supports the following features for data integration.
Support | Unsupported |
|
|
When you prepare data in OSS, if the data is in a CSV file, the file must be in standard CSV format. For example, if a column contains a double quotation mark ("), you must replace it with two double quotation marks (""). Otherwise, the file may be split incorrectly. If the file contains multiple delimiters, use the text type.
OSS is an unstructured data source that stores file-type data. Therefore, before you run a sync task, confirm that the field structure meets your expectations. Similarly, if the data structure in the unstructured data source changes, you must update the field structure in the task configuration. Otherwise, the synchronized data may be corrupted.
Offline write
OSS Writer converts data based on the data synchronization protocol and writes the data to text files in OSS. OSS is a service for storing unstructured data. OSS Writer currently supports the following features.
Support | Unsupported |
|
|
Type classification | Data Integration column configuration type |
Integer | LONG |
String | STRING |
Floating-point | DOUBLE |
Boolean | BOOLEAN |
Date and time | DATE |
Real-time write
Supports real-time writes.
Supports real-time writes from a single table to data lakes such as Hudi (0.12.x), Paimon, and Iceberg.
Create a data source
Before you develop a synchronization task in DataWorks, you must add the required data source to DataWorks by following the instructions in Data Source Management. You can view parameter descriptions in the DataWorks console to understand the meanings of the parameters when you add a data source.
When you create an OSS data source for another account, you must grant the necessary permissions to that account. For more information, see Implement cross-account access to OSS based on a bucket policy.
If you use a RAM role to authorize the OSS data source, see Configure a data source using a RAM role for authorization.
When you create an OSS data source in a different region, use a public endpoint for the connection. For more information, see Overview of endpoints and network connectivity.
Develop a data synchronization task
For information about the entry point for and the procedure of configuring a synchronization task, see the following configuration guides.
Configuration guide for single-table offline sync tasks
For more information about the procedure, see Configure a task in the codeless UI and Configure a task in the code editor.
For a list of all parameters and a script demo for the code editor, see Appendix: Script demos and parameter descriptions.
Configuration guide for single-table real-time sync tasks
For more information about the procedure, see Configure a real-time sync task in Data Integration and Configure a real-time sync task in DataStudio.
Configuration guide for full-database synchronization
For more information about the procedure, see Full-database offline sync task and Full-database real-time sync task.
FAQ
Is there a limit on the number of files that can be read from OSS?
How do I handle dirty data when reading a CSV file with multiple delimiters?
Appendix: Script demos and parameter descriptions
Configure a batch synchronization task by using the code editor
If you want to configure a batch synchronization task by using the code editor, you must configure the related parameters in the script based on the unified script format requirements. For more information, see Configure a task in the code editor. The following information describes the parameters that you must configure for data sources when you configure a batch synchronization task by using the code editor.
Reader script demo: General example
{
"type":"job",
"version":"2.0",// The version number.
"steps":[
{
"stepType":"oss",// The plugin name.
"parameter":{
"nullFormat":"",// Defines the string that can be interpreted as null.
"compress":"",// The text compression type.
"datasource":"",// The data source.
"column":[// The fields.
{
"index":0,// The column index.
"type":"string"// The data type.
},
{
"index":1,
"type":"long"
},
{
"index":2,
"type":"double"
},
{
"index":3,
"type":"boolean"
},
{
"format":"yyyy-MM-dd HH:mm:ss", // The time format.
"index":4,
"type":"date"
}
],
"skipHeader":"",// If a CSV-like file has a header row, skip it.
"encoding":"",// The encoding format.
"fieldDelimiter":",",// The column delimiter.
"fileFormat": "",// The text type.
"object":[]// The object prefix.
},
"name":"Reader",
"category":"reader"
},
{
"stepType":"stream",
"parameter":{},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":""// The number of error records.
},
"speed":{
"throttle":true,// If throttle is set to false, the mbps parameter does not take effect, which means that the data rate is not limited. If throttle is set to true, the data rate is limited.
"concurrent":1, // The number of concurrent jobs.
"mbps":"12"// The maximum data rate. 1 mbps = 1 MB/s.
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}Reader script demo: Read ORC or Parquet files from OSS
You can read files in ORC or Parquet format from OSS using the HDFS Reader. In addition to the existing parameters of the OSS Reader, this method adds extended configuration parameters, such as Path (for ORC) and FileFormat (for ORC and Parquet).
The following example shows how to read an ORC file from OSS.
{ "stepType": "oss", "parameter": { "datasource": "", "fileFormat": "orc", "path": "/tests/case61/orc__691b6815_9260_4037_9899_****", "column": [ { "index": 0, "type": "long" }, { "index": "1", "type": "string" }, { "index": "2", "type": "string" } ] } }The following example shows how to read a Parquet file from OSS.
{ "type":"job", "version":"2.0", "steps":[ { "stepType":"oss", "parameter":{ "nullFormat":"", "compress":"", "fileFormat":"parquet", "path":"/*", "parquetSchema":"message m { optional BINARY registration_dttm (UTF8); optional Int64 id; optional BINARY first_name (UTF8); optional BINARY last_name (UTF8); optional BINARY email (UTF8); optional BINARY gender (UTF8); optional BINARY ip_address (UTF8); optional BINARY cc (UTF8); optional BINARY country (UTF8); optional BINARY birthdate (UTF8); optional DOUBLE salary; optional BINARY title (UTF8); optional BINARY comments (UTF8); }", "column":[ { "index":"0", "type":"string" }, { "index":"1", "type":"long" }, { "index":"2", "type":"string" }, { "index":"3", "type":"string" }, { "index":"4", "type":"string" }, { "index":"5", "type":"string" }, { "index":"6", "type":"string" }, { "index":"7", "type":"string" }, { "index":"8", "type":"string" }, { "index":"9", "type":"string" }, { "index":"10", "type":"double" }, { "index":"11", "type":"string" }, { "index":"12", "type":"string" } ], "skipHeader":"false", "encoding":"UTF-8", "fieldDelimiter":",", "fieldDelimiterOrigin":",", "datasource":"wpw_demotest_oss", "envType":0, "object":[ "wpw_demo/userdata1.parquet" ] }, "name":"Reader", "category":"reader" }, { "stepType":"odps", "parameter":{ "partition":"dt=${bizdate}", "truncate":true, "datasource":"0_odps_wpw_demotest", "envType":0, "column":[ "id" ], "emptyAsNull":false, "table":"wpw_0827" }, "name":"Writer", "category":"writer" } ], "setting":{ "errorLimit":{ "record":"" }, "locale":"zh_CN", "speed":{ "throttle":false, "concurrent":2 } }, "order":{ "hops":[ { "from":"Reader", "to":"Writer" } ] } }
Reader script parameters
Parameter | Description | Required | Default value |
datasource | The name of the data source. In the code editor, you can add a data source. The value of this parameter must be the same as the name of the added data source. | Yes | None |
Object | Specifies one or more objects to synchronize from OSS. You can configure this parameter in three ways: explicit path, wildcard path, and dynamic parameter path. 1. Configuration methods
Important
2. Concurrent read mechanism and performance The configuration method directly affects the concurrent performance of data extraction:
| Yes | None |
parquetSchema | This parameter is configured when reading a Parquet file from OSS. It takes effect only when fileFormat is set to parquet. It specifies the type description of the Parquet storage. After you fill in the parquetSchema, ensure that the overall configuration conforms to JSON syntax. The format of the parquetSchema configuration is as follows:
The following is a configuration example. | No | None |
column | The list of fields to read. `type` specifies the type of the source data. `index` specifies which column of the text the current column comes from (starting from 0). `value` specifies that the current type is a constant. The data is not read from the source file, but the corresponding column is automatically generated based on the `value`. By default, you can read all data as the String type with the following configuration. You can specify the column field information with the following configuration. Note For the column information you specify, `type` is required, and you must choose either `index` or `value`. | Yes | All data is read as the STRING type. |
fileFormat | The text type. The file type of the source OSS file. For example, csv or text. Both formats support custom delimiters. | Yes | csv |
fieldDelimiter | The column delimiter for reading. Note When OSS Reader reads data, you need to specify a column delimiter. If you do not specify one, the default is a comma (,). The UI configuration also defaults to a comma (,). If the delimiter is not visible, enter the Unicode encoding. For example, \u001b or \u007c. | Yes | , |
lineDelimiter | The row delimiter for reading. Note This parameter is valid only when fileFormat is set to text. | No | None |
compress | The text compression type. By default, this is not filled in, which means no compression. Supported compression types are gzip, bzip2, and zip. | No | No compression |
encoding | The encoding configuration for reading files. | No | utf-8 |
nullFormat | In text files, null (a null pointer) cannot be defined with a standard string. Data synchronization provides nullFormat to define which strings can represent null. For example:
| No | None |
skipHeader | CSV-like files may have a header row that needs to be skipped. By default, it is not skipped. skipHeader is not supported in compressed file mode. | No | false |
csvReaderConfig | Parameter configuration for reading CSV files. This is a Map type. CSV files are read using CsvReader, which has many configurations. If not configured, default values are used. | No | None |
Writer script demo: General example
{
"type":"job",
"version":"2.0",
"steps":[
{
"stepType":"stream",
"parameter":{},
"name":"Reader",
"category":"reader"
},
{
"stepType":"oss",// The plugin name.
"parameter":{
"nullFormat":"",// The data synchronization system provides nullFormat to define which strings can represent null.
"dateFormat":"",// The date format.
"datasource":"",// The data source.
"writeMode":"",// The write mode.
"writeSingleObject":"false", // Specifies whether to write synchronized data to a single OSS file.
"encoding":"",// The encoding format.
"fieldDelimiter":",",// The column delimiter.
"fileFormat":"",// The text type.
"object":""// The object prefix.
},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"// The number of error records.
},
"speed":{
"throttle":true,// If throttle is set to false, the mbps parameter does not take effect, which means that the data rate is not limited. If throttle is set to true, the data rate is limited.
"concurrent":1, // The number of concurrent jobs.
"mbps":"12"// The maximum data rate. 1 mbps = 1 MB/s.
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}Writer script demo: Write ORC or Parquet files to OSS
You can write ORC or Parquet files to OSS by reusing the HDFS Writer. In addition to the existing OSS Writer parameters, extended configuration parameters such as Path and FileFormat are added. For more information about these parameters, see HDFS Writer.
The following are examples of writing ORC or Parquet files to OSS:
The following are examples only. Modify the parameters based on your specific column names and types. Do not copy and use them directly.
Write to OSS in ORC file format
To write an ORC file, you must use the code editor. In the code editor, set the fileFormat parameter to
orcand the path parameter to the destination file path. The format for the column parameter is{"name":"your column name","type": "your column type"}.The following ORC types are currently supported for writing:
Field type
Offline write to OSS (ORC format)
TINYINT
Support
SMALLINT
Supported
INT
Supported
BIGINT
Support
FLOAT
Supported
DOUBLE
Supported
TIMESTAMP
Supported
DATE
Supported
VARCHAR
Help and support
STRING
Support
CHAR
Supported
BOOLEAN
Support
DECIMAL
Support
BINARY
Supported
{ "stepType": "oss", "parameter": { "datasource": "", "fileFormat": "orc", "path": "/tests/case61", "fileName": "orc", "writeMode": "append", "column": [ { "name": "col1", "type": "BIGINT" }, { "name": "col2", "type": "DOUBLE" }, { "name": "col3", "type": "STRING" } ], "fieldDelimiter": "\t", "compress": "NONE", "encoding": "UTF-8" } }Write to OSS in Parquet file format
{ "stepType": "oss", "parameter": { "datasource": "", "fileFormat": "parquet", "path": "/tests/case61", "fileName": "test", "writeMode": "append", "fieldDelimiter": "\t", "compress": "SNAPPY", "encoding": "UTF-8", "parquetSchema": "message test { required int64 int64_col;\n required binary str_col (UTF8);\nrequired group params (MAP) {\nrepeated group key_value {\nrequired binary key (UTF8);\nrequired binary value (UTF8);\n}\n}\nrequired group params_arr (LIST) {\nrepeated group list {\nrequired binary element (UTF8);\n}\n}\nrequired group params_struct {\nrequired int64 id;\n required binary name (UTF8);\n }\nrequired group params_arr_complex (LIST) {\nrepeated group list {\nrequired group element {\n required int64 id;\n required binary name (UTF8);\n}\n}\n}\nrequired group params_complex (MAP) {\nrepeated group key_value {\nrequired binary key (UTF8);\nrequired group value {\nrequired int64 id;\n required binary name (UTF8);\n}\n}\n}\nrequired group params_struct_complex {\nrequired int64 id;\n required group detail {\nrequired int64 id;\n required binary name (UTF8);\n}\n}\n}", "dataxParquetMode": "fields" } }
Writer script parameters
Parameter | Description | Required | Default value |
datasource | The name of the data source. In the code editor, you can add a data source. The value of this parameter must be the same as the name of the added data source. | Yes | None |
object | The name of the file written by OSS Writer. OSS uses file names to simulate directories. OSS has the following restrictions on object names:
If you do not need a random UUID suffix, we recommend that you configure | Yes | None |
ossBlockSize | The OSS block size. The default block size is 16 MB. When the file is written in parquet or ORC format, you can add this parameter at the same level as the object parameter. Because OSS multipart upload supports a maximum of 10,000 blocks, the default single file size is limited to 160 GB. If the number of blocks exceeds the limit, you can increase the block size to support larger file uploads. | No | 16 |
writeMode | The data processing method before OSS Writer writes data:
| Yes | None |
writeSingleObject | Specifies whether to write data to a single file when writing to OSS:
Note
| No | false |
fileFormat | The format of the output file. The following formats are supported:
| No | text |
compress | The compression format of the data file written to OSS. This must be configured in a script task. Important CSV and TEXT file types do not support compression. Parquet/ORC files only support SNAPPY compression. | No | None |
fieldDelimiter | The column delimiter for writing. | No | , |
encoding | The encoding configuration of the output file. | No | utf-8 |
parquetSchema | This is a required parameter for writing to OSS in Parquet file format. It describes the structure of the object file. Therefore, this parameter is effective only when fileFormat is set to parquet. The format is as follows. The configuration items are described as follows:
Note Each row setting must end with a semicolon. The last row must also have a semicolon. The following is an example. | No | None |
nullFormat | In text files, null (a null pointer) cannot be defined with a standard string. The data synchronization system provides nullFormat to define which strings can represent null. For example, if you configure | No | None |
header | The header of the output file when writing to OSS. For example, | No | None |
maxFileSize (Advanced configuration, not supported in codeless UI) | The maximum size of a single object file when writing to OSS. The default is 10,000 × 10 MB. This is similar to controlling the size of a log file when printing log4j logs. When uploading in parts to OSS, each part is 10 MB (which is also the minimum granularity for log rotation files, meaning a maxFileSize less than 10 MB will be treated as 10 MB). Each OSS InitiateMultipartUploadRequest supports a maximum of 10,000 parts. When rotation occurs, the object name rule is to append suffixes like _1, _2, _3 to the original object prefix with a random UUID. Note
| No | 100,000 |
suffix (Advanced configuration, not supported in codeless UI) | The suffix of the file name generated when data synchronization writes data. For example, if you configure suffix as .csv, the final written file name will be fileName****.csv. | No | None |
Appendix: Conversion policy for Parquet data types
If you do not configure the parquetSchema parameter, DataWorks converts the data types based on the source field types. The following table describes the conversion policy.
Converted data type | Parquet type | Parquet logical type |
CHAR / VARCHAR / STRING | BINARY | UTF8 |
BOOLEAN | BOOLEAN | Not applicable |
BINARY / VARBINARY | BINARY | Not applicable |
DECIMAL | FIXED_LEN_BYTE_ARRAY | DECIMAL |
TINYINT | INT32 | INT_8 |
SMALLINT | INT32 | INT_16 |
INT/INTEGER | INT32 | Not applicable |
BIGINT | INT64 | Not applicable |
FLOAT | FLOAT | Not applicable |
DOUBLE | DOUBLE | Not applicable |
DATE | INT32 | DATE |
TIME | INT32 | TIME_MILLIS |
TIMESTAMP/DATETIME | INT96 | Not applicable |