本文將為您介紹如何在MaxCompute上輕鬆訪問OSS的資料。
STS模式授予許可權
- 當MaxCompute和OSS的owner是同一個帳號時,可以直接登入阿里雲帳號後,點擊此處完成一鍵授權。
- 自訂授權。
- 首先需要在RAM中授予MaxCompute訪問OSS的許可權。登入RAM控制台(若MaxCompute和OSS不是同一個帳號,此處需由OSS帳號登入進行授權),通過控制台中的角色管理建立角色 ,角色名稱如AliyunODPSDefaultRole或AliyunODPSRoleForOtherUser。
- 修改角色策略內容設定,如下所示。
--當MaxCompute和OSS的Owner是同一個帳號 { "Statement": [ { "Action": "sts:AssumeRole", "Effect": "Allow", "Principal": { "Service": [ "odps.aliyuncs.com" ] } } ], "Version": "1" } --當MaxCompute和OSS的Owner不是同一個帳號 { "Statement": [ { "Action": "sts:AssumeRole", "Effect": "Allow", "Principal": { "Service": [ "MaxCompute的Owner雲帳號id@odps.aliyuncs.com" ] } } ], "Version": "1" }
- 授予角色訪問OSS必要的許可權AliyunODPSRolePolicy,如下所示。
{ "Version": "1", "Statement": [ { "Action": [ "oss:ListBuckets", "oss:GetObject", "oss:ListObjects", "oss:PutObject", "oss:DeleteObject", "oss:AbortMultipartUpload", "oss:ListParts" ], "Resource": "*", "Effect": "Allow" } ] } --可自訂其他許可權
- 將許可權AliyunODPSRolePolicy授權給該角色。
內建extractor訪問OSS資料
訪問外部資料源時,需要使用者自訂不同的Extractor,同時您也可以使用MaxCompute內建的Extractor,來讀取按照約定格式儲存的OSS資料。只需要建立一個外部表格,便可把這張表作為源表進行查詢。
假設有一份CSV資料存在OSS上,endpoint為oss-cn-shanghai-internal.aliyuncs.com
,bucket為oss-odps-test
,資料檔案的存放路徑為/demo/vehicle.csv。
建立外部表格
CREATE EXTERNAL TABLE IF NOT EXISTS ambulance_data_csv_external
(
vehicleId int,
recordId int,
patientId int,
calls int,
locationLatitute double,
locationLongtitue double,
recordTime string,
direction string
)
STORED BY 'com.aliyun.odps.CsvStorageHandler' -- (1)
WITH SERDEPROPERTIES (
'odps.properties.rolearn'='acs:ram::xxxxx:role/aliyunodpsdefaultrole'
) -- (2)
LOCATION 'oss://oss-cn-shanghai-internal.aliyuncs.com/oss-odps-test/Demo/'; -- (3)(4)
com.aliyun.odps.CsvStorageHandler
是內建的處理CSV格式檔案的StorageHandler
,它定義了如何讀寫CSV檔案。您只需指明這個名字,相關邏輯已經由系統實現。odps.properties.rolearn
中的資訊是RAM中AliyunODPSDefaultRole
的Arn
資訊。您可以通過RAM控制台中的角色詳情擷取。- LOCATION必須指定一個OSS目錄,預設系統會讀取這個目錄下所有的檔案。
- 建議您使用OSS提供的內網網域名稱,否則將產生OSS流量費用。
- 建議您存放OSS資料的地區對應您開通MaxCompute的地區。由於MaxCompute只有在部分地區部署,我們不承諾跨地區的資料連通性。
- OSS的串連格式為
oss://oss-cn-shanghai-internal.aliyuncs.com/Bucket名稱/目錄名稱/
。目錄後不要加檔案名稱,如下的集中用法都是錯誤的:http://oss-odps-test.oss-cn-shanghai-internal.aliyuncs.com/Demo/ -- 不支援http串連 https://oss-odps-test.oss-cn-shanghai-internal.aliyuncs.com/Demo/ -- 不支援https串連 oss://oss-odps-test.oss-cn-shanghai-internal.aliyuncs.com/Demo -- 串連地址錯誤 oss://oss://oss-cn-shanghai-internal.aliyuncs.com/oss-odps-test/Demo/vehicle.csv -- 不必指定檔案名稱
- 外部表格只是在系統中記錄了與OSS目錄的關聯,當Drop這張表時,對應的
LOCATION
資料不會被刪除。
desc extended <table_name>;
在返回的資訊裡,除了跟內部表一樣的基礎資訊外,Extended Info包含外部表格StorageHandler 、Location等資訊。
查詢外部表格
/demo/vehicle.csv
資料如下:
1,1,51,1,46.81006,-92.08174,9/14/2014 0:00,S
1,2,13,1,46.81006,-92.08174,9/14/2014 0:00,NE
1,3,48,1,46.81006,-92.08174,9/14/2014 0:00,NE
1,4,30,1,46.81006,-92.08174,9/14/2014 0:00,W
1,5,47,1,46.81006,-92.08174,9/14/2014 0:00,S
1,6,9,1,46.81006,-92.08174,9/14/2014 0:00,S
1,7,53,1,46.81006,-92.08174,9/14/2014 0:00,N
1,8,63,1,46.81006,-92.08174,9/14/2014 0:00,SW
1,9,4,1,46.81006,-92.08174,9/14/2014 0:00,NE
1,10,31,1,46.81006,-92.08174,9/14/2014 0:00,N
select recordId, patientId, direction from ambulance_data_csv_external where patientId > 25;
+------------+------------+-----------+
| recordId | patientId | direction |
+------------+------------+-----------+
| 1 | 51 | S |
| 3 | 48 | NE |
| 4 | 30 | W |
| 5 | 47 | S |
| 7 | 53 | N |
| 8 | 63 | SW |
| 10 | 31 | N |
+------------+------------+-----------+
自訂Extractor訪問OSS
當OSS中的資料格式比較複雜,內建的Extractor無法滿足需求時,需要自訂Extractor來讀取OSS檔案中的資料。
|
分隔。比如
/demo/SampleData/CustomTxt/AmbulanceData/vehicle.csv
資料如下:
1|1|51|1|46.81006|-92.08174|9/14/2014 0:00|S
1|2|13|1|46.81006|-92.08174|9/14/2014 0:00|NE
1|3|48|1|46.81006|-92.08174|9/14/2014 0:00|NE
1|4|30|1|46.81006|-92.08174|9/14/2014 0:00|W
1|5|47|1|46.81006|-92.08174|9/14/2014 0:00|S
1|6|9|1|46.81006|-92.08174|9/14/2014 0:00|S
1|7|53|1|46.81006|-92.08174|9/14/2014 0:00|N
1|8|63|1|46.81006|-92.08174|9/14/2014 0:00|SW
1|9|4|1|46.81006|-92.08174|9/14/2014 0:00|NE
1|10|31|1|46.81006|-92.08174|9/14/2014 0:00|N
- 定義Extractor
寫一個通用的Extractor,將分隔字元作為參數傳進來,可以處理所有類似格式的text檔案。如下所示:
/** * Text extractor that extract schematized records from formatted plain-text(csv, tsv etc.) **/ public class TextExtractor extends Extractor { private InputStreamSet inputs; private String columnDelimiter; private DataAttributes attributes; private BufferedReader currentReader; private boolean firstRead = true; public TextExtractor() { // default to ",", this can be overwritten if a specific delimiter is provided (via DataAttributes) this.columnDelimiter = ","; } // no particular usage for execution context in this example @Override public void setup(ExecutionContext ctx, InputStreamSet inputs, DataAttributes attributes) { this.inputs = inputs; // inputs 是一個 InputStreamSet,每次調用 next() 返回一個 InputStream,這個 InputStream 可以讀取一個 OSS 檔案的所有內容。 this.attributes = attributes; // check if "delimiter" attribute is supplied via SQL query String columnDelimiter = this.attributes.getValueByKey("delimiter"); //delimiter 通過 DDL 語句傳參。 if ( columnDelimiter != null) { this.columnDelimiter = columnDelimiter; } // note: more properties can be inited from attributes if needed } @Override public Record extract() throws IOException {//extactor() 調用返回一條 Record,代表外部表格中的一條記錄。 String line = readNextLine(); if (line == null) { return null; // 返回 NULL 來表示這個表中已經沒有記錄可讀。 } return textLineToRecord(line); // textLineToRecord 將一行資料按照 delimiter 分割為多個列。 } @Override public void close(){ // no-op } }
textLineToRecord將資料分割的完整實現請參見此處。
定義StorageHandlerStorageHandler作為External Table自訂邏輯的統一入口。package com.aliyun.odps.udf.example.text; public class TextStorageHandler extends OdpsStorageHandler { @Override public Class<? extends Extractor> getExtractorClass() { return TextExtractor.class; } @Override public Class<? extends Outputer> getOutputerClass() { return TextOutputer.class; } }
編譯打包將自訂代碼編譯打包,並上傳到MaxCompute。add jar odps-udf-example.jar;
- 建立External表
與使用內建Extractor相似,首先需要建立一張外部表格,不同的是在指定外部表格訪問資料的時候,需要使用自訂的StorageHandler。
建立外部表格語句如下:CREATE EXTERNAL TABLE IF NOT EXISTS ambulance_data_txt_external ( vehicleId int, recordId int, patientId int, calls int, locationLatitute double, locationLongtitue double, recordTime string, direction string ) STORED BY 'com.aliyun.odps.udf.example.text.TextStorageHandler' --STORED BY 指定自訂 StorageHandler 的類名。 with SERDEPROPERTIES ( 'delimiter'='\\|', --SERDEPROPERITES 可以指定參數,這些參數會通過 DataAttributes 傳遞到 Extractor 代碼中。 'odps.properties.rolearn'='acs:ram::xxxxxxxxxxxxx:role/aliyunodpsdefaultrole' ) LOCATION 'oss://oss-cn-shanghai-internal.aliyuncs.com/oss-odps-test/Demo/SampleData/CustomTxt/AmbulanceData/' USING 'odps-udf-example.jar'; --同時需要指定類定義所在的jar包。
- 查詢外部表格
執行如下SQL語句:
select recordId, patientId, direction from ambulance_data_txt_external where patientId > 25;
自訂Extractor訪問非文字檔資料
在前面我們看到了通過內建與自訂的Extractor可以輕鬆處理儲存在OSS上的CSV等文本資料。接下來以語音資料(wav格式檔案)為例,為您介紹如何通過自訂的Extractor訪問並處理OSS上的非文字檔。
這裡從最終執行的SQL開始,介紹以MaxCompute SQL為入口,處理存放在OSS上的語音檔案的使用方法。
CREATE EXTERNAL TABLE IF NOT EXISTS speech_sentence_snr_external
(
sentence_snr double,
id string
)
STORED BY 'com.aliyun.odps.udf.example.speech.SpeechStorageHandler'
WITH SERDEPROPERTIES (
'mlfFileName'='sm_random_5_utterance.text.label' ,
'speechSampleRateInKHz' = '16'
)
LOCATION 'oss://oss-cn-shanghai-internal.aliyuncs.com/oss-odps-test/dev/SpeechSentenceTest/'
USING 'odps-udf-example.jar,sm_random_5_utterance.text.label';
- 一個語音檔案中的語句信噪比(SNR):sentence_snr。
- 對應語音檔案的名字:id。
建立外部表格後,通過標準的Select語句進行查詢,則會觸發Extractor運行計算。此處便可感受到,在讀取處理OSS資料時,除了可以對文字檔做簡單的還原序列化處理,還可以通過自訂Extractor實現更複雜的資料處理抽取邏輯。比如:在此樣本中,通過自訂的com.aliyun.odps.udf.example.speech.SpeechStorageHandler
中封裝的Extractor,實現了對語音檔案計算平均有效語句信噪比的功能,並將抽取出來的結構化資料直接進行SQL運算(WHERE sentence_snr > 10),最終返回所有信噪比大於10的語音檔案以及對應的信噪比值。
在OSS地址oss://oss-cn-hangzhou-zmf.aliyuncs.com/oss-odps-test/dev/SpeechSentenceTest/
上,儲存了原始的多個WAV格式的語音檔案,MaxCompute架構將讀取該地址上的所有檔案,並在必要的時候進行檔案層級的分區,自動將檔案分配給多個計算節點處理。每個計算節點上的Extractor則負責處理通過InputStreamSet分配給該節點的檔案集。具體的處理邏輯則與使用者單機程式相仿,您不需關心分布計算中的種種細節,按照類單機方式實現其使用者演算法即可。
定製化的SpeechSentenceSnrExtractor
主體邏輯,說明如下。
setup
介面中讀取參數,進行初始化,並且匯入語音處理模型(通過resource引入)。
public SpeechSentenceSnrExtractor(){
this.utteranceLabels = new HashMap<String, UtteranceLabel>();
}
@Override
public void setup(ExecutionContext ctx, InputStreamSet inputs, DataAttributes attributes){
this.inputs = inputs;
this.attributes = attributes;
this.mlfFileName = this.attributes.getValueByKey(MLF_FILE_ATTRIBUTE_KEY);
String sampleRateInKHzStr = this.attributes.getValueByKey(SPEECH_SAMPLE_RATE_KEY);
this.sampleRateInKHz = Double.parseDouble(sampleRateInKHzStr);
try {
// read the speech model file from resource and load the model into memory
BufferedInputStream inputStream = ctx.readResourceFileAsStream(mlfFileName);
loadMlfLabelsFromResource(inputStream);
inputStream.close();
} catch (IOException e) {
throw new RuntimeException("reading model from mlf failed with exception " + e.getMessage());
}
}
Extractor()介面中,實現了對語音檔案的具體讀取和處理邏輯,對讀取的資料根據語音模型進行信噪比的計算,並且將結果填充成[snr, id]格式的Record。
上述樣本對實現進行了簡化,同時也沒有包括涉及語音處理的演算法邏輯,具體實現請參見MaxCompute SDK在開源社區中提供的範例代碼。
@Override
public Record extract() throws IOException {
SourceInputStream inputStream = inputs.next();
if (inputStream == null){
return null;
}
// process one wav file to extract one output record [snr, id]
String fileName = inputStream.getFileName();
fileName = fileName.substring(fileName.lastIndexOf('/') + 1);
logger.info("Processing wav file " + fileName);
String id = fileName.substring(0, fileName.lastIndexOf('.'));
// read speech file into memory buffer
long fileSize = inputStream.getFileSize();
byte[] buffer = new byte[(int)fileSize];
int readSize = inputStream.readToEnd(buffer);
inputStream.close();
// compute the avg sentence snr
double snr = computeSnr(id, buffer, readSize);
// construct output record [snr, id]
Column[] outputColumns = this.attributes.getRecordColumns();
ArrayRecord record = new ArrayRecord(outputColumns);
record.setDouble(0, snr);
record.setString(1, id);
return record;
}
private void loadMlfLabelsFromResource(BufferedInputStream fileInputStream)
throws IOException {
// skipped here
}
// compute the snr of the speech sentence, assuming the input buffer contains the entire content of a wav file
private double computeSnr(String id, byte[] buffer, int validBufferLen){
// computing the snr value for the wav file (supplied as byte buffer array), skipped here
}
select sentence_snr, id
from speech_sentence_snr_external
where sentence_snr > 10.0;
--------------------------------------------------------------
| sentence_snr | id |
--------------------------------------------------------------
| 34.4703 | J310209090013_H02_K03_042 |
--------------------------------------------------------------
| 31.3905 | tsh148_seg_2_3013_3_6_48_80bd359827e24dd7_0 |
--------------------------------------------------------------
| 35.4774 | tsh148_seg_3013_1_31_11_9d7c87aef9f3e559_0 |
--------------------------------------------------------------
| 16.0462 | tsh148_seg_3013_2_29_49_f4cb0990a6b4060c_0 |
--------------------------------------------------------------
| 14.5568 | tsh_148_3013_5_13_47_3d5008d792408f81_0 |
--------------------------------------------------------------
綜上所述,通過自訂Extractor,便可在SQL語句上分布式地處理多個OSS上的語音資料檔案。同樣的方法,也可以方便地利用MaxCompute的大規模計算能力,完成對映像,視頻等各種類型非結構化資料的處理。
資料的分區
- 直接的方法:您對資料存放地址做好規劃,考慮使用多個EXTERNAL TABLE來描述不同部分的資料,讓每個EXTERNALTABLE的LOCATION指向資料的一個子集。
- 資料分區方法:EXTERNAL TABLE與內部表一樣,支援分區表的功能,可以通過這個功能來對資料做系統化的管理。
- 分區資料在OSS上的標準組織方式和路徑格式
與MaxCompute內部表不同,對於存放在外部儲存上(如OSS)上面的資料,MaxComput沒有資料的管理權,因此如果需要使用分區表功能,在OSS上資料檔案的存放路徑必須符合一定的格式,路徑格式如下:
partitionKey1=value1\partitionKey2=value2\...
情境樣本將每天產生的LOG檔案存放在OSS上,並需要通過MaxCompute進行資料處理,資料處理時需按照粒度為“天”來訪問一部分資料。假設這些LOG檔案為CSV格式且可以用內建extractor訪問(複雜自訂格式用法也類似),那麼 外部分區表定義資料如下:CREATE EXTERNAL TABLE log_table_external ( click STRING, ip STRING, url STRING, ) PARTITIONED BY ( year STRING, month STRING, day STRING ) STORED BY 'com.aliyun.odps.CsvStorageHandler' WITH SERDEPROPERTIES ( 'odps.properties.rolearn'='acs:ram::xxxxx:role/aliyunodpsdefaultrole' ) LOCATION 'oss://oss-cn-hangzhou-zmf.aliyuncs.com/oss-odps-test/log_data/';
如上建表語句,和前面的例子區別在於定義EXTERNAL TABLE時,通過PARTITIONED BY的文法指定該外部表格為分區表,該例子是一個三層分區分區表,分區的key分別是year,month和day。
為了讓分區生效,在OSS上儲存資料時需要遵循location的路徑格式。如有效路徑儲存layout:osscmd ls oss://oss-odps-test/log_data/ 2017-01-14 08:03:35 128MB Standard oss://oss-odps-test/log_data/year=2016/month=06/day=01/logfile 2017-01-14 08:04:12 127MB Standard oss://oss-odps-test/log_data/year=2016/month=06/day=01/logfile.1 2017-01-14 08:05:02 118MB Standard oss://oss-odps-test/log_data/year=2016/month=06/day=02/logfile 2017-01-14 08:06:45 123MB Standard oss://oss-odps-test/log_data/year=2016/month=07/day=10/logfile 2017-01-14 08:07:11 115MB Standard oss://oss-odps-test/log_data/year=2016/month=08/day=08/logfile ...
说明 因為資料是離線準備的,即通過osscmd或者其他OSS工具上傳到OSS儲存服務,所以資料路徑格式也在上傳時決定。通過ALTER TABLE ADD PARTITIONDDL語句,即可把這些分區資訊引入MaxCompute。
對應的DDL語句:ALTER TABLE log_table_external ADD PARTITION (year = '2016', month = '06', day = '01') ALTER TABLE log_table_external ADD PARTITION (year = '2016', month = '06', day = '02') ALTER TABLE log_table_external ADD PARTITION (year = '2016', month = '07', day = '10') ALTER TABLE log_table_external ADD PARTITION (year = '2016', month = '08', day = '08') ...
说明 以上這些操作與標準的MaxCompute內部表操作一樣,分區的詳情請參見分區。在資料準備好並且PARTITION資訊引入MaxCompute之後,即可通過SQL語句對OSS外表資料的分區進行操作。此時分析資料時,可以指定指需分析某天的資料,如只想分析2016年6月1號當天,有多少不同的IP出現在LOG裡面,可以通過如下語句實現。SELECT count(distinct(ip)) FROM log_table_external WHERE year = '2016' AND month = '06' AND day = '01';
該語句對log_table_external這個外表對應的目錄,將只訪問
log_data/year=2016/month=06/day=01
子目錄下的檔案(logfile和logfile.1),不會對整個log_data/目錄作全量資料掃描,避免大量無用的I/O操作。同樣如果只希望對2016年下半年的資料做分析,則執行如下語句。SELECT count(distinct(ip)) FROM log_table_external WHERE year = '2016' AND month > '06';
只訪問OSS上面儲存的下半年的LOG資料。
- 分區資料在OSS上的自訂路徑
如果事先存在OSS上的曆史資料,但是又不是根據
partitionKey1=value1\partitionKey2=value2\...
路徑格式來組織存放,也需要通過MaxCompute的分區方式來進行訪問計算時,MaxCompute也提供了通過自訂路徑來引入partition的方法。假設OSS資料路徑只有簡單的分區值(而無分區key資訊),也就是資料的layout為:osscmd ls oss://oss-odps-test/log_data_customized/ 2017-01-14 08:03:35 128MB Standard oss://oss-odps-test/log_data_customized/2016/06/01/logfile 2017-01-14 08:04:12 127MB Standard oss://oss-odps-test/log_data_customized/2016/06/01/logfile.1 2017-01-14 08:05:02 118MB Standard oss://oss-odps-test/log_data_customized/2016/06/02/logfile 2017-01-14 08:06:45 123MB Standard oss://oss-odps-test/log_data_customized/2016/07/10/logfile 2017-01-14 08:07:11 115MB Standard oss://oss-odps-test/log_data_customized/2016/08/08/logfile ...
外部表格建表DDL可參看前面的樣本,同樣在建表語句裡指定好分區key。
ALTER TABLE log_table_external ADD PARTITION (year = '2016', month = '06', day = '01')
LOCATION 'oss://oss-cn-hangzhou-zmf.aliyuncs.com/oss-odps-test/log_data_customized/2016/06/01/';
在ADD PARTITION的時候增加了LOCATION資訊,從而實現自訂分區資料路徑後,即使資料存放不符合推薦的partitionKey1=value1\partitionKey2=value2\...
格式,也能正確的實現對子目錄資料的分區訪問了。