功能介紹
對象儲存(Object Storage Service,簡稱OSS) 是基於阿里雲飛天分布式系統的海量、安全和高可靠的雲端儲存體服務,是一種面向互連網的大規模、低成本、通用儲存,提供RESTful API,具備容量和處理的彈性擴充能力。OSS不僅非常適合儲存海量的媒體檔案,也適合作為資料倉儲儲存海量的資料檔案。目前Hadoop 3.0已經支援OSS,在EMR上運行Spark/Hive/Presto等服務以及阿里自研的MaxCompute、HybridDB以及新上線的Data Lake Analytics都支援從OSS直接處理數據。
然而,目前OSS提供的GetObject介面決定了大數據平台只能把OSS數據全部下載到本地然後進行分析過濾,在很多查詢場景下浪費了大量頻寬和客戶端資源。
SelectObject介面是對上述問題的解決方案。其核心思想是大數據平台將條件、Projection下推到OSS層,讓OSS做基本的過濾,從而只返回有用的數據。客戶端一方面可以減少網路頻寬,另一方面也減少了數據的處理量,從而節省了CPU和記憶體用來做其他更多的事情。這使得基於OSS的資料倉儲、資料分析成為一種更有吸引力的選擇。
SelectObject現在處於公測階段,提供了Java、Python 的SDK。目前支援RFC 4180標準的CSV(包括TSV等類CSV檔案,檔案的行資料行分隔符號以及Quote字元都可自訂),且檔案編碼為UTF-8。支援標準儲存類型和低頻訪問儲存類型的檔案。支援加密檔案(OSS完全託管、KMS加密-預設KMS主要金鑰)。
支援的SQL文法如下:
- SQL 陳述式: Select From Where
- 資料類型:String, Int(64bit), float(64bit), Timestamp, Boolean
- 操作: 邏輯條件(AND,OR,NOT), 算術運算式(+-*/%), 比較操作(>,=, <, >=, <=, !=),String 操作 (LIKE, || )
和GetObject提供了基於Byte的分區下載類似,SelectObject也提供了分區查詢的機制,包括兩種分區方式:按行分區和按Split分區。按行分區是常用的分區方式,然而對於稀疏數據來說,按行分區可能會導致分區時負載不均衡。Split是OSS用於分區的一個概念,一個Split包含多行數據,每個Split的數據大小大致相等,相對按行來,按Spit是更加高效的分區方式。尤其是對於CSV數據來說,基於Byte的分區可能會將數據破壞,因此按Spit分區更加合適。
關於資料類型,OSS中的CSV數據預設都是String類型,用戶可以使用CAST函數實現數據轉換,比如下面的SQL查詢將_1和_2轉換為int後進行比較。
Select * from OSSOBject where cast (_1 as int) > cast(_2 as int)
同時,對於SelectObject支援在Where條件中進行隱式的轉換,比如下面的語句中第一列和第二列將被轉換成int:
Select _1 from ossobject where _1 + _2 > 100
RESTful API使用說明
對目標CSV檔案執行SQL語句,返回執行結果。同時該命令會自動儲存CSV檔案的metadata資訊,比如總的行數和列數等。
正確執行時,該API返回206。如果SQL語句不正確,或者和CSV檔案不匹配,則會返回400錯誤。
POST /object?x-oss-process=csv/select HTTP/1.1
HOST: BucketName.oss-cn-hangzhou.aliyuncs.com
Date: time GMT
Content-Length: ContentLength
Content-MD5: MD5Value
Authorization: Signature
<?xml version="1.0" encoding="UTF-8"?>
<SelectRequest>
<Expression>base64 encode(Select * from OSSObject where ...)</Expression>
<InputSerialization>
<CompressionType>None</CompressionType>
<CSV>
<FileHeaderInfo>NONE|IGNORE|USE</FileHeaderInfo>
<RecordDelimiter>base64 encode</RecordDelimiter>
<FieldDelimiter>base64 encode</FieldDelimiter>
<QuoteCharacter>base64 encode</QuoteCharacter>
<CommentCharacter>base64 encode</CommentCharacter>
<Range>line-range=start-end|split-range=start-end</Range>
</CSV>
</InputSerialization>
<OutputSerialization>
<CSV>
<RecordDelimiter>base64 encode</RecordDelimiter>
<FieldDelimiter>base64 encode</FieldDelimiter>
<KeepAllColumns>false|true</KeepAllColumns>
</CSV>
<OutputRawData>false|true</OutputRawData>
</OutputSerialization>
</SelectRequest>
名稱 | 類型 | 描述 |
---|---|---|
SelectRequest | 容器 | 保存Select請求的容器 子節點:Expression, InputSerialization, OutputSerialization 父節點:None |
Expression | 字元串 | 以Base 64 編碼的SQL語句 子節點:None 父節點:SelectRequest |
InputSerialization | 容器 | 輸入序列化參數(可選) 子節點:CompressionType, CSV 父節點:SelectRequest |
OutputSerialization | 容器 | 輸出序列化參數(可選) 子節點:CSV,OutputRawData 父節點:SelectRequest |
CSV(InputSerialization) | 容器 | 輸入CSV的格式參數(可選) 子節點:FileHeaderInfo,RecordDelimiter,FieldDelimiter,QuoteCharacter,CommentCharacter, Range 父節點:InputSerialization |
CSV(OutputSerialization) | 容器 | 輸出CSV的格式參數(可選) 子節點: RecordDelimiter, FieldDelimiter,KeepAllColumns 父節點:OutputSerialization |
OutputRawData | bool,預設false | 指定輸出數據為純數據(不是下面提到的基於Frame格式)(可選) 子節點:None 父節點:OutputSerialization |
CompressionType | 枚舉 | 指定檔案壓縮類型。目前不支援任何壓縮,故只能為None 子節點: None 父節點:InputSerialization |
FileHeaderInfo | 枚舉 | 指定CSV檔案頭資訊(可選)
取值:
子節點:None 父節點:CSV (輸入) |
RecordDelimiter | 字元串 | 指定CSV分行符號,以Base64編碼。預設值為\n(可選)。未編碼前的值最多為兩個字元,以字元的ANSI值表示,比如在Java裡用\n表示換行。 子節點:None 父節點:CSV (輸入、輸出) |
FieldDelimiter | 字元串 | 指定CSV資料行分隔符號,以Base64編碼。預設值為,(可選) 未編碼前的值必須為一個字元,以字元的ANSI值表示,比如Java裡用,表示逗號。 子節點:None 父節點:CSV (輸入,輸出) |
QuoteCharacter | 字元串 | 指定CSV的引號字元,以Base64編碼。預設值為\”(可選)。在CSV中引號內的分行符號,資料行分隔符號將被視作一般字元。為編碼前的值必須為一個字元,以字元的ANSI值表示,比如Java裡用\”表示引號。 子節點:None 父節點:CSV (輸入) |
CommentCharacter | 字元串 | 指定CSV的注釋符,以Base464編碼。預設值為#(可選) |
Range | 字元串 |
指定查詢檔案的範圍(可選)。支援兩種格式:
其中start和end均為inclusive。其格式和range get中的range參數一致。 子節點:None 父節點:CSV (輸入) |
KeepAllColumns | bool | 指定返回結果中包含CSV所有列的位置(可選,預設值為false)。但僅僅在select 語句裡出現的列會有值,不出現的列則為空,返回結果中每一行的數據按照CSV列的順序從低到高排列。比如下面語句:
如果KeepAllColumn = true,假設一共有6列數據,則返回的數據如下: Value of 1st column,,,,Value of 5th column,\n 子節點:None 父節點:CSV(輸出) |
請求結果以一個個Frame形式返回。每個Frame的格式如下,其中checksum均為CRC32:
Frame-Type | Payload Length | Header Checksum | Payload | Payload Checksum
<---4 bytes--><---4 bytes----------><-------4 bytes-------><variable><----4bytes---------->
一共有三種不同的Frame Type, 列舉如下:
名稱 | Frame-Type值 | Payload格式 | 描述 |
---|---|---|---|
Data Frame | version | 8388609 <--1 byte><--3 bytes> |
scanned size | data <-8 bytes----------><---variable-> 其中scanned size為目前已掃描過的數據大小,data為查詢返回的數據。 |
Data Frame用以返回查詢數據,並同時可以彙報當前的進展。 |
Continuous Frame | version | 8388612 <--1 byte><--3 bytes-> |
scanned size <----8 bytes--> |
Continuous Frame用以彙報當前進展以及維持http串連。如果該查詢在5s內未返回數據則會返回一個Continuous Frame。 |
End Frame | version | 8388613 |
Offset | total scanned bytes | http status code | error message <--8bytes-><--8bytes--------------><----4 bytes--------><-variable------> 其中offset為掃描後最終的位置位移,total scanned bytes為最終掃描過的數據大小。http status code為最終的處理結果,error message為錯誤資訊。 |
這裡返回status code的原因在於SelectObject為串流,因而在發送Response Header的時候僅僅處理了第一個Block。如果第一個Block數據和SQL是匹配的,則在Response Header中的Status為206,但如果下面的數據非法,我們已無法更改Header中的Status,只能在End Frame裡包含最終的Status及其出錯資訊。因此客戶端應該視其為最終狀態。 |
POST /oss-select/bigcsv_normal.csv?x-oss-process=csv%2Fselect HTTP/1.1
Date: Fri, 25 May 2018 22:11:39 GMT
Content-Type:
Authorization: OSS LTAIJPXxMLocA0fD:FC/9JRbBGRw4o2QqdaL246Pxuvk=
User-Agent: aliyun-sdk-dotnet/2.8.0.0(windows 16.7/16.7.0.0/x86;4.0.30319.42000)
Content-Length: 748
Expect: 100-continue
Connection: keep-alive
Host: host name
<?xml version="1.0"?>
<SelectRequest>
<Expression>c2VsZWN0IGNvdW50KCopIGZyb20gb3Nzb2JqZWN0IHdoZXJlIF80ID4gNDU=
</Expression>
<InputSerialization>
<Compression>None</Compression>
<CSV>
<FileHeaderInfo>Ignore</FileHeaderInfo>
<RecordDelimiter>Cg==</RecordDelimiter>
<FieldDelimiter>LA==</FieldDelimiter>
<QuoteCharacter>Ig==</QuoteCharacter>
<Comments>Iw==</Comments>
</CSV>
</InputSerialization>
<OutputSerialization>
<CSV>
<RecordDelimiter>Cg==</RecordDelimiter>
<FieldDelimiter>LA==</FieldDelimiter>
<QuoteCharacter>Ig==</QuoteCharacter>
<KeepAllColumns>false</KeepAllColumns>
</CSV>
<OutputRawData>false</OutputRawData>
</OutputSerialization>
</SelectRequest>
SELECT select-list from OSSObject where_opt limit_opt
其中SELECT, OSSOBJECT以及 WHERE為關鍵字不得更改。
select_list: column name
| column index (比如_1, _2)
| function(column index | column name)
| select_list AS alias
支援的function為AVG,SUM,MAX,MIN,COUNT, CAST(類型轉換函式)。其中COUNT後只能用*。
Where_opt:
| WHERE expr
expr:
| literal value
| column name
| column index
| expr op expr
| expr OR expr
| expr AND expr
| expr IS NULL
| expr IS NOT NULL
| expr IN (value1, value2,….)
| expr NOT in (value1, value2,…)
| expr between value1 and value2
| NOT (expr)
| expr op expr
| (expr)
| cast (column index or column name or literal as INT|DOUBLE|DATETIME)
op:包括 > < >= <= != =, LIKE,+-*/%以及字元串串連||。
cast: 對於同一個column,只能cast成一種類型。
limit_opt:
| limit 整數
彙總和Limit的混用
Select avg(cast(_1 as int)) from ossobject limit 100
對於上面的語句,其含義是指在前100行中計算第一列的AVG值。這個行為和MY SQL不同,原因是在 SelectObject中彙總永遠只返回一行數據,因而對彙總來說限制其輸出規模是多餘的。因此SelectObject裡limit 將先於彙總函式執行。
- 目前僅僅支援UTF-8編碼的文字檔。GZIP壓縮過的文本將在以後版本中支援,目前只能處理未壓縮檔。
- 僅支援單檔案查詢,不支援join, order by, group by, having
- Where語句裡不能包含彙總條件(e.g. where max(cast(age as int)) > 100這個是不允許的)。
- 支援的最大的列數是1000,SQL中最大的列名稱為1024。
- 在LIKE語句中,支援最多5個%萬用字元。*和%是等價的,表示0或多個任一字元。
- 在IN語句中,最多支援1024個常量項。
- Select後的Projection可以是列名,列索引(_1, _2等),或者是彙總函式,或者是CAST函數;不支援其他運算式。 比如select _1 + _2 from ossobject是不允許的。
- 支援的最大行及最大列長度是都是256K。
CREATE SELECT OBJECT META
Create Select Object Meta API作用獲得目標CSV檔案的總的行數,總的列個數,以及Splits個數。如果該資訊不存在,則會掃描整個檔案分析並記錄下CSV檔案的上述資訊。如果該API執行正確,返回200。否則如果目標CSV檔案為非法、或者指定的分隔符號和目標CSV不匹配,則返回400。
名稱 | 類型 | 描述 |
---|---|---|
CsvMetaRequest | 容器 | 保存建立Select Meta請求的容器。 子節點:Expression, InputSerialization, OutputSerialization 父節點:None |
InputSerialization | 容器 | 輸入序列化參數(可選) 子節點:CompressionType, CSV 父節點:CsvMetaRequest |
OverwriteIfExists | bool | 重新計算SelectMeta,覆蓋已有數據。(可選,預設是false,即如果Select Meta已存在則直接返回) 子節點: None 父節點:CsvMetaRequest |
CompressionType | 枚舉 | 指定檔案壓縮類型。目前不支援任何壓縮,故只能為None 子節點: None 父節點:InputSerialization |
RecordDelimiter | 字元串 | 指定CSV分行符號,以Base64編碼。預設值為’\n’(可選)。未編碼前的值最多為兩個字元,以字元的ANSI值表示,比如在Java裡用‘\n’表示換行。 子節點:None 父節點:CSV |
FieldDelimiter | 字元串 | 指定CSV資料行分隔符號,以Base64編碼。預設值為,(可選) 未編碼前的值必須為一個字元,以字元的ANSI值表示,比如Java裡用,表示逗號。 子節點:None 父節點:CSV (輸入,輸出) |
QuoteCharacter | 字元串 | 指定CSV的引號字元,以Base64編碼。預設值為\”(可選)。在CSV中引號內的分行符號,資料行分隔符號將被視作一般字元。為編碼前的值必須為一個字元,以字元的ANSI值表示,比如Java裡用\”表示引號。 子節點:None 父節點:CSV (輸入) |
CSV | 容器 | 指定CSV輸入格式 子節點:RecordDelimiter,FieldDelimiter,QuoteCharacter 父節點:InputSerialization |
Response Body:空
- x-oss-select-csv-lines: 總行數
- x-oss-select-csv-columns: 總列數
- x-oss-select-csv-splits: 總Splits數
- content-length:檔案內容length
Note | |
x-oss-select-csv-columns是指第一行的列數,假設用戶第一行的數據是正確的。 |
POST /oss-select/bigcsv_normal.csv?x-oss-process=csv%2Fmeta HTTP/1.1
Date: Fri, 25 May 2018 23:06:41 GMT
Content-Type:
Authorization: OSS LTAIJPXxMLocA0fD:2WF2l6zozf+hzTj9OSXPDklQCvE=
User-Agent: aliyun-sdk-dotnet/2.8.0.0(windows 16.7/16.7.0.0/x86;4.0.30319.42000)
Content-Length: 309
Expect: 100-continue
Connection: keep-alive
Host: Host
<?xml version="1.0"?>
<CsvMetaRequest>
<InputSerialization>
<CSV>
<RecordDelimiter>Cg==</RecordDelimiter>
<FieldDelimiter>LA==</FieldDelimiter>
<QuoteCharacter>Ig==</QuoteCharacter>
</CSV>
</InputSerialization>
<OverwriteIfExisting>false</OverwriteIfExisting>
</CsvMetaRequest>
HTTP/1.1 200 OK
Server: AliyunOSS
Date: Fri, 25 May 2018 23:06:42 GMT
Content-Type: application/vnd.ms-excel
Content-Length: 0
Connection: close
x-oss-request-id: 5B089702461FB4C07B000C75
x-oss-location: oss-cn-hangzhou-a
x-oss-access-id: LTAIJPXxMLocA0fD
x-oss-sign-type: NormalSign
x-oss-object-name: bigcsv_normal.csv
Accept-Ranges: bytes
ETag: "3E1372A912B4BC86E8A51234AEC0CA0C-400"
Last-Modified: Wed, 09 May 2018 00:22:32 GMT
x-oss-object-type: Multipart
x-oss-bucket-storage-type: standard
x-oss-hash-crc64ecma: 741622077104416154
x-oss-storage-class: Standard
**x-oss-select-csv-rows: 54000049**
**x-oss-select-csv-columns: 4**
**x-oss-select-csv-splits: 960**
Python SDK 樣例
import os
import oss2
def select_call_back(consumed_bytes, total_bytes = None):
print('Consumed Bytes:' + str(consumed_bytes) + '\n')
# 首先初始化AccessKeyId、AccessKeySecret、Endpoint等資訊。
# 通過環境變數獲取,或者把諸如“<yourAccessKeyId>”替換成真實的AccessKeyId等。
#
# 以杭州區域為例,Endpoint可以是:
# http://oss-cn-hangzhou.aliyuncs.com
# https://oss-cn-hangzhou.aliyuncs.com
access_key_id = os.getenv('OSS_TEST_ACCESS_KEY_ID', '<yourAccessKeyId>')
access_key_secret = os.getenv('OSS_TEST_ACCESS_KEY_SECRET', '<yourAccessKeySecret>')
bucket_name = os.getenv('OSS_TEST_BUCKET', '<yourBucket>')
endpoint = os.getenv('OSS_TEST_ENDPOINT', '<yourEndpoint>')
# 建立儲存空間實例,所有檔案相關的方法都需要通過儲存空間實例來調用。
bucket = oss2.Bucket(oss2.Auth(access_key_id, access_key_secret), endpoint, bucket_name)
key = 'python_select.csv'
content = 'Tom Hanks,USA,45\r\n'*1024
filename = 'python_select.csv'
# 上傳檔案
bucket.put_object(key, content)
csv_meta_params = {'CsvHeaderInfo': 'None',
'RecordDelimiter': '\r\n'}
select_csv_params = {'CsvHeaderInfo': 'None',
'RecordDelimiter': '\r\n',
'LineRange': (500, 1000)}
csv_header = bucket.create_select_object_meta(key, csv_meta_params)
print(csv_header.csv_rows)
print(csv_header.csv_splits)
result = bucket.select_object(key, "select * from ossobject where _3 > 44 limit 100000", select_call_back, select_csv_params)
content_got = b''
for chunk in result:
content_got += chunk
print(content_got)
result = bucket.select_object_to_file(key, filename,
"select * from ossobject where _3 > 44 limit 100000", select_call_back, select_csv_params)
bucket.delete_object(key)
Java SDK 樣例
package samples;
import com.aliyun.oss.event.ProgressEvent;
import com.aliyun.oss.event.ProgressListener;
import com.aliyun.oss.model.*;
import com.aliyun.oss.OSS;
import com.aliyun.oss.OSSClientBuilder;
import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.FileOutputStream;
/**
* Examples of create select object metadata and select object.
*
*/
public class SelectObjectSample {
private static String endpoint = "<endpoint, http://oss-cn-hangzhou.aliyuncs.com>";
private static String accessKeyId = "<accessKeyId>";
private static String accessKeySecret = "<accessKeySecret>";
private static String bucketName = "<bucketName>";
private static String key = "<objectKey>";
public static void main(String[] args) throws Exception {
OSS client = new OSSClientBuilder().build(endpoint, accessKeyId, accessKeySecret);
String content = "name,school,company,age\r\n" +
"Lora Francis,School A,Staples Inc,27\r\n" +
"Eleanor Little,School B,\"Conectiv, Inc\",43\r\n" +
"Rosie Hughes,School C,Western Gas Resources Inc,44\r\n" +
"Lawrence Ross,School D,MetLife Inc.,24";
client.putObject(bucketName, key, new ByteArrayInputStream(content.getBytes()));
SelectObjectMetadata selectObjectMetadata = client.createSelectObjectMetadata(
new CreateSelectObjectMetadataRequest(bucketName, key)
.withInputSerialization(
new InputSerialization().withCsvInputFormat(
new CSVFormat().withHeaderInfo(CSVFormat.Header.Use).withRecordDelimiter("\r\n"))));
System.out.println(selectObjectMetadata.getCsvObjectMetadata().getTotalLines());
System.out.println(selectObjectMetadata.getCsvObjectMetadata().getSplits());
SelectObjectRequest selectObjectRequest =
new SelectObjectRequest(bucketName, key)
.withInputSerialization(
new InputSerialization().withCsvInputFormat(
new CSVFormat().withHeaderInfo(CSVFormat.Header.Use).withRecordDelimiter("\r\n")))
.withOutputSerialization(new OutputSerialization().withCsvOutputFormat(new CSVFormat()));
selectObjectRequest.setExpression("select * from ossobject where _4 > 40");
OSSObject ossObject = client.selectObject(selectObjectRequest);
// read object content from ossObject
BufferedOutputStream outputStream = new BufferedOutputStream(new FileOutputStream("result.data"));
byte[] buffer = new byte[1024];
int bytesRead;
while ((bytesRead = ossObject.getObjectContent().read(buffer)) != -1) {
outputStream.write(buffer, 0, bytesRead);
}
outputStream.close();
}
}
最佳實踐
- 調用Create Select Object Meta API獲得該檔案的總的Split數。理想情況下如果該檔案需要用SelectObject,則該API最好在查詢前進行非同步呼叫,這樣可以節省掃描時間。
- 根據客戶端資源情況選擇合適的並發度n,用總的Split數除以並發度n得到每個分區查詢應該包含的Split個數。
- 在請求Body中用諸如split-range=1-20的形式進行分區查詢。
- 如果需要最後可以合并結果。
SelectObject和Normal類型檔案配合性能更佳。Multipart 以及Appendable類型的檔案由於其內部結構差異導致性能較差。