MaxCompute provides an unstructured data processing framework. This framework allows you to export unstructured data from MaxCompute to Object Storage Service (OSS) by using INSERT statements. This topic describes how to export unstructured data from MaxCompute to OSS.

Prerequisites

Before you export unstructured data to OSS, make sure that the following requirements are met:

Background information

You can use a built-in extractor or a custom extractor of MaxCompute to export unstructured data from an internal table or an external table of MaxCompute to OSS. This process is similar to the process of accessing OSS data.
  • Export unstructured data to OSS by using a built-in extractor

    When you use a built-in extractor of MaxCompute, you must execute a table creation statement to create an OSS external table. Then, execute an INSERT statement to export unstructured data from the external table to a file in the TSV or CSV format in OSS.

  • Export unstructured data to OSS by using a custom storage handler

    The unstructured data processing framework also provides a general-purpose SDK to customize extractors. You can use extractors to export files in custom data formats.

    The process of exporting unstructured data to OSS by using a custom storage handler is the same as that by using a built-in extractor. You must create an OSS external table and perform an INSERT operation on the external table to export unstructured data to OSS. However, if you export unstructured data to OSS by using a custom storage handler, you must specify a custom extractor in the STORED BY clause in the statement that is used to create an external table.
    Note The unstructured data processing framework of MaxCompute uses a StorageHandler class to describe the processing required for various data storage formats. StorageHandler is used as a wrapper class that can be used to specify custom extractors and outputers. An extractor is used to read, parse, and extract data, whereas an outputer is used to process and write data. A custom storage handler inherits the OdpsStorageHandler class to implement the getExtractorClass and getOutputerClass methods.

For more information about how to an OSS external table, see Access OSS data by using the built-in extractor.

Limits

When you perform an INSERT operation on an external table to write data to files in OSS, the size of each file in OSS cannot exceed 5 GB.

Export unstructured data to OSS by using a built-in extractor

In this example, the following information is used:
  • OSS internal endpoint: oss-cn-hangzhou-internal.aliyuncs.com, which indicates that OSS is deployed in the China (Hangzhou) region.
  • Directory of data stored in a non-partitioned table: oss://oss-cn-hangzhou-internal.aliyuncs.com/oss-mc-test/Demo1/output/.
  • Directory of data stored in a partitioned table: oss://oss-cn-hangzhou-internal.aliyuncs.com/oss-mc-test/Demo2/output/.

Procedure:

  1. Log on to the MaxCompute client and create an OSS external table.

    The following code provides an example:

    • Create a non-partitioned table
      create external table if not exists mc_oss_csv_external4
      (
      vehicleId int,
      recordId int,
      patientId int,
      calls int,
      locationLatitute double,
      locationLongtitue double,
      recordTime string,
      direction string
      )
      stored by 'com.aliyun.odps.CsvStorageHandler' 
      with serdeproperties (
       'odps.properties.rolearn'='acs:ram::xxxxxx:role/aliyunodpsdefaultrole'
      ) 
      location 'oss://oss-cn-hangzhou-internal.aliyuncs.com/oss-mc-test/Demo1/output/';                       
    • Create a partitioned table
      create external table if not exists mc_oss_csv_external5
      (
      vehicleId int,
      recordId int,
      patientId int,
      calls int,
      locationLatitute double,
      locationLongtitue double,
      recordTime string
      )
      partitioned by (
      direction string
      )
      stored by 'com.aliyun.odps.CsvStorageHandler' 
      with serdeproperties (
       'odps.properties.rolearn'='acs:ram::xxxxxx:role/aliyunodpsdefaultrole'
      ) 
      location 'oss://oss-cn-hangzhou-internal.aliyuncs.com/oss-mc-test/Demo2/output/';                      
    Note If the odps.properties.rolearn property is not specified in WITH SERDEPROPERTIES and a plaintext AccessKey pair is used for authorization, the value of location is in the following format:
    location 'oss://<accessKeyId>:<accessKeySecret>@<oss_endpoint>/<Bucket name>/<Directory name>/'
  2. Execute the INSERT OVERWRITE or INSERT INTO statement on the external table on the MaxCompute client to export the unstructured data to OSS.

    For more information about the INSERT operation, see Insert or update data into a table or static partitions (INSERT INTO and INSERT OVERWRITE) or Insert data into dynamic partitions (DYNAMIC PARTITION).

    The following code provides an example:
    • Export unstructured data from a non-partitioned table to OSS
      insert into table mc_oss_csv_external4 select * from mc_oss_csv_external1;

      For more information about the data in the mc_oss_csv_external1 table, see Access OSS data by using the built-in extractor.

      After the file is exported, you can view the file in the OSS directory. An .odps folder is generated under the output folder. The .odps folder contains .csv and .meta files.

      Export result
    • Export unstructured data from a partitioned table to OSS
      insert into table mc_oss_csv_external5 partition (direction) select * from mc_oss_csv_external2;

      For more information about the data in the mc_oss_csv_external2 table, see Access OSS data by using the built-in extractor.

      After the file is exported, you can view the file in the OSS directory. Subdirectories that are mapped to partitions are generated under the output folder based on the partition values that are specified in the INSERT statement. Each subdirectory contains the .odps folder. Example: output/direction=N/.odps/20210330*********/M1_0_0-0_TableSink1-0-.csv.

      Export result
    Note
    • The .meta file in the .odps folder is an extra macro data file written by MaxCompute. This file records valid data in the current folder. If the INSERT operation succeeds, all data in the current folder is valid. MaxCompute parses the .meta file only when the operation fails. If an INSERT OVERWRITE operation fails or is terminated, you can perform the operation again.
    • If you use a built-in extractor of MaxCompute to process TSV or CSV files, the number of files generated in the OSS directory is the same as the number of concurrent SQL jobs.
    • If the insert overwrite ... select ... from ...; operation allocates 1,000 mappers on the source table specified by from_tablename, 1,000 TSV or CSV files will be generated.
    • You can use flexible semantics and configurations of MaxCompute to limit the number of files that can be generated. If an outputer runs in a mapper, you can adjust the number of generated files by using the following method: Change the value of odps.stage.mapper.split.size to adjust the number of concurrent mappers. If an outputer runs in a reducer, you can adjust the number of generated files by changing the value of odps.stage.reducer.num. If an outputer runs in a joiner, you can adjust the number of generated files by changing the value of odps.stage.joiner.num.

Export unstructured data to OSS by using a custom storage handler

This section describes how MaxCompute writes data to OSS by using a custom extractor based on Access text data in OSS by using a custom extractor. Vertical bars (|) are used as column delimiters and \n is used as line feeds.
Note After you create a MaxCompute Java module in MaxCompute Studio, you can view sample code files in the examples folder. To view complete code files, click Source code.

Procedure:

  1. Define an Outputer class by using IntelliJ IDEA.
    An Outputer class must be implemented based on the output logic.
    package com.aliyun.odps.examples.unstructured.text;
    import com.aliyun.odps.data.Record;
    import com.aliyun.odps.io.OutputStreamSet;
    import com.aliyun.odps.io.SinkOutputStream;
    import com.aliyun.odps.udf.DataAttributes;
    import com.aliyun.odps.udf.ExecutionContext;
    import com.aliyun.odps.udf.Outputer;
    import java.io.IOException;
    public class TextOutputer extends Outputer {
        private SinkOutputStream outputStream;
        private DataAttributes attributes;
        private String delimiter;
        public TextOutputer (){
            // default delimiter, this can be overwritten if a delimiter is provided through the attributes.
    this.delimiter = "|";
        }
        @Override
        public void output(Record record) throws IOException {
            this.outputStream.write(recordToString(record).getBytes());
        }
        // no particular usage of execution context in this example
        @Override
        public void setup(ExecutionContext ctx, OutputStreamSet outputStreamSet, DataAttributes attributes) throws IOException {
            this.outputStream = outputStreamSet.next();
            this.attributes = attributes;
            this.delimiter = this.attributes.getValueByKey("delimiter");
            if ( this.delimiter == null)
            {
                this.delimiter=",";
            }
            System.out.println("Extractor using delimiter [" + this.delimiter + "].");
        }
        @Override
        public void close() {
            // no-op
        }
        private String recordToString(Record record){
            StringBuilder sb = new StringBuilder();
            for (int i = 0; i < record.getColumnCount(); i++)
            {
                if (null == record.get(i)){
                    sb.append("NULL");
                }
                else{
                    sb.append(record.get(i).toString());
                }
                if (i != record.getColumnCount() - 1){
                    sb.append(this.delimiter);
                }
            }
            sb.append("\n");
            return sb.toString();
        }
    }
    import com.aliyun.odps.io.SinkOutputStream;
    import com.aliyun.odps.udf.DataAttributes;
    import com.aliyun.odps.udf.ExecutionContext;
    import com.aliyun.odps.udf.Outputer;
    import java.io.IOException;
    public class TextOutputer extends Outputer {
        private SinkOutputStream outputStream;
        private DataAttributes attributes;
        private String delimiter;
        public TextOutputer (){
            // default delimiter, this can be overwritten if a delimiter is provided through the attributes.
            this.delimiter = "|";
        }
        @Override
        public void output(Record record) throws IOException {
            this.outputStream.write(recordToString(record).getBytes());
        }
        // no particular usage of execution context in this example
        @Override
        public void setup(ExecutionContext ctx, OutputStreamSet outputStreamSet, DataAttributes attributes) throws IOException {
            this.outputStream = outputStreamSet.next();
            this.attributes = attributes;
            this.delimiter = this.attributes.getValueByKey("delimiter");
            if ( this.delimiter == null)
            {
                this.delimiter=",";
            }
            System.out.println("Extractor using delimiter [" + this.delimiter + "].");
        }
        @Override
        public void close() {
            // no-op
        }
        private String recordToString(Record record){
            StringBuilder sb = new StringBuilder();
            for (int i = 0; i < record.getColumnCount(); i++)
            {
                if (null == record.get(i)){
                    sb.append("NULL");
                }
                else{
                    sb.append(record.get(i).toString());
                }
                if (i != record.getColumnCount() - 1){
                    sb.append(this.delimiter);
                }
            }
            sb.append("\n");
            return sb.toString();
        }
    }

    The setup(), output(), and close() methods of an Outputer class correspond to the setup(), extract(), and close() methods of an Extractor class, respectively. You can call the setup() and close() methods only once in an Outputer class. You can perform initialization tasks in the setup() method. You must save the three parameters returned by the setup() method as the variables of the Outputer class for later use in the output() or close() method. The close() method is used to mark the end of the code.

    Data processing occurs in the output(Record) method. MaxCompute calls the output(Record) method once based on each input record processed by the current outputer. If the code has consumed a record when the related output(Record) call returns a result, MaxCompute uses the memory that was used by the record for other purposes. Therefore, if the information in a record is called across multiple output() methods, you must call the record.clone() method to save the current record.
    Note When you use a custom extractor for an external table to implement an Outputer class, the record generated by the previous operation of the Outputer class is passed to the Outputer.output(Record record) method. In this case, the column name is changed. Column names are not fixed. For example, the column name generated by the some_function(column_a) method is a temporary column name.

    Therefore, we recommend that you use record.get(index) instead of record.get(Column name) to obtain the content of a column.

  2. Define an Extractor class by using IntelliJ IDEA.
    An extractor is used to read, parse, and process data. If MaxCompute does not need to read data from output tables, you do not need to define an extractor.
    package com.aliyun.odps.examples.unstructured.text;
    import com.aliyun.odps.Column;
    import com.aliyun.odps.data.ArrayRecord;
    import com.aliyun.odps.data.Record;
    import com.aliyun.odps.io.InputStreamSet;
    import com.aliyun.odps.udf.DataAttributes;
    import com.aliyun.odps.udf.ExecutionContext;
    import com.aliyun.odps.udf.Extractor;
    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStream;
    import java.io.InputStreamReader;
    /**
     * Text extractor that extract schematized records from formatted plain-text(csv, tsv etc.)
    **/
    public class TextExtractor extends Extractor {
        private InputStreamSet inputs;
        private String columnDelimiter;
        private DataAttributes attributes;
        private BufferedReader currentReader;
        private boolean firstRead = true;
        public TextExtractor() {
            // default to ",", this can be overwritten if a specific delimiter is provided (via DataAttributes)
            this.columnDelimiter = ",";
        }
        // no particular usage for execution context in this example
        @Override
        public void setup(ExecutionContext ctx, InputStreamSet inputs, DataAttributes attributes) {
            this.inputs = inputs;
            this.attributes = attributes;
            // check if "delimiter" attribute is supplied via SQL query
            String columnDelimiter = this.attributes.getValueByKey("delimiter");
            if ( columnDelimiter != null)
            {
                this.columnDelimiter = columnDelimiter;
            }
            System.out.println("TextExtractor using delimiter [" + this.columnDelimiter + "].");
            // note: more properties can be inited from attributes if needed
        }
        @Override
        public Record extract() throws IOException {
            String line = readNextLine();
            if (line == null) {
                return null;
            }
            return textLineToRecord(line);
        }
        @Override
        public void close(){
            // no-op
        }
        private Record textLineToRecord(String line) throws IllegalArgumentException
        {
            Column[] outputColumns = this.attributes.getRecordColumns();
            ArrayRecord record = new ArrayRecord(outputColumns);
            if (this.attributes.getRecordColumns().length != 0){
                // string copies are needed, not the most efficient one, but suffice as an example here
                String[] parts = line.split(columnDelimiter);
                int[] outputIndexes = this.attributes.getNeededIndexes();
                if (outputIndexes == null){
                    throw new IllegalArgumentException("No outputIndexes supplied.");
    }
                if (outputIndexes.length != outputColumns.length){
                    throw new IllegalArgumentException("Mismatched output schema:
    Expecting "
                            + outputColumns.length + " columns but get " + parts.length);
                }
                int index = 0;
                for(int i = 0; i < parts.length; i++){
                    // only parse data in columns indexed by output indexes
                    if (index < outputIndexes.length && i == outputIndexes[index]){
                        switch (outputColumns[index].getType()) {
                            case STRING:
    record.setString(index, parts[i]);
                                break;
                            case BIGINT:
    record.setBigint(index, Long.parseLong(parts[i]));
                                break;
                            case BOOLEAN:
    record.setBoolean(index, Boolean.parseBoolean(parts[i]));
                                break;
                            case DOUBLE:
    record.setDouble(index, Double.parseDouble(parts[i]));
                                break;
                            case DATETIME:
    case DECIMAL:
    case ARRAY:
    case MAP:
    default:
     throw new IllegalArgumentException("Type " + outputColumns[index].getType() + " not supported for now.");
     }
                        index++;
                    }
                }
            }
            return record;
        }
        /**
         * Read next line from underlying input streams.
    * @return The next line as String object.
        If all of the contents of input
         * streams has been read, return null.
        */
        private String readNextLine() throws IOException {
            if (firstRead) {
                firstRead = false;
                // the first read, initialize things
                currentReader = moveToNextStream();
                if (currentReader == null) {
                    // empty input stream set
                    return null;
                }
            }
            while (currentReader != null) {
                String line = currentReader.readLine();
                if (line != null) {
                    return line;
                }
                currentReader = moveToNextStream();
            }
            return null;
        }
        private BufferedReader moveToNextStream() throws IOException {
            InputStream stream = inputs.next();
            if (stream == null) {
                return null;
            } else {
                return new BufferedReader(new InputStreamReader(stream));
            }
        }
    }
        private DataAttributes attributes;
        private BufferedReader currentReader;
        private boolean firstRead = true;
        public TextExtractor() {
            // default to ",", this can be overwritten if a specific delimiter is provided (via DataAttributes)
            this.columnDelimiter = ",";
        }
        // no particular usage for execution context in this example
        @Override
        public void setup(ExecutionContext ctx, InputStreamSet inputs, DataAttributes attributes) {
            this.inputs = inputs;
            this.attributes = attributes;
            // check if "delimiter" attribute is supplied via SQL query
            String columnDelimiter = this.attributes.getValueByKey("delimiter");
            if ( columnDelimiter != null)
            {
                this.columnDelimiter = columnDelimiter;
            }
            System.out.println("TextExtractor using delimiter [" + this.columnDelimiter + "].");
            // note: more properties can be inited from attributes if needed
        }
        @Override
        public Record extract() throws IOException {
            String line = readNextLine();
            if (line == null) {
                return null;
            }
            return textLineToRecord(line);
        }
        @Override
        public void close(){
            // no-op
        }
        private Record textLineToRecord(String line) throws IllegalArgumentException
        {
            Column[] outputColumns = this.attributes.getRecordColumns();
            ArrayRecord record = new ArrayRecord(outputColumns);
            if (this.attributes.getRecordColumns().length != 0){
                // string copies are needed, not the most efficient one, but suffice as an example here
                String[] parts = line.split(columnDelimiter);
                int[] outputIndexes = this.attributes.getNeededIndexes();
                if (outputIndexes == null){
                    throw new IllegalArgumentException("No outputIndexes supplied.");
                }
                if (outputIndexes.length != outputColumns.length){
                    throw new IllegalArgumentException("Mismatched output schema: Expecting "
                            + outputColumns.length + " columns but get " + parts.length);
                }
                int index = 0;
                for(int i = 0; i < parts.length; i++){
                    // only parse data in columns indexed by output indexes
                    if (index < outputIndexes.length && i == outputIndexes[index]){
                        switch (outputColumns[index].getType()) {
                            case STRING:
                                record.setString(index, parts[i]);
                                break;
                            case BIGINT:
                                record.setBigint(index, Long.parseLong(parts[i]));
                                break;
                            case BOOLEAN:
                                record.setBoolean(index, Boolean.parseBoolean(parts[i]));
                                break;
                            case DOUBLE:
                                record.setDouble(index, Double.parseDouble(parts[i]));
                                break;
                            case DATETIME:
                            case DECIMAL:
                            case ARRAY:
                            case MAP:
                            default:
                                throw new IllegalArgumentException("Type " + outputColumns[index].getType() + " not supported for now.");
                        }
                        index++;
                    }
                }
            }
            return record;
        }
        /**
         * Read next line from underlying input streams.
         * @return The next line as String object. If all of the contents of input
         * streams has been read, return null.
         */
        private String readNextLine() throws IOException {
            if (firstRead) {
                firstRead = false;
                // the first read, initialize things
                currentReader = moveToNextStream();
                if (currentReader == null) {
                    // empty input stream set
                    return null;
                }
            }
            while (currentReader != null) {
                String line = currentReader.readLine();
                if (line != null) {
                    return line;
                }
                currentReader = moveToNextStream();
            }
            return null;
        }
        private BufferedReader moveToNextStream() throws IOException {
            InputStream stream = inputs.next();
            if (stream == null) {
                return null;
            } else {
                return new BufferedReader(new InputStreamReader(stream));
            }
        }
    }
  3. Define a StorageHandler class by using IntelliJ IDEA.
    package com.aliyun.odps.examples.unstructured.text;
    import com.aliyun.odps.udf.Extractor;
    import com.aliyun.odps.udf.OdpsStorageHandler;
    import com.aliyun.odps.udf.Outputer;
    public class TextStorageHandler extends OdpsStorageHandler {
        @Override
        public Class<? extends Extractor> getExtractorClass() {
            return TextExtractor.class;
        }
        @Override
        public Class<? extends Outputer> getOutputerClass() {
            return TextOutputer.class;
        }
    }

    If MaxCompute does not need to read data from a table, you do not need to specify an extractor.

  4. Compile the code for the preceding custom class by using IntelliJ IDEA, package it into a JAR file, and then add the file as a MaxCompute resource on the MaxCompute client.
    Sample command for adding MaxCompute resources:
    add jar odps-TextStorageHandler.jar;
  5. Create an external table on the MaxCompute client.
    You must create an OSS external table. This process of exporting unstructured data to OSS by using a custom extractor is similar to that by using a built-in extractor. The difference is that you must use a custom extractor when you export unstructured data to an OSS external table.
    • Create a non-partitioned table
      create external table if not exists output_data_txt_external1
      (
      vehicleId int,
      recordId int,
      patientId int,
      calls int,
      locationLatitute double,
      locationLongtitue double,
      recordTime string,
      direction string
      )
      stored by 'com.aliyun.odps.examples.unstructured.text.TextStorageHandler'  
      with serdeproperties(
          'delimiter'='|'
          [,'odps.properties.rolearn'='acs:ram::xxxxxx:role/aliyunodpsdefaultrole'])
      location 'oss://oss-cn-shanghai-internal.aliyuncs.com/oss-odps-test/Demo/SampleData/CustomTxt/AmbulanceData/output/'
      using 'odps-TextStorageHandler.jar';                     
    • Create a partitioned table
      create external table if not exists output_data_txt_external2
      (
      vehicleId int,
      recordId int,
      patientId int,
      calls int,
      locationLatitute double,
      locationLongtitue double,
      recordTime string
      )
      partitioned by (
      direction string
      )
      stored by 'com.aliyun.odps.examples.unstructured.text.TextStorageHandler' 
      with serdeproperties(
          'delimiter'='|'
          [,'odps.properties.rolearn'='acs:ram::xxxxxx:role/aliyunodpsdefaultrole'])
      location 'oss://oss-cn-shanghai-internal.aliyuncs.com/oss-odps-test/Demo/SampleData/CustomTxt/AmbulanceData/output/'
      using 'odps-TextStorageHandler.jar';                      
  6. Execute the INSERT OVERWRITE or INSERT INTO statement on the external table in the MaxCompute client to export unstructured data to OSS.

    For more information about the INSERT operation, see Insert or update data into a table or static partitions (INSERT INTO and INSERT OVERWRITE) or Insert data into dynamic partitions (DYNAMIC PARTITION).

    The following code provides an example:
    • Export unstructured data from a non-partitioned table to OSS
      insert into table output_data_txt_external1 select * from mc_oss_csv_external1;

      For more information about the data in the mc_oss_csv_external1 table, see Access OSS data by using the built-in extractor.

      After the file is exported, you can view the file in the OSS directory. An .odps folder is generated under the output folder. The .odps folder contains .csv and .meta files.

    • Export unstructured data from a partitioned table to OSS
      insert into table output_data_txt_external2 partition (direction) select * from mc_oss_csv_external2;

      For more information about the data in the mc_oss_csv_external2 table, see Access OSS data by using the built-in extractor.

      After the file is exported, you can view the file in the OSS directory. Subdirectories that are mapped to partitions are generated under the output folder based on the partition values that are specified in the INSERT statement. Each subdirectory contains the .odps folder. Example: output/direction=N/.odps/20210330*********/M1_0_0-0_TableSink1-0-.csv.

Export unstructured data to OSS by using the multipart upload feature

The multipart upload feature of OSS is supported. This feature allows you to perform the INSERT operation to write data to OSS external tables. To enable or disable this feature, you must specify the odps.sql.unstructured.oss.commit.mode parameter for a session or project. This feature is disabled by default. For more information about the multipart upload feature, see Multipart upload.

Notice The multipart upload feature supports external tables only in the stored as xxx format. For more information about the stored as xxx format, see Open source data formats supported by OSS external tables. This feature cannot be used in scenarios where a custom storage handler or extractor is used or a user-defined function (UDF) is used in SQL queries.

Before you use this feature, make sure that jobConf2 is enabled for your project to generate a job execution plan. To enable jobConf2, set the odps.sql.jobconf.odps2 parameter is set to True.

Note The default value of odps.sql.jobconf.odps2 is True. If it is not set to True, run the set odps.sql.jobconf.odps2=true; command to enable jobConf2 for a session.
Valid values of the odps.sql.unstructured.oss.commit.mode parameter:
  • False: Data that is written from MaxCompute to an OSS external table is stored in the .odps folder under the directory specified by LOCATION. A .meta file is included in the .odps folder to ensure the data consistency of MaxCompute. Only MaxCompute can correctly process the data in the .odps folder. If another data processing engine parses the data in this folder, an error is returned.
  • True: MaxCompute uses the multipart upload feature to ensure data consistency in two-phase commit mode. In this case, the .odps folder and the .meta file are not generated. Other data processing engines can normally parse the data that is written from MaxCompute to OSS.

Data that is written by using SQL statements to OSS must be used by other data processing engines. The purpose is to meet data openness requirements. You can set the odps.sql.unstructured.oss.commit.mode parameter to False. This way, the .odps folder is retained in the directory where the current OSS external table is stored. Alternatively, you can set the odps.sql.unstructured.oss.commit.mode parameter to True. This way, the .odps folder is not included in the directory where the current OSS external table is stored. If you use this method, other data processing engines can normally parse the data that is written from MaxCompute to OSS.