MaxCompute支援第三方引擎(如Spark on EMR、StarRocks、Presto、PAI和Hologres)通過SDK調用Storage API直接存取MaxCompute資料,本文為您介紹使用Java SDK訪問MaxCompute的程式碼範例。
概述
使用Java SDK訪問MaxCompute的主要介面如下。
主要介面 | 描述 |
用於建立一個MaxCompute讀表會話。 | |
表示一個從MaxCompute表中讀取資料的會話。 | |
用於讀取資料會話包含的一個資料分區。 |
如果您使用Maven,可以從Maven庫中搜尋odps-sdk-table-api擷取不同版本的Java SDK,相關配置資訊如下。
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-sdk-table-api</artifactId>
<version>0.48.8-public</version>
</dependency>MaxCompute提供了開放儲存相關介面,詳情請參見odps-sdk-table-api。
TableReadSessionBuilder
TableReadSessionBuilder介面用於建立一個MaxCompute讀表會話,其中主要介面定義如下。更多詳情,請參見Java-sdk-doc。
介面定義
public class TableReadSessionBuilder {
public TableReadSessionBuilder table(Table table);
public TableReadSessionBuilder identifier(TableIdentifier identifier);
public TableReadSessionBuilder requiredDataColumns(List<String> requiredDataColumns);
public TableReadSessionBuilder requiredPartitionColumns(List<String> requiredPartitionColumns);
public TableReadSessionBuilder requiredPartitions(List<PartitionSpec> requiredPartitions);
public TableReadSessionBuilder requiredBucketIds(List<Integer> requiredBucketIds);
public TableReadSessionBuilder withSplitOptions(SplitOptions splitOptions);
public TableReadSessionBuilder withArrowOptions(ArrowOptions arrowOptions);
public TableReadSessionBuilder withFilterPredicate(Predicate filterPredicate);
public TableReadSessionBuilder withSettings(EnvironmentSettings settings);
public TableReadSessionBuilder withSessionId(String sessionId);
public TableBatchReadSession buildBatchReadSession();
}介面說明
方法名稱 | 說明 |
| 將傳入的參數Table,定義為當前會話中的目標表。 |
| 將傳入的參數TableIdentifier ,定義為當前會話中的目標表。 |
| 讀取指定欄位的資料,並確保返回的資料中的欄位順序與參數 說明 如果參數 |
| 讀取指定表下的指定分區中指定列的資料。適用於進行分區裁剪的情境。 說明 如果參數 |
| 讀取指定表下指定分區的資料,適用於進行分區裁剪的情境。 說明 如果參數 |
| 讀取指定的Bucket資料,僅對聚簇表生效,適用於進行Bucket裁剪情境。 說明 如果參數 |
| 切分表資料,其中SplitOptions對象參數定義如下:
使用樣本 |
| 指定Arrow資料選項,
使用樣本 |
| 指定謂詞下推(Predicate Pushdown)選項,其中Predicate定義如下: 使用樣本 |
| 指定運行環境資訊,EnvironmentSettings 介面定義如下:
|
| 指定SessionID資訊,用於重新載入已建立的會話。 |
| 建立或擷取讀表會話。若提供入參SessionID,則根據SessionID返回已建立的Session;若未提供入參,將建立一個新的讀表會話。 說明 建立操作開銷較大,當檔案數很多時,耗時會比較長。 |
TableBatchReadSession
TableBatchReadSession介面表示一個從MaxCompute表中讀取資料的會話,主要介面定義如下。
介面定義
public interface TableBatchReadSession {
String getId();
TableIdentifier getTableIdentifier();
SessionStatus getStatus();
DataSchema readSchema();
InputSplitAssigner getInputSplitAssigner() throws IOException;
SplitReader<ArrayRecord> createRecordReader(InputSplit split, ReaderOptions options) throws IOException;
SplitReader<VectorSchemaRoot> createArrowReader(InputSplit split, ReaderOptions options) throws IOException;
}介面說明
方法名稱 | 說明 |
| 擷取當前會話ID,讀取會話ID的預設逾時時間長度為:24小時(h)。 |
| 擷取當前會話下的表名稱。 |
| 擷取當前工作階段狀態,狀態值如下:
|
| 擷取當前會話的表結構資訊,DataSchema定義如下:
|
| 擷取當前會話的InputSplitAssigner。InputSplitAssigner介面定義了在當前讀取會話中分配InputSplit執行個體的方法。每InputSplit代表一個資料分區,可由單個SplitReader處理。InputSplitAssigner定義如下:
|
| 構建
|
| 構建 |
SplitReader
介紹SplitReader介面,此介面用於讀取表資料。
介面定義
public interface SplitReader<T> {
boolean hasNext() throws IOException;
T get();
Metrics currentMetricsValues();
void close() throws IOException;
}介面說明
方法名稱 | 說明 |
| 確認是否還有更多資料項目可讀。如果還有下一個資料項目可以讀取,則返回true;否則,返回false。 |
| 擷取當前的資料項目。調用此方法前應確保通過 |
| 擷取SplitReader相關的指標。 |
| 讀取結束後,關閉串連。 |
使用樣本
配置串連MaxCompute服務的環境。
// 阿里雲帳號或RAM使用者的AccessKey ID和AccessKey Secret // 阿里雲帳號AccessKey擁有所有API的存取權限,風險很高。強烈建議您建立並使用RAM使用者進行API訪問或日常營運,請登入RAM控制台建立RAM使用者 // 此處以把AccessKey 和 AccessKeySecret 儲存在環境變數為例說明。您也可以根據業務需要,儲存到設定檔裡 // 強烈建議不要把 AccessKey 和 AccessKeySecret 儲存到代碼裡,會存在密鑰泄漏風險 private static String accessId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"); private static String accessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"); //訪問MaxCompute使用的Quota名稱 String quotaName = "<quotaName>"; //MaxCompute專案名稱 String project = "<project>"; //建立Odps對象來串連MaxCompute服務 Account account = new AliyunAccount(accessId, accessKey); Odps odps = new Odps(account); odps.setDefaultProject(project); //MaxCompute服務的串連地址,當前僅支援使用阿里雲VPC網路 odps.setEndpoint(endpoint); Credentials credentials = Credentials.newBuilder().withAccount(odps.getAccount()).withAppAccount(odps.getAppAccount()).build(); EnvironmentSettings settings = EnvironmentSettings.newBuilder().withCredentials(credentials).withServiceEndpoint(odps.getEndpoint()).withQuotaName(quotaName).build();說明擷取獨享Data Transmission Service資源群組(訂用帳戶)資源的Quota名稱的方式分別如下:
獨享Data Transmission Service資源群組:登入MaxCompute控制台,左上方切換地區後,在左側導覽列選擇工作區>配額(Quota)管理,查看可使用的Quota列表。具體操作,請參見計算資源-Quota管理。
開放儲存:登入MaxCompute控制台,在左側導覽列選擇租戶管理>租戶屬性,開啟開放儲存。
當前針對作業層級指定Quota的鑒權,預設所有帳號(包括阿里雲帳號)和角色都沒有許可權,需要進行授權操作。授權操作詳情,請參見授權。
讀表操作。
建立資料讀取會話,讀取MaxCompute資料。
//MaxCompute專案對應的表名稱 String tableName = "<table.name>"; //建立表資料讀取會話 TableReadSessionBuilder scanBuilder = new TableReadSessionBuilder(); TableBatchReadSession scan = scanBuilder.identifier(TableIdentifier.of(project, tableName)).withSettings(settings) .withSplitOptions(SplitOptions.newBuilder() .SplitByByteSize(256 * 1024L * 1024L) .withCrossPartition(false).build()) .requiredDataColumns(Arrays.asList("timestamp")) .requiredPartitionColumns(Arrays.asList("pt1")) .buildBatchReadSession();說明在資料量較大、網路延遲或不穩定的情況下,可能會導致建立資料讀取會話時間過長,從而自動切換到非同步流程建立資料讀取會話。
遍曆每個切片中的MaxCompute資料,並使用Arrow讀取器逐個讀取每個切片中的資料並輸出資料內容。
//遍曆所有輸入切片資料,並使用Arrow讀取器逐個讀取每個切片中的資料批次,最後輸出每批資料的內容 InputSplitAssigner assigner = scan.getInputSplitAssigner(); for (InputSplit split : assigner.getAllSplits()) { SplitReader<VectorSchemaRoot> reader = scan.createArrowReader(split, ReaderOptions.newBuilder() .withSettings(settings) .withCompressionCodec(CompressionCodec.ZSTD) .withReuseBatch(true) .build()); int rowCount = 0; List<VectorSchemaRoot> batchList = new ArrayList<>(); while (reader.hasNext()) { VectorSchemaRoot data = reader.get(); rowCount += data.getRowCount(); System.out.println(data.contentToTSVString()); } reader.close(); }
相關文檔
關於MaxCompute開放儲存介紹,請參見開放儲存概述。