SelectObject API は、ログの分析によく使用されます。 また、ビッグデータプロダクトと組み合わせて使用することもできます。 このトピックでは、Python SDK と Java SDK を使用して SelectObject API を呼び出す方法について説明します。

概要

Object Storage Service (OSS) は、Apsara システムに基づく、安全で信頼性の高いクラウドストレージサービスです。 インターネット上に大量のデータを低コストで保存できます。 また、RESTful API、容量と処理能力の自動スケーリングもサポートしています。 OSS は、多数のメディアオブジェクトを保存できるだけでなく、多数のデータオブジェクトを保存するデータウェアハウスとしても機能します。 Hadoop 3.0 は OSS をサポートしています。 Amazon Elastic MapReduce (EMR) で Spark、Hive、Presto などのサービスを実行している場合、または MaxCompute、HybridDB、新たにリリースされた Data Lake Analytics などの Alibaba Cloud サービスを使用している場合、OSS のデータを直接読み取り、処理できます。

ただし、OSS が提供する GetObject API では、ビッグデータプラットフォームのみが OSS の全データをローカルにダウンロードして分析やフィルタリングを行うことができます。 その結果、多くのクエリシナリオでは、大量の帯域幅リソースとクライアントリソースが浪費されます。

この問題を解決するために、OSS では SelectObject API が提供されています。 これにより、ビッグデータプラットフォームではなく OSS が、条件と予測を使用してデータを事前にフィルタリングし、有用なデータのみをビッグデータプラットフォームに返すことができます。 この方法では、クライアントは少ない帯域幅リソースを使用して、少量のデータを処理し、CPU とメモリリソースの使用を最大化できます。 これにより、OSS ベースのデータウェアハウジングとデータ分析が非常に魅力的なオプションになります。

SelectObject API は、Java SDK と Python SDK でサポートされています。今後、他の言語の SDK もサポートされる予定です。 SelectObject API は、UTF-8 でエンコードされた CSV オブジェクトと JSON オブジェクトをサポートします。 CSV オブジェクト (TSV オブジェクトなどの CSV と同じようなオブジェクトを含む) は、RFC 4180 に準拠します。 CSV オブジェクトでは、行と列の区切り文字と引用文字をカスタマイズできます。 SelectObject API は、標準および低頻度アクセス (IA) ストレージクラスのオブジェクトをサポートします。 アーカイブストレージクラスのオブジェクトは、使用する前に復元する必要があります。 SelectObject API は、OSS が完全に管理するサーバー側暗号化方式 (SSE-OSS)、または Key Management Service (KMS) が管理する Customer Master Key (CMK) によるサーバー側暗号化方式 (SSE-KMS) で暗号化されたオブジェクトをサポートします。

SelectObject API は、DOCUMENT および LINES 形式の JSON オブジェクトをサポートします。 JSON DOCUMENT オブジェクトには、オブジェクトが 1 つ含まれます。 JSON LINES オブジェクトは、区切り文字で区切られたオブジェクトの行で構成されます。 ただし、JSON オブジェクト自体は、有効なオブジェクトでない場合があります。 SelectObject API は、\n や \r\n などの一般的な区切り文字をサポートしています。区切り文字を指定する必要はありません。

  • サポートされる SQL 構文
    • SQL 文:SELECT、FROM、WHERE
    • データ型:string、int64、double64、decimal128、timestamp、Boolean
    • 演算:論理条件 (AND、OR、NOT)、算術式 (+、-、×、/、%)、比較演算 (>、=、<、>=、<=、!=)、および文字列操作 (LIKE と||)
  • マルチパートクエリ

    SelectObject API は、GetObject API でサポートされているバイトベースのマルチパートダウンロードに類似したマルチパートクエリをサポートしています。 データは行またはスプリット単位で分割されます。 行単位のデータ分割は、一般的に使用されますが、スパースデータが分割されると負荷が不均衡になる可能性があります。 複数の行を含む各スプリットのサイズはほぼ同じなので、スプリット単位のデータ分割は、より効率的です。

  • データ型

    OSS では、CSV オブジェクトのデータはデフォルトで string 型です。 CAST 関数を使用して、データ型を変換できます。 たとえば、次の SQL 文は、1 列目と 2 列目のデータを integer 型に変換して比較します。

    Select * from OSSOBject where cast (_1 as int) > cast(_2 as int)

    また、SelectObject API を使用すると、WHERE 句のデータ型を暗黙的に変換できます。 たとえば、次の SQL 文は、1 列目と 2 列目のデータを integer 型に変換します。

    Select _1 from ossobject where _1 + _2 > 100

    SQL 文で CAST 関数を使用しない場合、JSON オブジェクトのデータ型は、オブジェクトのデータ型によって決まります。 標準の JSON オブジェクトは、null、Boolean、int64、double、string などのデータ型をサポートします。

Python SDK

import os
import oss2

def select_call_back(consumed_bytes, total_bytes = None):
    print('Consumed Bytes:' + str(consumed_bytes) + '\n')

# Initialize OSS information such as the AccessKey ID, AccessKey Secret, and endpoint.
# Obtain the information through environment variables or replace variables such as <yourAccessKeyId> with actual values.
#
# Use China (Hangzhou) as an example to set the endpoint to one of the following:
# http://oss-cn-hangzhou.aliyuncs.com
# https://oss-cn-hangzhou.aliyuncs.com

access_key_id = os.getenv('OSS_TEST_ACCESS_KEY_ID', '<yourAccessKeyId>')
access_key_secret = os.getenv('OSS_TEST_ACCESS_KEY_SECRET', '<yourAccessKeySecret>')
bucket_name = os.getenv('OSS_TEST_BUCKET', '<yourBucket>')
endpoint = os.getenv('OSS_TEST_ENDPOINT', '<yourEndpoint>')

# Create an OSS bucket. All object-related methods must be called through the bucket.
bucket = oss2. Bucket(oss2. Auth(access_key_id, access_key_secret), endpoint, bucket_name)
key = 'python_select.csv'
content = 'Tom Hanks,USA,45\r\n'*1024
filename = 'python_select.csv'
# Upload a CSV object.
bucket.put_object(key, content)
# Set the parameters of the SelectObject API.
csv_meta_params = {'CsvHeaderInfo': 'None',
'RecordDelimiter': '\r\n'}
select_csv_params = {'CsvHeaderInfo': 'None',
'RecordDelimiter': '\r\n',
'LineRange': (500, 1000)}

csv_header = bucket.create_select_object_meta(key, csv_meta_params)
print(csv_header.rows)
print(csv_header.splits)
result = bucket.select_object(key、 "select * from ossobject where _3> 44"、select_call_back、select_csv_params)
select_content = result.read()
print(select_content)

result = bucket.select_object_to_file(key, filename,
"select * from ossobject where _3 > 44", select_call_back, select_csv_params)
bucket.delete_object(key)

###JSON DOCUMENT
key = 'python_select.json'
content = "{\"contacts\":[{\"key1\":1,\"key2\":\"hello world1\"},{\"key1\":2,\"key2\":\"hello world2\"}]}"
filename = 'python_select.json'
# Upload a JSON DOCUMENT object.
bucket.put_object(key, content)
select_json_params = {'Json_Type': 'DOCUMENT'}
result = bucket.select_object(key, "select s.key2 from ossobject.contacts[*] s where s.key1 = 1", None, select_json_params)
select_content = result.read()
print(select_content)

result = bucket.select_object_to_file(key, filename,
"select s.key2 from ossobject.contacts[*] s where s.key1 = 1", None, select_json_params)

bucket.delete_object(key)
###JSON LINES
key = 'python_select_lines.json'
content = "{\"key1\":1,\"key2\":\"hello world1\"}\n{\"key1\":2,\"key2\":\"hello world2\"}"
filename = 'python_select.json'
# Upload a JSON LINES object.
bucket.put_object(key, content)
select_json_params = {'Json_Type': 'LINES'}
json_header = bucket.create_select_object_meta(key,select_json_params)
print(json_header.rows)
print(json_header.splits)

result = bucket.select_object(key, "select s.key2 from ossobject s where s.key1 = 1", None, select_json_params)
select_content = result.read()
print(select_content)
result = bucket.select_object_to_file(key, filename,
"select s.key2 from ossobject s where s.key1 = 1", None, select_json_params)

bucket.delete_object(key)

Python の SelectObject API

  • select_object
    • 次の例は、select_object 操作のサンプルコードを示しています。
      def select_object(self, key, sql,
                         progress_callback=None,
                         select_params=None                   ):

      上記のサンプルコードでは、指定されたキーを持つオブジェクトに対して SQL 文を実行し、クエリ結果を返します。

      • SQL 文を sql パラメーターの値としてそのまま使用可能です。Base64 でエンコードする必要はありません。
      • progress_callback パラメーターはオプションです。 クエリの進行状況を報告するためのコールバック関数を指定します。
      • select_params は、この API の重要なパラメーターです。 select_object 操作のパラメーターとアクションを指定します。
      • headers パラメーターを使用して、リクエストに含まれるヘッダー情報を指定できます。 ヘッダー情報の機能は、GetObject API の機能と同じです。 たとえば、リクエストヘッダーにバイトフィールドを設定して、SQL 文が CSV オブジェクトでクエリ可能な範囲を指定できます。
    • 次の表に、select_params パラメーターでサポートされるパラメーターを示します。
      名前 説明
      Json_Type
      • このパラメーターを空のままにすると、デフォルトでオブジェクトは CSV オブジェクトになります。
      • このパラメーターに DOCUMENT が設定されている場合、オブジェクトは JSON オブジェクトです。
      • このパラメーターに LINES が設定されている場合、オブジェクトは JSON LINES オブジェクトです。
      CsvHeaderInfo CSV オブジェクトのヘッダー情報。 有効な値:None、Ignore、Use
      • None:このオブジェクトにヘッダー情報が含まれないことを示します。
      • Ignore:このオブジェクトにヘッダー情報が含まれていることを示します。ヘッダー情報は SQL 文では使用されません。
      • Use:このオブジェクトにヘッダー情報が含まれ、ヘッダー情報の列名が SQL 文で使用されることを示します。
      CommentCharacter CSV オブジェクトのコメント文字。 パラメーター値は 1 文字のみです。 デフォルト値:None。コメント文字がないことを示します。
      RecordDelimiter CSV オブジェクトの行区切り文字。 パラメーター値は、1 文字または 2 文字のみです。 デフォルト値:\n
      OutputRecordDelimiter select_object 操作の出力結果の行区切り文字。 デフォルト値:\n
      FieldDelimiter CSV オブジェクトの列区切り文字。 パラメーター値は 1 文字のみです。 デフォルト値:カンマ (,)。
      OutputFieldDelimiter select_object 操作の出力結果の列区切り文字。 デフォルト値:カンマ (,)。
      QuoteCharacter CSV オブジェクトの列の引用文字。 パラメーター値は 1 文字のみです。 デフォルト値:二重引用符 (") です。 引用符で囲まれた行区切り文字と列区切り文字は、通常の文字として処理されます。
      SplitRange マルチパートクエリのスプリット範囲。 パラメーター値は (start, end) 形式の限定された範囲で、start# から end# までのスプリットがクエリされることを示します。
      LineRange マルチパートクエリの行範囲。 パラメーター値は (start, end) 形式の限定された範囲で、start# から end# までの行がクエリされることを示します。
      CompressionType 圧縮タイプ。 デフォルト値:None。 有効な値:GZIP。
      KeepAllColumns このパラメーターが true に設定されている場合、CSV オブジェクトの SELECT 文で除外された列は、出力結果で空のままになります。 ただし、列の位置は保持されます。 デフォルト値:false。

      例:

      CSV オブジェクトに firstname 列、lastname 列、age 列が含まれています。 SQL 文は select firstname, age from ossobject です。 KeepAllColumns パラメーターが true に設定されている場合、出力結果は firstname,,age で、除外された lastname 列の位置を示すカンマ (,) が追加されます。 KeepAllColumns パラメーターが false に設定されている場合、出力結果は firstname,age です。 このパラメーターを使用すると、コードを変更する必要はありませんが、GetObject API の処理に使用されるコードを直接使用して、SelectObject API を処理できます。

      OutputRawData
      • このパラメーターが true に設定されている場合、select_object 操作は元のデータをそのまま返します。 データが長時間返されない場合、タイムアウトエラーが発生することがあります。
      • このパラメーターが false に設定されている場合、出力データはフレームにカプセル化されます。 デフォルト値:false。
      EnablePayloadCrc フレームごとに巡回冗長検査 (CRC) 値を計算するかどうかを示します。 デフォルト値:false。
      OutputHeader 出力結果の先頭行のヘッダー情報。 このパラメーターは、CSV オブジェクトにのみ適用されます。
      SkipPartialDataRecord
      • このパラメーターが true に設定されている場合、CSV オブジェクトの列にデータがない場合、または JSON オブジェクトにキーが存在しない場合、現在のレコードはスキップされます。
      • このパラメーターが false に設定されている場合、データのない列は出力結果で空のままになります。

      例:

      行に firstname 列、lastname 列、age 列が含まれています。 SQL 文は select _1, _4 from ossobject です。 SkipPartialDataRecord パラメーターが true に設定されている場合、この行はスキップされます。 SkipPartialDataRecord パラメーターが false に設定されている場合、firstname,\n という結果が返されます。

      MaxSkippedRecordsAllowed スキップされる行の最大数。 デフォルト値:0。行がスキップされた場合、エラーが返されることを示します。
      ParseJsonNumberAsString このパラメーターが true に設定されている場合、JSON オブジェクトの数値は文字列として解決されます。 このパラメーターが false に設定されている場合、JSON オブジェクトの数値は整数または浮動小数点数として解決されます。 デフォルト値:false。

      JSON オブジェクトの高精度浮動小数点数は、浮動小数点数として解決された場合、精度が低下します。 精度を維持するには、このパラメーターを true に設定し、CAST 関数を使用して、解決されたデータを 10 進数型に変換します。

    • select_object 操作で返される結果:SelectObjectResult オブジェクトが返されます。 read () 関数または __iter__ メソッドを使用して、すべての結果を取得できます。 出力結果に大量のデータが含まれる場合、read() 関数ですべての結果を取得するのは、最適な方法とはいえません。 この関数は、すべての結果が返されるまでシステムをブロックし、過度のメモリリソースを使用する場合があります。

      __iter__ メソッドを使用して (結果の各チャンクに対して) すべての結果を取得し、結果のチャンクごとに処理することを推奨します。 この方法は、使用するメモリリソースが少なく、OSS サーバーでチャンクが処理された直後にクライアントで処理できます。 クライアントは、すべての結果が返されるのを待つ必要はありません。

  • select_object_to_file
    def select_object_to_file(self, key, filename, sql,
                       progress_callback=None,
                       select_params=None
                       ):

    上記のサンプルコードを使用して、指定されたキーを持つオブジェクトに対して SQL 文を実行し、クエリ結果を別のオブジェクトに書き込みます。

    その他のパラメーターは、select_object 操作のパラメーターと同じです。

  • create_select_object_meta
    • 次のサンプルコードは、select_meta_params パラメーターの構文を示します。
      def create_select_object_meta(self, key, select_meta_params=None):

      上記のサンプルコードを使用して、指定されたキーを持つオブジェクトに Select Meta を作成したり、このオブジェクトから Select Meta を取得したりします。 Select Meta には、行の総数、列の総数 (CSV オブジェクトの場合)、およびオブジェクト内のスプリットの総数が含まれます。

      オブジェクトの Select Meta が作成済みの場合、OverwriteIfExists パラメーターの値が true に設定されていない限り、Select Meta は再作成されません。

      オブジェクトの Select Meta を作成するには、オブジェクトを完全にスキャンする必要があります。

    • 次の表に、select_meta_params パラメーターでサポートされるパラメーターを示します。
      名前 説明
      Json_Type このパラメーターを空のままにすると、デフォルトでオブジェクトは CSV オブジェクトになります。 このパラメーターが指定されている場合、パラメーター値は LINES (オブジェクトが JSON LINES オブジェクトであることを示す) でなければなりません。 この操作は、JSON DOCUMENT オブジェクトには適用されません。
      RecordDelimiter CSV オブジェクトの行区切り文字。
      FieldDelimiter CSV オブジェクトの列区切り文字。
      QuoteCharacter CSV オブジェクトの引用文字。 引用符で囲まれた行区切り文字と列区切り文字は、通常の文字として処理されます。
      CompressionType 圧縮タイプ。 このパラメーターが指定されている場合、パラメーター値は None でなければなりません。
      OverwriteIfExists 作成された Select Meta が元の Select Meta を上書きするかどうかを示します。 一般的なシナリオでは、このパラメーターを設定する必要はありません。
    • create_select_object_meta 操作で返される結果:行とスプリットの属性を含む GetSelectObjectMetaResult オブジェクトが返されます。 CSV オブジェクトの場合、結果の select_resp オブジェクトには columns 属性 (CSV オブジェクトの列数) が含まれます。

Java SDK

package samples;

import com.aliyun.oss.ClientBuilderConfiguration;
import com.aliyun.oss.model.*;
import com.aliyun.odps.Odps;
import com.aliyun.oss.OSSClientBuilder;


import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import com.aliyun.oss.common.auth.*;
import com.aliyuncs.DefaultAcsClient;
import com.aliyuncs.exceptions.ClientException;
import com.aliyuncs.http.MethodType;
import com.aliyuncs.http.ProtocolType;
import com.aliyuncs.profile.DefaultProfile;
import com.aliyuncs.profile.IClientProfile;
import com.aliyuncs.sts.model.v20150401. AssumeRoleRequest;
import com.aliyuncs.sts.model.v20150401. AssumeRoleResponse;

import java.text.SimpleDateFormat;
import java.util.Calendar;

/**
 * Examples of the create_select_object_meta and select_object operations.
 *
 */
class MulipartSelector implements Callable<Integer> {

    private OSS client;
    private String bucket;
    private String key;
    private int start;
    private int end;
    private String sql;

    public MulipartSelector(OSS client, String bucket, String key, int start, int end, String sql){
        this.client = client;
        this.bucket = bucket;
        this.key = key;
        this.start = start;
        this.end = end;
        this.sql = sql;
    }

    @Override
    public Integer call() throws Exception {
        SelectObjectRequest selectObjectRequest =
                new SelectObjectRequest(bucket, key)
                        .withInputSerialization(
                                new InputSerialization().withCsvInputFormat(
                                        new CSVFormat().withHeaderInfo(CSVFormat.Header.None).withRecordDelimiter("\n")
                                                .withFieldDelimiter("|")))
                        .withSplitRange(start, end)
                        .withOutputSerialization(new OutputSerialization().withCsvOutputFormat(new CSVFormat()).withCrcEnabled(true));

        selectObjectRequest.setExpression(sql);
        OSSObject ossObject = client.selectObject(selectObjectRequest);
        byte[] buffer = new byte[4096];
        int bytesRead;
        int totalSize = 0;
        try {
            while ((bytesRead = ossObject.getObjectContent().read(buffer)) ! = -1) {
                totalSize += bytesRead;
            }
            String result = new String(buffer, 0, totalSize - 1);
            return new Integer(Integer.parseInt(result));
        }
        catch (IOException e){
            System.out.println(e.toString());
            return new Integer(0);
        }
    }
}
class RoleCredentialProvider {
    public static final String REGION_CN_HANGZHOU = "cn-hangzhou";
    // Obtain the current Security Token Service (STS) API version.
    public static final String STS_API_VERSION = "2015-04-01";
    public static final String serviceAccessKeyId = "<AccessKey ID that can play the assumed role>";
    public static final String serviceAccessKeySecret = "<AccessKey Secret>";

    public static final long DurationSeconds = 15 * 60;

    private Credentials credential;
    private Calendar expireTime;

    private String roleArn;
    private DefaultAcsClient client;

    public RoleCredentialProvider(String roleArn) throws InvalidCredentialsException {
        this.roleArn = roleArn;
    }

    private AssumeRoleResponse assumeRole (String accessKeyId, String accessKeySecret, String roleArn, String roleSessionName, String policy, ProtocolType protocolType, long durationSeconds) throws ClientException {
        try {
            // Create an AcsClient instance for sending API requests.
            if (this.client == null) {
                IClientProfile profile = DefaultProfile.getProfile(REGION_CN_HANGZHOU, accessKeyId, accessKeySecret);
                this.client = new DefaultAcsClient(profile);
            }
            // Create an AssumeRoleRequest instance and set its properties.
            final AssumeRoleRequest request = new AssumeRoleRequest();
            request.setVersion(STS_API_VERSION);
            request.setMethod(MethodType.POST);
            request.setProtocol(protocolType);
            request.setRoleArn(roleArn);
            request.setRoleSessionName(roleSessionName);
            request.setPolicy(policy);
            request.setDurationSeconds(durationSeconds);
            // Send the request and obtain the response.
            final AssumeRoleResponse response = client.getAcsResponse(request);
            return response;
        } catch (ClientException e) {
            throw e;
        }
    }


    public CredentialsProvider GetCredentialProvider()
            throws IOException {
        // Request parameters for the AssumeRole API include RoleArn, RoleSessionName, Policy, and DurationSeconds.
        // You need to obtain the value of the RoleArn parameter in the Resource Access Management (RAM) console.
        // The RoleSessionName parameter indicates the name of the session for the temporary token. You can use this parameter to identify users in audit or identify users who you want to issue tokens to.
        // However, you need to pay attention to the length and rules of the RoleSessionName parameter. It can contain only letters, numbers, hyphens (-), and underscores (_), and cannot include spaces.
        // For more information about the rules, see the format requirements in the API reference.
        SimpleDateFormat timeFormat = new SimpleDateFormat("yyyy-MM-dd");
        String roleSessionName = "AssumingRole" + timeFormat.format(Calendar.getInstance().getTime());
        // Read OSS data.
        String policy = "{\n" +
                "    \"Version\": \"1\", \n" +
                "    \"Statement\": [\n" +
                "        {\n" +
                "            \"Action\": \"oss:*\", \n" +
                "            \"Resource\": [\n" +
                "                \"acs:oss:*:*:*\"\n" +
                "            ], \n" +
                "            \"Effect\": \"Allow\"\n" +
                "        }\n" +
                "    ]\n" +
                "}";
        // You must set the protocol type to HTTPS.
        ProtocolType protocolType = ProtocolType.HTTPS;
        try {
            final AssumeRoleResponse response = assumeRole(serviceAccessKeyId, serviceAccessKeySecret,
                    roleArn, roleSessionName, policy, protocolType, DurationSeconds);
            String ossAccessId = response.getCredentials().getAccessKeyId();
            String ossAccessKey = response.getCredentials().getAccessKeySecret();
            String ossSts = response.getCredentials().getSecurityToken();

            return new DefaultCredentialProvider(ossAccessId, ossAccessKey, ossSts);

        } catch (ClientException e) {
            throw new InvalidCredentialsException("Unable tp get the temporary AK:" + e);
        }
    }

    public void setClient(DefaultAcsClient client) {
        this.client = client;
    }

    public void setCredentials(Credentials creds) {
        this.credential = creds;
    }

    public Credentials getCredentials() {
        if (credential ! = null && expireTime.after(Calendar.getInstance())) {
            return credential;
        }

        try {
            CredentialsProvider provider = GetCredentialProvider();
            credential = provider.getCredentials();
            expireTime = Calendar.getInstance();
            expireTime.add(Calendar.SECOND, (int) DurationSeconds - 60);
            return credential;
        } catch (IOException e) {
            throw new InvalidCredentialsException("Unable tp get the temporary AK:" + e);
        }
    }
}
public class SelectObjectSample {
    private static String endpoint = "<OSS endpoint>";
    private static String bucketName = "<Bucket>";
    private static String key = "<Object>";
    private static String roleArn = "<Service role's ARN>"; // You can obtain the Alibaba Cloud Resource Name (ARN) of a RAM role in the RAM console. The RAM role must have permissions to access OSS.
    private static String recordDelimiter = "\n";
    private static int threadCount = 10;

    public static void main(String[] args) throws Exception {
        ClientBuilderConfiguration config = new ClientBuilderConfiguration();
        RoleCredentialProvider provider = new RoleCredentialProvider(roleArn);
        Credentials credentials = provider.getCredentials();
        //OSS client = new OSSClientBuilder().build(endpoint, accessKeyId, accessKeySecret, config);
        System.out.println("Id " + credentials.getAccessKeyId());
        System.out.println("Key " + credentials.getSecretAccessKey());
        System.out.println("Token " + credentials.getSecurityToken());
        OSS client = new OSSClientBuilder().build(endpoint, credentials.getAccessKeyId(), credentials.getSecretAccessKey(), credentials.getSecurityToken(), config);
        int totalSplits = 1;
        try {
            SelectObjectMetadata selectObjectMetadata = client.createSelectObjectMetadata(
                    new CreateSelectObjectMetadataRequest(bucketName, key)
                            .withInputSerialization(
                                    new InputSerialization().withCsvInputFormat(
                                            new CSVFormat().withHeaderInfo(CSVFormat.Header.None).withRecordDelimiter(recordDelimiter))));

            totalSplits = selectObjectMetadata.getCsvObjectMetadata().getSplits();
            System.out.println(selectObjectMetadata.getCsvObjectMetadata().getTotalLines());
            System.out.println(totalSplits);

        }
        catch (Exception e)
        {
        e.printStackTrace();
     }

        String sql = "select count(*) from ossobject";

        ExecutorService executor = Executors.newFixedThreadPool(threadCount);
        long startTime = System.currentTimeMillis();
        List<Future<Integer>> list = new ArrayList<Future<Integer>>();
        int n = threadCount < totalSplits ? threadCount: totalSplits;
        for(int i = 0; i < n; i++) {
            int start = i * totalSplits/n;
            int end = i == n-1 ? totalSplits - 1 : (i+1)* totalSplits /n - 1;
            System.out.println("start:" + start + " end:" + end);
            Callable<Integer> task = new MulipartSelector(client, bucketName, key, start, end, sql);
            Future<Integer> future = executor.submit(task);
            list.add(future);
        }

        long totalLines = 0;
        for(Future<Integer> task : list){
            totalLines += task.get().longValue();
        }
        long endTime = System.currentTimeMillis();
        System.out.println("total lines:" + totalLines);
        System.out.printf("Total time %dms\n" , (endTime - startTime));

    }
}

SQL 文の例

  • SQL 文の例 (CSV オブジェクトの場合)
    シナリオ SQL 文
    先頭の 10 行を返します。 select * from ossobject limit 10
    1 列目の整数が 3 列目の整数よりも大きい場合、1 列目と 3 列目の整数を返します。 select _1, _3 from ossobject where cast(_1 as int) > cast(_3 as int)
    1 列目のデータが X で始まるレコードの数を返します (like の後ろに指定する漢字は、UTF-8 でエンコードされている必要があります)。 select count(*) from ossobject where _1 like 'X%'
    2 列目のデータの時間が 2018-08-09 11:30:25 以降で、3 列目のデータが 200 より大きいレコードをすべて返します。 select * from ossobject where _2 > cast('2018-08-09 11:30:25' as timestamp) and _3 > 200
    2 列目の浮動小数点数の平均値、合計、最大値、最小値を返します。

    select AVG(cast(_2 as double)), SUM(cast(_2 as double)), MAX(cast(_2 as double)), MIN(cast(_2 as double))

    1 列目と 3 列目のデータで連結された文字列が Tom で始まり、Anderson で終わるすべてのレコードを返します。 select * from ossobject where (_1 || _3) like 'Tom%Anderson'
    1 列目のデータが 3 で割り切れるすべてのレコードを返します。 select * from ossobject where (_1 % 3) == 0
    1 列目のデータの範囲が 1995 から 2012 のレコードをすべて返します。 select * from ossobject where _1 between 1995 and 2012
    5 列目のデータが N、M、G、または L のレコードをすべて返します。 select * from ossobject where _5 in ('N', 'M', 'G', 'L')
    2 列目と 3 列目のデータの積が、100 と 5 列目のデータの合計よりも大きいレコードをすべて返します。 select * from ossobject where _2 * _3 > _5 + 100
  • SQL 文の例 (JSON オブジェクトの場合)

    次の JSON オブジェクトを例として使用します。

    {
      "contacts":[
    {
      "firstName": "John",
      "lastName": "Smith",
      "isAlive": true,
      "age": 27,
      "address": {
        "streetAddress": "21 2nd Street",
        "city": "New York",
        "state": "NY",
        "postalCode": "10021-3100"
      },
      "phoneNumbers": [
        {
          "type": "home",
          "number": "212 555-1234"
        },
        {
          "type": "office",
          "number": "646 555-4567"
        },
        {
          "type": "mobile",
          "number": "123 456-7890"
        }
      ],
      "children": [],
      "spouse": null
    }, ... # Other similar nodes are omitted.
    ]}

    次の表に、SQL 文の例を示します。

    シナリオ SQL 文
    age の値が 27 のレコードをすべて返します。

    select * from ossobject.contacts[*] s where s.age = 27

    すべての home の電話番号を返します。

    select s.number from ossobject.contacts[*].phoneNumbers[*] s where s.type = "home"

    spouse の値が null のレコードをすべて返します。

    select * from ossobject s where s.spouse is null

    children の値が空のままになっているレコードをすべて返します。 select * from ossobject s where s.children[0] is null
    他の方法で空の配列を指定できないため、上記の文を使用します。

ベストプラクティス

  • マルチパートクエリで大きなオブジェクトをクエリします。

    CSV オブジェクトの列に行区切り文字が含まれていない場合、バイト単位でオブジェクトをパートに分割できます。 この方法は、オブジェクトの Select Meta を作成する必要がないため、最も簡単です。 CSV オブジェクトの列に行区切り文字が含まれているか、JSON オブジェクトをクエリする場合、次の手順を実行します。

    1. create_select_object_meta 操作を呼び出して、オブジェクトのスプリットの総数を取得します。 オブジェクトの SelectObject API を呼び出す必要がある場合、クエリの前に非同期で API 呼び出して、スキャン時間を短縮します。
    2. クライアントのリソースに基づいて適切な同時実行性 n を選択し、スプリットの合計数を同時実行性 n で除算し、1 つのクエリに含まれるスプリット数を取得します。
    3. リクエスト本文で split-range=1-20 などのパラメーターを設定して、マルチパートクエリを実行します。
    4. 必要に応じて結果をマージします。
  • 標準のオブジェクトには SelectObject API を使用します。 SelectObject API を使用して、マルチパートオブジェクトや追加可能オブジェクトをクエリしないことを推奨します。 内部構造の違いにより、クエリのパフォーマンスが低下する場合があります。
  • JSON オブジェクトをクエリする場合、FROM 文で JSON パス範囲を絞り込みます。

    次の JSON オブジェクトを例として使用します。

    { contacts:[
            {"firstName":"John", "lastName":"Smith", "phoneNumbers":[{"type":"home", "number":"212-555-1234"}, {"type":"office", "number":"646-555-4567"}, {"type":"mobile", "number":"123 456-7890"}], "address":{"streetAddress": "21 2nd Street", "city":"New York", "state":NY, "postalCode":"10021-3100"}
             }
    ]}

    postalCode が 10021 で始まるレコードの streetAddress データをクエリするには、次の SQL 文を実行します。

    select s.address.streetAddress from ossobject.contacts[*] s where s.address.postalCode like ‘10021%’

    または、次の SQL 文を実行します。

    select s.streetAddress from ossobject.contacts[*].address s where s.postalCode like ‘10021%’

    2 番目の SQL 文の方が JSON パスの精度が高いため、クエリのパフォーマンスが向上します。

  • JSON オブジェクトの高精度浮動小数点数を処理します。

    JSON オブジェクトの高精度浮動小数点数を計算する必要がある場合、ParseJsonNumberAsString パラメーターを true に設定し、CAST 関数を使用して解決済みデータを 10 進数型に変換することを推奨します。 たとえば、属性 a の値は 123456789.123456789 です。 elect s.a from ossobject s where cast(s.a as decimal) > 123456789.12345 を実行すると、属性 a の精度を維持できます。