功能介紹

對象儲存(Object Storage Service,簡稱OSS) 是基於阿里雲飛天分布式系統的海量、安全和高可靠的雲端儲存體服務,是一種面向互連網的大規模、低成本、通用儲存,提供RESTful API,具備容量和處理的彈性擴充能力。OSS不僅非常適合儲存海量的媒體檔案,也適合作為資料倉儲儲存海量的資料檔案。目前Hadoop 3.0已經支援OSS,在EMR上運行Spark/Hive/Presto等服務以及阿里自研的MaxCompute、HybridDB以及新上線的Data Lake Analytics都支援從OSS直接處理數據。

然而,目前OSS提供的GetObject介面決定了大數據平台只能把OSS數據全部下載到本地然後進行分析過濾,在很多查詢場景下浪費了大量頻寬和客戶端資源。

SelectObject介面是對上述問題的解決方案。其核心思想是大數據平台將條件、Projection下推到OSS層,讓OSS做基本的過濾,從而只返回有用的數據。客戶端一方面可以減少網路頻寬,另一方面也減少了數據的處理量,從而節省了CPU和記憶體用來做其他更多的事情。這使得基於OSS的資料倉儲、資料分析成為一種更有吸引力的選擇。

SelectObject現在處於公測階段,提供了Java、Python 的SDK。目前支援RFC 4180標準的CSV(包括TSV等類CSV檔案,檔案的行資料行分隔符號以及Quote字元都可自訂),且檔案編碼為UTF-8。支援標準儲存類型和低頻訪問儲存類型的檔案。支援加密檔案(OSS完全託管、KMS加密-預設KMS主要金鑰)。

支援的SQL文法如下:

  • SQL 陳述式: Select From Where
  • 資料類型:String, Int(64bit), float(64bit), Timestamp, Boolean
  • 操作: 邏輯條件(AND,OR,NOT), 算術運算式(+-*/%), 比較操作(>,=, <, >=, <=, !=),String 操作 (LIKE, || )

和GetObject提供了基於Byte的分區下載類似,SelectObject也提供了分區查詢的機制,包括兩種分區方式:按行分區和按Split分區。按行分區是常用的分區方式,然而對於稀疏數據來說,按行分區可能會導致分區時負載不均衡。Split是OSS用於分區的一個概念,一個Split包含多行數據,每個Split的數據大小大致相等,相對按行來,按Spit是更加高效的分區方式。尤其是對於CSV數據來說,基於Byte的分區可能會將數據破壞,因此按Spit分區更加合適。

關於資料類型,OSS中的CSV數據預設都是String類型,用戶可以使用CAST函數實現數據轉換,比如下面的SQL查詢將_1和_2轉換為int後進行比較。

Select * from OSSOBject where cast (_1 as int) > cast(_2 as int)

同時,對於SelectObject支援在Where條件中進行隱式的轉換,比如下面的語句中第一列和第二列將被轉換成int:

Select _1 from ossobject where _1 + _2 > 100

RESTful API使用說明

對目標CSV檔案執行SQL語句,返回執行結果。同時該命令會自動儲存CSV檔案的metadata資訊,比如總的行數和列數等。

正確執行時,該API返回206。如果SQL語句不正確,或者和CSV檔案不匹配,則會返回400錯誤。

請求文法
POST /object?x-oss-process=csv/select HTTP/1.1 
HOST: BucketName.oss-cn-hangzhou.aliyuncs.com 
Date: time GMT
Content-Length: ContentLength
Content-MD5: MD5Value 
Authorization: Signature

<?xml  version="1.0"  encoding="UTF-8"?>
<SelectRequest>
	<Expression>base64 encode(Select * from OSSObject where ...)</Expression>
	<InputSerialization>
		<CompressionType>None</CompressionType>
		<CSV>
			<FileHeaderInfo>NONE|IGNORE|USE</FileHeaderInfo>
			<RecordDelimiter>base64 encode</RecordDelimiter>
			<FieldDelimiter>base64 encode</FieldDelimiter>
			<QuoteCharacter>base64 encode</QuoteCharacter>
			<CommentCharacter>base64 encode</CommentCharacter>
			<Range>line-range=start-end|split-range=start-end</Range>
		</CSV>
  </InputSerialization>
  <OutputSerialization>
        <CSV>
			<RecordDelimiter>base64 encode</RecordDelimiter>
			<FieldDelimiter>base64 encode</FieldDelimiter>
			<KeepAllColumns>false|true</KeepAllColumns>
		</CSV>
	<OutputRawData>false|true</OutputRawData>
  </OutputSerialization>
</SelectRequest>
名稱 類型 描述
SelectRequest 容器

保存Select請求的容器

子節點:Expression, InputSerialization, OutputSerialization

父節點:None

Expression 字元串

以Base 64 編碼的SQL語句

子節點:None

父節點:SelectRequest

InputSerialization 容器

輸入序列化參數(可選)

子節點:CompressionType, CSV

父節點:SelectRequest

OutputSerialization 容器

輸出序列化參數(可選)

子節點:CSV,OutputRawData

父節點:SelectRequest

CSV(InputSerialization) 容器

輸入CSV的格式參數(可選)

子節點:FileHeaderInfo,RecordDelimiter,FieldDelimiter,QuoteCharacter,CommentCharacter, Range

父節點:InputSerialization

CSV(OutputSerialization) 容器

輸出CSV的格式參數(可選)

子節點: RecordDelimiter, FieldDelimiter,KeepAllColumns

父節點:OutputSerialization

OutputRawData bool,預設false

指定輸出數據為純數據(不是下面提到的基於Frame格式)(可選)

子節點:None

父節點:OutputSerialization

CompressionType 枚舉

指定檔案壓縮類型。目前不支援任何壓縮,故只能為None

子節點: None

父節點:InputSerialization

FileHeaderInfo 枚舉

指定CSV檔案頭資訊(可選)

取值:
  • Use:該CSV檔案有頭資訊,可以用CSV列名作為Select裡的列名。
  • Ignore:該CSV檔案有頭資訊,但不可用CSV列名作為Select裡的列名。
  • None:該檔案沒有頭資訊,為預設值。

子節點:None

父節點:CSV (輸入)

RecordDelimiter 字元串

指定CSV分行符號,以Base64編碼。預設值為\n(可選)。未編碼前的值最多為兩個字元,以字元的ANSI值表示,比如在Java裡用\n表示換行。

子節點:None

父節點:CSV (輸入、輸出)

FieldDelimiter 字元串

指定CSV資料行分隔符號,以Base64編碼。預設值為(可選)

未編碼前的值必須為一個字元,以字元的ANSI值表示,比如Java裡用表示逗號。

子節點:None

父節點:CSV (輸入,輸出)

QuoteCharacter 字元串

指定CSV的引號字元,以Base64編碼。預設值為\”(可選)。在CSV中引號內的分行符號,資料行分隔符號將被視作一般字元。為編碼前的值必須為一個字元,以字元的ANSI值表示,比如Java裡用\”表示引號。

子節點:None

父節點:CSV (輸入)

CommentCharacter 字元串 指定CSV的注釋符,以Base464編碼。預設值為#(可選)
Range 字元串
指定查詢檔案的範圍(可選)。支援兩種格式:
  • 按行查詢:line-range=start-end
  • 按Split查詢:split-range=start-end

其中start和end均為inclusive。其格式和range get中的range參數一致。

子節點:None

父節點:CSV (輸入)

KeepAllColumns bool

指定返回結果中包含CSV所有列的位置(可選,預設值為false)。但僅僅在select 語句裡出現的列會有值,不出現的列則為空,返回結果中每一行的數據按照CSV列的順序從低到高排列。比如下面語句:

select _5, _1 from ossobject.

如果KeepAllColumn = true,假設一共有6列數據,則返回的數據如下:

Value of 1st column,,,,Value of 5th column,\n

子節點:None

父節點:CSV(輸出)

返回結果

請求結果以一個個Frame形式返回。每個Frame的格式如下,其中checksum均為CRC32:

Frame-Type | Payload Length | Header Checksum | Payload | Payload Checksum

<---4 bytes--><---4 bytes----------><-------4 bytes-------><variable><----4bytes---------->

一共有三種不同的Frame Type, 列舉如下:

名稱 Frame-Type值 Payload格式 描述
Data Frame

version | 8388609

<--1 byte><--3 bytes>

scanned size  |  data

<-8 bytes----------><---variable->

其中scanned size為目前已掃描過的數據大小,data為查詢返回的數據。

Data Frame用以返回查詢數據,並同時可以彙報當前的進展。
Continuous Frame

version  | 8388612

<--1 byte><--3 bytes->

scanned size

<----8 bytes-->

Continuous Frame用以彙報當前進展以及維持http串連。如果該查詢在5s內未返回數據則會返回一個Continuous Frame。
End Frame

version  | 8388613

Offset  | total scanned bytes | http status code | error message

<--8bytes-><--8bytes--------------><----4 bytes--------><-variable------>

其中offset為掃描後最終的位置位移,total scanned bytes為最終掃描過的數據大小。http status code為最終的處理結果,error message為錯誤資訊。

這裡返回status code的原因在於SelectObject為串流,因而在發送Response Header的時候僅僅處理了第一個Block。如果第一個Block數據和SQL是匹配的,則在Response Header中的Status為206,但如果下面的數據非法,我們已無法更改Header中的Status,只能在End Frame裡包含最終的Status及其出錯資訊。因此客戶端應該視其為最終狀態。
樣例請求
POST /oss-select/bigcsv_normal.csv?x-oss-process=csv%2Fselect HTTP/1.1
Date: Fri, 25 May 2018 22:11:39 GMT
Content-Type:
Authorization: OSS LTAIJPXxMLocA0fD:FC/9JRbBGRw4o2QqdaL246Pxuvk=
User-Agent: aliyun-sdk-dotnet/2.8.0.0(windows 16.7/16.7.0.0/x86;4.0.30319.42000)
Content-Length: 748
Expect: 100-continue
Connection: keep-alive
Host: host name

<?xml version="1.0"?>
<SelectRequest>
	<Expression>c2VsZWN0IGNvdW50KCopIGZyb20gb3Nzb2JqZWN0IHdoZXJlIF80ID4gNDU=
	</Expression>
	<InputSerialization>
		<Compression>None</Compression>
		<CSV>
			<FileHeaderInfo>Ignore</FileHeaderInfo>
			<RecordDelimiter>Cg==</RecordDelimiter>
			<FieldDelimiter>LA==</FieldDelimiter>
			<QuoteCharacter>Ig==</QuoteCharacter>
			<Comments>Iw==</Comments>
		</CSV>
	</InputSerialization>
	<OutputSerialization>
		<CSV>
			<RecordDelimiter>Cg==</RecordDelimiter>
			<FieldDelimiter>LA==</FieldDelimiter>
			<QuoteCharacter>Ig==</QuoteCharacter>
			<KeepAllColumns>false</KeepAllColumns>
	</CSV>
	<OutputRawData>false</OutputRawData>
 </OutputSerialization>
</SelectRequest>
SQL語句Regex

SELECT select-list from OSSObject where_opt limit_opt

其中SELECT, OSSOBJECT以及 WHERE為關鍵字不得更改。

select_list: column name

| column index (比如_1, _2)

| function(column index | column name)

| select_list AS alias

支援的function為AVG,SUM,MAX,MIN,COUNT, CAST(類型轉換函式)。其中COUNT後只能用*。

Where_opt:
| WHERE expr
expr:
| literal value
| column name
| column index
| expr op expr
| expr OR expr
| expr AND expr
| expr IS NULL
| expr IS NOT NULL
| expr IN (value1, value2,….)
| expr NOT in (value1, value2,…)
| expr between value1 and value2
| NOT (expr)
| expr op expr
| (expr)
| cast (column index or column name or literal as INT|DOUBLE|DATETIME)

op:包括 > < >= <= != =, LIKE,+-*/%以及字元串串連||。

cast: 對於同一個column,只能cast成一種類型。

limit_opt:

| limit 整數

彙總和Limit的混用

Select avg(cast(_1 as int)) from ossobject limit 100

對於上面的語句,其含義是指在前100行中計算第一列的AVG值。這個行為和MY SQL不同,原因是在 SelectObject中彙總永遠只返回一行數據,因而對彙總來說限制其輸出規模是多餘的。因此SelectObject裡limit 將先於彙總函式執行。

SQL語句限制
  • 目前僅僅支援UTF-8編碼的文字檔。GZIP壓縮過的文本將在以後版本中支援,目前只能處理未壓縮檔。
  • 僅支援單檔案查詢,不支援join, order by, group by, having
  • Where語句裡不能包含彙總條件(e.g. where max(cast(age as int)) > 100這個是不允許的)。
  • 支援的最大的列數是1000,SQL中最大的列名稱為1024。
  • 在LIKE語句中,支援最多5個%萬用字元。*和%是等價的,表示0或多個任一字元。
  • 在IN語句中,最多支援1024個常量項。
  • Select後的Projection可以是列名,列索引(_1, _2等),或者是彙總函式,或者是CAST函數;不支援其他運算式。 比如select _1 + _2 from ossobject是不允許的。
  • 支援的最大行及最大列長度是都是256K。

CREATE SELECT OBJECT META

Create Select Object Meta API作用獲得目標CSV檔案的總的行數,總的列個數,以及Splits個數。如果該資訊不存在,則會掃描整個檔案分析並記錄下CSV檔案的上述資訊。如果該API執行正確,返回200。否則如果目標CSV檔案為非法、或者指定的分隔符號和目標CSV不匹配,則返回400。

請求元素
名稱 類型 描述
CsvMetaRequest 容器

保存建立Select Meta請求的容器。

子節點:Expression, InputSerialization, OutputSerialization

父節點:None

InputSerialization 容器

輸入序列化參數(可選)

子節點:CompressionType, CSV

父節點:CsvMetaRequest

OverwriteIfExists bool

重新計算SelectMeta,覆蓋已有數據。(可選,預設是false,即如果Select Meta已存在則直接返回)

子節點: None

父節點:CsvMetaRequest

CompressionType 枚舉

指定檔案壓縮類型。目前不支援任何壓縮,故只能為None

子節點: None

父節點:InputSerialization

RecordDelimiter 字元串

指定CSV分行符號,以Base64編碼。預設值為’\n’(可選)。未編碼前的值最多為兩個字元,以字元的ANSI值表示,比如在Java裡用‘\n’表示換行。

子節點:None

父節點:CSV

FieldDelimiter 字元串

指定CSV資料行分隔符號,以Base64編碼。預設值為(可選)

未編碼前的值必須為一個字元,以字元的ANSI值表示,比如Java裡用表示逗號。

子節點:None

父節點:CSV (輸入,輸出)

QuoteCharacter 字元串

指定CSV的引號字元,以Base64編碼。預設值為\”(可選)。在CSV中引號內的分行符號,資料行分隔符號將被視作一般字元。為編碼前的值必須為一個字元,以字元的ANSI值表示,比如Java裡用\”表示引號。

子節點:None

父節點:CSV (輸入)

CSV 容器

指定CSV輸入格式

子節點:RecordDelimiter,FieldDelimiter,QuoteCharacter

父節點:InputSerialization

Response Body:空

Response Header:
  • x-oss-select-csv-lines: 總行數
  • x-oss-select-csv-columns: 總列數
  • x-oss-select-csv-splits: 總Splits數
  • content-length:檔案內容length
Note
x-oss-select-csv-columns是指第一行的列數,假設用戶第一行的數據是正確的。
樣例請求
POST /oss-select/bigcsv_normal.csv?x-oss-process=csv%2Fmeta HTTP/1.1
Date: Fri, 25 May 2018 23:06:41 GMT
Content-Type:
Authorization: OSS LTAIJPXxMLocA0fD:2WF2l6zozf+hzTj9OSXPDklQCvE=
User-Agent: aliyun-sdk-dotnet/2.8.0.0(windows 16.7/16.7.0.0/x86;4.0.30319.42000)
Content-Length: 309
Expect: 100-continue
Connection: keep-alive
Host: Host

<?xml version="1.0"?>
<CsvMetaRequest>
	<InputSerialization>
		<CSV>
			<RecordDelimiter>Cg==</RecordDelimiter>
			<FieldDelimiter>LA==</FieldDelimiter>
			<QuoteCharacter>Ig==</QuoteCharacter>
		</CSV>
	</InputSerialization>
	<OverwriteIfExisting>false</OverwriteIfExisting>
</CsvMetaRequest>
返迴響應
HTTP/1.1 200 OK
Server: AliyunOSS
Date: Fri, 25 May 2018 23:06:42 GMT
Content-Type: application/vnd.ms-excel
Content-Length: 0
Connection: close
x-oss-request-id: 5B089702461FB4C07B000C75
x-oss-location: oss-cn-hangzhou-a
x-oss-access-id: LTAIJPXxMLocA0fD
x-oss-sign-type: NormalSign
x-oss-object-name: bigcsv_normal.csv
Accept-Ranges: bytes
ETag: "3E1372A912B4BC86E8A51234AEC0CA0C-400"
Last-Modified: Wed, 09 May 2018 00:22:32 GMT
x-oss-object-type: Multipart
x-oss-bucket-storage-type: standard
x-oss-hash-crc64ecma: 741622077104416154
x-oss-storage-class: Standard
**x-oss-select-csv-rows: 54000049**
**x-oss-select-csv-columns: 4**
**x-oss-select-csv-splits: 960**

Python SDK 樣例

import os
import oss2

def  select_call_back(consumed_bytes, total_bytes =  None):
	print('Consumed Bytes:'  +  str(consumed_bytes) +  '\n')
	
# 首先初始化AccessKeyId、AccessKeySecret、Endpoint等資訊。
# 通過環境變數獲取,或者把諸如“<yourAccessKeyId>”替換成真實的AccessKeyId等。
#
# 以杭州區域為例,Endpoint可以是:
# http://oss-cn-hangzhou.aliyuncs.com
# https://oss-cn-hangzhou.aliyuncs.com

access_key_id = os.getenv('OSS_TEST_ACCESS_KEY_ID', '<yourAccessKeyId>')
access_key_secret = os.getenv('OSS_TEST_ACCESS_KEY_SECRET', '<yourAccessKeySecret>')
bucket_name = os.getenv('OSS_TEST_BUCKET', '<yourBucket>')
endpoint = os.getenv('OSS_TEST_ENDPOINT', '<yourEndpoint>')

# 建立儲存空間實例,所有檔案相關的方法都需要通過儲存空間實例來調用。
bucket = oss2.Bucket(oss2.Auth(access_key_id, access_key_secret), endpoint, bucket_name)
key =  'python_select.csv'
content =  'Tom Hanks,USA,45\r\n'*1024
filename =  'python_select.csv'
# 上傳檔案
bucket.put_object(key, content)
csv_meta_params = {'CsvHeaderInfo': 'None',
'RecordDelimiter': '\r\n'}
select_csv_params = {'CsvHeaderInfo': 'None',
'RecordDelimiter': '\r\n',
'LineRange': (500, 1000)}

csv_header = bucket.create_select_object_meta(key, csv_meta_params)
print(csv_header.csv_rows)
print(csv_header.csv_splits)
result = bucket.select_object(key, "select * from ossobject where _3 > 44 limit 100000", select_call_back, select_csv_params)
content_got =  b''
for chunk in result:
	content_got += chunk
print(content_got)

result = bucket.select_object_to_file(key, filename,
"select * from ossobject where _3 > 44 limit 100000", select_call_back, select_csv_params)

bucket.delete_object(key)

Java SDK 樣例

package samples;

import com.aliyun.oss.event.ProgressEvent;
import com.aliyun.oss.event.ProgressListener;
import com.aliyun.oss.model.*;
import com.aliyun.oss.OSS;
import com.aliyun.oss.OSSClientBuilder;

import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.FileOutputStream;

/**
 * Examples of create select object metadata and select object.
 *
 */
public class SelectObjectSample {
    private static String endpoint = "<endpoint, http://oss-cn-hangzhou.aliyuncs.com>";
    private static String accessKeyId = "<accessKeyId>";
    private static String accessKeySecret = "<accessKeySecret>";
    private static String bucketName = "<bucketName>";
    private static String key = "<objectKey>";

    public static void main(String[] args) throws Exception {
        OSS client = new OSSClientBuilder().build(endpoint, accessKeyId, accessKeySecret);
        String content = "name,school,company,age\r\n" +
                "Lora Francis,School A,Staples Inc,27\r\n" +
                "Eleanor Little,School B,\"Conectiv, Inc\",43\r\n" +
                "Rosie Hughes,School C,Western Gas Resources Inc,44\r\n" +
                "Lawrence Ross,School D,MetLife Inc.,24";

        client.putObject(bucketName, key, new ByteArrayInputStream(content.getBytes()));

        SelectObjectMetadata selectObjectMetadata = client.createSelectObjectMetadata(
                new CreateSelectObjectMetadataRequest(bucketName, key)
                        .withInputSerialization(
                                new InputSerialization().withCsvInputFormat(
                                        new CSVFormat().withHeaderInfo(CSVFormat.Header.Use).withRecordDelimiter("\r\n"))));
        System.out.println(selectObjectMetadata.getCsvObjectMetadata().getTotalLines());
        System.out.println(selectObjectMetadata.getCsvObjectMetadata().getSplits());

        SelectObjectRequest selectObjectRequest =
                new SelectObjectRequest(bucketName, key)
                        .withInputSerialization(
                                new InputSerialization().withCsvInputFormat(
                                        new CSVFormat().withHeaderInfo(CSVFormat.Header.Use).withRecordDelimiter("\r\n")))
                        .withOutputSerialization(new OutputSerialization().withCsvOutputFormat(new CSVFormat()));
        selectObjectRequest.setExpression("select * from ossobject where _4 > 40");
        OSSObject ossObject = client.selectObject(selectObjectRequest);
        // read object content from ossObject
        BufferedOutputStream outputStream = new BufferedOutputStream(new FileOutputStream("result.data"));
        byte[] buffer = new byte[1024];
        int bytesRead;
        while ((bytesRead = ossObject.getObjectContent().read(buffer)) != -1) {
            outputStream.write(buffer, 0, bytesRead);
        }
        outputStream.close();
    }
}

最佳實踐

當一個檔案很大時,要有效實現分區查詢,推薦的流程如下:
  1. 調用Create Select Object Meta API獲得該檔案的總的Split數。理想情況下如果該檔案需要用SelectObject,則該API最好在查詢前進行非同步呼叫,這樣可以節省掃描時間。
  2. 根據客戶端資源情況選擇合適的並發度n,用總的Split數除以並發度n得到每個分區查詢應該包含的Split個數。
  3. 在請求Body中用諸如split-range=1-20的形式進行分區查詢。
  4. 如果需要最後可以合并結果。

SelectObject和Normal類型檔案配合性能更佳。Multipart 以及Appendable類型的檔案由於其內部結構差異導致性能較差。