If the data format in Object Storage Service (OSS) is complex and the built-in extractor cannot meet your business requirements, you can use a custom extractor to read unstructured data, such as the text data and non-text data, from OSS. This topic describes how to define a custom extractor and use the custom extractor to create an OSS external table to read data from OSS.

Prerequisites

  • MaxCompute is granted the permissions to access OSS data.

    For more information about authorization, see STS authorization.

  • An OSS bucket, OSS directories, and OSS data files are prepared.

    For more information about operations on OSS buckets, see Create buckets.

    For more information about how to create an OSS directory, see Create directories.

    For more information about how to upload data files, see Simple upload.

Limits

  • A custom extractor that is defined by using the method described in this topic cannot access data of the DATETIME type in OSS text files. For more information about how to define a custom extractor to access data of the DATETIME type in OSS text files, see Access data of the DATETIME type in OSS text files by using a MaxCompute custom extractor.
  • The schemas of OSS external tables must be the same as those of OSS data files.
  • You can perform operations on OSS external tables only by using MaxCompute SQL.

Usage notes

When you use OSS external tables, take note of the following items:

  • If the OSS external table that you created is a partitioned table, the OSS directory where partition data is stored must comply with the format requirements. For more information about the format requirements, see Partition paths of an OSS external table.
  • OSS external tables record only the mappings between these tables and OSS directories. If you delete an OSS external table, data in the OSS directory that is mapped to the table is not deleted.
  • If a data file stored in OSS is an object of the Archive storage class, you must first restore the file. For more information about the restoration operation, see Restore objects.

Syntax used to create an external table

create external table [if not exists] <mc_oss_extable_name>
(
<col_name> <date_type>,
...
)
[partitioned by (<col_name> <data_type>, ...)]
-- Specify a custom extractor. 
stored by '<StorageHandler>' 
-- Specify the parameters that are related to the OSS external table. 
with serdeproperties (
 'delimiter'='<delimiter>',  
 'odps.properties.rolearn'='<ram_arn>'
-- Set odps.text.option.gzip.input.enabled to true if OSS files are in the GZIP format. 
 [,'odps.text.option.gzip.input.enabled'='true']
-- Specify the properties that are related to the OSS external table based on your business requirements. 
 [,'<property_name>'='<property_value>'[,'<property_name>'='<property_value>'...]]
) 
location '<oss_location>'
using '<jar_name>';
  • if not exists: optional. If a table that has the same name as the table you want to create exists and you do not specify the if not exists parameter, an error is returned. If you specify the if not exists parameter when you create a table, a success message is returned regardless of whether a table with the same name exists, even if the schema of this table is different from that of the table you want to create. If the table you want to create has the same name as an existing table, the table is not created and the metadata of the existing table is not changed.
  • mc_oss_extable_name: required. The name of the OSS external table that you want to create.
  • col_name: required. The name of the column in the OSS external table.
  • data_type: required. The data type of the column in the OSS external table.
  • partitioned by (<col_name> <data_type>, ...): optional. The partition information of the OSS external table when the table is a partitioned table.
    • col_name: required. The name of the partition key column.
    • data_type: required. The data type of the partition key column.
  • StorageHandler: required. Specifies the class name of the custom storage handler.
  • odps.properties.rolearn'='<ram_arn>: required. The Alibaba Cloud Resource Name (ARN) of AliyunODPSDefaultRole in Resource Access Management (RAM). You can obtain its value from the role details page in the RAM console.
  • location: required. The OSS directory where data files are stored. The OSS directory is in the format of oss://<oss_endpoint>/<Bucket name>/<Directory name>/. MaxCompute automatically reads data from all files in the OSS directory that you specified.
    • oss_endpoint: the OSS endpoint. We recommend that you use an internal endpoint of OSS to avoid extra fees that are incurred by Internet traffic. For more information about the internal endpoints of OSS, see Regions and endpoints. We recommend that OSS for storing data is deployed in the same region as the MaxCompute project. MaxCompute can be deployed only in some regions. In this case, cross-region data connectivity issues may occur.
    • Bucket name: the name of the OSS bucket. For more information about how to view the bucket name, see List buckets.
    • Directory name: the name of the OSS directory. File names do not need to be included in directory names. Examples of incorrect usage:
      http://oss-odps-test.oss-cn-shanghai-internal.aliyuncs.com/Demo/                  -- HTTP connections are not supported.
      https://oss-odps-test.oss-cn-shanghai-internal.aliyuncs.com/Demo/                 -- HTTPS connections are not supported.
      oss://oss-odps-test.oss-cn-shanghai-internal.aliyuncs.com/Demo                    -- The endpoint is incorrect.
      oss://oss-cn-shanghai-internal.aliyuncs.com/oss-odps-test/Demo/vehicle.csv  -- The file name does not need to be included in the directory name. 
  • 'odps.text.option.gzip.input.enabled'='true': required when OSS data files are compressed in the GZIP format.
  • <property_name>'='<property_value>': optional. property_name specifies the name of a property and property_value specifies the value of a property. For more information about properties, see Access OSS data by using a built-in extractor.
  • USING: specifies the JAR package that contains the class definition of the custom extractor.

Use a custom extractor to access text data in OSS

The OSS text file, vehicle.csv, contains the following data. The columns in the file are separated by vertical bars (|). The vehicle.csv file is stored in the /demo/SampleData/CustomTxt/AmbulanceData/ directory.
1|1|51|1|46.81006|-92.08174|9/14/2014 0:00|S
1|2|13|1|46.81006|-92.08174|9/14/2014 0:00|NE
1|3|48|1|46.81006|-92.08174|9/14/2014 0:00|NE
1|4|30|1|46.81006|-92.08174|9/14/2014 0:00|W
1|5|47|1|46.81006|-92.08174|9/14/2014 0:00|S
1|6|9|1|46.81006|-92.08174|9/14/2014 0:00|S
1|7|53|1|46.81006|-92.08174|9/14/2014 0:00|N
1|8|63|1|46.81006|-92.08174|9/14/2014 0:00|SW
1|9|4|1|46.81006|-92.08174|9/14/2014 0:00|NE
1|10|31|1|46.81006|-92.08174|9/14/2014 0:00|N

Procedure:

  1. Use IntelliJ IDEA to define the class of an extractor.
    Define a common extractor and use a delimiter as a parameter. The extractor can process all text files that have a similar format. Sample code of the extractor:
    /**
     * Text extractor that extract schematized records from formatted plain-text(csv, tsv etc.)
     **/
    public class TextExtractor extends Extractor {
      private InputStreamSet inputs;
      private String columnDelimiter;
      private DataAttributes attributes;
      private BufferedReader currentReader;
      private boolean firstRead = true;
      public TextExtractor() {
        // default to ",", this can be overwritten if a specific delimiter is provided (via DataAttributes)
        this.columnDelimiter = ",";
      }
      // no particular usage for execution context in this example
      @Override
      public void setup(ExecutionContext ctx, InputStreamSet inputs, DataAttributes attributes) {
        this.inputs = inputs; // inputs specifies an InputStreamSet. An InputStream is returned every time the next() function is called. This InputStream can read all the data of an OSS file. 
        this.attributes = attributes;
        // check if "delimiter" attribute is supplied via SQL query
        String columnDelimiter = this.attributes.getValueByKey("delimiter"); // The delimiter can be used as a parameter in Data Definition Language (DDL) statements. 
        if ( columnDelimiter != null)
        {
          this.columnDelimiter = columnDelimiter;
        }
        // note: more properties can be inited from attributes if needed
      }
      @Override
      public Record extract() throws IOException {// The extract() function returns a record that is extracted from the external table. 
        String line = readNextLine();
        if (line == null) {
          return null; // If null is returned, all records in the OSS external table have been read. 
        }
        return textLineToRecord(line); // textLineToRecord splits a row into multiple columns by using the delimiter. 
      }
      @Override
      public void close(){
        // no-op
      }
    }
    Note For more information about how textLineToRecord splits a row into multiple columns, see TextExtractor.java.
  2. Use IntelliJ IDEA to define the class of a storage handler.
    A storage handler is a unified entry point that can be used to run custom logic for processing external tables. Sample code:
    package com.aliyun.odps.udf.example.text;
    public class TextStorageHandler extends OdpsStorageHandler {
      @Override
      public Class<? extends Extractor> getExtractorClass() {
        return TextExtractor.class;
      }
      @Override
      public Class<? extends Outputer> getOutputerClass() {
        return TextOutputer.class;
      }
    }
  3. Compile the code for the preceding classes of the extractor and storage handler by using IntelliJ IDEA, package the code into a JAR file, and then add the file as a MaxCompute resource on the MaxCompute client.
    Sample command used to add the JAR file:
    add jar odps-udf-example.jar;
  4. Create an external table on the MaxCompute client.
    In the following sample command, delimiter indicates the delimiter of OSS data.
    create external table if not exists ambulance_data_txt_external
    (
    vehicleId int,
    recordId int,
    patientId int,
    calls int,
    locationLatitute double,
    locationLongtitue double,
    recordTime string,
    direction string
    )
    stored by 'com.aliyun.odps.udf.example.text.TextStorageHandler' 
      with serdeproperties (
    'delimiter'='\\|',  
    'odps.properties.rolearn'='acs:ram::xxxxxxxxxxxxx:role/aliyunodpsdefaultrole'
    )
    location 'oss://oss-cn-shanghai-internal.aliyuncs.com/oss-odps-test/Demo/SampleData/CustomTxt/AmbulanceData/'
    using 'odps-udf-example.jar'; 
  5. On the MaxCompute client, execute the following SQL statement to query data from the external table:
    select recordId, patientId, direction from ambulance_data_txt_external where patientId > 25;

Use a custom extractor to access non-text data in OSS

This section describes how to use a custom extractor to access and process a WAV file that contains audio data.

Procedure:

  1. Define the classes of the custom extractor and storage handler, compile the code for these classes, and then package the code into a JAR file. For more information about the logic implementation, see SpeechSentenceSnrExtractor.
  2. On the MaxCompute client, execute the following SQL statement to create an external table:
    create external table if not exists speech_sentence_snr_external
    (
    sentence_snr double,
    id string
    )
    stored by 'com.aliyun.odps.udf.example.speech.SpeechStorageHandler'
    with serdeproperties (
        'mlfFileName'='sm_random_5_utterance.text.label' ,
        'speechSampleRateInKHz' = '16'
    )
    location 'oss://oss-cn-shanghai-internal.aliyuncs.com/oss-odps-test/dev/SpeechSentenceTest/'
    using 'odps-udf-example.jar,sm_random_5_utterance.text.label';
    • com.aliyun.odps.udf.example.speech.SpeechStorageHandler: encapsulates an extractor to calculate the average signal-to-noise ratio (SNR) of valid sentences in each WAV file, and runs SQL jobs to process the structured data that is extracted.
    • location: specifies the directory in which WAV files are stored. For example, oss://oss-cn-hangzhou-zmf.aliyuncs.com/oss-odps-test/dev/SpeechSentenceTest/ stores multiple WAV files. MaxCompute reads all the files in this directory, splits files based on the file level, and then allocates these files to multiple compute nodes for data processing.
  3. On the MaxCompute client, execute the following SQL statement to query the results:
    select sentence_snr, id from speech_sentence_snr_external where sentence_snr > 10.0; 
    The following result is returned:
    --------------------------------------------------------------
    | sentence_snr |                     id                      |
    --------------------------------------------------------------
    |   34.4703    |          J310209090013_H02_K03_042          |
    --------------------------------------------------------------
    |   31.3905    | tsh148_seg_2_3013_3_6_48_80bd359827e24dd7_0 |
    --------------------------------------------------------------
    |   35.4774    | tsh148_seg_3013_1_31_11_9d7c87aef9f3e559_0  |
    --------------------------------------------------------------
    |   16.0462    | tsh148_seg_3013_2_29_49_f4cb0990a6b4060c_0  |
    --------------------------------------------------------------
    |   14.5568    |   tsh_148_3013_5_13_47_3d5008d792408f81_0   |
    --------------------------------------------------------------

A custom extractor allows you to execute SQL statements to process multiple audio files in OSS in a distributed manner. You can also process other unstructured data, such as images and videos, in the same way to make full use of the powerful computing capabilities of MaxCompute.