SelectObject API を使用して、オブジェクトに対して SQL ステートメントを実行し、その結果を取得できます。
背景情報
Hadoop 3.0 では、E-MapReduce (EMR) 上で Spark や Hive、Presto などのサービスを実行し、Object Storage Service (OSS) に保存されたデータを処理できます。MaxCompute や Data Lake Analytics (DLA) などの Alibaba Cloud サービスも、OSS から直接データを処理することをサポートしています。
従来、ビッグデータプラットフォームは、フィルタリングおよび分析を行う前に、GetObject API を使用して OSS からオブジェクト全体をダウンロードしていました。
SelectObject API は、プロジェクションやフィルターなどのクエリロジックを OSS レイヤーにプッシュダウンすることでこの課題を解決します。これにより、必要なデータのサブセットのみを取得でき、データ転送量を削減し、コストを下げ、クライアント側の CPU およびメモリ負荷を軽減できます。この効率性により、OSS 上でのデータ分析がより魅力的になります。
課金
SelectObject API を使用してデータをクエリする場合、ソースオブジェクトからスキャンされた実際のデータ量に基づいて課金されます。詳細については、「データ処理料金」をご参照ください。
サポートされるオブジェクトフォーマット
SelectObject API は、特定のオブジェクトフォーマットおよび SQL 構文をサポートしています。
郵便番号が「10021」で始まるすべての住所を検索するには、 select s.address.streetAddress from ossobject.contacts[*] s where s.address.postalCode like '10021%' または select s.streetAddress from ossobject.contacts[*].address s where s.postalCode like '10021%' のいずれかの SQL ステートメントを記述できます。
後者のステートメント select s.streetAddress from ossobject.contacts[*].address s where s.postalCode like '10021%' は JSON パスがより正確であるため、パフォーマンスが優れています。
JSON オブジェクト内の高精度浮動小数点数の処理
JSON オブジェクト内の高精度浮動小数点数で計算を行う必要がある場合は、ParseJsonNumberAsString オプションを true に設定し、その後値を decimal 型にキャストすることを推奨します。たとえば、属性 a の値が 123456789.123456789 の場合、select s.a from ossobject s where cast(s.a as decimal) > 123456789.12345 というステートメントを使用して、元のデータの精度を維持できます。
select count(*), max(cast(_3 as int)), min(cast(_3 as int)) from ossobject
実行結果を表示します。
また、ダウンロードをクリックして、選択したコンテンツをローカルに保存することもできます。
OSS SDK
Java、Python、Go 向けの OSS SDK を使用してオブジェクトをクエリできます。
import com.aliyun.oss.ClientBuilderConfiguration;
import com.aliyun.oss.common.comm.SignVersion;
import com.aliyun.oss.model.*;
import com.aliyun.oss.OSS;
import com.aliyun.oss.common.auth.*;
import com.aliyun.oss.OSSClientBuilder;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.InputStreamReader;
/**
* Examples of create select object metadata and select object.
*
*/
public class SelectObjectSample {
// Specify the endpoint of the region in which the bucket is located. For example, if the bucket is located in the China (Hangzhou) region, set the endpoint to https://oss-cn-hangzhou.aliyuncs.com.
private static String endpoint = "https://oss-cn-hangzhou.aliyuncs.com";
// Specify the name of the bucket. Example: examplebucket.
private static String bucketName = "examplebucket";
public static void main(String[] args) throws Exception {
// Obtain access credentials from environment variables. Before you run the sample code, make sure that the OSS_ACCESS_KEY_ID and OSS_ACCESS_KEY_SECRET environment variables are configured.
EnvironmentVariableCredentialsProvider credentialsProvider = CredentialsProviderFactory.newEnvironmentVariableCredentialsProvider();
// Specify the region in which the bucket is located. For example, if the bucket is located in the China (Hangzhou) region, set the region to cn-hangzhou.
String region = "cn-hangzhou";
// Create an OSSClient instance.
// Call the shutdown method to release associated resources when the OSSClient is no longer in use.
ClientBuilderConfiguration clientBuilderConfiguration = new ClientBuilderConfiguration();
clientBuilderConfiguration.setSignatureVersion(SignVersion.V4);
OSS ossClient = OSSClientBuilder.create()
.endpoint(endpoint)
.credentialsProvider(credentialsProvider)
.clientConfiguration(clientBuilderConfiguration)
.region(region)
.build();
// Specify the full path of the object that you want to query, and then query the data of the object by using SELECT statements. Do not include the bucket name in the full path.
// Specify the full path of the CSV object.
selectCsvSample("test.csv", ossClient);
// Specify the full path of the JSON object.
selectJsonSample("test.json", ossClient);
ossClient.shutdown();
}
private static void selectCsvSample(String key, OSS ossClient) throws Exception {
// Specify the content of the object that you want to upload.
String content = "name,school,company,age\r\n" +
"Lora Francis,School A,Staples Inc,27\r\n" +
"Eleanor Little,School B,\"Conectiv, Inc\",43\r\n" +
"Rosie Hughes,School C,Western Gas Resources Inc,44\r\n" +
"Lawrence Ross,School D,MetLife Inc.,24";
ossClient.putObject(bucketName, key, new ByteArrayInputStream(content.getBytes()));
SelectObjectMetadata selectObjectMetadata = ossClient.createSelectObjectMetadata(
new CreateSelectObjectMetadataRequest(bucketName, key)
.withInputSerialization(
new InputSerialization().withCsvInputFormat(
// Specify the delimiter that is used to separate different records in the content. Example: \r\n.
new CSVFormat().withHeaderInfo(CSVFormat.Header.Use).withRecordDelimiter("\r\n"))));
System.out.println(selectObjectMetadata.getCsvObjectMetadata().getTotalLines());
System.out.println(selectObjectMetadata.getCsvObjectMetadata().getSplits());
SelectObjectRequest selectObjectRequest =
new SelectObjectRequest(bucketName, key)
.withInputSerialization(
new InputSerialization().withCsvInputFormat(
new CSVFormat().withHeaderInfo(CSVFormat.Header.Use).withRecordDelimiter("\r\n")))
.withOutputSerialization(new OutputSerialization().withCsvOutputFormat(new CSVFormat()));
// Use a SELECT statement to query all records whose values are greater than 40 in the 4th column.
selectObjectRequest.setExpression("select * from ossobject where _4 > 40");
OSSObject ossObject = ossClient.selectObject(selectObjectRequest);
// Read the content of the object.
BufferedReader reader = new BufferedReader(new InputStreamReader(ossObject.getObjectContent()));
while (true) {
String line = reader.readLine();
if (line == null) {
break;
}
System.out.println(line);
}
reader.close();
ossClient.deleteObject(bucketName, key);
}
private static void selectJsonSample(String key, OSS ossClient) throws Exception {
// Specify the content of the object that you want to upload.
final String content = "{\n" +
"\t\"name\": \"Lora Francis\",\n" +
"\t\"age\": 27,\n" +
"\t\"company\": \"Staples Inc\"\n" +
"}\n" +
"{\n" +
"\t\"name\": \"Eleanor Little\",\n" +
"\t\"age\": 43,\n" +
"\t\"company\": \"Conectiv, Inc\"\n" +
"}\n" +
"{\n" +
"\t\"name\": \"Rosie Hughes\",\n" +
"\t\"age\": 44,\n" +
"\t\"company\": \"Western Gas Resources Inc\"\n" +
"}\n" +
"{\n" +
"\t\"name\": \"Lawrence Ross\",\n" +
"\t\"age\": 24,\n" +
"\t\"company\": \"MetLife Inc.\"\n" +
"}";
ossClient.putObject(bucketName, key, new ByteArrayInputStream(content.getBytes()));
SelectObjectRequest selectObjectRequest =
new SelectObjectRequest(bucketName, key)
.withInputSerialization(new InputSerialization()
.withCompressionType(CompressionType.NONE)
.withJsonInputFormat(new JsonFormat().withJsonType(JsonType.LINES)))
.withOutputSerialization(new OutputSerialization()
.withCrcEnabled(true)
.withJsonOutputFormat(new JsonFormat()))
.withExpression("select * from ossobject as s where s.age > 40"); // Use the SELECT statement to query data in the object.
OSSObject ossObject = ossClient.selectObject(selectObjectRequest);
// Read the content of the object.
BufferedReader reader = new BufferedReader(new InputStreamReader(ossObject.getObjectContent()));
while (true) {
String line = reader.readLine();
if (line == null) {
break;
}
System.out.println(line);
}
reader.close();
ossClient.deleteObject(bucketName, key);
}
}
import oss2
from oss2.credentials import EnvironmentVariableCredentialsProvider
def select_call_back(consumed_bytes, total_bytes = None):
print('Consumed Bytes:' + str(consumed_bytes) + '\n')
# Obtain access credentials from environment variables. Before you run the sample code, make sure that the OSS_ACCESS_KEY_ID and OSS_ACCESS_KEY_SECRET environment variables are configured.
auth = oss2.ProviderAuthV4(EnvironmentVariableCredentialsProvider())
# Specify the endpoint of the region in which the bucket is located. For example, if the bucket is located in the China (Hangzhou) region, set the endpoint to https://oss-cn-hangzhou.aliyuncs.com.
endpoint = "https://oss-cn-hangzhou.aliyuncs.com"
# Specify the ID of the region that maps to the endpoint. Example: cn-hangzhou. This parameter is required if you use the signature algorithm V4.
region = "cn-hangzhou"
# Specify the name of your bucket.
bucket = oss2.Bucket(auth, endpoint, "yourBucketName", region=region)
key ='python_select.csv'
content ='Tom Hanks,USA,45\r\n'*1024
filename ='python_select.csv'
# Upload a CSV file.
bucket.put_object(key, content)
# Configure the parameters for the SelectObject operation.
csv_meta_params = {'RecordDelimiter': '\r\n'}
select_csv_params = {'CsvHeaderInfo': 'None',
'RecordDelimiter': '\r\n',
'LineRange': (500, 1000)}
csv_header = bucket.create_select_object_meta(key, csv_meta_params)
print(csv_header.rows)
print(csv_header.splits)
result = bucket.select_object(key, "select * from ossobject where _3 > 44", select_call_back, select_csv_params)
select_content = result.read()
print(select_content)
result = bucket.select_object_to_file(key, filename,
"select * from ossobject where _3 > 44", select_call_back, select_csv_params)
bucket.delete_object(key)
###JSON DOCUMENT
key = 'python_select.json'
content = "{\"contacts\":[{\"key1\":1,\"key2\":\"hello world1\"},{\"key1\":2,\"key2\":\"hello world2\"}]}"
filename = 'python_select.json'
# Upload a JSON DOCUMENT object.
bucket.put_object(key, content)
select_json_params = {'Json_Type': 'DOCUMENT'}
result = bucket.select_object(key, "select s.key2 from ossobject.contacts[*] s where s.key1 = 1", None, select_json_params)
select_content = result.read()
print(select_content)
result = bucket.select_object_to_file(key, filename,
"select s.key2 from ossobject.contacts[*] s where s.key1 = 1", None, select_json_params)
bucket.delete_object(key)
###JSON LINES
key = 'python_select_lines.json'
content = "{\"key1\":1,\"key2\":\"hello world1\"}\n{\"key1\":2,\"key2\":\"hello world2\"}"
filename = 'python_select.json'
# Upload a JSON LINES object.
bucket.put_object(key, content)
select_json_params = {'Json_Type': 'LINES'}
json_header = bucket.create_select_object_meta(key,select_json_params)
print(json_header.rows)
print(json_header.splits)
result = bucket.select_object(key, "select s.key2 from ossobject s where s.key1 = 1", None, select_json_params)
select_content = result.read()
print(select_content)
result = bucket.select_object_to_file(key, filename,
"select s.key2 from ossobject s where s.key1 = 1", None, select_json_params)
bucket.delete_object(key)
package main
import (
"context"
"flag"
"io"
"log"
"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss"
"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/credentials"
)
// Define global variables.
var (
region string // The storage region.
bucketName string // The bucket name.
objectName string // The object name.
)
// The init function is used to initialize command line parameters.
func init() {
flag.StringVar(®ion, "region", "", "The region in which the bucket is located.")
flag.StringVar(&bucketName, "bucket", "", "The name of the bucket.")
flag.StringVar(&objectName, "object", "", "The name of the object.")
}
func main() {
// Parse command line parameters.
flag.Parse()
// Check whether the bucket name is empty.
if len(bucketName) == 0 {
flag.PrintDefaults()
log.Fatalf("invalid parameters, bucket name required")
}
// Check whether the region is empty.
if len(region) == 0 {
flag.PrintDefaults()
log.Fatalf("invalid parameters, region required")
}
// Check whether the object name is empty.
if len(objectName) == 0 {
flag.PrintDefaults()
log.Fatalf("invalid parameters, object name required")
}
// Load the default configurations and set the credential provider and region.
cfg := oss.LoadDefaultConfig().
WithCredentialsProvider(credentials.NewEnvironmentVariableCredentialsProvider()).
WithRegion(region)
// Create an OSS client.
client := oss.NewClient(cfg)
// Create a request to select an object.
request := &oss.SelectObjectRequest{
Bucket: oss.Ptr(bucketName), // The bucket name.
Key: oss.Ptr(objectName), // The object name.
SelectRequest: &oss.SelectRequest{
Expression: oss.Ptr("select * from ossobject limit 10"), // Define an SQL expression to query the first 10 rows of data in the object.
InputSerializationSelect: oss.InputSerializationSelect{
CsvBodyInput: &oss.CSVSelectInput{
FileHeaderInfo: oss.Ptr("Use"),
},
},
OutputSerializationSelect: oss.OutputSerializationSelect{
OutputHeader: oss.Ptr(true),
},
},
}
// Execute the request to select the object.
result, err := client.SelectObject(context.TODO(), request)
if err != nil {
log.Fatalf("failed to select object %v", err)
}
content, err := io.ReadAll(result.Body)
if err != nil {
log.Fatalf("failed to read object %v", err)
}
// Print the result of selecting the object.
log.Printf("select object result:%#v\n", string(content))
}