本文將為您介紹如何在MaxCompute上輕鬆訪問OSS的資料。

STS模式授予許可權

MaxCompute需要直接存取OSS的資料,前提需要將OSS的資料相關許可權賦給MaxCompute的訪問帳號,您可通過以下兩種方式授予許可權。
  • 當MaxCompute和OSS的owner是同一個帳號時,可以直接登入阿里雲帳號後,點擊此處完成一鍵授權
  • 自訂授權。
    1. 首先需要在RAM中授予MaxCompute訪問OSS的許可權。登入RAM控制台(若MaxCompute和OSS不是同一個帳號,此處需由OSS帳號登入進行授權),通過控制台中的角色管理建立角色 ,角色名稱如AliyunODPSDefaultRole或AliyunODPSRoleForOtherUser。
    2. 修改角色策略內容設定,如下所示。
      --當MaxCompute和OSS的Owner是同一個帳號
      {
      "Statement": [
      {
       "Action": "sts:AssumeRole",
       "Effect": "Allow",
       "Principal": {
         "Service": [
           "odps.aliyuncs.com"
         ]
       }
      }
      ],
      "Version": "1"
      }
      --當MaxCompute和OSS的Owner不是同一個帳號
      {
      "Statement": [
      {
       "Action": "sts:AssumeRole",
       "Effect": "Allow",
       "Principal": {
         "Service": [
           "MaxCompute的Owner雲帳號id@odps.aliyuncs.com"
         ]
       }
      }
      ],
      "Version": "1"
      }
    3. 授予角色訪問OSS必要的許可權AliyunODPSRolePolicy,如下所示。
      {
      "Version": "1",
      "Statement": [
      {
       "Action": [
         "oss:ListBuckets",
         "oss:GetObject",
         "oss:ListObjects",
         "oss:PutObject",
         "oss:DeleteObject",
         "oss:AbortMultipartUpload",
         "oss:ListParts"
       ],
       "Resource": "*",
       "Effect": "Allow"
      }
      ]
      }
      --可自訂其他許可權
    4. 將許可權AliyunODPSRolePolicy授權給該角色。

內建extractor訪問OSS資料

訪問外部資料源時,需要使用者自訂不同的Extractor,同時您也可以使用MaxCompute內建的Extractor,來讀取按照約定格式儲存的OSS資料。只需要建立一個外部表格,便可把這張表作為源表進行查詢。

假設有一份CSV資料存在OSS上,endpoint為oss-cn-shanghai-internal.aliyuncs.com,bucket為oss-odps-test,資料檔案的存放路徑為/demo/vehicle.csv

建立外部表格

建立外部表格,語句如下:
CREATE EXTERNAL TABLE IF NOT EXISTS ambulance_data_csv_external
(
vehicleId int,
recordId int,
patientId int,
calls int,
locationLatitute double,
locationLongtitue double,
recordTime string,
direction string
)
STORED BY 'com.aliyun.odps.CsvStorageHandler' -- (1)
WITH SERDEPROPERTIES (
 'odps.properties.rolearn'='acs:ram::xxxxx:role/aliyunodpsdefaultrole'
) -- (2)
LOCATION 'oss://oss-cn-shanghai-internal.aliyuncs.com/oss-odps-test/Demo/'; -- (3)(4)
上述語句,說明如下:
  • com.aliyun.odps.CsvStorageHandler是內建的處理CSV格式檔案的StorageHandler,它定義了如何讀寫CSV檔案。您只需指明這個名字,相關邏輯已經由系統實現。
  • odps.properties.rolearn中的資訊是RAM中AliyunODPSDefaultRoleArn資訊。您可以通過RAM控制台中的角色詳情擷取。
  • LOCATION必須指定一個OSS目錄,預設系統會讀取這個目錄下所有的檔案。
    • 建議您使用OSS提供的內網網域名稱,否則將產生OSS流量費用。
    • 建議您存放OSS資料的地區對應您開通MaxCompute的地區。由於MaxCompute只有在部分地區部署,我們不承諾跨地區的資料連通性。
    • OSS的串連格式為oss://oss-cn-shanghai-internal.aliyuncs.com/Bucket名稱/目錄名稱/。目錄後不要加檔案名稱,如下的集中用法都是錯誤的:
      http://oss-odps-test.oss-cn-shanghai-internal.aliyuncs.com/Demo/  -- 不支援http串連
      https://oss-odps-test.oss-cn-shanghai-internal.aliyuncs.com/Demo/ -- 不支援https串連
      oss://oss-odps-test.oss-cn-shanghai-internal.aliyuncs.com/Demo -- 串連地址錯誤
      oss://oss://oss-cn-shanghai-internal.aliyuncs.com/oss-odps-test/Demo/vehicle.csv  -- 不必指定檔案名稱
  • 外部表格只是在系統中記錄了與OSS目錄的關聯,當Drop這張表時,對應的LOCATION資料不會被刪除。
如果想查看建立好的外部表格結構資訊,可以執行語句:
desc extended <table_name>;

在返回的資訊裡,除了跟內部表一樣的基礎資訊外,Extended Info包含外部表格StorageHandler 、Location等資訊。

查詢外部表格

外部表格建立成功後,便可如同普通表一樣使用這個外部表格。假設 /demo/vehicle.csv資料如下:
1,1,51,1,46.81006,-92.08174,9/14/2014 0:00,S
1,2,13,1,46.81006,-92.08174,9/14/2014 0:00,NE
1,3,48,1,46.81006,-92.08174,9/14/2014 0:00,NE
1,4,30,1,46.81006,-92.08174,9/14/2014 0:00,W
1,5,47,1,46.81006,-92.08174,9/14/2014 0:00,S
1,6,9,1,46.81006,-92.08174,9/14/2014 0:00,S
1,7,53,1,46.81006,-92.08174,9/14/2014 0:00,N
1,8,63,1,46.81006,-92.08174,9/14/2014 0:00,SW
1,9,4,1,46.81006,-92.08174,9/14/2014 0:00,NE
1,10,31,1,46.81006,-92.08174,9/14/2014 0:00,N
執行如下SQL語句:
select recordId, patientId, direction from ambulance_data_csv_external where patientId > 25;
说明 目前外部表格只能通過MaxCompute SQL操作,MaxCompute MapReduce目前無法操作外部表格。
這條語句會提交一個作業,調用內建csv extractor,從OSS讀取資料進行處理。輸出結果如下:
+------------+------------+-----------+
| recordId   | patientId  | direction |
+------------+------------+-----------+
| 1          | 51         | S         |
| 3          | 48         | NE        |
| 4          | 30         | W         |
| 5          | 47         | S         |
| 7          | 53         | N         |
| 8          | 63         | SW        |
| 10         | 31         | N         |
+------------+------------+-----------+

自訂Extractor訪問OSS

當OSS中的資料格式比較複雜,內建的Extractor無法滿足需求時,需要自訂Extractor來讀取OSS檔案中的資料。

例如有一個txt資料檔案,並不是CSV格式,記錄之間的列通過 |分隔。比如 /demo/SampleData/CustomTxt/AmbulanceData/vehicle.csv資料如下:
1|1|51|1|46.81006|-92.08174|9/14/2014 0:00|S
1|2|13|1|46.81006|-92.08174|9/14/2014 0:00|NE
1|3|48|1|46.81006|-92.08174|9/14/2014 0:00|NE
1|4|30|1|46.81006|-92.08174|9/14/2014 0:00|W
1|5|47|1|46.81006|-92.08174|9/14/2014 0:00|S
1|6|9|1|46.81006|-92.08174|9/14/2014 0:00|S
1|7|53|1|46.81006|-92.08174|9/14/2014 0:00|N
1|8|63|1|46.81006|-92.08174|9/14/2014 0:00|SW
1|9|4|1|46.81006|-92.08174|9/14/2014 0:00|NE
1|10|31|1|46.81006|-92.08174|9/14/2014 0:00|N
  • 定義Extractor
    寫一個通用的Extractor,將分隔字元作為參數傳進來,可以處理所有類似格式的text檔案。如下所示:
    /**
     * Text extractor that extract schematized records from formatted plain-text(csv, tsv etc.)
     **/
    public class TextExtractor extends Extractor {
      private InputStreamSet inputs;
      private String columnDelimiter;
      private DataAttributes attributes;
      private BufferedReader currentReader;
      private boolean firstRead = true;
      public TextExtractor() {
        // default to ",", this can be overwritten if a specific delimiter is provided (via DataAttributes)
        this.columnDelimiter = ",";
      }
      // no particular usage for execution context in this example
      @Override
      public void setup(ExecutionContext ctx, InputStreamSet inputs, DataAttributes attributes) {
        this.inputs = inputs; //  inputs 是一個 InputStreamSet,每次調用 next() 返回一個 InputStream,這個 InputStream 可以讀取一個 OSS 檔案的所有內容。
        this.attributes = attributes;
        // check if "delimiter" attribute is supplied via SQL query
        String columnDelimiter = this.attributes.getValueByKey("delimiter"); //delimiter 通過 DDL 語句傳參。
        if ( columnDelimiter != null)
        {
          this.columnDelimiter = columnDelimiter;
        }
        // note: more properties can be inited from attributes if needed
      }
      @Override
      public Record extract() throws IOException {//extactor() 調用返回一條 Record,代表外部表格中的一條記錄。
        String line = readNextLine();
        if (line == null) {
          return null; // 返回 NULL 來表示這個表中已經沒有記錄可讀。
        }
        return textLineToRecord(line); // textLineToRecord 將一行資料按照 delimiter 分割為多個列。
      }
      @Override
      public void close(){
        // no-op
      }
    }

    textLineToRecord將資料分割的完整實現請參見此處

    定義StorageHandler
    StorageHandler作為External Table自訂邏輯的統一入口。
    package com.aliyun.odps.udf.example.text;
    public class TextStorageHandler extends OdpsStorageHandler {
      @Override
      public Class<? extends Extractor> getExtractorClass() {
        return TextExtractor.class;
      }
      @Override
      public Class<? extends Outputer> getOutputerClass() {
        return TextOutputer.class;
      }
    }
    編譯打包
    將自訂代碼編譯打包,並上傳到MaxCompute。
    add jar odps-udf-example.jar;
  • 建立External表

    與使用內建Extractor相似,首先需要建立一張外部表格,不同的是在指定外部表格訪問資料的時候,需要使用自訂的StorageHandler。

    建立外部表格語句如下:
    CREATE EXTERNAL TABLE IF NOT EXISTS ambulance_data_txt_external
    (
    vehicleId int,
    recordId int,
    patientId int,
    calls int,
    locationLatitute double,
    locationLongtitue double,
    recordTime string,
    direction string
    )
    STORED BY 'com.aliyun.odps.udf.example.text.TextStorageHandler' --STORED BY 指定自訂 StorageHandler 的類名。
      with SERDEPROPERTIES (
    'delimiter'='\\|',  --SERDEPROPERITES 可以指定參數,這些參數會通過 DataAttributes 傳遞到 Extractor 代碼中。
    'odps.properties.rolearn'='acs:ram::xxxxxxxxxxxxx:role/aliyunodpsdefaultrole'
    )
    LOCATION 'oss://oss-cn-shanghai-internal.aliyuncs.com/oss-odps-test/Demo/SampleData/CustomTxt/AmbulanceData/'
    USING 'odps-udf-example.jar'; --同時需要指定類定義所在的jar包。
  • 查詢外部表格
    執行如下SQL語句:
    select recordId, patientId, direction from ambulance_data_txt_external where patientId > 25;

自訂Extractor訪問非文字檔資料

在前面我們看到了通過內建與自訂的Extractor可以輕鬆處理儲存在OSS上的CSV等文本資料。接下來以語音資料(wav格式檔案)為例,為您介紹如何通過自訂的Extractor訪問並處理OSS上的非文字檔。

這裡從最終執行的SQL開始,介紹以MaxCompute SQL為入口,處理存放在OSS上的語音檔案的使用方法。

建立外部表格SQL如下:
CREATE EXTERNAL TABLE IF NOT EXISTS speech_sentence_snr_external
(
sentence_snr double,
id string
)
STORED BY 'com.aliyun.odps.udf.example.speech.SpeechStorageHandler'
WITH SERDEPROPERTIES (
    'mlfFileName'='sm_random_5_utterance.text.label' ,
    'speechSampleRateInKHz' = '16'
)
LOCATION 'oss://oss-cn-shanghai-internal.aliyuncs.com/oss-odps-test/dev/SpeechSentenceTest/'
USING 'odps-udf-example.jar,sm_random_5_utterance.text.label';
如上所示,同樣需要建立外部表格,然後通過外部表格的Schema定義了希望通過外部表格從語音檔案中抽取出來的資訊:
  • 一個語音檔案中的語句信噪比(SNR):sentence_snr。
  • 對應語音檔案的名字:id。

建立外部表格後,通過標準的Select語句進行查詢,則會觸發Extractor運行計算。此處便可感受到,在讀取處理OSS資料時,除了可以對文字檔做簡單的還原序列化處理,還可以通過自訂Extractor實現更複雜的資料處理抽取邏輯。比如:在此樣本中,通過自訂的com.aliyun.odps.udf.example.speech.SpeechStorageHandler 中封裝的Extractor,實現了對語音檔案計算平均有效語句信噪比的功能,並將抽取出來的結構化資料直接進行SQL運算(WHERE sentence_snr > 10),最終返回所有信噪比大於10的語音檔案以及對應的信噪比值。

在OSS地址oss://oss-cn-hangzhou-zmf.aliyuncs.com/oss-odps-test/dev/SpeechSentenceTest/上,儲存了原始的多個WAV格式的語音檔案,MaxCompute架構將讀取該地址上的所有檔案,並在必要的時候進行檔案層級的分區,自動將檔案分配給多個計算節點處理。每個計算節點上的Extractor則負責處理通過InputStreamSet分配給該節點的檔案集。具體的處理邏輯則與使用者單機程式相仿,您不需關心分布計算中的種種細節,按照類單機方式實現其使用者演算法即可。

定製化的SpeechSentenceSnrExtractor主體邏輯,說明如下。

首先在 setup介面中讀取參數,進行初始化,並且匯入語音處理模型(通過resource引入)。
public SpeechSentenceSnrExtractor(){
    this.utteranceLabels = new HashMap<String, UtteranceLabel>();
  }
  @Override
  public void setup(ExecutionContext ctx, InputStreamSet inputs, DataAttributes attributes){
    this.inputs = inputs;
    this.attributes = attributes;
    this.mlfFileName = this.attributes.getValueByKey(MLF_FILE_ATTRIBUTE_KEY);
    String sampleRateInKHzStr = this.attributes.getValueByKey(SPEECH_SAMPLE_RATE_KEY);
    this.sampleRateInKHz = Double.parseDouble(sampleRateInKHzStr);
    try {
      // read the speech model file from resource and load the model into memory
      BufferedInputStream inputStream = ctx.readResourceFileAsStream(mlfFileName);
      loadMlfLabelsFromResource(inputStream);
      inputStream.close();
    } catch (IOException e) {
      throw new RuntimeException("reading model from mlf failed with exception " + e.getMessage());
    }
  }

Extractor()介面中,實現了對語音檔案的具體讀取和處理邏輯,對讀取的資料根據語音模型進行信噪比的計算,並且將結果填充成[snr, id]格式的Record。

上述樣本對實現進行了簡化,同時也沒有包括涉及語音處理的演算法邏輯,具體實現請參見MaxCompute SDK在開源社區中提供的範例代碼

@Override
  public Record extract() throws IOException {
    SourceInputStream inputStream = inputs.next();
    if (inputStream == null){
      return null;
    }
    // process one wav file to extract one output record [snr, id]
    String fileName = inputStream.getFileName();
    fileName = fileName.substring(fileName.lastIndexOf('/') + 1);
    logger.info("Processing wav file " + fileName);
    String id = fileName.substring(0, fileName.lastIndexOf('.'));
    // read speech file into memory buffer
    long fileSize = inputStream.getFileSize();
    byte[] buffer = new byte[(int)fileSize];
    int readSize = inputStream.readToEnd(buffer);
    inputStream.close();
    // compute the avg sentence snr
    double snr = computeSnr(id, buffer, readSize);
    // construct output record [snr, id]
    Column[] outputColumns = this.attributes.getRecordColumns();
    ArrayRecord record = new ArrayRecord(outputColumns);
    record.setDouble(0, snr);
    record.setString(1, id);
    return record;
  }
  private void loadMlfLabelsFromResource(BufferedInputStream fileInputStream)
          throws IOException {
    //  skipped here
  }
  // compute the snr of the speech sentence, assuming the input buffer contains the entire content of a wav file
  private double computeSnr(String id, byte[] buffer, int validBufferLen){
    // computing the snr value for the wav file (supplied as byte buffer array), skipped here
  }
執行查詢,如下所示:
select sentence_snr, id
    from speech_sentence_snr_external
where sentence_snr > 10.0;
獲得計算結果,如下所示:
--------------------------------------------------------------
| sentence_snr |                     id                      |
--------------------------------------------------------------
|   34.4703    |          J310209090013_H02_K03_042          |
--------------------------------------------------------------
|   31.3905    | tsh148_seg_2_3013_3_6_48_80bd359827e24dd7_0 |
--------------------------------------------------------------
|   35.4774    | tsh148_seg_3013_1_31_11_9d7c87aef9f3e559_0  |
--------------------------------------------------------------
|   16.0462    | tsh148_seg_3013_2_29_49_f4cb0990a6b4060c_0  |
--------------------------------------------------------------
|   14.5568    |   tsh_148_3013_5_13_47_3d5008d792408f81_0   |
--------------------------------------------------------------

綜上所述,通過自訂Extractor,便可在SQL語句上分布式地處理多個OSS上的語音資料檔案。同樣的方法,也可以方便地利用MaxCompute的大規模計算能力,完成對映像,視頻等各種類型非結構化資料的處理。

資料的分區

在前面的例子中,一個外部表格關聯的資料通過LOCATION上指定的OSS目錄來實現,而在處理的時候,MaxCompute是讀取目錄下的所有資料, 包括子目錄中的所有檔案。在資料量比較大,尤其是對於隨著時間不斷積累的資料目錄,對全目錄掃描可能帶來不必要的I/O以及資料處理時間。 解決這個問題通常有兩種做法:
  • 直接的方法:您對資料存放地址做好規劃,考慮使用多個EXTERNAL TABLE來描述不同部分的資料,讓每個EXTERNALTABLE的LOCATION指向資料的一個子集。
  • 資料分區方法:EXTERNAL TABLE與內部表一樣,支援分區表的功能,可以通過這個功能來對資料做系統化的管理。
本章節主要介紹EXTERNAL TABLE的資料分割函數。
  • 分區資料在OSS上的標準組織方式和路徑格式
    與MaxCompute內部表不同,對於存放在外部儲存上(如OSS)上面的資料,MaxComput沒有資料的管理權,因此如果需要使用分區表功能,在OSS上資料檔案的存放路徑必須符合一定的格式,路徑格式如下:
    partitionKey1=value1\partitionKey2=value2\...
    情境樣本
    將每天產生的LOG檔案存放在OSS上,並需要通過MaxCompute進行資料處理,資料處理時需按照粒度為“天”來訪問一部分資料。假設這些LOG檔案為CSV格式且可以用內建extractor訪問(複雜自訂格式用法也類似),那麼 外部分區表定義資料如下:
    CREATE EXTERNAL TABLE log_table_external (
        click STRING,
        ip STRING,
        url STRING,
      )
      PARTITIONED BY (
        year STRING,
        month STRING,
        day STRING
      )
      STORED BY 'com.aliyun.odps.CsvStorageHandler'
      WITH SERDEPROPERTIES (
     'odps.properties.rolearn'='acs:ram::xxxxx:role/aliyunodpsdefaultrole'
    ) 
      LOCATION 'oss://oss-cn-hangzhou-zmf.aliyuncs.com/oss-odps-test/log_data/';

    如上建表語句,和前面的例子區別在於定義EXTERNAL TABLE時,通過PARTITIONED BY的文法指定該外部表格為分區表,該例子是一個三層分區分區表,分區的key分別是year,month和day。

    為了讓分區生效,在OSS上儲存資料時需要遵循location的路徑格式。如有效路徑儲存layout:
    osscmd ls oss://oss-odps-test/log_data/
    2017-01-14 08:03:35 128MB Standard oss://oss-odps-test/log_data/year=2016/month=06/day=01/logfile
    2017-01-14 08:04:12 127MB Standard oss://oss-odps-test/log_data/year=2016/month=06/day=01/logfile.1
    2017-01-14 08:05:02 118MB Standard oss://oss-odps-test/log_data/year=2016/month=06/day=02/logfile
    2017-01-14 08:06:45 123MB Standard oss://oss-odps-test/log_data/year=2016/month=07/day=10/logfile
    2017-01-14 08:07:11 115MB Standard oss://oss-odps-test/log_data/year=2016/month=08/day=08/logfile
    ...
    说明 因為資料是離線準備的,即通過osscmd或者其他OSS工具上傳到OSS儲存服務,所以資料路徑格式也在上傳時決定。

    通過ALTER TABLE ADD PARTITIONDDL語句,即可把這些分區資訊引入MaxCompute。

    對應的DDL語句:
    ALTER TABLE log_table_external ADD PARTITION (year = '2016', month = '06', day = '01')
    ALTER TABLE log_table_external ADD PARTITION (year = '2016', month = '06', day = '02')
    ALTER TABLE log_table_external ADD PARTITION (year = '2016', month = '07', day = '10')
    ALTER TABLE log_table_external ADD PARTITION (year = '2016', month = '08', day = '08')
    ...
    说明 以上這些操作與標準的MaxCompute內部表操作一樣,分區的詳情請參見分區。在資料準備好並且PARTITION資訊引入MaxCompute之後,即可通過SQL語句對OSS外表資料的分區進行操作。
    此時分析資料時,可以指定指需分析某天的資料,如只想分析2016年6月1號當天,有多少不同的IP出現在LOG裡面,可以通過如下語句實現。
    SELECT count(distinct(ip)) FROM log_table_external WHERE year = '2016' AND month = '06' AND day = '01';

    該語句對log_table_external這個外表對應的目錄,將只訪問log_data/year=2016/month=06/day=01子目錄下的檔案(logfile和logfile.1),不會對整個log_data/目錄作全量資料掃描,避免大量無用的I/O操作。

    同樣如果只希望對2016年下半年的資料做分析,則執行如下語句。
    SELECT count(distinct(ip)) FROM log_table_external 
    WHERE year = '2016' AND month > '06';

    只訪問OSS上面儲存的下半年的LOG資料。

  • 分區資料在OSS上的自訂路徑

    如果事先存在OSS上的曆史資料,但是又不是根據partitionKey1=value1\partitionKey2=value2\...路徑格式來組織存放,也需要通過MaxCompute的分區方式來進行訪問計算時,MaxCompute也提供了通過自訂路徑來引入partition的方法。

    假設OSS資料路徑只有簡單的分區值(而無分區key資訊),也就是資料的layout為:
    osscmd ls oss://oss-odps-test/log_data_customized/
    2017-01-14 08:03:35 128MB Standard oss://oss-odps-test/log_data_customized/2016/06/01/logfile
    2017-01-14 08:04:12 127MB Standard oss://oss-odps-test/log_data_customized/2016/06/01/logfile.1
    2017-01-14 08:05:02 118MB Standard oss://oss-odps-test/log_data_customized/2016/06/02/logfile
    2017-01-14 08:06:45 123MB Standard oss://oss-odps-test/log_data_customized/2016/07/10/logfile
    2017-01-14 08:07:11 115MB Standard oss://oss-odps-test/log_data_customized/2016/08/08/logfile
    ...

外部表格建表DDL可參看前面的樣本,同樣在建表語句裡指定好分區key。

不同的子目錄指定到不同的分區,可通過類似如下自訂分區路徑的DDL語句實現。
ALTER TABLE log_table_external ADD PARTITION (year = '2016', month = '06', day = '01')
LOCATION 'oss://oss-cn-hangzhou-zmf.aliyuncs.com/oss-odps-test/log_data_customized/2016/06/01/';

在ADD PARTITION的時候增加了LOCATION資訊,從而實現自訂分區資料路徑後,即使資料存放不符合推薦的partitionKey1=value1\partitionKey2=value2\...格式,也能正確的實現對子目錄資料的分區訪問了。