全部產品
Search
文件中心

MaxCompute:開放儲存SDK樣本-Java SDK

更新時間:Mar 08, 2025

MaxCompute支援第三方引擎(如Spark on EMR、StarRocks、Presto、PAI和Hologres)通過SDK調用Storage API直接存取MaxCompute資料,本文為您介紹使用Java SDK訪問MaxCompute的程式碼範例。

概述

使用Java SDK訪問MaxCompute的主要介面如下。

主要介面

描述

TableReadSessionBuilder

用於建立一個MaxCompute讀表會話。

TableBatchReadSession

表示一個從MaxCompute表中讀取資料的會話。

SplitReader

用於讀取資料會話包含的一個資料分區。

如果您使用Maven,可以從Maven庫中搜尋odps-sdk-table-api擷取不同版本的Java SDK,相關配置資訊如下。

<dependency>
	<groupId>com.aliyun.odps</groupId>
	<artifactId>odps-sdk-table-api</artifactId>
	<version>0.48.8-public</version>
</dependency>

MaxCompute提供了開放儲存相關介面,詳情請參見odps-sdk-table-api

TableReadSessionBuilder

TableReadSessionBuilder介面用於建立一個MaxCompute讀表會話,其中主要介面定義如下。更多詳情,請參見Java-sdk-doc

介面定義

public class TableReadSessionBuilder {

    public TableReadSessionBuilder table(Table table);

    public TableReadSessionBuilder identifier(TableIdentifier identifier);

    public TableReadSessionBuilder requiredDataColumns(List<String> requiredDataColumns);

    public TableReadSessionBuilder requiredPartitionColumns(List<String> requiredPartitionColumns);

    public TableReadSessionBuilder requiredPartitions(List<PartitionSpec> requiredPartitions);

    public TableReadSessionBuilder requiredBucketIds(List<Integer> requiredBucketIds);

    public TableReadSessionBuilder withSplitOptions(SplitOptions splitOptions);

    public TableReadSessionBuilder withArrowOptions(ArrowOptions arrowOptions);

    public TableReadSessionBuilder withFilterPredicate(Predicate filterPredicate);

    public TableReadSessionBuilder withSettings(EnvironmentSettings settings);

    public TableReadSessionBuilder withSessionId(String sessionId);

    public TableBatchReadSession buildBatchReadSession();
}

介面說明

方法名稱

說明

table(Table table)

將傳入的參數Table,定義為當前會話中的目標表。

identifier(TableIdentifier identifier)

將傳入的參數TableIdentifier ,定義為當前會話中的目標表。

requiredDataColumns(List<String> requiredDataColumns)

讀取指定欄位的資料,並確保返回的資料中的欄位順序與參數requiredDataColumns指定的欄位順序一致,適用於資料欄位裁剪的情境。

說明

如果參數requiredDataColumns為空白,則返回所有分區資料。

requiredPartitionColumns(List<String> requiredPartitionColumns)

讀取指定表下的指定分區中指定列的資料。適用於進行分區裁剪的情境。

說明

如果參數requiredPartitionColumns為空白,則會返回所有分區資料。

requiredPartitions(List<PartitionSpec> requiredPartitions)

讀取指定表下指定分區的資料,適用於進行分區裁剪的情境。

說明

如果參數requiredPartitions為空白,則會返回所有分區資料。

requiredBucketIds(List<Integer> requiredBucketIds)

讀取指定的Bucket資料,僅對聚簇表生效,適用於進行Bucket裁剪情境。

說明

如果參數requiredBucketIds為空白,則會返回所有Bucket資料。

withSplitOptions(SplitOptions splitOptions)

切分表資料,其中SplitOptions對象參數定義如下:

public class SplitOptions {

    public static SplitOptions.Builder newBuilder() {
        return new Builder();
    }

    public static class Builder {

      public SplitOptions.Builder SplitByByteSize(long splitByteSize);
  
      public SplitOptions.Builder SplitByRowOffset();
  
      public SplitOptions.Builder withCrossPartition(boolean crossPartition);
  
      public SplitOptions.Builder withMaxFileNum(int splitMaxFileNum);
  
      public SplitOptions build();
    }
}
  • SplitByByteSize(long splitByteSize):按照指定的參數splitByteSize切分資料,服務端返回的單個資料分區大小不超過splitByteSize(單位為:byte)。

    說明
    • 若您未使用SplitByByteSize(long splitByteSize)自訂切分資料值,系統將預設按照256*1024*1024(256 MB)進行切分資料。

    • 您自訂切分資料值時不能小於10*1024*1024(10 MB)。

  • SplitByRowOffset():按照行進行切分資料,允許用戶端從指定的行索引讀取資料。

  • withCrossPartition(boolean crossPartition):是否允許單個資料分區包含多個資料分區。crossPartition參數取值如下:

    • true(預設值):允許單個資料分區包含多個資料分區。

    • false:不允許。

  • withMaxFileNum(int splitMaxFileNum):在表檔案數量較多的情況下,可以通過指定單個資料分區中最大包含的物理檔案數量來產生更多的資料分區。

    說明

    預設不限制單個資料分區中包含的物理檔案數量。

  • build():建立SplitOptions對象。

使用樣本

// 1. 按照SplitSize切分資料,設定SplitSize為256MB

SplitOptions splitOptionsByteSize = 
      SplitOptions.newBuilder().SplitByByteSize(256 * 1024L * 1024L).build()

// 2. 按照RowOffset切分資料

SplitOptions splitOptionsCount = 
      SplitOptions.newBuilder().SplitByRowOffset().build()

//3. 指定單個Split包含的最大檔案數為1

SplitOptions splitOptionsCount = 
      SplitOptions.newBuilder().SplitByRowOffset().withMaxFileNum(1).build()

withArrowOptions(ArrowOptions arrowOptions)

指定Arrow資料選項,ArrowOptions定義如下:

public class ArrowOptions {
    
    public static Builder newBuilder() {
        return new Builder();
    }

    public static class Builder {

        public Builder withTimestampUnit(TimestampUnit unit);

        public Builder withDatetimeUnit(TimestampUnit unit);

        public ArrowOptions build();
    }

    public enum TimestampUnit {
        SECOND,
        MILLI,
        MICRO,
        NANO;
    }
}
  • TimestampUnit:用於指定Timestamp和Datetime資料類型的單位,取值如下:

    • SECOND:秒(s)。

    • MILLI:毫秒(ms)。

    • MICRO:微秒(μs)。

    • NANO:納秒(ns)。

  • withTimestampUnit(TimestampUnit unit):指定Timestamp資料類型的單位,預設單位為NANO。

  • withDatetimeUnit(TimestampUnit unit):指定Datetime資料類型的單位,預設單位為MILLI。

使用樣本

ArrowOptions options = ArrowOptions.newBuilder()
          .withDatetimeUnit(ArrowOptions.TimestampUnit.MILLI)
          .withTimestampUnit(ArrowOptions.TimestampUnit.NANO)
          .build()

withFilterPredicate(Predicate filterPredicate)

指定謂詞下推(Predicate Pushdown)選項,其中Predicate定義如下:

// 1. 二元運算

public class BinaryPredicate extends Predicate {

  public enum Operator {
    /**
     * 二元運算操作符
     */
    EQUALS("="),
    NOT_EQUALS("!="),
    GREATER_THAN(">"),
    LESS_THAN("<"),
    GREATER_THAN_OR_EQUAL(">="),
    LESS_THAN_OR_EQUAL("<=");
   
  }

  public BinaryPredicate(Operator operator, Serializable leftOperand, Serializable rightOperand);

  public static BinaryPredicate equals(Serializable leftOperand, Serializable rightOperand);

  public static BinaryPredicate notEquals(Serializable leftOperand, Serializable rightOperand);

  public static BinaryPredicate greaterThan(Serializable leftOperand, Serializable rightOperand);

  public static BinaryPredicate lessThan(Serializable leftOperand, Serializable rightOperand);

  public static BinaryPredicate greaterThanOrEqual(Serializable leftOperand,
                                                   Serializable rightOperand);

  public static BinaryPredicate lessThanOrEqual(Serializable leftOperand,
                                                Serializable rightOperand);
}

// 2. 一元運算
public class UnaryPredicate extends Predicate {

  public enum Operator {
    /**
     * 一元運算操作符
     */
    IS_NULL("is null"),
    NOT_NULL("is not null");
  }

  public static UnaryPredicate isNull(Serializable operand);
  public static UnaryPredicate notNull(Serializable operand);
}

### 3. IN and NOT IN
public class InPredicate extends Predicate {

  public enum Operator {
    /**
     * IN and NOT IN operators for set membership check
     */
    IN("in"),
    NOT_IN("not in");
  }

  public InPredicate(Operator operator, Serializable operand, List<Serializable> set);

  public static InPredicate in(Serializable operand, List<Serializable> set);

  public static InPredicate notIn(Serializable operand, List<Serializable> set);
}

// 4. 列名
public class Attribute extends Predicate {

  public Attribute(Object value);
    
  public static Attribute of(Object value);
}

// 5. 常量
public class Constant extends Predicate {

  public Constant(Object value);

  public static Constant of(Object value);
}

// 6. 組合運算
public class CompoundPredicate extends Predicate {

  public enum Operator {
    /**
     * 複合謂詞運算子
     */
    AND("and"),
    OR("or"),
    NOT("not");
  }

  public CompoundPredicate(Operator logicalOperator, List<Predicate> predicates);

  public static CompoundPredicate and(Predicate... predicates);
  
  public static CompoundPredicate or(Predicate... predicates);

  public static CompoundPredicate not(Predicate predicates);

  public void addPredicate(Predicate predicate);
}

使用樣本

// 1. c1 > 20000 and c2 < 100000
BinaryPredicate c1 = new BinaryPredicate(BinaryPredicate.Operator.GREATER_THAN, Attribute.of("c1"), Constant.of(20000));
BinaryPredicate c2 = new BinaryPredicate(BinaryPredicate.Operator.LESS_THAN, Attribute.of("c2"), Constant.of(100000));
CompoundPredicate predicate =
        new CompoundPredicate(CompoundPredicate.Operator.AND, ImmutableList.of(c1, c2));

// 2. c1 is not null
Predicate predicate = new UnaryPredicate(UnaryPredicate.Operator.NOT_NULL,  Attribute.of("c1"));

  
// 3. c1 in (1, 10001)
Predicate predicate =
        new InPredicate(InPredicate.Operator.IN,  Attribute.of("c1"), ImmutableList.of(Constant.of(1), Constant.of(10001)));

withSettings(EnvironmentSettings settings)

指定運行環境資訊,EnvironmentSettings 介面定義如下:

public class EnvironmentSettings {

    public static Builder newBuilder() {
        return new Builder();
    }

    public static class Builder {

        public Builder withDefaultProject(String projectName);

        public Builder withDefaultSchema(String schema);

        public Builder withServiceEndpoint(String endPoint);

        public Builder withTunnelEndpoint(String tunnelEndPoint);

        public Builder withQuotaName(String quotaName);

        public Builder withCredentials(Credentials credentials);

        public Builder withRestOptions(RestOptions restOptions);

        public EnvironmentSettings build();
    }
}
  • withDefaultProject(String projectName):設定當前的專案名稱。

    說明

    projectName參數為MaxCompute專案名稱,您可以登入MaxCompute控制台,左上方切換地區後,在左側導覽列選擇工作區>專案管理,查看具體的MaxCompute專案名稱。

  • withDefaultSchema(String schema):設定為當前預設的schema。

    說明

    schema參數為MaxCompute Schema名稱,關於Schema詳情,請參見Schema操作

  • withServiceEndpoint(String endPoint):設定當前服務鏈結接地址Endpoint。

    說明

    各地區的Endpoint資訊,請參見Endpoint

  • withTunnelEndpoint(String tunnelEndPoint):設定當前服務鏈結接地址TunnelEndpoint。

    說明

    各地區的TunnelEndpoint資訊,請參見Endpoint

  • withQuotaName(String quotaName):指定當前使用的Quota名稱。

    訪問MaxCompute支援獨享Data Transmission Service資源群組(訂用帳戶)資源,擷取Quota名稱的方式分別如下:

    • 獨享Data Transmission Service資源群組:登入MaxCompute控制台,左上方切換地區後,在左側導覽列選擇工作區>配額(Quota)管理,查看可使用的Quota列表。具體操作,請參見計算資源-Quota管理

    • 開放儲存:登入MaxCompute控制台,在左側導覽列選擇租戶管理>租戶屬性,開啟開放儲存。

  • withCredentials(Credentials credentials):指定當前認證資訊,其中Credentials定義如下:

    public class Credentials {
    
        public static Builder newBuilder() {
            return new Builder();
        }
    
        public static class Builder {
    
            public Builder withAccount(Account account);
    
            public Builder withAppAccount(AppAccount appAccount);
    
            public Builder withAppStsAccount(AppStsAccount appStsAccount);
    
            public Credentials build();
        }
    
    }
    • withAccount(Account account):指定Odps Account對象。

    • withAppAccount(AppAccount appAccount):指定Odps appAccount對象。

    • withAppStsAccount(AppStsAccount appStsAccount):指定Odps appStsAccount對象。

    • withRestOptions(RestOptions restOptions):指定當前訪問網路的配置,RestOptions定義如下:

      public class RestOptions implements Serializable {
      
          public static Builder newBuilder() {
              return new RestOptions.Builder();
          }
      
          public static class Builder {
              public Builder witUserAgent(String userAgent);
              public Builder withConnectTimeout(int connectTimeout);
              public Builder withReadTimeout(int readTimeout);
              public RestOptions build();
          }
      }
      • witUserAgent(String userAgent):指定當前userAgent資訊。

      • withConnectTimeout(int connectTimeout):指定當前底層網路建立逾時時間,預設為10秒(s)。

      • withReadTimeout(int readTimeout):指定當前底層網路連接逾時時間,120秒(s)。

withSessionId(String sessionId)

指定SessionID資訊,用於重新載入已建立的會話。

buildBatchReadSession()

建立或擷取讀表會話。若提供入參SessionID,則根據SessionID返回已建立的Session;若未提供入參,將建立一個新的讀表會話。

說明

建立操作開銷較大,當檔案數很多時,耗時會比較長。

TableBatchReadSession

TableBatchReadSession介面表示一個從MaxCompute表中讀取資料的會話,主要介面定義如下。

介面定義

public interface TableBatchReadSession {

    String getId();

    TableIdentifier getTableIdentifier();

    SessionStatus getStatus();

    DataSchema readSchema();
    
    InputSplitAssigner getInputSplitAssigner() throws IOException;

    SplitReader<ArrayRecord> createRecordReader(InputSplit split, ReaderOptions options) throws IOException;

    SplitReader<VectorSchemaRoot> createArrowReader(InputSplit split, ReaderOptions options) throws IOException;    

}

介面說明

方法名稱

說明

String getId()

擷取當前會話ID,讀取會話ID的預設逾時時間長度為:24小時(h)。

getTableIdentifier()

擷取當前會話下的表名稱。

getStatus()

擷取當前工作階段狀態,狀態值如下:

  • INIT:建立一個會話時設定的初始值。

  • NORMAL:建立會話成功。

  • CRITICAL:建立會話失敗。

  • EXPIRED:會話逾時。

readSchema()

擷取當前會話的表結構資訊,DataSchema定義如下:

public class DataSchema implements Serializable {
    
    List<Column> getColumns();

    List<String> getPartitionKeys();

    List<String> getColumnNames();

    List<TypeInfo> getColumnDataTypes();

    Optional<Column> getColumn(int columnIndex);

    Optional<Column> getColumn(String columnName);

}
  • getColumns():擷取要讀取的表和分區的Column資訊。

  • getPartitionKeys():擷取要讀取的分區Column名稱。

  • getColumnNames():擷取要讀取的表和分區的Column名稱。

  • getColumnDataTypes():擷取要讀取的表和分區的Column資訊。

  • getColumn(int columnIndex):根據索引擷取Column對象,若索引超出當前Column的範圍,則返回為空白。

  • getColumn(String columnName):根據參數columnName擷取Column對象,若當前表的Column不包含columnName,則返回為空白。

getInputSplitAssigner()

擷取當前會話的InputSplitAssigner。InputSplitAssigner介面定義了在當前讀取會話中分配InputSplit執行個體的方法。每InputSplit代表一個資料分區,可由單個SplitReader處理。InputSplitAssigner定義如下:

public interface InputSplitAssigner {

    int getSplitsCount();

    long getTotalRowCount();

    InputSplit getSplit(int index);

    InputSplit getSplitByRowOffset(long startIndex, long numRecord);
}
  • getSplitsCount():擷取會話包含的資料分區數量。

    說明

    當SplitOptions為SplitByByteSize時,該介面傳回值大於等於0。

  • getTotalRowCount():擷取會話包含的資料行數。

    說明

    當SplitOptions為SplitByByteSize時,該介面傳回值大於等於0

  • getSplit(int index):根據指定的資料分區參數Index擷取對應的InputSplit,參數index取值範圍為:[0,SplitsCount-1]

  • getSplitByRowOffset(long startIndex, long numRecord):擷取對應的InputSplit。參數說明如下:

    • startIndex:指定InputSplit讀取的資料行起始索引,取值範圍為[0,RecordCount-1]

    • numRecord:指定InputSplit讀取的資料行數。

// 1. 若SplitOptions為SplitByByteSize

TableBatchReadSession scan = ...;
InputSplitAssigner assigner = scan.getInputSplitAssigner();
int splitCount = assigner.getSplitsCount();
for (int k = 0; k < splitCount; k++) {
    InputSplit split = assigner.getSplit(k);
    ...
}

// 2. 若SplitOptions為SplitByRowOffset
TableBatchReadSession scan = ...;
InputSplitAssigner assigner = scan.getInputSplitAssigner();
long rowCount = assigner.getTotalRowCount();
long recordsPerSplit = 10000;
for (long offset = 0; offset < numRecords; offset += recordsPerSplit) {
    recordsPerSplit = Math.min(recordsPerSplit, numRecords - offset);
    InputSplit split = assigner.getSplitByRowOffset(offset, recordsPerSplit);
    ...
}

createRecordReader(InputSplit split, ReaderOptions options)

構建SplitReader<ArrayRecord> 對象。其中ReaderOptions定義如下:

public class ReaderOptions {
    
    public static ReaderOptions.Builder newBuilder() {
        return new Builder();
    }

    public static class Builder {

        public Builder withMaxBatchRowCount(int maxBatchRowCount);

        public Builder withMaxBatchRawSize(long batchRawSize);

        public Builder withCompressionCodec(CompressionCodec codec);

        public Builder withBufferAllocator(BufferAllocator allocator);

        public Builder withReuseBatch(boolean reuseBatch);

        public Builder withSettings(EnvironmentSettings settings);

        public ReaderOptions build();
    }

}
  • withMaxBatchRowCount(int maxBatchRowCount):指定服務端返回的每批次資料中最大行數。參數maxBatchRowCount預設最大值為4096。

  • withMaxBatchRawSize(long batchRawSize):指定服務端返回的每個批次資料包含的最大原始位元組數。

  • withCompressionCodec(CompressionCodec codec):指定資料壓縮類型,目前只支援ZSTD和LZ4_FRAME壓縮類型。

    說明
    • 在直接傳輸大量未加壓縮的Arrow資料時,由於網路頻寬的限制,可能會顯著增加資料轉送時間。

    • 若未指定壓縮類型,預設不進行資料壓縮。

  • withBufferAllocator(BufferAllocator allocator):指定讀取Arrow資料的記憶體 Clerk。

  • withReuseBatch(boolean reuseBatch):指定ArrowBatch記憶體是否可複用。reuseBatch參數取值如下:

    • true(預設值):ArrowBatch記憶體可複用。

    • false:ArrowBatch記憶體不可複用。

  • withSettings(EnvironmentSettings settings):指定運行環境資訊。

createArrowReader(InputSplit split, ReaderOptions options)

構建SplitReader<VectorSchemaRoot>對象。

SplitReader

介紹SplitReader介面,此介面用於讀取表資料。

介面定義

public interface SplitReader<T> {

    boolean hasNext() throws IOException;

    T get();

    Metrics currentMetricsValues();

    void close() throws IOException;
}

介面說明

方法名稱

說明

hasNext()

確認是否還有更多資料項目可讀。如果還有下一個資料項目可以讀取,則返回true;否則,返回false。

get()

擷取當前的資料項目。調用此方法前應確保通過hasNext()方法確認有下一個元素。

currentMetricsValues()

擷取SplitReader相關的指標。

close()

讀取結束後,關閉串連。

使用樣本

  1. 配置串連MaxCompute服務的環境

    // 阿里雲帳號或RAM使用者的AccessKey ID和AccessKey Secret
    // 阿里雲帳號AccessKey擁有所有API的存取權限,風險很高。強烈建議您建立並使用RAM使用者進行API訪問或日常營運,請登入RAM控制台建立RAM使用者
    // 此處以把AccessKey 和 AccessKeySecret 儲存在環境變數為例說明。您也可以根據業務需要,儲存到設定檔裡
    // 強烈建議不要把 AccessKey 和 AccessKeySecret 儲存到代碼裡,會存在密鑰泄漏風險
    private static String accessId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
    private static String accessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
    //訪問MaxCompute使用的Quota名稱
    String quotaName = "<quotaName>";
    //MaxCompute專案名稱
    String project = "<project>";
    //建立Odps對象來串連MaxCompute服務
    Account account = new AliyunAccount(accessId, accessKey);
    Odps odps = new Odps(account);
    odps.setDefaultProject(project);
    //MaxCompute服務的串連地址,當前僅支援使用阿里雲VPC網路
    odps.setEndpoint(endpoint);
    Credentials credentials = Credentials.newBuilder().withAccount(odps.getAccount()).withAppAccount(odps.getAppAccount()).build();
    EnvironmentSettings settings = EnvironmentSettings.newBuilder().withCredentials(credentials).withServiceEndpoint(odps.getEndpoint()).withQuotaName(quotaName).build();
    說明
    • 擷取獨享Data Transmission Service資源群組(訂用帳戶)資源的Quota名稱的方式分別如下:

      • 獨享Data Transmission Service資源群組:登入MaxCompute控制台,左上方切換地區後,在左側導覽列選擇工作區>配額(Quota)管理,查看可使用的Quota列表。具體操作,請參見計算資源-Quota管理

      • 開放儲存:登入MaxCompute控制台,在左側導覽列選擇租戶管理>租戶屬性,開啟開放儲存。

    • 當前針對作業層級指定Quota的鑒權,預設所有帳號(包括阿里雲帳號)和角色都沒有許可權,需要進行授權操作。授權操作詳情,請參見授權

  2. 讀表操作。

    1. 建立資料讀取會話,讀取MaxCompute資料。

      //MaxCompute專案對應的表名稱
      String tableName = "<table.name>";
      //建立表資料讀取會話
      TableReadSessionBuilder scanBuilder = new TableReadSessionBuilder();
      TableBatchReadSession scan = scanBuilder.identifier(TableIdentifier.of(project, tableName)).withSettings(settings)
              .withSplitOptions(SplitOptions.newBuilder()
                      .SplitByByteSize(256 * 1024L * 1024L)
                      .withCrossPartition(false).build())
              .requiredDataColumns(Arrays.asList("timestamp"))
              .requiredPartitionColumns(Arrays.asList("pt1"))
              .buildBatchReadSession();
      說明

      在資料量較大、網路延遲或不穩定的情況下,可能會導致建立資料讀取會話時間過長,從而自動切換到非同步流程建立資料讀取會話。

    2. 遍曆每個切片中的MaxCompute資料,並使用Arrow讀取器逐個讀取每個切片中的資料並輸出資料內容。

      //遍曆所有輸入切片資料,並使用Arrow讀取器逐個讀取每個切片中的資料批次,最後輸出每批資料的內容
      InputSplitAssigner assigner = scan.getInputSplitAssigner();
      for (InputSplit split : assigner.getAllSplits()) {
          SplitReader<VectorSchemaRoot> reader =
                  scan.createArrowReader(split, ReaderOptions.newBuilder()
                          .withSettings(settings)
                          .withCompressionCodec(CompressionCodec.ZSTD)
                          .withReuseBatch(true)
                          .build());
      
          int rowCount = 0;
          List<VectorSchemaRoot> batchList = new ArrayList<>();
          while (reader.hasNext()) {
              VectorSchemaRoot data = reader.get();
              rowCount += data.getRowCount();
              System.out.println(data.contentToTSVString());
          }
          reader.close();
      }

相關文檔

關於MaxCompute開放儲存介紹,請參見開放儲存概述