すべてのプロダクト
Search
ドキュメントセンター

:Java SDK の例

最終更新日:Jul 09, 2025

MaxCompute は、Spark on EMR、StarRocks、Presto、PAI、Hologres などのサードパーティエンジンをサポートしており、Java SDK を使用して Storage API 経由で MaxCompute データに直接アクセスできます。このトピックでは、Java SDK を使用して MaxCompute にアクセスするためのコード例を紹介します。

概要

Java SDK で MaxCompute にアクセスするための主要なインターフェイスを以下に示します。

メインインターフェイス

説明

TableReadSessionBuilder

MaxCompute テーブル読み取りセッションを作成するために使用されます。

TableBatchReadSession

MaxCompute テーブルからデータを読み取るためのセッションを表します。

SplitReader

データセッションに含まれるデータセグメントを読み取るために使用されます。

Maven ユーザーの場合は、Maven リポジトリodps-sdk-table-api を検索して、さまざまなバージョンの Java SDK を入手してください。構成の詳細は次のとおりです。

<dependency>
	<groupId>com.aliyun.odps</groupId>
	<artifactId>odps-sdk-table-api</artifactId>
	<version>0.48.8-public</version>
</dependency>

MaxCompute はストレージ API を提供しています。詳細については、「odps-sdk-table-api」をご参照ください。

TableReadSessionBuilder

このインターフェイスは、MaxCompute テーブルの読み取りセッションを作成するために設計されています。主なインターフェイス定義は次のとおりです。詳細については、Java-sdk-doc をご参照ください。

インターフェイス定義

public class TableReadSessionBuilder {

    public TableReadSessionBuilder table(Table table);

    public TableReadSessionBuilder identifier(TableIdentifier identifier);

    public TableReadSessionBuilder requiredDataColumns(List<String> requiredDataColumns);

    public TableReadSessionBuilder requiredPartitionColumns(List<String> requiredPartitionColumns);

    public TableReadSessionBuilder requiredPartitions(List<PartitionSpec> requiredPartitions);

    public TableReadSessionBuilder requiredBucketIds(List<Integer> requiredBucketIds);

    public TableReadSessionBuilder withSplitOptions(SplitOptions splitOptions);

    public TableReadSessionBuilder withArrowOptions(ArrowOptions arrowOptions);

    public TableReadSessionBuilder withFilterPredicate(Predicate filterPredicate);

    public TableReadSessionBuilder withSettings(EnvironmentSettings settings);

    public TableReadSessionBuilder withSessionId(String sessionId);

    public TableBatchReadSession buildBatchReadSession();
}

使用上の注意

メソッド名

説明

table(Table table)

渡されたパラメーター Table を現在のセッションのターゲットテーブルとして定義します。

identifier(TableIdentifier identifier)

渡されたパラメーター TableIdentifier を現在のセッションのターゲットテーブルとして定義します。

requiredDataColumns(List<String> requiredDataColumns)

指定されたフィールドのデータを読み取り、返されるデータのフィールドの順序がパラメーター requiredDataColumns で指定された順序と一致することを保証します。これは、データフィールドのトリミングシナリオに適用できます。

説明

パラメーター requiredDataColumns が空の場合、すべてのパーティションデータが返されます。

requiredPartitionColumns(List<String> requiredPartitionColumns)

指定されたテーブルの指定されたパーティションの指定された列からデータを読み取ります。これは、パーティションプルーニングが実行されるシナリオに適用できます。

説明

パラメーター requiredPartitionColumns が空の場合、すべてのパーティションデータが返されます。

requiredPartitions(List<PartitionSpec> requiredPartitions)

指定されたテーブルの指定されたパーティションのデータを読み取ります。これは、パーティションのトリミングシナリオに適用できます。

説明

パラメーター requiredPartitions が空の場合、すべてのパーティションデータが返されます。

requiredBucketIds(List<Integer> requiredBucketIds)

指定されたバケットのデータを読み取ります。これは、クラスタ化されたテーブルに対してのみ有効であり、バケットのトリミングシナリオに適用できます。

説明

パラメーター requiredBucketIds が空の場合、すべてのバケットデータが返されます。

withSplitOptions(SplitOptions splitOptions)

テーブルデータを分割します。 SplitOptions オブジェクトは次のように定義されます。

public class SplitOptions {

    public static SplitOptions.Builder newBuilder() {
        return new Builder();
    }

    public static class Builder {

      public SplitOptions.Builder SplitByByteSize(long splitByteSize);
  
      public SplitOptions.Builder SplitByRowOffset();
  
      public SplitOptions.Builder withCrossPartition(boolean crossPartition);
  
      public SplitOptions.Builder withMaxFileNum(int splitMaxFileNum);
  
      public SplitOptions build();
    }
}
  • SplitByByteSize(long splitByteSize): 指定されたパラメーター splitByteSize に従ってデータを分割します。サーバーから返される単一データセグメントのサイズは、splitByteSize(単位:バイト)を超えません。

    説明
    • SplitByByteSize(long splitByteSize) を使用してデータ分割値をカスタマイズしない場合、システムはデフォルトで 256×1024×1024(256 MB)に従ってデータを分割します。

    • カスタムデータ分割値は、10×1024×1024(10 MB)未満にすることはできません。

  • SplitByRowOffset(): 行ごとにデータを分割し、クライアントが指定された行インデックスからデータを読み取れるようにします。

  • withCrossPartition(boolean crossPartition): 単一のデータセグメントに複数のデータパーティションを含めることができるかどうかを指定します。パラメーター crossPartition には次の値があります。

    • true(デフォルト):単一のデータセグメントに複数のデータパーティションを含めることができます。

    • false:含めることはできません。

  • withMaxFileNum(int splitMaxFileNum): テーブルファイルの数が多い場合、単一データセグメントに含まれる物理ファイルの最大数を指定して、より多くのデータセグメントを生成できます。

    説明

    デフォルトでは、単一データセグメントに含まれる物理ファイルの数に制限はありません。

  • build(): SplitOptions オブジェクトを作成します。

使用例

// 1. SplitSize でデータを分割し、SplitSize を 256 MB に設定します

SplitOptions splitOptionsByteSize = 
      SplitOptions.newBuilder().SplitByByteSize(256 * 1024L * 1024L).build()

// 2. RowOffset でデータを分割します

SplitOptions splitOptionsCount = 
      SplitOptions.newBuilder().SplitByRowOffset().build()

//3. 単一の分割に含まれるファイルの最大数を 1 に指定します

SplitOptions splitOptionsCount = 
      SplitOptions.newBuilder().SplitByRowOffset().withMaxFileNum(1).build()

withArrowOptions(ArrowOptions arrowOptions)

Arrow データオプションを指定します。 ArrowOptions オブジェクトは次のように定義されます。

public class ArrowOptions {
    
    public static Builder newBuilder() {
        return new Builder();
    }

    public static class Builder {

        public Builder withTimestampUnit(TimestampUnit unit);

        public Builder withDatetimeUnit(TimestampUnit unit);

        public ArrowOptions build();
    }

    public enum TimestampUnit {
        SECOND,
        MILLI,
        MICRO,
        NANO;
    }
}
  • TimestampUnit: Timestamp および Datetime データ型の単位を指定します。値は次のとおりです。

    • SECOND: 秒(s)。

    • MILLI: ミリ秒(ms)。

    • MICRO: マイクロ秒(μs)。

    • NANO: ナノ秒(ns)。

  • withTimestampUnit(TimestampUnit unit): Timestamp データ型の単位を指定します。デフォルトの単位は NANO です。

  • withDatetimeUnit(TimestampUnit unit): Datetime データ型の単位を指定します。デフォルトの単位は MILLI です。

使用例

ArrowOptions options = ArrowOptions.newBuilder()
          .withDatetimeUnit(ArrowOptions.TimestampUnit.MILLI)
          .withTimestampUnit(ArrowOptions.TimestampUnit.NANO)
          .build()

withFilterPredicate(Predicate filterPredicate)

Predicate Pushdown オプションを指定します。 Predicate は次のように定義されます。

// 1. 二項演算

public class BinaryPredicate extends Predicate {

  public enum Operator {
    /**
     * 二項演算子
     */
    EQUALS("="),
    NOT_EQUALS("!="),
    GREATER_THAN(">"),
    LESS_THAN("<"),
    GREATER_THAN_OR_EQUAL(">="),
    LESS_THAN_OR_EQUAL("<=");
   
  }

  public BinaryPredicate(Operator operator, Serializable leftOperand, Serializable rightOperand);

  public static BinaryPredicate equals(Serializable leftOperand, Serializable rightOperand);

  public static BinaryPredicate notEquals(Serializable leftOperand, Serializable rightOperand);

  public static BinaryPredicate greaterThan(Serializable leftOperand, Serializable rightOperand);

  public static BinaryPredicate lessThan(Serializable leftOperand, Serializable rightOperand);

  public static BinaryPredicate greaterThanOrEqual(Serializable leftOperand,
                                                   Serializable rightOperand);

  public static BinaryPredicate lessThanOrEqual(Serializable leftOperand,
                                                Serializable rightOperand);
}

// 2. 単項演算
public class UnaryPredicate extends Predicate {

  public enum Operator {
    /**
     * 単項演算子
     */
    IS_NULL("is null"),
    NOT_NULL("is not null");
  }

  public static UnaryPredicate isNull(Serializable operand);
  public static UnaryPredicate notNull(Serializable operand);
}

### 3. IN および NOT IN
public class InPredicate extends Predicate {

  public enum Operator {
    /**
     * 集合メンバーシップチェックの IN および NOT IN 演算子
     */
    IN("in"),
    NOT_IN("not in");
  }

  public InPredicate(Operator operator, Serializable operand, List<Serializable> set);

  public static InPredicate in(Serializable operand, List<Serializable> set);

  public static InPredicate notIn(Serializable operand, List<Serializable> set);
}

// 4. 列名
public class Attribute extends Predicate {

  public Attribute(Object value);
    
  public static Attribute of(Object value);
}

// 5. 定数
public class Constant extends Predicate {

  public Constant(Object value);

  public static Constant of(Object value);
}

// 6. 複合演算
public class CompoundPredicate extends Predicate {

  public enum Operator {
    /**
     * 複合述語演算子
     */
    AND("and"),
    OR("or"),
    NOT("not");
  }

  public CompoundPredicate(Operator logicalOperator, List<Predicate> predicates);

  public static CompoundPredicate and(Predicate... predicates);
  
  public static CompoundPredicate or(Predicate... predicates);

  public static CompoundPredicate not(Predicate predicates);

  public void addPredicate(Predicate predicate);
}

使用例

// 1. c1 > 20000 and c2 < 100000
BinaryPredicate c1 = new BinaryPredicate(BinaryPredicate.Operator.GREATER_THAN, Attribute.of("c1"), Constant.of(20000));
BinaryPredicate c2 = new BinaryPredicate(BinaryPredicate.Operator.LESS_THAN, Attribute.of("c2"), Constant.of(100000));
CompoundPredicate predicate =
        new CompoundPredicate(CompoundPredicate.Operator.AND, ImmutableList.of(c1, c2));

// 2. c1 is not null
Predicate predicate = new UnaryPredicate(UnaryPredicate.Operator.NOT_NULL,  Attribute.of("c1"));

  
// 3. c1 in (1, 10001)
Predicate predicate =
        new InPredicate(InPredicate.Operator.IN,  Attribute.of("c1"), ImmutableList.of(Constant.of(1), Constant.of(10001)));

withSettings(EnvironmentSettings settings)

ランタイム環境情報を指定します。 EnvironmentSettings インターフェイスは次のように定義されます。

public class EnvironmentSettings {

    public static Builder newBuilder() {
        return new Builder();
    }

    public static class Builder {

        public Builder withDefaultProject(String projectName);

        public Builder withDefaultSchema(String schema);

        public Builder withServiceEndpoint(String endPoint);

        public Builder withTunnelEndpoint(String tunnelEndPoint);

        public Builder withQuotaName(String quotaName);

        public Builder withCredentials(Credentials credentials);

        public Builder withRestOptions(RestOptions restOptions);

        public EnvironmentSettings build();
    }
}
  • withDefaultProject(String projectName): 現在のプロジェクト名を設定します。

    説明

    パラメーター projectName は MaxCompute プロジェクト名です。 MaxCompute コンソール にログインし、左上隅でリージョンを切り替え、左側のナビゲーションウィンドウで [ワークエリア] > [プロジェクト] を選択して、特定の MaxCompute プロジェクト名を表示できます。

  • withDefaultSchema(String schema): 現在のデフォルトスキーマを設定します。

    説明

    パラメーター schema は MaxCompute スキーマ名です。スキーマの詳細については、「スキーマ関連の操作」をご参照ください。

  • withServiceEndpoint(String endPoint): 現在のサービス接続アドレス Endpoint を設定します。

    説明

    各リージョンのエンドポイント情報については、「エンドポイント」をご参照ください。

  • withTunnelEndpoint(String tunnelEndPoint): 現在のサービス接続アドレス TunnelEndpoint を設定します。

    説明

    各リージョンの TunnelEndpoint 情報については、「エンドポイント」をご参照ください。

  • withQuotaName(String quotaName): 現在使用中のクォータ名を指定します。

    MaxCompute は、専用の Data Transmission Service リソースグループ(サブスクリプション)リソースへのアクセスをサポートしています。クォータ名を取得する方法は次のとおりです。

    • 専用の Data Transmission Service リソースグループ:MaxCompute コンソール にログインし、左上隅でリージョンを切り替え、左側のナビゲーションウィンドウで [ワークエリア] > [クォータ管理] を選択して、使用可能なクォータリストを表示します。具体的な操作については、「MaxCompute コンソールで計算リソースのクォータを管理する」をご参照ください。

    • ストレージ API:MaxCompute コンソール にログインし、左側のナビゲーションウィンドウで [テナント管理] > [テナントプロパティ] を選択し、 を有効にします。[オープンストレージ (Storage API) スイッチ]

  • withCredentials(Credentials credentials): 現在の認証情報を指定します。 Credentials は次のように定義されます。

    public class Credentials {
    
        public static Builder newBuilder() {
            return new Builder();
        }
    
        public static class Builder {
    
            public Builder withAccount(Account account);
    
            public Builder withAppAccount(AppAccount appAccount);
    
            public Builder withAppStsAccount(AppStsAccount appStsAccount);
    
            public Credentials build();
        }
    
    }
    • withAccount(Account account): Odps Account オブジェクトを指定します。

    • withAppAccount(AppAccount appAccount): Odps appAccount オブジェクトを指定します。

    • withAppStsAccount(AppStsAccount appStsAccount): Odps appStsAccount オブジェクトを指定します。

    • withRestOptions(RestOptions restOptions): 現在のネットワークアクセス構成を指定します。 RestOptions は次のように定義されます。

      public class RestOptions implements Serializable {
      
          public static Builder newBuilder() {
              return new RestOptions.Builder();
          }
      
          public static class Builder {
              public Builder witUserAgent(String userAgent);
              public Builder withConnectTimeout(int connectTimeout);
              public Builder withReadTimeout(int readTimeout);
              public RestOptions build();
          }
      }
      • witUserAgent(String userAgent): 現在の userAgent 情報を指定します。

      • withConnectTimeout(int connectTimeout): 現在の基盤となるネットワーク接続タイムアウトを指定します。デフォルトは 10 秒です。

      • withReadTimeout(int readTimeout): 現在の基盤となるネットワーク接続タイムアウトを指定します。120 秒です。

withSessionId(String sessionId)

既存のセッションを再読み込みするための SessionID 情報を指定します。

buildBatchReadSession()

テーブル読み取りセッションを作成または取得します。入力パラメーター SessionID が指定されている場合、SessionID に基づいて作成されたセッションが返されます。入力パラメーターが指定されていない場合は、新しいテーブル読み取りセッションが作成されます。

説明

作成操作には大きなオーバーヘッドがあります。ファイルが多い場合、完了までに時間がかかります。

TableBatchReadSession

TableBatchReadSession インターフェイスは、MaxCompute テーブルから読み取るためのセッションを表します。主なインターフェイス定義は次のとおりです。

インターフェイス定義

public interface TableBatchReadSession {

    String getId();

    TableIdentifier getTableIdentifier();

    SessionStatus getStatus();

    DataSchema readSchema();
    
    InputSplitAssigner getInputSplitAssigner() throws IOException;

    SplitReader<ArrayRecord> createRecordReader(InputSplit split, ReaderOptions options) throws IOException;

    SplitReader<VectorSchemaRoot> createArrowReader(InputSplit split, ReaderOptions options) throws IOException;    

}

使用上の注意

メソッド名

説明

String getId()

現在のセッション ID を取得します。セッション ID を読み取るためのデフォルトのタイムアウトは 24 時間です。

getTableIdentifier()

現在のセッションのテーブル名を取得します。

getStatus()

現在のセッションステータスを取得します。ステータス値は次のとおりです。

  • INIT: セッションの作成時に設定される初期値。

  • NORMAL: セッションが正常に作成されました。

  • CRITICAL: セッションの作成に失敗しました。

  • EXPIRED: セッションがタイムアウトしました。

readSchema()

現在のセッションのテーブル構造情報を取得します。 DataSchema は次のように定義されます。

public class DataSchema implements Serializable {
    
    List<Column> getColumns();

    List<String> getPartitionKeys();

    List<String> getColumnNames();

    List<TypeInfo> getColumnDataTypes();

    Optional<Column> getColumn(int columnIndex);

    Optional<Column> getColumn(String columnName);

}
  • getColumns(): 読み取るテーブルとパーティションの Column 情報を取得します。

  • getPartitionKeys(): 読み取るパーティションの Column 名を取得します。

  • getColumnNames(): 読み取るテーブルとパーティションの Column 名を取得します。

  • getColumnDataTypes(): 読み取るテーブルとパーティションの Column 情報を取得します。

  • getColumn(int columnIndex): インデックスに基づいて Column オブジェクトを取得します。インデックスが現在の Column 範囲外の場合は、空を返します。

  • getColumn(String columnName): パラメーター columnName に基づいて Column オブジェクトを取得します。現在のテーブルの Column に columnName が含まれていない場合は、空を返します。

getInputSplitAssigner()

現在のセッションの InputSplitAssigner を取得します。 InputSplitAssigner インターフェイスは、現在の読み取りセッションで InputSplit インスタンスを割り当てるためのメソッドを定義します。各 InputSplit は、単一の SplitReader で処理できるデータセグメントを表します。 InputSplitAssigner は次のように定義されます。

public interface InputSplitAssigner {

    int getSplitsCount();

    long getTotalRowCount();

    InputSplit getSplit(int index);

    InputSplit getSplitByRowOffset(long startIndex, long numRecord);
}
  • getSplitsCount(): セッションに含まれるデータセグメントの数を取得します。

    説明

    SplitOptions が SplitByByteSize の場合、このインターフェイスの戻り値は 0 以上です。

  • getTotalRowCount(): セッションに含まれるデータ行の数を取得します。

    説明

    SplitOptions が SplitByByteSize の場合、このインターフェイスの戻り値は 0 以上です。

  • getSplit(int index): 指定されたデータセグメントパラメーター Index に基づいて、対応する InputSplit を取得します。パラメーター index の値の範囲は [0,SplitsCount-1] です。

  • getSplitByRowOffset(long startIndex, long numRecord): 対応する InputSplit を取得します。パラメーターの説明は次のとおりです。

    • startIndex: InputSplit によって読み取られるデータ行の開始インデックスを指定します。値の範囲は [0,RecordCount-1] です。

    • numRecord: InputSplit によって読み取られるデータ行の数を指定します。

// 1. SplitOptions が SplitByByteSize の場合

TableBatchReadSession scan = ...;
InputSplitAssigner assigner = scan.getInputSplitAssigner();
int splitCount = assigner.getSplitsCount();
for (int k = 0; k < splitCount; k++) {
    InputSplit split = assigner.getSplit(k);
    ...
}

// 2. SplitOptions が SplitByRowOffset の場合
TableBatchReadSession scan = ...;
InputSplitAssigner assigner = scan.getInputSplitAssigner();
long rowCount = assigner.getTotalRowCount();
long recordsPerSplit = 10000;
for (long offset = 0; offset < numRecords; offset += recordsPerSplit) {
    recordsPerSplit = Math.min(recordsPerSplit, numRecords - offset);
    InputSplit split = assigner.getSplitByRowOffset(offset, recordsPerSplit);
    ...
}

createRecordReader(InputSplit split, ReaderOptions options)

SplitReader<ArrayRecord> オブジェクトを構築します。 ReaderOptions は次のように定義されます。

public class ReaderOptions {
    
    public static ReaderOptions.Builder newBuilder() {
        return new Builder();
    }

    public static class Builder {

        public Builder withMaxBatchRowCount(int maxBatchRowCount);

        public Builder withMaxBatchRawSize(long batchRawSize);

        public Builder withCompressionCodec(CompressionCodec codec);

        public Builder withBufferAllocator(BufferAllocator allocator);

        public Builder withReuseBatch(boolean reuseBatch);

        public Builder withSettings(EnvironmentSettings settings);

        public ReaderOptions build();
    }

}
  • withMaxBatchRowCount(int maxBatchRowCount): サーバーから返される各データバッチの最大行数を指定します。パラメーター maxBatchRowCount のデフォルトの最大値は 4096 です。

  • withMaxBatchRawSize(long batchRawSize): サーバーから返される各データバッチに含まれる生のバイトの最大数を指定します。

  • withCompressionCodec(CompressionCodec codec): データ圧縮タイプを指定します。 ZSTD および LZ4_FRAME 圧縮タイプのみがサポートされています。

    説明
    • 圧縮されていない大量の Arrow データを直接送信する場合、ネットワーク帯域幅の制限により、データ送信時間が大幅に増加する可能性があります。

    • 圧縮タイプが指定されていない場合、デフォルトではデータ圧縮は実行されません。

  • withBufferAllocator(BufferAllocator allocator): Arrow データを読み取るためのメモリアロケーターを指定します。

  • withReuseBatch(boolean reuseBatch): ArrowBatch メモリを再利用できるかどうかを指定します。パラメーター reuseBatch には次の値があります。

    • true(デフォルト):ArrowBatch メモリを再利用できます。

    • false:ArrowBatch メモリを再利用できません。

  • withSettings(EnvironmentSettings settings): ランタイム環境情報を指定します。

createArrowReader(InputSplit split, ReaderOptions options)

SplitReader<VectorSchemaRoot> オブジェクトを構築します。

SplitReader

SplitReader インターフェイスは、MaxCompute テーブルからデータを読み取るために使用されます。

インターフェイス定義

public interface SplitReader<T> {

    boolean hasNext() throws IOException;

    T get();

    Metrics currentMetricsValues();

    void close() throws IOException;
}

使用上の注意

メソッド名

説明

hasNext()

読み取るデータ項目がさらに存在するかどうかを確認します。次のデータ項目が存在する場合は true を返します。それ以外の場合は false を返します。

get()

現在のデータ項目を取得します。このメソッドを呼び出す前に、hasNext() メソッドを使用して、次の要素が存在することを確認する必要があります。

currentMetricsValues()

SplitReader に関連するメトリックを取得します。

close()

読み取りが完了したら、接続を閉じます。

使用例

  1. MaxCompute サービスに接続するように環境を設定します

    // Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey ID と AccessKey Secret
    // Alibaba Cloud アカウントの AccessKey ペアは、すべての API 操作に対する権限を持っています。これらの資格情報を使用して操作を実行することは、リスクの高い操作です。 RAM ユーザーを使用して API 操作を呼び出したり、日常的な O&M を実行することをお勧めします。 RAM ユーザーを作成するには、RAM コンソールにログインします
    // この例では、AccessKey ID と AccessKey secret は環境変数に格納されています。ビジネス要件に基づいて、構成ファイルに AccessKey ペアを保存することもできます
    // AccessKey ペアの漏洩を防ぐため、コードに AccessKey ID と AccessKey secret を直接指定しないことをお勧めします
    private static String accessId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
    private static String accessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
    //MaxCompute にアクセスするために使用されるクォータ名
    String quotaName = "<quotaName>";
    //MaxCompute プロジェクト名
    String project = "<project>";
    //MaxCompute サービスに接続するための Odps オブジェクトを作成します
    Account account = new AliyunAccount(accessId, accessKey);
    Odps odps = new Odps(account);
    odps.setDefaultProject(project);
    //MaxCompute サービスの接続アドレス。 Alibaba Cloud VPC ネットワークのみがサポートされています
    odps.setEndpoint(endpoint);
    Credentials credentials = Credentials.newBuilder().withAccount(odps.getAccount()).withAppAccount(odps.getAppAccount()).build();
    EnvironmentSettings settings = EnvironmentSettings.newBuilder().withCredentials(credentials).withServiceEndpoint(odps.getEndpoint()).withQuotaName(quotaName).build();
    説明
    • 専用の Data Transmission Service リソースグループ(サブスクリプション)リソースのクォータ名を取得するには、次の手順に従います。

      • 専用の Data Transmission Service リソースグループ:MaxCompute コンソール にログインします。次に、左上隅でリージョンを切り替え、左側のナビゲーションウィンドウで [ワークエリア] > [クォータ] を選択して、使用可能なクォータのリストを表示します。詳細な手順については、「MaxCompute コンソールで計算リソースのクォータを管理する」をご参照ください。

      • ストレージ API:MaxCompute コンソール にログインし、左側のナビゲーションウィンドウで [テナント管理] > [テナントプロパティ] を選択し、[オープンストレージ (Storage API) スイッチ] を有効にします。

    • ジョブレベルのクォータ承認の場合、デフォルトでは、Alibaba Cloud アカウントとロールを含むすべてのアカウントに権限がありません。承認が必要です。承認の詳細については、「承認」をご参照ください。

  2. テーブル読み取り操作を実行します。

    1. MaxCompute データにアクセスするためのデータ読み取りセッションを作成します。

      //MaxCompute プロジェクトに対応するテーブル名
      String tableName = "<table.name>";
      //テーブルデータ読み取りセッションを作成します
      TableReadSessionBuilder scanBuilder = new TableReadSessionBuilder();
      TableBatchReadSession scan = scanBuilder.identifier(TableIdentifier.of(project, tableName)).withSettings(settings)
              .withSplitOptions(SplitOptions.newBuilder()
                      .SplitByByteSize(256 * 1024L * 1024L)
                      .withCrossPartition(false).build())
              .requiredDataColumns(Arrays.asList("timestamp"))
              .requiredPartitionColumns(Arrays.asList("pt1"))
              .buildBatchReadSession();
      説明

      データ量が多い、ネットワーク遅延がある、または不安定なシナリオでは、データ読み取りセッションの作成に時間がかかり、セッション作成の非同期プロセスに自動的に切り替わる可能性があります。

    2. Arrow リーダーを使用して各セグメントの MaxCompute データをトラバースし、各セグメントのデータコンテンツを順番に読み取って出力します。

      //すべての入力セグメントデータをトラバースし、Arrow リーダーを使用して各セグメントのデータバッチを 1 つずつ読み取り、最後に各バッチデータのコンテンツを出力します
      InputSplitAssigner assigner = scan.getInputSplitAssigner();
      for (InputSplit split : assigner.getAllSplits()) {
          SplitReader<VectorSchemaRoot> reader =
                  scan.createArrowReader(split, ReaderOptions.newBuilder()
                          .withSettings(settings)
                          .withCompressionCodec(CompressionCodec.ZSTD)
                          .withReuseBatch(true)
                          .build());
      
          int rowCount = 0;
          List<VectorSchemaRoot> batchList = new ArrayList<>();
          while (reader.hasNext()) {
              VectorSchemaRoot data = reader.get();
              rowCount += data.getRowCount();
              System.out.println(data.contentToTSVString());
          }
          reader.close();
      }

参照

MaxCompute Storage API の概要については、「ストレージ API の概要」をご参照ください。